This blog post demonstrates how to auto-scale your DynamoDB Streams consumer applications on Kubernetes. You will work with a Java application that uses the DynamoDB Streams Kinesis adapter library to consume change data events from a DynamoDB table. It will be deployed to an Amazon EKS cluster and will be scaled automatically using KEDA
.
The application includes an implementation of the com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
that processes data from the DynamoDB stream and replicates it to another (target) DynamoDB table - this is just used as an example. We will use the AWS CLI to produce data to DynamoDB stream and observe the scaling of the application.
The code is available in this GitHub repository
What's covered?
- Introduction
- What is KEDA?
- Prerequisites
- Setup and configure KEDA on EKS
- Configure IAM Roles
- Deploy DynamoDB Streams consumer application to EKS
- DynamoDB Streams consumer app autoscaling in action with KEDA
- Delete resources
- Conclusion
Introduction
Amazon DynamoDB is a fully managed database service that provides fast and predictable performance with seamless scalability. With DynamoDB Streams, you can leverage Change Data Capture (CDC) to get notified about changes to DynamoDB table data in real-time. This makes it possible to easily build applications that react to changes in the underlying database without the need for complex polling or querying.
DynamoDB offers two streaming models for change data capture:
- Kinesis Data Streams for DynamoDB
- DynamoDB Streams
With Kinesis Data Streams, you can capture item-level modifications in any DynamoDB table and replicate them to a Kinesis data stream. With DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours.
We will make use of the native DynamoDB Streams capability. Even with DynamoDB Streams, there are multiple options to choose from when it comes to consuming the change data events:
- Use the low-level DynamoDB Streams API to read the change data events from the DynamoDB Streams table.
- Use an AWS Lambda trigger
- Use the DynamoDB Streams Kinesis adapter library
Our application will leverage DynamoDB Streams along with the Kinesis Client Library (KCL) adapter library 1.x to consume change data events from a DynamoDB table.
Horizontal scalability with Kinesis Client Library
The Kinesis Client Library ensures that for every shard there is a record processor running and processing that shard. KCL helps take care of many of the complex tasks associated with distributed computing and scalability. It connects to the data stream, enumerates the shards within the data stream and uses leases to coordinates shard associations with its consumer applications.
A record processor is instantiated for every shard it manages. KCL pulls data records from the data stream, pushes the records to the corresponding record processor and checkpoints processed records. More importantly, it balances shard-worker associations (leases) when the worker instance count changes or when the data stream is re-sharded (shards are split or merged). This means that you are able to scale your DynamoDB Streams application by simply adding more instances since KCL will automatically balance the shards across the instances.
But, you still need a way to scale your applications when the load increases. Of course, you could do it manually or build a custom solution to get this done.
This is where KEDA comes in.
What is KEDA?
KEDA
is a Kubernetes-based event-driven autoscaling component that can monitor event sources like DynamoDB Streams and scale the underlying Deployment
s (and Pod
s) based on the number of events needing to be processed. It's built on top of native Kubernetes primitives such as the Horizontal Pod Autoscaler that can be added to any Kubernetes cluster. Here is a high level overview of it's key components (you can refer to the KEDA documentation for a deep-dive):
From KEDA documentation - https://keda.sh/docs/2.8/concepts/
- The
keda-operator-metrics-apiserver
component inKEDA
acts as a Kubernetes metrics server that exposes metrics for the Horizontal Pod Autoscaler - A KEDA Scaler integrates with an external system (such as Redis) to fetch these metrics (e.g. length of a List) to drives auto scaling of any container in Kubernetes based on the number of events needing to be processed.
- The role of the
keda-operator
component is to activate and deactivateDeployment
i.e. scale to and from zero.
You will see the DynamoDB Streams scaler in action that scales based on the shard count of a DynamoDB Stream.
Now lets move on the practical part of this tutorial.
Prerequisites
In addition to an AWS account, you will need to have the AWS CLI, kubectl and Docker installed.
Setup an EKS cluster and create a DynamoDB tables
There are a variety of ways in which you can create an Amazon EKS cluster. I prefer using eksctl CLI because of the convenience it offers. Creating an an EKS cluster using eksctl
, can be as easy as this:
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
For details, refer to the Getting started with Amazon EKS – eksctl.
Create a DynamoDB table with streams enabled to persist application data and access the change data feed. You can use the AWS CLI to create a table with the following command:
aws dynamodb create-table \
--table-name users \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
We will need to create another table that will serve as a replica for the first table.
aws dynamodb create-table \
--table-name users_replica \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
Clone this GitHub repository and change to the right directory:
git clone https://github.com/abhirockzz/dynamodb-streams-keda-autoscale
cd dynamodb-streams-keda-autoscale
Ok let's get started!
Setup and configure KEDA on EKS
For the purposes of this tutorial, you will use YAML files to deploy KEDA
. But you could also use Helm charts.
Install KEDA
:
# update version 2.8.2 if required
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
Verify the installation:
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
Configure IAM Roles
The KEDA operator as well as the DynamoDB streams consumer application need to invoke AWS APIs. Since both will run as Deployment
s in EKS, we will use IAM Roles for Service Accounts (IRSA) to provide the necessary permissions.
In our particular scenario:
-
KEDA
operator needs to be able to get information about the DynamoDB table and Stream. - The application (KCL 1.x library to be specific) needs to interact with Kinesis and DynamoDB - it needs a bunch of IAM permissions to do so.
Configure IRSA for KEDA operator
Set your AWS Account ID and OIDC Identity provider as environment variables:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
Create a JSON
file with Trusted Entities for the role:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
Now, create the IAM role and attach the policy (take a look at policy_dynamodb_streams_keda.json
file for details):
export ROLE_NAME=keda-operator-dynamodb-streams-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for DynamoDB streams KEDA scaler on EKS"
aws iam create-policy --policy-name keda-dynamodb-streams-policy --policy-document file://policy_dynamodb_streams_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-dynamodb-streams-policy
Associate the IAM role and Service Account:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
You will need to restart KEDA
operator Deployment
for this to take effect:
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-dynamodb-streams-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
Configure IRSA for the DynamoDB Streams consumer application
Start by creating a Kubernetes Service Account:
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: dynamodb-streams-consumer-app-sa
EOF
Create a JSON
file with Trusted Entities for the role:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:default:dynamodb-streams-consumer-app-sa"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust.json
Now, create the IAM role and attach the policy. Update the policy.json
file and enter the region and AWS account details.
export ROLE_NAME=dynamodb-streams-consumer-app-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for DynamoDB Streams consumer app on EKS"
aws iam create-policy --policy-name dynamodb-streams-consumer-app-policy --policy-document file://policy.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/dynamodb-streams-consumer-app-policy
Associate the IAM role and Service Account:
kubectl annotate serviceaccount -n default dynamodb-streams-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/dynamodb-streams-consumer-app-sa
The core infrastructure is now ready. Let's prepare and deploy the consumer application.
Deploy DynamoDB Streams consumer application to EKS
We would first need to build the Docker image and push it to ECR (you can refer to the Dockerfile
for details)
Build and push the Docker image to ECR
# create runnable JAR file
mvn clean compile assembly\:single
# build docker image
docker build -t dynamodb-streams-consumer-app .
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
aws ecr create-repository --repository-name dynamodb-streams-consumer-app --region us-east-1
# tag and push the image
docker tag dynamodb-streams-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
Deploy the consumer application
Update the consumer.yaml
to include the Docker image you just pushed to ECR and the ARN for the DynamoDB streams for the source table. The rest of the manifest remains the same.
To retrieve the ARN for the stream, run the following command:
aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn'
The consumer.yaml
Deployment
manifest looks like this:
apiVersion: apps/v1
kind: Deployment
metadata:
name: dynamodb-streams-kcl-consumer-app
spec:
replicas: 1
selector:
matchLabels:
app: dynamodb-streams-kcl-consumer-app
template:
metadata:
labels:
app: dynamodb-streams-kcl-consumer-app
spec:
serviceAccountName: dynamodb-streams-kcl-consumer-app-sa
containers:
- name: dynamodb-streams-kcl-consumer-app
image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-kcl-consumer-app:latest
imagePullPolicy: Always
env:
- name: TARGET_TABLE_NAME
value: users_replica
- name: APPLICATION_NAME
value: dynamodb-streams-kcl-app-demo
- name: SOURCE_TABLE_STREAM_ARN
value: <enter ARN>
- name: AWS_REGION
value: us-east-1
- name: INSTANCE_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
Create the Deployment
:
kubectl apply -f consumer.yaml
# verify Pod transition to Running state
kubectl get pods -w
DynamoDB Streams consumer app autoscaling in action with KEDA
Now that you've deployed the consumer application, KCL adapter library should jump into action. The first thing it will do is create a "control table" in DynamoDB - it should be the same as name of the application (which in this case is dynamodb-streams-kcl-app-demo
).
It might take a few minutes for the initial co-ordination to happen and the table to get created. You can check the logs of the consumer application to see the progress.
kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
Once the lease allocation is complete, check the table and note the leaseOwner
attribute:
aws dynamodb describe-table --table-name dynamodb-streams-kcl-app-demo
Add data to the DynamoDB table
Now that you've deployed the consumer application, let's add data to the source DynamoDB table (users
).
You can use the producer.sh
script for this.
export export TABLE_NAME=users
./producer.sh
Check consumer logs to see the messages being processed:
kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
Check the target table (users_replica
) to confirm that the DynamoDB streams consumer application has indeed replicated the data.
aws dynamodb scan --table-name users_replica
Notice that the value for the processed_by
attribute? It's the same as consumer application Pod
. This will make it easier for us to verify the end to end autoscaling process.
Create the KEDA scaler
Use the scaler definition:
kubectl apply -f keda-dynamodb-streams-scaler.yaml
Here is the ScaledObject
definition. Notice that it's targeting the dynamodb-streams-kcl-consumer-app
Deployment
(the one we just created) and the shardCount
is set to 2
:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-dynamodb-streams-scaledobject
spec:
scaleTargetRef:
name: dynamodb-streams-kcl-consumer-app
triggers:
- type: aws-dynamodb-streams
metadata:
awsRegion: us-east-1
tableName: users
shardCount: "2"
identityOwner: "operator"
Note on shardCount
attribute:
We are using the shardCount
value of 2
. This is very important to note since we are using DynamoDB Streams Kinesis adapter library using KCL 1.x that supports "Up to 2 simultaneous consumers per shard.". This means that you cannot have more than two consumer application instances processing the same DynamoDB stream shard.
However, this KEDA scaler configuration will ensure that there will be one Pod
for every two shards. So, for e.g. if there are four shards, the application will be scaled out to two Pods
. If there are siz shards, there will be three Pods
, and so on. Of course, you can choose to have one Pod
for every shard by setting the shardCount
to 1
.
To keep a track of the number of shards in the DynamoDB stream, you can run the following command:
aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards | length'
I have used a utility called
jq
If you want the shard details:
aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards'
Verify DynamoDB streams consumer application auto-scaling
We started off with one Pod
of our application. But, thanks to KEDA
, we should now see additional Pod
s coming up automatically to match the processing requirements of the consumer application.
To confirm, check the number of Pods
:
kubectl get pods -l=app=dynamodb-streams-kcl-consumer-app-consumer
Most likely, you will see fours shards in the DynamoDB stream and two Pod
s. This can change (increase/decrease) depending on the rate at which data is produced to the DynamoDB table.
Just like before, validate the data in the DynamoDB target table (users_replica
) and note the processed_by
attribute. Since we have scaled out to additional Pod
s, the value should be different for each record since each Pod
will process a subset of the messages from the DynamoDB change stream.
Also, make sure to check dynamodb-streams-kcl-app-demo
control table in DynamoDB
- You should see an update for the leaseOwner
reflecting the fact that now there are two Pod
s consuming from the DynamoDB stream.
Once you have verified the end to end solution, you can clean up the resources to avoid incurring any additional charges.
Delete resources
Delete the EKS cluster and DynamoDB tables.
eksctl delete cluster --name <enter cluster name>
aws dynamodb delete-table --table-name users
aws dynamodb delete-table --table-name users_replica
Conclusion
Use cases you should experiment with
-
Scale further up - How can you make DynamoDB streams increase it's number of shards? What happens to the number of consumer instance
Pod
s? - Scale down - What happens when the shard capacity of the DynamoDB streams decreases?
In this post, we demonstrated how to use KEDA
and DynamoDB Streams and combine two powerful techniques (Change data capture and Auto-scaling) to build scalable, event-driven systems that can adapt based on the data processing needs of your application.