Python ThreadPoolExecutor para Concurrencia con Hilos

ThreadPoolExecutor es una herramienta del módulo concurrent.futures en Python que permite ejecutar múltiples tareas de forma concurrente usando un pool de hilos. Es ideal para operaciones de I/O-bound, como solicitudes de red o lectura de archivos, ya que optimiza el uso de recursos y evita la sobrecarga de crear y destruir hilos continuamente.

Implementación Básica de ThreadPoolExecutor

Para empezar, importa el módulo y crea un pool de hilos especificando el número máximo de workers:

from concurrent.futures import ThreadPoolExecutor

# Crear un pool con hasta 3 hilos
pool = ThreadPoolExecutor(max_workers=3)

Puedes enviar tareas individuales con submit, que devuelve un objeto Future para obtener resultados más tarde:

import time

def procesar(datos):
    print(f"Procesando {datos}")
    time.sleep(1)  # Simular trabajo de I/O
    return f"Resultado de {datos}"

# Enviar una tarea
futuro = pool.submit(procesar, "lote_A")
print(futuro.result())  # Bloquea hasta que termine

Alternativamente, map permite enviar múltiples tareas de una vez, similar a la función map integrada:

lista_datos = ["lote_B", "lote_C", "lote_D"]
resultados = pool.map(procesar, lista_datos)
for res in resultados:
    print(res)  # Los resultados se procesan en orden de envío

Manejo de Objetos Future

Los objetos Future ofrecan métodos útiles para monitorear tareas:

  • result(timeout=None): Espera el resultado con un límite de tiempo opcional.
  • done(): Verifica si la tarea ha finalizado.
  • add_done_callback(fn): Ejecuta una función cuando la tarea se completa.

Ejemplo con callback y verificación de estado:

def al_completar(fut):
    print(f"Tarea finalizada: {fut.result()}")

futuro2 = pool.submit(procesar, "lote_E")
futuro2.add_done_callback(al_completar)

while not futuro2.done():
    time.sleep(0.5)
    print("Esperando...")

Cierre del Pool

Usa shutdown(wait=True) para cerrar el pool después de que todas las tareas terminen. Si wait=False, el pool se cierra inmediatamente y las tareas pendientes se cancelan.

pool.shutdown(wait=True)
print("Pool cerrado tras completar todas las tareas.")

Uso de as_completed para Resultados en Orden de Finalización

La función as_completed del mismo módulo itera sobre los objetos Future en el orden en que se completan, no en el orden de envío. Esto es útil para procesar resultados tan pronto como estén disponibles.

from concurrent.futures import as_completed

with ThreadPoolExecutor(max_workers=2) as ejecutor:
    tareas = [ejecutor.submit(procesar, f"elemento_{i}") for i in range(4)]
    
    for futuro in as_completed(tareas):
        print(futuro.result())  # Los resultados se muestran según se completen

Este método soporta control de tiempo con el parámetro timeout y manejo de excepciones mediante bloques try-except en el bucle.

Comparación de Métodos

Elige el enfoque según tus necesidades:

  • submit + result: Control granular, resultados en orden de envío.
  • map: Sintaxis concisa, resultados en orden de envío, fijo el número de tareas.
  • as_completed: Resultados en orden de finalización, ideal para tareas con tiempos variables.

Ejemplo Práctico: Incremento Concurrente con Lock

A continuación, un programa que usa múltiples hilos para incrementar una variable compartida. Se emplea un bloqueo (Lock) para evitar condiciones de carrera.

from concurrent.futures import ThreadPoolExecutor
import threading

contador_global = 0  # Variable compartida
bloqueo = threading.Lock()

def incrementar(id_tarea, repeticiones):
    global contador_global
    for _ in range(repeticiones):
        with bloqueo:  # Adquirir y liberar el bloqueo automáticamente
            contador_global += 1
    print(f"Tarea {id_tarea} completada")

# Crear pool y ejecutar tareas
with ThreadPoolExecutor(max_workers=5) as ejecutor:
    for idx in range(5):
        ejecutor.submit(incrementar, idx, 100)

print(f"Valor final: {contador_global}")  # Debe ser 500

Este código asegura que el contador final sea exacto gracias al bloqueo. El uso de with simplifica la gestión del recurso.

Etiquetas: Python ThreadPoolExecutor concurrent.futures Multithreading I/O-bound

Publicado el 6-3 17:16