Apache Kafka se ha consolidado como el estándar de facto para sistemas de mensajería distribuidos. Este artículo te guiará a través de la integración práctica de Kafka con Spring Boot, cubirendo el desarrollo completo de productores y consumidores.
Conceptos Clave de Kafka
- Productor (Producer): Entidad que envía mensajes.
- Consumidor (Consumer): Antidad que recibe y procesa mensajes.
- Broker: Servidor individual dentro del clúster de Kafka.
- Topic: Categoría o feed con nombre al que se publican los mensajes.
- Partición (Partition): División de un Topic para permitir el procesamiento paralelo.
- Grupo de Consumidores (Consumer Group): Conjunto de consumidores que colaboran para consumir un Topic, permitiendo la distribución de la carga.
Instalación de Kafka con Docker
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Integración de Kafka con Spring Boot
Añade la dependencia de spring-kafka a tu archivo pom.xml:
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configura los parámetros de Kafka en tu archivo de propiedades (application.yml):
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: mi-grupo-consumidores
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Implementación del Productor
Crea un serivcio para enviar mensajes al Topic especificado.
@Service
public class MensajeroPedido {
private final KafkaTemplate<String, String> kafkaTemplate;
public MensajeroPedido(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void enviarInfoPedido(String idPedido) {
String payload = String.format("{\"idPedido\":\"%s\",\"timestamp\":\"%s\"}",
idPedido, LocalDateTime.now());
// Envía el mensaje al topic "gestion-pedidos", usando idPedido como clave
kafkaTemplate.send("gestion-pedidos", idPedido, payload);
System.out.println("Mensaje enviado: " + payload);
}
}
Implementación del Consumidor
Utiliza la anotación @KafkaListener para crear un consumidor que escuche en un Topic específico.
@Component
public class ReceptorPedido {
@KafkaListener(topics = "gestion-pedidos", groupId = "mi-grupo-consumidores")
public void procesarMensajePedido(ConsumerRecord<String, String> registro) {
System.out.println("Mensaje recibido:");
System.out.println(" Clave: " + registro.key());
System.out.println(" Valor: " + registro.value());
System.out.println(" Partición: " + registro.partition());
System.out.println(" Offset: " + registro.offset());
}
}
Confirmación Manual de Offset
Para garantizar la fiabilidad, puedes configurar la confirmación manual de offset después de procesar cada mensaje.
@Component
public class ReceptorPedidoConfirmacionManual {
@KafkaListener(topics = "gestion-pedidos", groupId = "mi-grupo-consumidores")
public void procesarMensajeConConfirmacion(
ConsumerRecord<String, String> registro,
Acknowledgment confirmacion) {
try {
procesarDetallesPedido(registro.value());
confirmacion.acknowledge(); // Confirma el procesamiento exitoso
} catch (Exception ex) {
// Si falla el procesamiento, el mensaje puede ser reintentado
System.err.println("Fallo al procesar el mensaje con clave: " + registro.key() + ". Error: " + ex.getMessage());
}
}
// Método auxiliar para simular el procesamiento
private void procesarDetallesPedido(String detalles) {
System.out.println("Procesando: " + detalles);
// Simula lógica de negocio que podría fallar
if (detalles.contains("error")) {
throw new RuntimeException("Simulación de error en procesamiento");
}
}
}
Habilita el modo de confirmación manual en la configuración de Spring Boot:
spring:
kafka:
listener:
ack-mode: MANUAL # Habilita la confirmación manual
Comandos Útiles de Kafka
A continuación, se presentan algunos comandos básicos para interactuar con Kafka desde la línea de comandos:
# Crear un Topic
kafka-topics.sh --create --topic gestion-pedidos \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# Listar Topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Listar Grupos de Consumidores
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# Consumidor de consola para ver mensajes en tiempo real
kafka-console-consumer.sh --topic gestion-pedidos \
--bootstrap-server localhost:9092 --from-beginning
Kafka es fundamental para la comunicación asíncrona en arquitecturas de microservicios. Los puntos clave incluyen el patrón Productor/Consumidor para el desacoplamiento, las Particiones para el paralelismo, los Grupos de Consumidores para el balanceo de carga y la confirmación manual de offset para la fiabilidad de los mensajes.