Introduction
Change streams in MongoDB allow your application to react to real-time data changes instantly. In this blog post, I’ll show you how to set up and use change streams with Python, without diving too deep into theory. We'll create a simple program that listens to database events, focusing on inserts first, then extending it to other event types.
Getting Started with Change Streams
Change streams let your app listen to specific database events, like inserts or updates, and respond immediately. Imagine a scenario where a user updates their profile; with change streams, you can instantly reflect this change across your app without needing the user to refresh the page. Before this feature, you had to constantly poll the database or use complex methods like tailing the MongoDB Oplog. Change streams simplify this by providing a more user-friendly API.
What Happens Without Change Streams
Let's say I have an API to upload invoices. The flow is that customers will upload an image of the invoice to MongoDB, then we extract the information with AI and update the invoice. Here's an example of the code for uploading an invoice:
from pymongo import MongoClient
class MongoDatabase:
def __init__(self, config_path: str):
# Load the YAML configuration file using the provided utility function
self.config_path = config_path
self.config = read_config(path=self.config_path)
# Initialize MongoDB connection
self.client = MongoClient(self.config['mongodb']['uri'])
self.db = self.client[self.config['mongodb']['database']]
self.collection = self.db[self.config['mongodb']['collection']]
def create_document(self, data: Dict[str, Any]) -> str:
# Insert a new document and return the automatically generated document ID
result = self.collection.insert_one(data)
return str(result.inserted_id)
def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
try:
self.collection.update_one({"_id": document_id}, {"$set": data})
except PyMongoError as e:
print(f"Error updating document: {e}")
First I will wrap the pymongo inside a class, just in case :))
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
# Parse JSON body
body = await request.json()
img = body.get("img")
user_uuid = body.get("user_uuid")
if not img or not is_base64(img):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"status": "error", "message": "Base64 image is required"},
)
# Generate invoice UUID
current_time = datetime.now(timezone.utc)
img = valid_base64_image(img)
invoice_document = {
"invoice_type": None,
"created_at": current_time,
"created_by": user_uuid,
"last_modified_at": None,
"last_modified_by": None,
"status": "not extracted",
"invoice_image_base64": img,
"invoice_info": {}
}
invoice_uuid = mongo_db.create_document(invoice_document)
print('Result saved to MongoDB:', invoice_uuid)
mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})
return JSONResponse(
status_code=status.HTTP_201_CREATED,
content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
)
except Exception as e:
# Handle errors
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": str(e)}
)
A reasonable question might be: why not wait until the AI model processes the image before updating? The problem is that it takes around 4-5 minutes to process, and we don't want to affect the user experience.
How about Kafka?
Another option could be using Kafka. We could publish the image to a Kafka topic, and another service would process the data.
Pros:
- Decouples the upload and processing services.
- Efficient for large-scale, real-time data processing.
- Improved user experience: Users get an immediate response after uploading the invoice. The processing is handled asynchronously.
Cons:
- Introduces additional complexity.
- Requires setup and maintenance of Kafka infrastructure.
- May be overkill for small-scale applications.
Here’s a basic implementation to demonstrate using Kafka to handle the invoice upload process.
The user uploads an invoice through an API endpoint. The invoice image is saved in MongoDB, and a message is sent to a Kafka topic for further processing.
from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone
app = FastAPI()
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
body = await request.json()
img = body.get("img")
user_uuid = body.get("user_uuid")
if not img or not is_base64(img):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"status": "error", "message": "Base64 image is required"},
)
current_time = datetime.now(timezone.utc)
img = valid_base64_image(img)
invoice_document = {
"invoice_type": None,
"created_at": current_time,
"created_by": user_uuid,
"status": "not extracted",
"invoice_image_base64": img,
}
# Save the document to MongoDB
invoice_uuid = mongo_db.create_document(invoice_document)
mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})
# Send a message to Kafka topic
producer.send('invoice_topic', invoice_document)
producer.flush()
return JSONResponse(
status_code=status.HTTP_201_CREATED,
content={"message": "Invoice upload received and will be processed"}
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": str(e)}
)
The Kafka consumer listens to the invoice_topic
. When it receives a message, it processes the invoice (e.g., extracting information from the image) and updates the corresponding document in MongoDB.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'invoice_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
invoice_document = message.value
# Process the invoice, extract information, and update the document in MongoDB
invoice_uuid = invoice_document["_id"]
extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
mongo_db.update_document_by_id(invoice_uuid, {
"invoice_info": extracted_data,
"status": "extracted"
})
print(f"Processed and updated invoice: {invoice_uuid}")
Flow Summary:
- Upload Invoice: The user uploads an invoice through the API.
- Save to MongoDB: The invoice document is saved in MongoDB.
-
Send Message to Kafka: A message containing the invoice details is sent to a Kafka topic (
invoice_topic
). -
Kafka Consumer Processes Invoice: A Kafka consumer listens to
invoice_topic
, processes the invoice, and updates the corresponding document in MongoDB with the extracted information.
Wow, I can't believe I managed to write this on my own! It really highlights the effort involved. And that's not even considering the complexities of managing and configuring the three services: MongoDB, Kafka, and the Invoice service.
Invoice Processing with MongoDB Change Streams
Here's the complete code rewritten in Markdown to demonstrate MongoDB change streams, including additional methods and functions to handle invoice processing triggered by the change stream.
First, I used mongo Docker, so I need to config the mongo cluster on docker, If you don't use docker you can follow the steps on their official page
docker run -d \
--name mongodb \
-v /data/test-change-streams:/data/db \
-p 27017:27017 \
mongo:latest \
mongod --replSet test-change-streams --logpath /data/db/mongodb.log --dbpath /data/db --port 27017
docker exec -it mongodb mongosh --eval 'config = {_id: "test-change-streams",members: [{ _id : 0, host : "localhost:27017"}]};rs.initiate(config);'
docker exec -it mongodb mongosh --eval "rs.status()"
We'll start by creating a MongoDB wrapper class that handles database operations such as creating documents and listening to change streams.
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml
class MongoDatabase:
# Same code as before #
def process_invoice(self, invoice_document: Dict[str, Any]):
"""Process the invoice by extracting data and updating the document in MongoDB."""
try:
# Simulate extracting information from the invoice image
extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
invoice_uuid = invoice_document["_id"]
# Update the invoice document with the extracted data
self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
print(f"Processed and updated invoice: {invoice_uuid}")
except Exception as e:
print(f"Error processing invoice: {str(e)}")
def start_change_stream(self):
change_stream = change_stream = self.collection.watch([{
'$match': {
'operationType': { '$in': ['insert'] }
}
}])
return change_stream
To make it easy I add process_invoice inside the MongoDatabase class. But you should leave it somewhere else
The upload API should be like the original one.
config_path='config/config.yaml'
config = read_config(path=config_path)
mongo_db = MongoDatabase(config_path=config_path)
change_stream = None
change_stream_thread = None
ocr_reader = OcrReader(config_path=config_path, translator=GoogleTranslator())
invoice_extractor = OpenAIExtractor(config_path=config_path)
def process_change_stream(ocr_reader, invoice_extractor, config):
global change_stream
for change in change_stream:
if change['operationType'] == 'insert':
document_id = change['fullDocument']['_id']
document = mongo_db.get_document_by_id(document_id=str(document_id))
base64_img = document['invoice_image_base64']
new_data = extract_invoice_info(base64_img=base64_img, ocr_reader=ocr_reader,
invoice_extractor=invoice_extractor, config=config)
mongo_db.update_document_by_id(document_id, new_data)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global change_stream, change_stream_thread
change_stream = mongo_db.start_change_stream()
change_stream_thread = threading.Thread(target=process_change_stream,
args=(ocr_reader, invoice_extractor, config))
change_stream_thread.start()
yield # This is where the FastAPI app runs
# Shutdown
if change_stream:
change_stream.close()
if change_stream_thread:
change_stream_thread.join()
app = FastAPI(lifespan=lifespan)
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
# Parse JSON body
body = await request.json()
# same code as before
Flow Summary:
- User Uploads Invoice: The user uploads an invoice through the API.
- Save to MongoDB: The invoice document is saved in MongoDB.
- MongoDB Change Stream Triggered: The MongoDB change stream detects the insertion of the new document.
-
Invoice Processing: The change stream triggers the
process_invoice
function, which processes the invoice and updates the document in MongoDB with the extracted information.
Let's see the result using Mongo Compass
Before: After uploading the invoice, its status remains "not extracted," indicating that only basic information like created_at
and the base64_image
has been saved.
After: Just seconds after the upload, MongoDB automatically triggers the extraction process. Now, the invoice's full details are available, as shown by the updated last_modified_at
timestamp.
Conclusion
With MongoDB change streams, you can efficiently process real-time changes in your database. Extending this example, you can handle various events such as updates and deletes, making your application more reactive and responsive.
The code might have some issues since I haven't tested it thoroughly. If you're interested, you can explore my repo multilanguage_invoice_ocr for a deeper dive. If you'd like to learn more about this project or how to build a SaaS application, feel free to leave a comment!