Event-driven architecture have become increasingly popular in modern application design making it's possible to create scalable and efficient event-driven systems that react to data changes in real time.
In this article, I will explain how to build an event-driven pipeline using Amazon DynamoDB, Kinesis Data Streams, Kinesis Data Firehose, AWS Lambda, and Amazon S3.
Overview of the Pipeline
The goal of this pipeline is to react to changes in a DynamoDB table, process the data, and store it in either S3 or trigger a Lambda function for further processing. Here's a high-level look at the flow:
DynamoDB – Your source of data.
- Kinesis Data Streams – Streams the change events from DynamoDB.
- Kinesis Data Firehose – Transforms and routes the streaming data.
- AWS Lambda (optional) – Processes data for real-time events or complex transformations.
- S3 – Stores processed data in a scalable, durable storage.
Part 1: Setting up the environment
Amazon DynamoDB is a serverless, NoSQL, fully managed database with single-digit millisecond performance at any scale. It is well-suited for high-performance applications that require consistent and fast data access. e.g in Financial service applications, Gaming applications and Streaming applications
Setup Amazon Kinesis Data Streams
Amazon Kinesis Data Streams is a fully managed, serverless streaming data service that makes it easy to elastically ingest and store logs, events, clickstreams, and other forms of streaming data in real time.
Kinesis Data Streams is particularly helpful when you need to handle massive volumes of streaming data. It allows multiple consumers to read from the stream simultaneously, making it possible to apply multiple transformations or route the data to different destinations.
Setup Amazon Data Firehose
Source - Amazon Kinesis Data Streams
Destination - Amazon S3
Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, Amazon OpenSearch Serverless, Splunk, Apache Iceberg Tables, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers, including Datadog, Dynatrace, LogicMonitor, MongoDB, New Relic, Coralogix, and Elastic.
In the Data Firehose choose Amazon Kinesis Data Streams as the source and
Amazon S3 as the Destination.
Create an s3 Bucket
Amazon S3 is a highly scalable and durable object storage service. S3 allows you to store structured, semi-structured, and unstructured data at virtually unlimited scale.
S3 is ideal for long-term data storage or batch processing use cases. Data in S3 can then be used for analytics, reporting, or further batch processing via services like AWS Glue or Amazon Athena.
Part 2: Insert and Manage the data
Insert data into the Dynamo Db table.
Four Items have been inserted into the table
Create a lambda function and use the blueprint option.
Use the Process records sent to an Amazon Data Firehose Stream. Runtime python3.10
Lambda is a serverless compute service that triggers based on events. In this case, it processes each data batch from Firehose and can perform more complex transformations or trigger alerts/notifications.
After you have created the function, go to the configuration settings and edit the Timeout to 3 mins instead of seconds.
This is useful if since the function involves longer processing tasks, like dealing with large data sets.
The function handles the incoming event, which contains records from Firehose. Each record has a recordId and data. The function processes each record and returns the result back to Firehose for further downstream operations.
import base64
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
# Do custom processing on the payload here
print(f"the actual data is : {payload}")
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
In the Amazon Data Firehose, configuration tab there is Transform and convert records option.
Select the lambda function we had created. This function is part of a real-time processing pipeline where you can make on-the-fly changes to the data before it is permanently stored or consumed by other services.
Insert more records in the dynamodb table
After Lambda processes the data, it is delivered to the S3 bucket via Firehose. S3 acts as the final storage layer in this pipeline.
All items initially created in the DynamoDB table are now stored as objects (files) in the S3 bucket.
These stored files can be used for data analysis, archiving, or further processing, such as feeding into machine learning models, querying with Amazon Athena, or visualizing with Amazon QuickSight.
Summary:
This architecture demonstrates how AWS services can work together in an event-driven, serverless pipeline. The flow begins with changes in DynamoDB, which are streamed in real-time through Kinesis Data Streams and Firehose, processed by Lambda if needed, and finally stored in S3.