Para asegurar que los eventos se envían y consumen en un orden totalmente consistente en Apache Kafka, es fundamental entender cómo funciona la partición de mensajes y la asignación de consumidores.
Uso de Particiones en Kafka
-
Particionamiento de Temas:
- Kafka organiza los mensajes en particiones dentro de un tema. Cada partición mantiene el orden de los mensajes que recibe, lo que significa que los mensajes se procesan en el orden en que fueron enviados a esa partición.
- Para garantizar el orden, es crucial que todos los mensajes relacionados con un mismo contexto (por ejemplo, un ID de usuario o un ID de transacción) se envíen a la misma partición. Esto se logra utilizando una clave de partición al enviar mensajes. Kafka utiliza esta clave para determinar a qué partición enviar el mensaje mediante una función hash[1][5].
-
Claves de Mensaje:
- Al enviar un mensaje, se puede especificar una clave. Todos los mensajes con la misma clave serán enviados a la misma partición, lo que asegura que sean consumidos en el mismo orden en que fueron producidos. Por ejemplo, si se usa el ID del usuario como clave, todos los eventos relacionados con ese usuario irán a la misma partición.
Grupos de Consumidores
-
Asignación de Consumidores:
- Los consumidores en Kafka se agrupan en grupos de consumidores. Cada grupo puede tener múltiples consumidores, pero cada partición solo puede ser leída por un consumidor dentro del grupo a la vez.
- Esto significa que si se tienen más consumidores que particiones, algunos consumidores estarán inactivos. Para mantener el orden y maximizar la eficiencia, es recomendable tener al menos tantas particiones como consumidores en el grupo.
-
Manejo de Offsets:
- Kafka almacena el estado de lectura de cada consumidor mediante offsets, que son identificadores numéricos incrementales para cada mensaje dentro de una partición. Esto permite que los consumidores retomen la lectura desde donde lo dejaron en caso de fallos.
Estrategias Adicionales
- Evitar Sobrecargas: Al elegir claves de partición, es importante considerar la distribución del tráfico para evitar que algunas particiones estén sobrecargadas mientras otras están subutilizadas.
- Replicación y Tolerancia a Fallos: Asegúrate de configurar una replicación adecuada (mayor a 1) para las particiones, lo que no solo mejora la disponibilidad sino también la resiliencia del sistema ante fallos.
Para implementar un sistema de producción y consumo de mensajes en Kafka utilizando Avro, asegurando que los mensajes se procesen en orden y manejando posibles fallos, aquí tienes un ejemplo completo. Este incluye la definición del esquema Avro, el código del productor y consumidor, así como estrategias para manejar errores.
Esquema Avro
Primero, definimos el esquema Avro para nuestro payload. Crearemos un archivo llamado user_signed_up.avsc que describe la estructura del mensaje.
{
"type": "record",
"name": "UserSignedUp",
"namespace": "com.example",
"fields": [
{ "name": "userId", "type": "int" },
{ "name": "userEmail", "type": "string" },
{ "name": "timestamp", "type": "string" } // Formato ISO 8601
]
}
Generación de la Clave
Para asegurar el orden en la producción y consumo de mensajes, utilizaremos una clave estructurada como tipo-mensaje-fecha, por ejemplo: user-signed-up-2024-11-04.
Productor Kafka
Aquí tienes el código del productor que envía mensajes a Kafka utilizando el esquema Avro:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;
public class AvroProducer {
private final KafkaProducer<String, byte[]> producer;
private final Schema schema;
public AvroProducer(String bootstrapServers) throws IOException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
// Establecer la propiedad de reintentos, Número de reintentos
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// Asegura que todos los réplicas reconozcan la escritura,
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// Solo un mensaje a la vez
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// Habilitar idempotencia, no quiero enviar duplicados
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(properties);
this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
}
public void sendMessage(String topic, int userId, String userEmail) {
GenericRecord record = new GenericData.Record(schema);
record.put("userId", userId);
record.put("userEmail", userEmail);
record.put("timestamp", java.time.Instant.now().toString());
String key = String.format("user-signed-up-%s", java.time.LocalDate.now());
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
**handleFailure(exception, producerRecord);
** } else {
System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
}
});
}
private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
// Log the error for monitoring
System.err.println("Error sending message: " + exception.getMessage());
// Implement local persistence as a fallback
saveToLocalStorage(producerRecord);
// Optionally: Notify an external monitoring system or alert
}
private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
try {
// Persist the failed message to a local file or database for later processing
Files.write(new File("failed_messages.log").toPath(),
(record.key() + ": " + new String(record.value()) + "\n").getBytes(),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
} catch (IOException e) {
System.err.println("Error saving to local storage: " + e.getMessage());
}
}
private byte[] serialize(GenericRecord record) {
// Crear un ByteArrayOutputStream para almacenar los bytes serializados
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
// Crear un escritor de datos para el registro Avro
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());
// Crear un encoder para escribir en el ByteArrayOutputStream
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
// Escribir el registro en el encoder
datumWriter.write(record, encoder);
// Finalizar la escritura
encoder.flush();
} catch (IOException e) {
throw new AvroSerializationException("Error serializing Avro record", e);
}
// Devolver los bytes serializados
return outputStream.toByteArray();
}
public void close() {
producer.close();
}
}
Consideraciones sobre Reintentos
**Es importante tener en cuenta que al habilitar reintentos, puede haber un riesgo de reordenamiento de mensajes si no se maneja adecuadamente.
Para evitar esto:
**max.in.flight.requests.per.connection: Puedes establecer esta propiedad en 1 para garantizar que los mensajes se envíen uno a la vez y se procesen en orden. Sin embargo, esto puede afectar el rendimiento.
Con esta configuración y manejo adecuado de errores, puedes asegurar que tu productor Kafka sea más robusto y capaz de manejar fallos en la producción de mensajes mientras mantienes el orden necesario.
**Consumidor Kafka
**El consumidor que lee y procesa los mensajes:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
private final KafkaConsumer<String, byte[]> consumer;
private final Schema schema;
public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
this.consumer = new KafkaConsumer<>(properties);
this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
}
public void consume(String topic) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
try {
processMessage(record.value());
} catch (Exception e) {
handleProcessingError(e, record);
}
}
}
}
private void processMessage(byte[] data) throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
var decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord record = reader.read(null, decoder);
System.out.printf("Consumido mensaje: %s - %s - %s%n",
record.get("userId"),
record.get("userEmail"),
record.get("timestamp"));
}
private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
System.err.println("Error processing message: " + e.getMessage());
// Implement logic to save failed messages for later processing
saveFailedMessage(record);
}
private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
try {
// Persist the failed message to a local file or database for later processing
Files.write(new File("failed_consumed_messages.log").toPath(),
(record.key() + ": " + new String(record.value()) + "\n").getBytes(),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key());
} catch (IOException e) {
System.err.println("Error saving consumed message to local storage: " + e.getMessage());
}
}
public void close() {
consumer.close();
}
}
Ejemplo Realista de Claves
En un entorno con muchos eventos distintos y muchas particiones distintas, una clave realista podría ser algo como:
user-signed-up-2024-11-04
order-created-2024-11-04
payment-processed-2024-11-04
Esto permite que todos los eventos relacionados con un tipo específico en una fecha específica se envíen a la misma partición y se procesen en orden. Además, puedes diversificar las claves incluyendo más detalles si es necesario (como un ID de sesión o transacción).
Con esta implementación y estrategias para manejar fallos y asegurar el orden de los mensajes en Kafka utilizando Avro, puedes construir un sistema robusto y eficiente para gestionar eventos.
Ahora un Productor y un consumidor Kafka algo más capaz.
Productor Kafka con Circuit Breaker, Persistencia Local y DLQ.
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Properties;
public class AvroProducer {
private final KafkaProducer<String, byte[]> producer;
private final Schema schema;
private final CircuitBreaker circuitBreaker;
public AvroProducer(String bootstrapServers) throws IOException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
this.producer = new KafkaProducer<>(properties);
this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
// Configuración del Circuit Breaker
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // porcentaje de fallos para abrir el circuito
.waitDurationInOpenState(Duration.ofSeconds(30)) // tiempo en estado abierto
.slidingWindowSize(10) // número de llamadas para calcular el porcentaje de fallos
.build();
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
this.circuitBreaker = registry.circuitBreaker("producerCircuitBreaker");
}
public void sendMessage(String topic, int userId, String userEmail) {
GenericRecord record = new GenericData.Record(schema);
record.put("userId", userId);
record.put("userEmail", userEmail);
record.put("timestamp", java.time.Instant.now().toString());
String key = String.format("user-signed-up-%s", java.time.LocalDate.now());
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record));
circuitBreaker.executeRunnable(() -> {
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
handleFailure(exception, producerRecord);
} else {
System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset());
}
});
});
}
private byte[] serialize(GenericRecord record) throws AvroSerializationException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema());
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
datumWriter.write(record, encoder);
encoder.flush();
} catch (IOException e) {
throw new AvroSerializationException("Error serializing Avro record", e);
}
return outputStream.toByteArray();
}
private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) {
System.err.println("Error al enviar el mensaje: " + exception.getMessage());
// Guardar el mensaje en una cola de errores (DLQ)
saveToDeadLetterQueue(producerRecord);
// Guardar en almacenamiento local como respaldo
saveToLocalStorage(producerRecord);
}
private void saveToDeadLetterQueue(ProducerRecord<String, byte[]> record) {
try {
Files.write(new File("dead_letter_queue.log").toPath(),
(record.key() + ": " + new String(record.value()) + "\n").getBytes(),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
System.out.println("Mensaje guardado en Dead Letter Queue: " + record.key());
} catch (IOException e) {
System.err.println("Error guardando en DLQ: " + e.getMessage());
}
}
private void saveToLocalStorage(ProducerRecord<String, byte[]> record) {
try {
Files.write(new File("failed_messages.log").toPath(),
(record.key() + ": " + new String(record.value()) + "\n").getBytes(),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
System.out.println("Mensaje guardado localmente para reenvío: " + record.key());
} catch (IOException e) {
System.err.println("Error guardando en almacenamiento local: " + e.getMessage());
}
}
public void close() {
producer.close();
}
}
Consumidor Kafka con Manejo de DLQ.
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
private final KafkaConsumer<String, byte[]> consumer;
private final Schema schema;
public AvroConsumer(String bootstrapServers, String groupId) throws IOException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
this.consumer = new KafkaConsumer<>(properties);
this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc"));
}
public void consume(String topic) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
try {
processMessage(record.value());
} catch (Exception e) {
handleProcessingError(e, record);
}
}
processDeadLetterQueue(); // Procesar mensajes en la DLQ después de consumir del tópico principal
}
}
private void processMessage(byte[] data) throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
var decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord record = reader.read(null, decoder);
System.out.printf("Consumido mensaje: %s - %s - %s%n",
record.get("userId"),
record.get("userEmail"),
record.get("timestamp"));
}
private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) {
System.err.println("Error procesando el mensaje: " + e.getMessage());
// Guardar el mensaje fallido en la DLQ
saveFailedMessage(record);
}
private void saveFailedMessage(ConsumerRecord<String, byte[]> record) {
try {
Files.write(new File("dead_letter_queue.log").toPath(),
(record.key() + ": " + new String(record.value()) + "\n").getBytes(),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
System.out.println("Mensaje consumido guardado en Dead Letter Queue: " + record.key());
// Opcionalmente puedes eliminar el mensaje del tópico original si es necesario
// consumer.commitSync(); // Confirmar el procesamiento si es necesario
} catch (IOException e) {
System.err.println("Error guardando mensaje consumido en DLQ: " + e.getMessage());
}
}
private void processDeadLetterQueue() {
try {
Files.lines(new File("dead_letter_queue.log").toPath()).forEach(line -> {
System.out.println("Procesando mensaje desde la Dead Letter Queue: " + line);
// Aquí puedes implementar la lógica para reintentar o procesar mensajes desde la DLQ.
});
} catch (IOException e) {
System.err.println("Error procesando la Dead Letter Queue: " + e.getMessage());
}
}
public void close() {
consumer.close();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-avro-example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<java.version>17</java.version>
<kafka.version>3.5.1</kafka.version>
<avro.version>1.11.0</avro.version>
<resilience4j.version>1.7.1</resilience4j.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>
<dependencies>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<!-- Confluent Kafka Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version> <!-- Asegúrate de que esta versión sea compatible con tu versión de Kafka -->
</dependency>
<!-- Resilience4j -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Logging Implementation (Logback) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>
<!-- JUnit for Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Explicación del Código
Circuit Breaker:
Se utiliza Resilience4j para gestionar el circuito breaker del productor. Se configura un umbral de tasa de fallo y un tiempo de espera en estado abierto.
Persistencia Local y DLQ:
Los mensajes fallidos se guardan tanto en un archivo local (failed_messages.log) como en una cola de errores (dead_letter_queue.log).
Manejo de Errores:
En el productor y consumidor se manejan los errores adecuadamente y se registran.
Procesamiento de la DLQ:
El consumidor también procesa los mensajes almacenados en la DLQ después de consumir mensajes del tópico principal.
Logging:
Se utilizan mensajes System.err y System.out para registrar errores y eventos importantes.
Consideraciones Finales
Con esta implementación:
Se asegura que los mensajes sean enviados y procesados de manera resiliente.
Se proporciona un manejo adecuado para errores temporales o persistentes.
La lógica permite una recuperación efectiva mediante el uso de una Dead Letter Queue.
El circuito breaker ayuda a evitar que el sistema se sature ante fallos prolongados.
Este enfoque crea un sistema robusto que puede manejar problemas físicos y lógicos mientras mantiene la entrega ordenada de mensajes en Kafka.