Real-Time Data Processing with MongoDB Change Streams and Python

Mai Chi Bao - Sep 12 - - Dev Community

Introduction

Change streams in MongoDB allow your application to react to real-time data changes instantly. In this blog post, I’ll show you how to set up and use change streams with Python, without diving too deep into theory. We'll create a simple program that listens to database events, focusing on inserts first, then extending it to other event types.

Getting Started with Change Streams

Change streams let your app listen to specific database events, like inserts or updates, and respond immediately. Imagine a scenario where a user updates their profile; with change streams, you can instantly reflect this change across your app without needing the user to refresh the page. Before this feature, you had to constantly poll the database or use complex methods like tailing the MongoDB Oplog. Change streams simplify this by providing a more user-friendly API.

What Happens Without Change Streams

Let's say I have an API to upload invoices. The flow is that customers will upload an image of the invoice to MongoDB, then we extract the information with AI and update the invoice. Here's an example of the code for uploading an invoice:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
Enter fullscreen mode Exit fullscreen mode

First I will wrap the pymongo inside a class, just in case :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Enter fullscreen mode Exit fullscreen mode

A reasonable question might be: why not wait until the AI model processes the image before updating? The problem is that it takes around 4-5 minutes to process, and we don't want to affect the user experience.

How about Kafka?

Another option could be using Kafka. We could publish the image to a Kafka topic, and another service would process the data.

Pros:

  • Decouples the upload and processing services.
  • Efficient for large-scale, real-time data processing.
  • Improved user experience: Users get an immediate response after uploading the invoice. The processing is handled asynchronously.

Cons:

  • Introduces additional complexity.
  • Requires setup and maintenance of Kafka infrastructure.
  • May be overkill for small-scale applications.

Here’s a basic implementation to demonstrate using Kafka to handle the invoice upload process.

The user uploads an invoice through an API endpoint. The invoice image is saved in MongoDB, and a message is sent to a Kafka topic for further processing.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Enter fullscreen mode Exit fullscreen mode

The Kafka consumer listens to the invoice_topic. When it receives a message, it processes the invoice (e.g., extracting information from the image) and updates the corresponding document in MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
Enter fullscreen mode Exit fullscreen mode

Flow Summary:

  1. Upload Invoice: The user uploads an invoice through the API.
  2. Save to MongoDB: The invoice document is saved in MongoDB.
  3. Send Message to Kafka: A message containing the invoice details is sent to a Kafka topic (invoice_topic).
  4. Kafka Consumer Processes Invoice: A Kafka consumer listens to invoice_topic, processes the invoice, and updates the corresponding document in MongoDB with the extracted information.

Wow, I can't believe I managed to write this on my own! It really highlights the effort involved. And that's not even considering the complexities of managing and configuring the three services: MongoDB, Kafka, and the Invoice service.

Invoice Processing with MongoDB Change Streams

Here's the complete code rewritten in Markdown to demonstrate MongoDB change streams, including additional methods and functions to handle invoice processing triggered by the change stream.

First, I used mongo Docker, so I need to config the mongo cluster on docker, If you don't use docker you can follow the steps on their official page

      docker run -d \
         --name mongodb \
         -v /data/test-change-streams:/data/db \
         -p 27017:27017 \
         mongo:latest \
         mongod --replSet test-change-streams --logpath /data/db/mongodb.log --dbpath /data/db --port 27017

      docker exec -it mongodb mongosh --eval 'config = {_id: "test-change-streams",members: [{ _id : 0, host : "localhost:27017"}]};rs.initiate(config);'
      docker exec -it mongodb mongosh --eval "rs.status()"

Enter fullscreen mode Exit fullscreen mode

We'll start by creating a MongoDB wrapper class that handles database operations such as creating documents and listening to change streams.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream(self):
        change_stream = change_stream = self.collection.watch([{
                '$match': {
                    'operationType': { '$in': ['insert'] }
                }
            }])
        return change_stream
Enter fullscreen mode Exit fullscreen mode

To make it easy I add process_invoice inside the MongoDatabase class. But you should leave it somewhere else

The upload API should be like the original one.

config_path='config/config.yaml'
config = read_config(path=config_path)

mongo_db = MongoDatabase(config_path=config_path)
change_stream = None
change_stream_thread = None

ocr_reader = OcrReader(config_path=config_path, translator=GoogleTranslator())
invoice_extractor = OpenAIExtractor(config_path=config_path)

def process_change_stream(ocr_reader, invoice_extractor, config):
    global change_stream
    for change in change_stream:
        if change['operationType'] == 'insert':
            document_id = change['fullDocument']['_id']
            document = mongo_db.get_document_by_id(document_id=str(document_id))
            base64_img = document['invoice_image_base64']

            new_data = extract_invoice_info(base64_img=base64_img, ocr_reader=ocr_reader,
                                            invoice_extractor=invoice_extractor, config=config)

            mongo_db.update_document_by_id(document_id, new_data)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global change_stream, change_stream_thread
    change_stream = mongo_db.start_change_stream()
    change_stream_thread = threading.Thread(target=process_change_stream, 
                                            args=(ocr_reader, invoice_extractor, config))
    change_stream_thread.start()

    yield  # This is where the FastAPI app runs

    # Shutdown
    if change_stream:
        change_stream.close()
    if change_stream_thread:
        change_stream_thread.join()

app = FastAPI(lifespan=lifespan)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
Enter fullscreen mode Exit fullscreen mode

Flow Summary:

  1. User Uploads Invoice: The user uploads an invoice through the API.
  2. Save to MongoDB: The invoice document is saved in MongoDB.
  3. MongoDB Change Stream Triggered: The MongoDB change stream detects the insertion of the new document.
  4. Invoice Processing: The change stream triggers the process_invoice function, which processes the invoice and updates the document in MongoDB with the extracted information.

Let's see the result using Mongo Compass

Before: After uploading the invoice, its status remains "not extracted," indicating that only basic information like created_at and the base64_image has been saved.

Image description

After: Just seconds after the upload, MongoDB automatically triggers the extraction process. Now, the invoice's full details are available, as shown by the updated last_modified_at timestamp.

Image description

Conclusion

With MongoDB change streams, you can efficiently process real-time changes in your database. Extending this example, you can handle various events such as updates and deletes, making your application more reactive and responsive.

The code might have some issues since I haven't tested it thoroughly. If you're interested, you can explore my repo multilanguage_invoice_ocr for a deeper dive. If you'd like to learn more about this project or how to build a SaaS application, feel free to leave a comment!

Reference:

. . . . .
Terabox Video Player