CountDownLatch y CyclicBarrier: Sincronización de hilos en sistemas de reconciliación

En un sistema de reconciliación para pedidos en línea, el objetivo es verificar discrepancias entre órdenes de compra y envíos. El flujo actual involucra consultar una base de datos de órdenes, luego una de envíos, comparar ambas y almacenar diferencias. Inicialmente, este proceso se ejecuta de forma secuencial en un solo hilo, lo que puede resultar lento para grandes volúmenes de datos.

El código básico del flujo secuencial es el siguiente:

while (existenOrdenesPendientes()) {
    List<Orden> ordenes = obtenerOrdenes();
    List<Envio> envios = obtenerEnvios();
    List<Discrepancia> diferencias = comparar(ordenes, envios);
    almacenarDiferencias(diferencias);
}

Para mejorar el rendimiento, se puede paralelizar la consulta de órdenes y envíos, ya que son operaciones independeintes. Una primera aproximación usa hilos con el método join para sincronización:

while (existenOrdenesPendientes()) {
    Thread hiloOrdenes = new Thread(() -> {
        List<Orden> ordenes = obtenerOrdenes();
    });
    Thread hiloEnvios = new Thread(() -> {
        List<Envio> envios = obtenerEnvios();
    });
    hiloOrdenes.start();
    hiloEnvios.start();
    hiloOrdenes.join();
    hiloEnvios.join();
    List<Discrepancia> diferencias = comparar(ordenes, envios);
    almacenarDiferencias(diferencias);
}

Esta solución reduce el tiempo al ejecutar consultas en paralelo, pero crear hilos en cada iteración es costoso. Se puede usar un pool de hilos para reutilizarlos, pero join ya no es directo. Aquí se introduce un contador manual para sincronización: un hilo principal espera hasta que ambas consultas terminen, usando un contador que decrementa al finalizar cada tarea.

Java ofrece CountDownLatch para simplificar esto. Se inicializa con un contador (por ejemplo, 2 para dos tareas), los hilos llaman a countDown al completar, y el hilo principal espera con await. Ejemplo con pool de hilos:

ExecutorService pool = Executors.newFixedThreadPool(2);
while (existenOrdenesPendientes()) {
    CountDownLatch latch = new CountDownLatch(2);
    pool.execute(() -> {
        List<Orden> ordenes = obtenerOrdenes();
        latch.countDown();
    });
    pool.execute(() -> {
        List<Envio> envios = obtenerEnvios();
        latch.countDown();
    });
    latch.await();
    List<Discrepancia> diferencias = comparar(ordenes, envios);
    almacenarDiferencias(diferencias);
}

Para mayor eficiencia, se puede solapar la reconciliación de una iteración con las consultas de la siguiente. Esto se modela como un sistema productor-consumidor con dos colas sincronizadas: una para órdenes y otra para envíos. Los productores insertan datos en las colas, y un consumidor extrae elementos correspondientes para reconciliación.

Para garantizar que los productores avancen al mismo ritmo y notifiquen al consumidor, se usa CyclicBarrier. Este sincroniza múltiples hilos en un punto común y puede ejecutar una acción al alcanzar el umbral (por ejemplo, iniciar la reconciliación). La barrera se resetea automáticamente, permitiendo ciclos continuos. Implementación:

Vector<Orden> colaOrdenes = new Vector<>();
Vector<Envio> colaEnvios = new Vector<>();
ExecutorService poolReconciliacion = Executors.newFixedThreadPool(1);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    poolReconciliacion.execute(() -> reconciliar());
});

void reconciliar() {
    Orden orden = colaOrdenes.remove(0);
    Envio envio = colaEnvios.remove(0);
    List<Discrepancia> diferencias = comparar(orden, envio);
    almacenarDiferencias(diferencias);
}

void iniciarProcesamiento() {
    Thread productorOrdenes = new Thread(() -> {
        while (existenOrdenesPendientes()) {
            colaOrdenes.add(obtenerOrdenes());
            barrier.await();
        }
    });
    Thread productorEnvios = new Thread(() -> {
        while (existenOrdenesPendientes()) {
            colaEnvios.add(obtenerEnvios());
            barrier.await();
        }
    });
    productorOrdenes.start();
    productorEnvios.start();
}

Este enfoque asegura que la consulta de órdenes y envíos se mantenga sincronizada, permitiendo reconciliaciones paralelas y mejorando el throughput del sistema.

Etiquetas: CountDownLatch CyclicBarrier hilos java ExecutorService

Publicado el 7-4 01:25