Guía Completa de Integración de Kafka con Spring Boot para Mensajería

Apache Kafka se ha consolidado como el estándar de facto para sistemas de mensajería distribuidos. Este artículo te guiará a través de la integración práctica de Kafka con Spring Boot, cubirendo el desarrollo completo de productores y consumidores.

Conceptos Clave de Kafka

  • Productor (Producer): Entidad que envía mensajes.
  • Consumidor (Consumer): Antidad que recibe y procesa mensajes.
  • Broker: Servidor individual dentro del clúster de Kafka.
  • Topic: Categoría o feed con nombre al que se publican los mensajes.
  • Partición (Partition): División de un Topic para permitir el procesamiento paralelo.
  • Grupo de Consumidores (Consumer Group): Conjunto de consumidores que colaboran para consumir un Topic, permitiendo la distribución de la carga.

Instalación de Kafka con Docker


# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Integración de Kafka con Spring Boot

Añade la dependencia de spring-kafka a tu archivo pom.xml:


<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Configura los parámetros de Kafka en tu archivo de propiedades (application.yml):


# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: mi-grupo-consumidores
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Implementación del Productor

Crea un serivcio para enviar mensajes al Topic especificado.


@Service
public class MensajeroPedido {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MensajeroPedido(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void enviarInfoPedido(String idPedido) {
        String payload = String.format("{\"idPedido\":\"%s\",\"timestamp\":\"%s\"}",
                                        idPedido, LocalDateTime.now());
        // Envía el mensaje al topic "gestion-pedidos", usando idPedido como clave
        kafkaTemplate.send("gestion-pedidos", idPedido, payload);
        System.out.println("Mensaje enviado: " + payload);
    }
}

Implementación del Consumidor

Utiliza la anotación @KafkaListener para crear un consumidor que escuche en un Topic específico.


@Component
public class ReceptorPedido {

    @KafkaListener(topics = "gestion-pedidos", groupId = "mi-grupo-consumidores")
    public void procesarMensajePedido(ConsumerRecord<String, String> registro) {
        System.out.println("Mensaje recibido:");
        System.out.println("  Clave: " + registro.key());
        System.out.println("  Valor: " + registro.value());
        System.out.println("  Partición: " + registro.partition());
        System.out.println("  Offset: " + registro.offset());
    }
}

Confirmación Manual de Offset

Para garantizar la fiabilidad, puedes configurar la confirmación manual de offset después de procesar cada mensaje.


@Component
public class ReceptorPedidoConfirmacionManual {

    @KafkaListener(topics = "gestion-pedidos", groupId = "mi-grupo-consumidores")
    public void procesarMensajeConConfirmacion(
            ConsumerRecord<String, String> registro,
            Acknowledgment confirmacion) {
        try {
            procesarDetallesPedido(registro.value());
            confirmacion.acknowledge(); // Confirma el procesamiento exitoso
        } catch (Exception ex) {
            // Si falla el procesamiento, el mensaje puede ser reintentado
            System.err.println("Fallo al procesar el mensaje con clave: " + registro.key() + ". Error: " + ex.getMessage());
        }
    }

    // Método auxiliar para simular el procesamiento
    private void procesarDetallesPedido(String detalles) {
        System.out.println("Procesando: " + detalles);
        // Simula lógica de negocio que podría fallar
        if (detalles.contains("error")) {
             throw new RuntimeException("Simulación de error en procesamiento");
        }
    }
}

Habilita el modo de confirmación manual en la configuración de Spring Boot:


spring:
  kafka:
    listener:
      ack-mode: MANUAL # Habilita la confirmación manual

Comandos Útiles de Kafka

A continuación, se presentan algunos comandos básicos para interactuar con Kafka desde la línea de comandos:


# Crear un Topic
kafka-topics.sh --create --topic gestion-pedidos \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# Listar Topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Listar Grupos de Consumidores
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# Consumidor de consola para ver mensajes en tiempo real
kafka-console-consumer.sh --topic gestion-pedidos \
  --bootstrap-server localhost:9092 --from-beginning

Kafka es fundamental para la comunicación asíncrona en arquitecturas de microservicios. Los puntos clave incluyen el patrón Productor/Consumidor para el desacoplamiento, las Particiones para el paralelismo, los Grupos de Consumidores para el balanceo de carga y la confirmación manual de offset para la fiabilidad de los mensajes.

Etiquetas: Kafka spring-boot mensajería Microservicios Docker

Publicado el 6-24 17:13