This blog post is for folks interested in learning how to use Golang and AWS Lambda to build a serverless solution. You will be using the aws-lambda-go library along with the AWS Go SDK v2 for an application that will process records from an Amazon Kinesis data stream and store them in a DynamoDB table. But that's not all! You will also use Go bindings for AWS CDK to implement "Infrastructure-as-code" for the entire solution and deploy it with the AWS CDK CLI.
What's covered?
- Introduction
- Pre-requisites
- Use AWS CDK to deploy the solution
- Verify the solution
- Code walk through
- Wrap up
Introduction
Amazon Kinesis is a platform for real-time data processing, ingestion, and analysis. Kinesis Data Streams is a serverless streaming data service (part of the Kinesis streaming data platform, along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics) that enables developers to collect, process, and analyze large amounts of data in real-time from various sources such as social media, IoT devices, logs, and more. AWS Lambda, on the other hand, is a serverless compute service that allows developers to run their code without having to manage the underlying infrastructure.
The integration of Amazon Kinesis with AWS Lambda provides an efficient way to process and analyze large data streams in real-time. A Kinesis data stream is a set of shards and each shard contains a sequence of data records. A Lambda function can act as a consumer application and process data from a Kinesis data stream. You can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out. For standard iterators, Lambda polls each shard in your Kinesis stream for records using HTTP protocol. The event source mapping shares read throughput with other consumers of the shard.
Amazon Kinesis and AWS Lambda can be used together to build many solutions including real-time analytics (allowing businesses to make informed decisions), log processing (use logs to proactively identify and address issues in server/applications etc. before they become critical), IoT data processing (analyze device data in real-time and trigger actions based on the results), clickstream analysis (provide insights into user behavior), fraud detection (detect and prevent fraudulent card transactions) and more.
As always, the code is available on GitHub
Pre-requisites
Before you proceed, make sure you have the Go programming language (v1.18 or higher) and AWS CDK installed.
Clone the GitHub repository and change to the right directory:
git clone https://github.com/abhirockzz/kinesis-lambda-events-golang
cd kinesis-lambda-events-golang
Use AWS CDK to deploy the solution
To start the deployment, simply invoke cdk deploy
and wait for a bit. You will see a list of resources that will be created and will need to provide your confirmation to proceed.
cd cdk
cdk deploy
# output
Bundling asset KinesisLambdaGolangStack/kinesis-function/Code/Stage...
✨ Synthesis time: 5.94s
This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:
//.... omitted
Do you wish to deploy these changes (y/n)? y
This will start creating the AWS resources required for our application.
If you want to see the AWS CloudFormation template which will be used behind the scenes, run
cdk synth
and check thecdk.out
folder
You can keep track of the progress in the terminal or navigate to AWS console: CloudFormation > Stacks > KinesisLambdaGolangStack
Once all the resources are created, you can try out the application. You should have:
- A Lambda function
- A Kinesis stream
- A DynamoDB table
- along with a few other components (like IAM roles etc.)
Verify the solution
You can check the table and Kinesis stream info in the stack output (in the terminal or the Outputs tab in the AWS CloudFormation console for your Stack):
Publish few messages to the Kinesis stream. For the purposes of this demo, you can use the AWS CLI:
export KINESIS_STREAM=<enter the Kinesis stream name from cloudformation output>
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"seattle"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new york"}' | base64)
Check the DynamoDB table to confirm that the file metadata has been stored. You can use the AWS console or the AWS CLI aws dynamodb scan --table-name <enter the table name from cloudformation output>
Don't forget to clean up
Once you're done, to delete all the services, simply use:
cdk destroy
#output prompt (choose 'y' to continue)
Are you sure you want to delete: KinesisLambdaGolangStack (y/n)?
You were able to setup and try the complete solution. Before we wrap up, let's quickly walk through some of important parts of the code to get a better understanding of what's going the behind the scenes.
Code walk through
Some of the code (error handling, logging etc.) has been omitted for brevity since we only want to focus on the important parts.
AWS CDK
You can refer to the CDK code here
We start by creating the DynamoDB
table:
table := awsdynamodb.NewTable(stack, jsii.String("dynamodb-table"),
&awsdynamodb.TableProps{
PartitionKey: &awsdynamodb.Attribute{
Name: jsii.String("email"),
Type: awsdynamodb.AttributeType_STRING},
})
table.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
We create the Lambda function (CDK will take care of building and deploying the function) and make sure we provide it appropriate permissions to write to the DynamoDB
table.
function := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("kinesis-function"),
&awscdklambdagoalpha.GoFunctionProps{
Runtime: awslambda.Runtime_GO_1_X(),
Environment: &map[string]*string{"TABLE_NAME": table.TableName()},
Entry: jsii.String(functionDir),
})
table.GrantWriteData(function)
Then, we create the Kinesis stream and add that as an event source to the Lambda function.
kinesisStream := awskinesis.NewStream(stack, jsii.String("lambda-test-stream"), nil)
function.AddEventSource(awslambdaeventsources.NewKinesisEventSource(kinesisStream, &awslambdaeventsources.KinesisEventSourceProps{
StartingPosition: awslambda.StartingPosition_LATEST,
}))
Finally, we export the Kinesis stream and DynamoDB table name as CloudFormation outputs.
awscdk.NewCfnOutput(stack, jsii.String("kinesis-stream-name"),
&awscdk.CfnOutputProps{
ExportName: jsii.String("kinesis-stream-name"),
Value: kinesisStream.StreamName()})
awscdk.NewCfnOutput(stack, jsii.String("dynamodb-table-name"),
&awscdk.CfnOutputProps{
ExportName: jsii.String("dynamodb-table-name"),
Value: table.TableName()})
Lambda function
You can refer to the Lambda Function code here
The Lambda function handler iterates over each record in the Kinesis stream, and for each of them:
- Unmarshals the
JSON
payload in the Kinese stream into a Gostruct
- Stores the stream data partition key as the primary key attribute (
email
) of theDynamoDB
table - Rest of the information is picked up from the stream data and also stored in the table.
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
for _, record := range kinesisEvent.Records {
data := record.Kinesis.Data
var user CreateUserInfo
err := json.Unmarshal(data, &user)
item, err := attributevalue.MarshalMap(user)
if err != nil {
return err
}
item["email"] = &types.AttributeValueMemberS{Value: record.Kinesis.PartitionKey}
_, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
TableName: aws.String(table),
Item: item,
})
}
return nil
}
type CreateUserInfo struct {
Name string `json:"name"`
City string `json:"city"`
}
Wrap up
In this blog, you saw an example of how to use Lambda to process messages in a Kinesis stream and store them in DynamoDB, thanks to the Kinesis and Lamdba integration. The entire infrastructure life-cycle was automated using AWS CDK.
All this was done using the Go programming language, which is well supported in DynamoDB, AWS Lambda and AWS CDK.
Happy building!