The DynamoDB Scan API accesses every items in a table (or secondary index). It is the equivalent of a select * from
query. One of the things I will cover in this blog is how to use Scan API with the DynamoDB Go SDK.
To scan a table, we need some data to begin with! So in the process, I will also go into how to use the Batch
API to write bulk data in DynamoDB
. You can use the BatchWriteItem API to create or delete items in batches (of twenty five) and it's possible to you can combine these operations across multiple tables.
We will start simple and gradually improve our approach to use the APIs efficiently. I will also go over some of the basic tests that I ran to demonstrate incremental improvements. Finally I will wrap up by highlighting some of the considerations while using these operations.
You can refer to the code on GitHub
Before you proceed...
... make sure to create a DynamoDB
table called users
with:
- partition key
email
(data typeString
) and -
On-Demand
capacity mode.
Also, there are a few things I want to call a few things to set the context:
- The table was created in
us-east-1
and tests were executed from anEC2
instance inus-east-1
as well - Since these are general tests instead of specialised benchmarks, I did not do any special tuning (at any level). These are just Go functions that were executed with different inputs, keeping things as simple as possible.
- The tests include marshalling (converting Go
struct
toDynamoDB
data types) forBatchWriteItem
operations and un-marshalling (converting fromDynamoDB
data types back to Gostruct
) forScan
operation.
Lets start off by exploring the BatchWriteItem
API. This way we will have data to work with the Scan
operations as well.
Win-win!
Importing data in batches
Since you can combine 25 items in a single invocation, using a batch approach for bulk data imports is much better compared to invoking the PutItem in a loop (or even in parallel).
Here is a basic example of how you would use BatchWriteItem
:
func basicBatchImport() {
startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest
for i := 1; i <= 25; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
item, _ := attributevalue.MarshalMap(user)
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}
batch[table] = requests
op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Fatal("batch write error", err)
} else {
log.Println("batch insert done")
}
if len(op.UnprocessedItems) != 0 {
log.Println("there were", len(op.UnprocessedItems), "unprocessed records")
}
log.Println("inserted", (25 - len(op.UnprocessedItems)), "records in", time.Since(startTime).Seconds(), "seconds")
}
With BatchWriteItemInput, we can define the operations we want to perform in the batch - here we are just going to perform PutRequests (which is encapsulated within another type called WriteRequest).
We assemble the WriteRequest
s in a slice and finally put them in a map
with key being the table name - this is exactly what the RequestItems
attribute in BatchWriteItemInput
needs.
In this case we are dealing with a single table but you could execute operations on multiple tables.
In this example we just dealt with one batch of 25 records (maximum permitted batch size). If we want to import more records, all we need to do is split them into batches of 25 and execute them one (sub)batch at a time. Simple enough - here is an example:
func basicBatchImport2(total int) {
startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
processed := total
for num := 1; num <= total; num = num + batchSize {
batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest
start := num
end := num + 24
for i := start; i <= end; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
item, _ := attributevalue.MarshalMap(user)
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}
batch[table] = requests
op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Fatal("batch write error", err)
}
if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}
log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")
if processed != total {
log.Println("there were", (total - processed), "unprocessed records")
}
}
I tried this with 50000 records (which means 2000 batches) and it took approximately 15 seconds. But we can do much better!
Parallel batch import
Instead of processing each batch sequentially, we can spin up a goroutine
for each batch:
func parallelBatchImport(numRecords int) {
startTime := time.Now()
cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
var wg sync.WaitGroup
processed := numRecords
for num := 1; num <= numRecords; num = num + batchSize {
start := num
end := num + 24
wg.Add(1)
go func(s, e int) {
defer wg.Done()
batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest
for i := s; i <= e; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
item, err := attributevalue.MarshalMap(user)
if err != nil {
log.Fatal("marshal map failed", err)
}
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}
batch[table] = requests
op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Fatal("batch write error", err)
}
if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}(start, end)
}
log.Println("waiting for all batches to finish....")
wg.Wait()
log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")
if processed != numRecords {
log.Println("there were", (numRecords - processed), "unprocessed records")
}
}
The results improved by a good margin. Here is what I got. On an average:
- Inserting 50000 records took ~ 2.5 seconds
- inserted 100000 records in ~ 4.5 to 5 seconds
- inserted 150000 records in less than 9.5 seconds
- inserted 200000 records in less than 11.5 seconds
There maybe unprocessed records in a batch. This example detects these records, but the retry logic has been skipped to keep things simple. Ideally you should have a (exponential back-off based) retry mechanism for handling unprocessed records as well.
To insert more data, I ran the parallelBatchImport
function (above) in loops. For example:
for i := 1; i <= 100; i++ {
parallelBatchImport(50000)
}
Alright, let's move ahead. Now that we have some data, let's try ...
... the Scan API
This is what basic usage looks like:
func scan() {
startTime := time.Now()
op, err := client.Scan(context.Background(), &dynamodb.ScanInput{
TableName: aws.String(table),
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
})
if err != nil {
log.Fatal("scan failed", err)
}
for _, i := range op.Items {
var u User
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Fatal("unmarshal failed", err)
}
}
if op.LastEvaluatedKey != nil {
log.Println("all items have not been scanned")
}
log.Println("scanned", op.ScannedCount, "items in", time.Since(startTime).Seconds(), "seconds")
log.Println("consumed capacity", *op.ConsumedCapacity.CapacityUnits)
}
Just provide the table (or secondary index) name and you are good to go! But, there are chances that you might not be able to get all items because of API limits (1 MB worth of data per invocation). In my case took about 0.5 secs for approximately 15000 records - rest of the items were skipped because the 1 MB limit was breached.
Using Pagination
To handle the limitation around data, the Scan
API returns LastEvaluatedKey
in its output to point to the last processed record. All you need to do is invoke Scan
again, with the value for ExclusiveStartKey
attribute set to the one for LastEvaluatedKey
.
Using paginated scan approach took me approximately 100 secs to scan ~ 7.5 million records.
Parallel Scan
Pagination helps, but it's still a sequential process. There is lot of scope for improvement. Thankfully, Scan
allows you to adopt a parallelized approach i.e. you can use multiple workers (goroutine
s in this case) to process data in parallel!
func parallelScan(pageSize, totalWorkers int) {
log.Println("parallel scan with page size", pageSize, "and", totalWorkers, "goroutines")
startTime := time.Now()
var total int
var wg sync.WaitGroup
wg.Add(totalWorkers)
for i := 0; i < totalWorkers; i++ {
// start a goroutine for each segment
go func(segId int) {
var segTotal int
defer wg.Done()
lastEvaluatedKey := make(map[string]types.AttributeValue)
scip := &dynamodb.ScanInput{
TableName: aws.String(table),
Limit: aws.Int32(int32(pageSize)),
Segment: aws.Int32(int32(segId)),
TotalSegments: aws.Int32(int32(totalWorkers)),
}
for {
if len(lastEvaluatedKey) != 0 {
scip.ExclusiveStartKey = lastEvaluatedKey
}
op, err := client.Scan(context.Background(), scip)
if err != nil {
log.Fatal("scan failed", err)
}
segTotal = segTotal + int(op.Count)
for _, i := range op.Items {
var u User
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Fatal("unmarshal failed", err)
}
}
if len(op.LastEvaluatedKey) == 0 {
log.Println("[ segment", segId, "] finished")
total = total + segTotal
log.Println("total records processsed by segment", segId, "=", segTotal)
return
}
lastEvaluatedKey = op.LastEvaluatedKey
}
}(i)
}
log.Println("waiting...")
wg.Wait()
log.Println("done...")
log.Println("scanned", total, "items in", time.Since(startTime).Seconds(), "seconds")
}
Segment
and TotalSegments
attributes are the key to how Scan
API enables parallelism. TotalSegments
is nothing but the number of threads/goroutine
s/worker-processes that need to be spawned and Segment
is a unique identifier for each of them.
In my tests, the Scan
performance remained (almost) constant at 37-40 seconds (average) for about ~ 7.5 million records (I tried a variety of page size and goroutine
combinations).
How many TotalSegments
do I need to configure???
To tune appropriate number of parallel threads/workers, you might need to experiment a bit. A lot might depend on your client environment.
- Do you have enough compute resources?
- Some environments/runtimes might have managed thread-pools, so you will have to comply with those
So, you will need to try things out to find the optimum parallelism for your. one way to think about it could be to choose one segment (single worker/thread/goroutine
) per unit of data (say a segment for every GB of data you want to scan).
Wrap up - API considerations
Both Batch
and Scan
APIs are quite powerful, but there are nuances you should be aware of. My advise is to read up the API documentation thoroughly.
With Batch
APIs:
- There are certain limits:
- No more than 25 requests in a batch
- Individual item in a batch should not exceeds 400KB
- Total size of items in a single
BatchWriteItem
cannot be more than 16MB
-
BatchWriteItem
cannot update items - You cannot specify conditions on individual
put
anddelete
requests - It does not return deleted items in the response
- If there are failed operations, you can access them via the
UnprocessedItems
response parameter
Use Scan wisely
Since a Scan
operation goes over the entire table (or secondary index), it's highly likely that it consumes a large chunk of the provisioned throughput, especially if it's a large table. That being said, Scan
should be your last resort. Check whether Query API (or BatchGetItem) works for your use-case.
The same applies to parallel
Scan
.
There are a few ways in which you can further narrow down the results by using a Filter Expression, a Limit
parameter (as demonstrated earlier) or a ProjectionExpression
to return only a subset of attributes.
That's all for this blog. I hope you found it useful.
Until next time, Happy coding!