Errores de Memoria por Uso Inadecuado de Hilos en Java

Este artículo analiza un incidente de producción causado por el uso incorrecto de ExecutorCompletionService, lo que llevó a un OutOfMemoryError (OOM). Se detallan el problema, su causa raíz, y se comparan las implementaciones con ExecutorService para comprender mejor el escenario de uso adecuado de ExecutorCompletionService.

Resumen del Incidente

El problema comenzó con un pequeño número de usuarios experimentando fallos al acceder a la página de inicio de la aplicación. En poco menos de una hora, el servicio de la página de inicio se volvió inaccesible para la mayoría de los usuarios. El incidente se resolvió mediante un rollback del código.

Causa Raíz

La causa fundamental del OOM fue la omisión de llamar a los métodos take() o poll() en el ExecutorCompletionService después de enviar tareas. Esto impide que los resultados de las tareas completadas sean retirados de la cola interna, provocando una acumulación que consume la memoria del heap.

Código Problemático:


import java.util.concurrent.*;

public class FaultyCompletionService {

   public static void test() throws InterruptedException, ExecutionException {
       Executor executor = Executors.newFixedThreadPool(3);
       CompletionService<string> service = new ExecutorCompletionService<>(executor);

       service.submit(() -> {
           return "Resultado de la Tarea -- " + Thread.currentThread().getName();
       });

       // ¡Falta llamar a take() o poll() para consumir los resultados!
       // executor.shutdown(); // Aunque se apague el executor, los resultados permanecen en la cola interna.
   }

   public static void main(String[] args) {
       try {
           test();
           System.out.println("Tarea enviada. Si se llama repetidamente, puede causar OOM.");
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 </string>

Código Correcto:


import java.util.concurrent.*;

public class CorrectCompletionService {

   public static void test() throws InterruptedException, ExecutionException {
       Executor executor = Executors.newFixedThreadPool(3);
       CompletionService<string> service = new ExecutorCompletionService<>(executor);

       service.submit(() -> {
           return "Resultado de la Tarea -- " + Thread.currentThread().getName();
       });

       // Consumir el resultado para liberar memoria
       String result = service.take().get(); // take() espera hasta que una tarea esté lista
       System.out.println("Resultado obtenido: " + result);

       executor.shutdown();
   }

   public static void main(String[] args) {
       try {
           test();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 </string>

La acumulación de memoria debido a la no extracción de resultados puede ser gradual y difícil de detectar, especialmente con un volumen de llamadas bajo. En escenarios de alto tráfico, la falta de liberación de memoria puede llevar rápidamente a un OOM.

Comparativa: ExecutorService vs. ExecutorCompletionService

Escenario con ExecutorService:

Cuando se utiliza ExecutorService directamente, cada tarea completada devuelve un objeto Future. Si se recolectan todos estos Futures en una lista y luego se itera sobre ellos llamando a get(), el hilo principal se bloqueará hasta que la tarea más lenta de la lista se complete, incluso si otras tareas ya han terminado.

Ejemplo con ExecutorService:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceExample {

   public static void processTasks() throws Exception {
       ExecutorService executorService = Executors.newFixedThreadPool(3);
       List<future>> futureResults = new ArrayList<>();

       System.out.println("Iniciando envío de tareas...");

       futureResults.add(executorService.submit(() -> {
           System.out.println("Tarea Larga iniciada (10s)...");
           TimeUnit.SECONDS.sleep(10);
           System.out.println("Tarea Larga completada.");
           return "Resultado Tarea Larga";
       }));

       futureResults.add(executorService.submit(() -> {
           System.out.println("Tarea Corta iniciada (3s)...");
           TimeUnit.SECONDS.sleep(3);
           System.out.println("Tarea Corta completada.");
           return "Resultado Tarea Corta";
       }));

       futureResults.add(executorService.submit(() -> {
           System.out.println("Tarea Media iniciada (6s)...");
           TimeUnit.SECONDS.sleep(6);
           System.out.println("Tarea Media completada.");
           return "Resultado Tarea Media";
       }));

       System.out.println("Todas las tareas enviadas. Esperando resultados...");

       try {
           for (Future<string> future : futureResults) {
               // El hilo principal se bloqueará aquí hasta que la tarea actual esté lista.
               // Si la primera tarea es la más larga, todas las demás tendrán que esperar.
               String result = future.get();
               System.out.println("Procesando: " + result);
           }
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           executorService.shutdown();
       }
   }

   public static void main(String[] args) {
       try {
           processTasks();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 </string></future>

En este modelo, el hilo principal espera secuencialmente los resultados, lo que puede ser ineficiente si las tareas terminan en órdenes diferentes.

Escenario con ExecutorCompletionService:

ExecutorCompletionService actúa como un "planificador de tareas". Cuando se envía tareas, ExecutorCompletionService las gestiona internamente y las coloca en una cola (completionQueue) una vez que están completadas. El método take() permite recuperar el siguiente resultado de tarea completada tan pronto como esté disponible, sin importar el orden en que fueron enviadas.

Ejemplo con ExecutorCompletionService:


import java.util.concurrent.*;

public class CompletionServiceExample {

   public static void processTasksEfficiently() throws Exception {
       ExecutorService executorService = Executors.newFixedThreadPool(3);
       ExecutorCompletionService<string> completionService = new ExecutorCompletionService<>(executorService);

       System.out.println("Iniciando envío de tareas...");

       completionService.submit(() -> {
           System.out.println("Tarea Larga iniciada (10s)...");
           TimeUnit.SECONDS.sleep(10);
           System.out.println("Tarea Larga completada.");
           return "Resultado Tarea Larga";
       });

       completionService.submit(() -> {
           System.out.println("Tarea Corta iniciada (3s)...");
           TimeUnit.SECONDS.sleep(3);
           System.out.println("Tarea Corta completada.");
           return "Resultado Tarea Corta";
       });

       completionService.submit(() -> {
           System.out.println("Tarea Media iniciada (6s)...");
           TimeUnit.SECONDS.sleep(6);
           System.out.println("Tarea Media completada.");
           return "Resultado Tarea Media";
       });

       System.out.println("Todas las tareas enviadas. Esperando resultados disponibles...");

       try {
           // Se sabe que se enviaron 3 tareas.
           for (int i = 0; i < 3; i++) {
               // take() espera hasta que UNA tarea esté completada y disponible.
               String result = completionService.take().get();
               System.out.println("Procesando: " + result);
           }
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           executorService.shutdown();
       }
   }

   public static void main(String[] args) {
       try {
           processTasksEfficiently();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 </string>

Este enfoque es más eficiente para escenarios como llamadas RPC a múltiples servicios, donde se desea procesar las respuestas a medida que llegan, en lugar de esperar la respuesta más lenta.

Análisis Interno de ExecutorCompletionService

ExecutorCompletionService envuelve un ExecutorService y utiliza una cola interna (completionQueue) para almacenar los resultados de las tareas completadas. Cuando una tarea finaliza, su Future se coloca en esta cola gracias a la sobrescritura del método done() en una clase interna llamada QueueingFuture.

El problema surge porque los elementos en la completionQueue solo se eliminan cuando se llama a take() o poll(). Si estas llamadas no se realizan, los objetos Future y sus resultados permanecen en memoria, acumulándose con el tiempo y provocando un OOM.

Lecciones Aprendidas y Buenas Prácticas

Antes del Despliegue:

  • Revisiones de Código Rigurosas: Asegurar que otro desarrollador revise el código para detectar errores lógicos o de uso de APIs.
  • Registros de Despliegue Claros: Mantener un registro de las versiones desplegables para facilitar los rollbacks.
  • Estrategias de Degradación: Definir planes para la degradación del servicio si el rollback no es inmediatamente factible o si el problema persiste.

Después del Despliegue:

  • Monitorización Continua de Memoria: Prestar atención al uso de memoria, que a menudo se ignora en favor de la CPU.
  • Monitorización de Métricas Clave: Vigilar el uso de CPU, la actividad del recolector de basura (GC), el número de hilos, y métricas de rendimiento como TP99 y TP999.
  • Alertas Proactivas: Configurar alertas para picos de memoria, uso de CPU, y degradación del renddimiento.

Conclusión: Utilice ExecutorCompletionService cuando necesite procesar resultados de tareas asíncronas a medida que estén disponibles. Si no necesita los resultados de las tareas, evite usar ExecutorCompletionService. Si lo usa y no necesita los resultados, asegúrese de llamar a take() o poll() para evitar la acumulación de objetos en memoria y prevenir OOMs.

Etiquetas: java Multithreading ExecutorCompletionService OutOfMemoryError Java Concurrency

Publicado el 6-10 19:32