Implementación del patrón Publicar/Suscribir de Redis en Spring Boot

El patrón Publicar/Suscribir de Redis permite desacoplar completamente a los publicadores y suscriptores. A continuación, se detalla su integración en Spring Boot.

  1. Configuración de dependencias y propiedades

Primero, se añade la dependencia de Spring Data Redis y se configuran las propiedades de conexión. Se utiliza Jedis como cliente, excluyendo Lettuce por defecto.

<!-- Dependencia para Redis en pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

En application.yml, se establece la configuración del servidor Redis:

spring:
  redis:
    database: 5
    host: 127.0.0.1
    port: 6379
    password: 123456
    jedis:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
  1. Configuración de componentes principales

Se define una clase de configuración para registrar los contenedores de mensajes, adaptadores y plantillas Redis en el contexto de Spring.

@Configuration
public class ConfiguracionRedis {

    @Bean
    public RedisMessageListenerContainer contenedorMensajes(
            RedisConnectionFactory conexionFactory,
            MessageListenerAdapter adaptadorPrincipal,
            MessageListenerAdapter adaptadorSecundario) {
        RedisMessageListenerContainer contenedor = new RedisMessageListenerContainer();
        contenedor.setConnectionFactory(conexionFactory);
        contenedor.addMessageListener(adaptadorPrincipal, new PatternTopic("canal_principal"));
        contenedor.addMessageListener(adaptadorSecundario, new PatternTopic("canal_secundario"));
        return contenedor;
    }

    @Bean
    public MessageListenerAdapter adaptadorPrincipal(ReceptorMensajes receptor) {
        return new MessageListenerAdapter(receptor, "procesarMensajePrincipal");
    }

    @Bean
    public MessageListenerAdapter adaptadorSecundario(ReceptorMensajes receptor) {
        return new MessageListenerAdapter(receptor, "procesarMensajeSecundario");
    }

    @Bean
    public StringRedisTemplate plantillaCadenaRedis(RedisConnectionFactory conexionFactory) {
        return new StringRedisTemplate(conexionFactory);
    }

    @Bean
    public RedisTemplate<String, Object> plantillaRedis(RedisConnectionFactory conexionFactory) {
        Jackson2JsonRedisSerializer<Object> serializador = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapeador = new ObjectMapper();
        mapeador.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapeador.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializador.setObjectMapper(mapeador);
        RedisTemplate<String, Object> plantilla = new RedisTemplate<>();
        plantilla.setConnectionFactory(conexionFactory);
        plantilla.setKeySerializer(serializador);
        plantilla.setValueSerializer(serializador);
        plantilla.setHashKeySerializer(serializador);
        plantilla.setHashValueSerializer(serializador);
        plantilla.afterPropertiesSet();
        return plantilla;
    }
}

El adaptador MessageListenerAdapter utiliza reflexión para invocar métodos en POJOs comunes, permitiendo el manejo de mensajes sin implementar interfaces específicas.

  1. Publicación de mensajes

Se crea un componente programado para anviar mensajes a canales Redis a intervalos regulares.

@EnableScheduling
@Component
public class EmisorMensajes {

    @Autowired
    private StringRedisTemplate plantillaRedis;

    @Scheduled(fixedRate = 2000)
    public void transmitir() {
        plantillaRedis.convertAndSend("canal_principal", String.valueOf(Math.random()));
        plantillaRedis.convertAndSend("canal_secundario", String.valueOf(Math.random()));
    }
}
  1. Manejo de mensajes

Un POJO simple puede recibir mensajes mediante métodos designados, invocados a través del adaptador.

@Component
public class ReceptorMensajes {

    public void procesarMensajePrincipal(String contenido) {
        System.out.println("Mensaje recibido en canal_principal: " + contenido);
    }

    public void procesarMensajeSecundario(String contenido) {
        System.out.println("Mensaje recibido en canal_secundario: " + contenido);
    }
}
  1. Enfoque alternativo con control de concurrencia

Para escenarios más robustos, se puede integrar un CountDownLatch para sincronizar el consumo de mensajes.

Configuración del listener

@Configuration
public class ConfiguracionListenerRedis {

    @Bean
    public RedisMessageListenerContainer contenedorListener(
            RedisConnectionFactory conexionFactory,
            MessageListenerAdapter adaptadorA,
            MessageListenerAdapter adaptadorB) {
        RedisMessageListenerContainer contenedor = new RedisMessageListenerContainer();
        contenedor.setConnectionFactory(conexionFactory);
        contenedor.addMessageListener(adaptadorA, new PatternTopic("topico_alertas"));
        contenedor.addMessageListener(adaptadorB, new PatternTopic("topico_logs"));
        return contenedor;
    }

    @Bean
    public MessageListenerAdapter adaptadorA(ConsumidorMensajes consumidor) {
        return new MessageListenerAdapter(consumidor, "consumirAlerta");
    }

    @Bean
    public MessageListenerAdapter adaptadorB(ConsumidorMensajes consumidor) {
        return new MessageListenerAdapter(consumidor, "consumirLog");
    }

    @Bean
    public ConsumidorMensajes consumidor(CountDownLatch cerrojo) {
        return new ConsumidorMensajes(cerrojo);
    }

    @Bean
    public CountDownLatch cerrojo() {
        return new CountDownLatch(1);
    }
}

Implementación del consumidor

public class ConsumidorMensajes {
    private static final Logger log = LoggerFactory.getLogger(ConsumidorMensajes.class);
    private final CountDownLatch cerrojo;

    @Autowired
    public ConsumidorMensajes(CountDownLatch cerrojo) {
        this.cerrojo = cerrojo;
    }

    public void consumirAlerta(String mensaje) {
        log.info("Procesando alerta: {}", mensaje);
        // Lógica de negocio aquí
        cerrojo.countDown();
    }

    public void consumirLog(String mensaje) {
        log.info("Procesando log: {}", mensaje);
        // Lógica de negocio aquí
        cerrojo.countDown();
    }
}

Publicación vía controlador REST

@RestController
@RequestMapping("/api")
public class ControladorPublicador {
    private static final Logger log = LoggerFactory.getLogger(ControladorPublicador.class);

    @Autowired
    private RedisTemplate<String, Object> plantillaRedis;

    @GetMapping("/publicar/{tema}")
    public String publicar(@PathVariable String tema) {
        plantillaRedis.convertAndSend("topico_alertas", "Alerta en tema: " + tema);
        plantillaRedis.convertAndSend("topico_logs", "Log registrado para: " + tema);
        log.info("Mensajes publicados.");
        return "Publicación exitosa";
    }
}

Etiquetas: Spring Boot Redis Publicar/Suscribir java Jedis

Publicado el 6-14 22:23