Introducción al uso de ThreadPoolExecutor en Java

En el desarrollo de aplicaciones concurrentes en Java, es preferible utilizar ThreadPoolExecutor directamente en lugar de las fábricas de Executors. Este enfoque ofrece mayor control y evita riesgos de agotamiento de recursos, como se observa en ciertas configuraciones predefinidas que pueden provocar excepciones de memoria o desbordamiento de pila.

ThreadPoolExecutor se construye mediante varios parámetros que definen su comportamiento:

  • corePoolSize: Número de hilos base que permanecen activos en el pool.
  • maximumPoolSize: Límite máximo de hilos que pueden existir, incluso durante picos de carga.
  • keepAliveTime: Período de inactividad tras el cual los hilos adicionales se eliminan, reduciéndose hasta el tamaño del núcleo.
  • unit: Unidad de tiempo para keepAliveTime, como TimeUnit.SECONDS o TimeUnit.MILLISECONDS.
  • workQueue: Cola de tareas pendientes. Ejemplos incluyen ArrayBlockingQueue (limitada por tamaño) y SynchronousQueue (sin almacenamiento).
  • threadFactory: Factoría para crear instancias de hilos, permitiendo personalización.
  • handler: Política de rechazo cuando el pool y la cola están saturados, como AbortPolicy (lanza excepción) o DiscardOldestPolicy (descarta la tarea más antigua).

La lógica de ejecución sigue un orden específico: primero se utilizan los hilos del núcleo, luego se encolan las tareas, y si la cola se llena, se crean hilos adicionales hasta el máximo. Si el máximo se alcanza y la cola está llena, se aplica la política de rechazo.

Ejemplo básico

package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class EjemploPoolHilos {
    private static final int TIEMPO_ENVIO_TAREA = 2;
    private static final int NUM_MAX_TAREAS = 10;

    public static void main(String[] args) {
        ThreadPoolExecutor ejecutor = new ThreadPoolExecutor(
            2, 4, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(3),
            new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        for (int i = 1; i <= NUM_MAX_TAREAS; i++) {
            try {
                String tareaId = "Tarea-" + i;
                System.out.println("Enviando " + tareaId);
                ejecutor.execute(new TareaWorker(tareaId));
                Thread.sleep(TIEMPO_ENVIO_TAREA);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

class TareaWorker implements Runnable {
    private static final int TIEMPO_PROCESAMIENTO = 2000;
    private final String datosTarea;

    TareaWorker(String datos) {
        this.datosTarea = datos;
    }

    @Override
    public void run() {
        System.out.println("Hilo: " + Thread.currentThread().getName());
        System.out.println("Iniciando procesamiento de: " + datosTarea);
        try {
            Thread.sleep(TIEMPO_PROCESAMIENTO);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

En este ejemplo, se crea un pool con un núcleo de 2 hilos, un máximo de 4 y una cola de 3 elementos. Las tareas se envían periódicamente para simular una carga controlada. Ajustar los tiempos de envío y procesamiento permite observar cómo el pool gestiona la concurrencia.

Manejo de cola con límite

package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class EjemploColaLimitada {
    private static final int PROFUNDIDAD_COLA = 4;

    public void ejecutar() {
        ThreadPoolExecutor ejecutor = new ThreadPoolExecutor(
            2, 4, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(PROFUNDIDAD_COLA),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            while (ejecutor.getQueue().size() >= PROFUNDIDAD_COLA) {
                System.out.println("Cola llena, esperando...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("Agregando tarea " + i);
            ejecutor.execute(new Tarea(i));
        }
        ejecutor.shutdown();
    }

    public static void main(String[] args) {
        new EjemploColaLimitada().ejecutar();
    }

    static class Tarea implements Runnable {
        private final int identificador;

        Tarea(int id) {
            this.identificador = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread() + " procesando: " + identificador);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Este segundo ejemplo muestra una implementación con control de saturación: se verifica el tamaño de la cola antes de añadir tareas, evitando desbordamientos. La política CallerRunsPolicy se usa aquí para que el hilo que envía la tarea la ejecute si el pool está sobrecargado, proporcionando un mecanismo de retroceso suave.

La configuración de ThreadPoolExecutor permite equilibrar rendimiento y uso de recursos según las necesidades de la aplicación. Explorar diferentes políticas y parámetros ayuda a optimizar el manejo de tareas concurrentes en entonros de producción.

Etiquetas: java ThreadPoolExecutor concurrencia Pool de hilos Colas bloqueantes

Publicado el 7-3 22:24