En este post, te mostraré cómo hemos integrado un sistema de notificaciones usando Kafka en un proyecto que utiliza Astro para el frontend y Next.js para el backend. El objetivo es publicar un mensaje en un tópico de Kafka cada vez que se genera un post en formato .md
y notificar a los usuarios en su panel de usuario sobre las últimas novedades. A continuación, se explica el proceso de cómo y cuándo se publica un mensaje en el tópico de Kafka.
Proceso de compilación en Astro
Configurar variables de entorno : Utilizamos
dotenv
para cargar las variables de entorno necesarias para conectar con Kafka y Azure Storage.Configuración de Kafka : Configuramos el cliente de Kafka con autenticación SSL y SASL, y creamos un productor para enviar mensajes.
Registrar el esquema en el Schema Registry : Utilizamos
SchemaRegistry
de Confluent para registrar el esquema Avro que define la estructura de los mensajes de los posts.Procesar y subir los posts : Leemos los archivos
.md
del directorio de posts, extraemos los metadatos usandogray-matter
, y verificamos si la entidad ya existe en Azure Table Storage. Si la entidad no existe, publicamos un mensaje en el tópico de Kafka con la información del post.
Script de compilación
Este script se ejecuta durante el proceso de compilación de la web en Astro, cuando se va a desplegar. Aquí se describe cómo se procesa cada post y se publica en Kafka si no existe en Azure Table Storage.
require('dotenv').config();
const fs = require('fs');
const path = require('path');
const matter = require('gray-matter');
const { Kafka, logLevel } = require('kafkajs');
const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry');
const { TableClient } = require('@azure/data-tables');
const connectionString = process.env.AZURE_STORAGE_CONNECTION_STRING;
const tableName = process.env.AZURE_STORAGE_TABLE_IA_SEARCH_NAME;
const kafka = new Kafka({
brokers: [process.env.KAFKA_BROKERS],
ssl: true,
sasl: {
mechanism: process.env.KAFKA_SASL_MECHANISM,
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
logLevel: logLevel.ERROR,
});
const producer = kafka.producer();
const topic = 'Notifications';
const registry = new SchemaRegistry({
host: process.env.SCHEMA_REGISTRY_HOST,
auth: {
username: process.env.SCHEMA_REGISTRY_USERNAME,
password: process.env.SCHEMA_REGISTRY_PASSWORD,
},
});
const tableClient = TableClient.fromConnectionString(connectionString, tableName);
async function createTableIfNotExists() {
try {
await tableClient.createTable();
console.log(`Table ${tableName} created`);
} catch (error) {
if (error.statusCode === 409) {
console.log(`Table ${tableName} already exists`);
} else {
throw error;
}
}
}
async function entityExists(partitionKey, rowKey) {
try {
await tableClient.getEntity(partitionKey, rowKey);
return true;
} catch (error) {
if (error.statusCode === 404) {
return false;
} else {
throw error;
}
}
}
async function uploadPosts() {
await createTableIfNotExists();
await producer.connect();
const schema = `{
"type": "record",
"name": "BlogPost",
"namespace": "com.upstash.avro",
"fields": [
{ "name": "PartitionKey", "type": "string" },
{ "name": "RowKey", "type": "string" },
{ "name": "title", "type": "string" },
{ "name": "description", "type": "string" },
{ "name": "date", "type": "string" },
{ "name": "categories", "type": { "type": "array", "items": "string" } },
{ "name": "tags", "type": { "type": "array", "items": "string" } },
{ "name": "image", "type": "string" }
]
}`;
const { id: schemaId } = await registry.register({ type: SchemaType.AVRO, schema: schema });
const postsDir = path.join(__dirname, '..', 'src', 'content', 'posts');
const files = fs.readdirSync(postsDir);
for (const file of files) {
if (path.extname(file) === '.md') {
const content = fs.readFileSync(path.join(postsDir, file), 'utf8');
const { data } = matter(content);
const { title, description, date, categories, tags, image } = data;
const entity = {
partitionKey: 'post',
rowKey: path.basename(file, '.md'),
title,
description,
date: new Date(date).toISOString(),
categories: JSON.stringify(categories),
tags: JSON.stringify(tags),
image,
};
if (!(await entityExists(entity.partitionKey, entity.rowKey))) {
const message = {
PartitionKey: entity.partitionKey,
RowKey: entity.rowKey,
title: entity.title,
description: entity.description,
date: entity.date,
categories: JSON.parse(entity.categories),
tags: JSON.parse(entity.tags),
image: entity.image,
};
const encodedValue = await registry.encode(schemaId, message);
await producer.send({
topic,
messages: [{ key: entity.rowKey, value: encodedValue }],
});
console.log(`Message sent for post: ${entity.rowKey}`);
await tableClient.createEntity(entity);
console.log(`Entity with RowKey ${entity.rowKey} created`);
} else {
console.log(`Entity with RowKey ${entity.rowKey} already exists`);
}
}
}
await producer.disconnect();
}
uploadPosts().catch(console.error);
¿Cuándo se publica en el tópico de Kafka?
El mensaje se publica en el tópico de Kafka solo si el post no existe ya en Azure Table Storage. Este es el proceso detallado:
Lectura de Archivos : Se leen todos los archivos
.md
del directorio de posts.Extracción de Metadatos : Para cada archivo
.md
, se extraen los metadatos usandogray-matter
.Verificación en Azure Table Storage : Se verifica si una entidad con la misma
PartitionKey
yRowKey
ya existe en Azure Table Storage.Publicación en Kafka : Si la entidad no existe, se construye un mensaje con los metadatos del post, se codifica usando el esquema registrado en Confluent Schema Registry y se publica en el tópico de Kafka.
Creación de la Entidad : Después de publicar el mensaje, se crea la entidad en Azure Table Storage.
Este enfoque asegura que solo los posts nuevos se publiquen en el tópico de Kafka, evitando duplicados.
Endpoint en Next.js para obtener notificaciones
En el backend, hemos creado un endpoint en Next.js que consume los mensajes del tópico de Kafka y los devuelve ordenados por fecha.
import { enableCors } from "@/src/middleware/enableCors";
import { methodValidator } from "@/src/utils/methodValidator";
import { Kafka, logLevel } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
brokers: [process.env.KAFKA_BROKERS],
ssl: true,
sasl: {
mechanism: process.env.KAFKA_SASL_MECHANISM,
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD
},
logLevel: logLevel.ERROR,
});
const registry = new SchemaRegistry({
host: process.env.SCHEMA_REGISTRY_HOST,
auth: {
username: process.env.SCHEMA_REGISTRY_USERNAME,
password: process.env.SCHEMA_REGISTRY_PASSWORD
}
});
async function fetchNotifications() {
const consumer = kafka.consumer({ groupId: `user-notifications-${Date.now()}` });
let notifications = [];
try {
await consumer.connect();
await consumer.subscribe({ topic: 'Notifications', fromBeginning: true });
const runConsumer = new Promise((resolve, reject) => {
let isConsumed = false;
consumer.run({
eachMessage: async ({ message }) => {
try {
const decodedMessage = await registry.decode(message.value);
const messageId = message.key ? message.key.toString() : null;
notifications.push({ ...decodedMessage, messageId });
isConsumed = true;
} catch (error) {
reject(error);
}
}
});
setTimeout(async () => {
await consumer.disconnect();
if (isConsumed) {
resolve();
} else {
reject(new Error('No messages consumed within the timeout period'));
}
}, 5000);
});
await runConsumer;
} catch (error) {
console.error('Error fetching notifications:', error);
}
notifications.sort((a, b) => new Date(b.date) - new Date(a.date));
return notifications.slice
(0, 5);
}
async function notificationHandler(req, res) {
await methodValidator(req, res, 'POST');
if (res.headersSent) {
return;
}
try {
const notifications = await fetchNotifications();
res.status(200).json({ notifications });
} catch (error) {
res.status(500).json({ error: 'Error al obtener las notificaciones del usuario' });
}
}
export default enableCors(notificationHandler);
Componente de React para el frontend en Astro
Finalmente, en el frontend de Astro, hemos creado un componente en React que se encarga de mostrar las notificaciones a los usuarios cuando inician sesión.
import { useContext, useState, useEffect } from 'react';
import { AuthContext } from './LoginContext';
const API_BASE_URL = import.meta.env.PUBLIC_API_BASE_URL;
export default function Login() {
const { state, dispatch } = useContext(AuthContext);
const [notifications, setNotifications] = useState([]);
useEffect(() => {
const fetchNotifications = async () => {
try {
const response = await fetch(`${API_BASE_URL}/api/notification`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
});
if (!response.ok) throw new Error('Error fetching notifications');
const data = await response.json();
setNotifications(data.notifications || []);
} catch (error) {
console.error('Error fetching notifications:', error);
}
};
if (state.isLoggedIn) {
fetchNotifications();
}
}, [state.isLoggedIn]);
const logout = () => {
dispatch({ type: 'LOGOUT' });
window.location.reload();
};
if (state.isLoggedIn) {
return (
<div className="login">
<div className="login-details">
<strong className="text">Últimas novedades</strong>
{notifications.length > 0 ? (
notifications.map((notification, index) => (
<div
key={index}
className="notification"
onClick={() => {
window.location.href = `/${notification.RowKey}`;
}}
>
<img src={notification.image} alt={notification.title} className="notification-image" />
<div className="notification-content">
<p className="notification-title">{notification.title}</p>
<p className="notification-description">{notification.description}</p>
</div>
</div>
))
) : (
<p>No hay novedades recientes</p>
)}
<button onClick={logout}>Cerrar sesión</button>
</div>
</div>
);
}
return null;
}
Con esta configuración, logramos notificar a los usuarios sobre las últimas novedades cada vez que se genera un nuevo post en formato .md
, usando Kafka como sistema de mensajería. Esta solución proporciona una manera eficiente y escalable de manejar las notificaciones en tiempo real.