Recientemente, me embarqué en un proyecto donde necesitaba integrar Azure EventHub en una aplicación construida con Astro , utilizando React en el frontend y Next.js en el backend. Quiero compartir cómo logré esta integración, especialmente enfocándome en cómo leer datos desde una tabla en Azure Storage Account y cómo producir y consumir eventos con Azure EventHub.
Comencemos por el backend. Tenía una función que necesitaba leer datos de una tabla en Azure Storage Account y luego enviar uno de esos registros como un evento a Azure EventHub. Aquí está el código esencial de esa función:
export default async function handler(req, res) {
const authHeader = req.headers['authorization'] || '';
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
console.error('Error de autorización');
return res.status(401).end('Unauthorized');
}
try {
const { TableClient } = require('@azure/data-tables');
const { EventHubProducerClient } = require('@azure/event-hubs');
// Variables de entorno necesarias
const connectionString = process.env.AZURE_STORAGE_CONNECTION_STRING;
const tableName = process.env.AZURE_STORAGE_TABLE_IA_SEARCH_NAME;
const eventHubConnectionString = process.env.EVENT_HUB_CONNECTION_STRING;
const eventHubName = process.env.EVENT_HUB_NAME_RECOMENDATIONS;
// Verificación de variables de entorno
if (!connectionString || !tableName || !eventHubConnectionString || !eventHubName) {
throw new Error('Variables de entorno no definidas.');
}
// Creación del cliente de tabla y obtención de entidades
const tableClient = TableClient.fromConnectionString(connectionString, tableName);
const entities = [];
const iter = tableClient.listEntities();
for await (const entity of iter) {
entities.push(entity);
}
// Verificación de entidades disponibles
if (entities.length <= 1) {
throw new Error('No hay suficientes entidades disponibles para enviar.');
}
// Ordenamiento y selección de una entidad aleatoria
entities.sort((a, b) => new Date(b.date) - new Date(a.date));
entities.shift(); // Eliminamos la entidad más reciente
const randomIndex = Math.floor(Math.random() * entities.length);
const randomEntity = entities[randomIndex];
// Creación del productor de EventHub y envío del evento
const producer = new EventHubProducerClient(eventHubConnectionString, eventHubName);
const batch = await producer.createBatch();
batch.tryAdd({
body: randomEntity,
properties: {
targetConsumerGroup: 'Recommendation',
},
});
await producer.sendBatch(batch);
await producer.close();
console.log(`Mensaje enviado para la entidad con RowKey ${randomEntity.rowKey}`);
res.status(200).json({ message: 'Mensaje enviado correctamente.', entity: randomEntity });
} catch (error) {
console.error('Error en el cron job:', error);
res.status(500).json({ error: error.message });
}
}
Este código comienza verificando una autorización básica, luego conecta con Azure Table Storage para obtener las entidades almacenadas. Después de asegurarse de que hay suficientes datos, selecciona una entidad aleatoria y la envía como un evento a Azure EventHub utilizando EventHubProducerClient
.
Ahora, en el backend también necesitaba una forma de consumir esos eventos y proporcionarlos al frontend. Para ello, utilicé el siguiente código:
import { enableCors } from "@/src/middleware/enableCors";
import { methodValidator } from "@/src/utils/methodValidator";
import { EventHubConsumerClient, earliestEventPosition } from '@azure/event-hubs';
const connectionString = process.env.EVENT_HUB_CONNECTION_STRING;
const eventHubName = process.env.EVENT_HUB_NAME_RECOMENDATIONS;
const consumerGroup = '$Default';
async function fetchNotifications() {
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
let notifications = [];
try {
console.log('Connecting to Event Hubs...');
const subscription = consumerClient.subscribe({
processEvents: async (events) => {
for (const event of events) {
try {
const message = event.body;
const messageId = event.properties?.messageId || null;
notifications.push({ ...message, messageId });
console.log(`Received message`);
} catch (error) {
console.error('Error processing event:', error);
}
}
},
processError: async (err) => {
console.error('Error receiving messages from Event Hubs:', err);
}
}, { startPosition: earliestEventPosition });
await new Promise((resolve) => setTimeout(resolve, 5000));
await subscription.close();
await consumerClient.close();
console.log('Consumer disconnected');
} catch (error) {
console.error('Error fetching notifications from Event Hubs:', error);
}
notifications.sort((a, b) => new Date(b.date) - new Date(a.date));
return notifications.slice(0, 5);
}
async function notificationHandler(req, res) {
await methodValidator(req, res, 'POST');
if (res.headersSent) {
return;
}
try {
const notifications = await fetchNotifications();
console.log(`Notifications: ${JSON.stringify(notifications)}`);
res.status(200).json({ notifications });
} catch (error) {
console.error('Error fetching notifications:', error);
res.status(500).json({ error: 'Error al obtener las notificaciones del usuario' });
}
}
export default enableCors(notificationHandler);
Esta función se conecta a Azure EventHub utilizando EventHubConsumerClient
, suscribiéndose al flujo de eventos y recopilando las notificaciones. Después de un breve periodo, cierra la conexión y devuelve las notificaciones más recientes.
En el frontend, quería mostrar estas notificaciones al usuario. Utilicé Astro con React para construir la interfaz. Aquí está el componente React principal:
import { useContext, useState, useEffect } from 'react';
import { AuthContext } from './LoginContext';
import { SiGithub } from 'react-icons/si';
import './Login.scss';
const API_BASE_URL = import.meta.env.PUBLIC_API_BASE_URL;
export default function Login() {
const { state, dispatch } = useContext(AuthContext);
const [data, setData] = useState({ errorMessage: '', isLoading: false });
const [notifications, setNotifications] = useState([]);
useEffect(() => {
const fetchNotifications = async () => {
try {
const response = await fetch(`${API_BASE_URL}/api/notification`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
});
if (!response.ok) throw new Error('Error fetching notifications');
const data = await response.json();
setNotifications(data.notifications || []);
} catch (error) {
console.error('Error fetching notifications:', error);
setData({
...data,
errorMessage: 'Error fetching notifications',
});
}
};
if (state.isLoggedIn && state.user?.login) {
fetchNotifications();
}
}, [state.isLoggedIn, state.user?.login]);
const logout = () => {
dispatch({ type: 'LOGOUT' });
window.location.reload(true);
};
if (state.isLoggedIn) {
return (
<div className="login">
<div className="login-image-container">
<img
src={state.user?.avatar_url}
alt="Imagen de usuario"
className="login-image"
loading="eager"
decoding="async"
/>
</div>
<div className="login-details">
<div className="text">
<p>
Hola, <strong>{state.user?.login}</strong>
</p>
</div>
<div className="input-container">
<strong>Últimas novedades</strong>
{notifications.length > 0 ? (
notifications.map((notification, index) => (
<div
key={index}
className="notification"
onClick={() => {
window.location.href = `/${notification.RowKey}`;
}}
>
<div
className="notification-image"
style={{
backgroundImage: `url(${notification.image})`,
}}
></div>
<div className="notification-content">
<p className="notification-title">{notification.title}</p>
<p className="notification-description">{notification.description}</p>
</div>
</div>
))
) : (
<span>No hay novedades recientes</span>
)}
</div>
<div className="github_btn" onClick={logout}>
<SiGithub />
<button>Cerrar sesión</button>
</div>
</div>
</div>
);
}
if (data.isLoading) {
return (
<div className="login">
<div className="spinner-icon" />
</div>
);
}
return (
<div className="login">
<span className="error">{data.errorMessage}</span>
<div className="flex flex-col items-center justify-center gap-4">
<span>
<strong>¡Bienvenido!</strong> Por favor, inicia sesión para continuar.
</span>
<a
className="github_btn"
onClick={() => {
setData({ ...data, errorMessage: '' });
}}
href={`https://github.com/login/oauth/authorize?scope=user&client_id=${client_id}&redirect_uri=${redirect_uri}`}
>
<SiGithub />
<span>Iniciar sesión</span>
</a>
</div>
</div>
);
}
Este componente maneja la autenticación del usuario y muestra las notificaciones obtenidas desde el backend. Utiliza useEffect
para llamar a la API de notificaciones cuando el usuario está autenticado.
La clave aquí es cómo el frontend interactúa con el backend para obtener las notificaciones que hemos consumido de Azure EventHub. Al hacer una solicitud a nuestra ruta API /api/notification
, obtenemos las últimas notificaciones y las mostramos al usuario.
Este enfoque me permitió integrar Azure EventHub de manera eficiente en mi aplicación, utilizando Astro con React en el frontend y Next.js en el backend. La comunicación entre el frontend y el backend es fluida, y Azure EventHub maneja el flujo de eventos de forma robusta.