ThreadPoolExecutor es una herramienta del módulo concurrent.futures en Python que permite ejecutar múltiples tareas de forma concurrente usando un pool de hilos. Es ideal para operaciones de I/O-bound, como solicitudes de red o lectura de archivos, ya que optimiza el uso de recursos y evita la sobrecarga de crear y destruir hilos continuamente.
Implementación Básica de ThreadPoolExecutor
Para empezar, importa el módulo y crea un pool de hilos especificando el número máximo de workers:
from concurrent.futures import ThreadPoolExecutor
# Crear un pool con hasta 3 hilos
pool = ThreadPoolExecutor(max_workers=3)
Puedes enviar tareas individuales con submit, que devuelve un objeto Future para obtener resultados más tarde:
import time
def procesar(datos):
print(f"Procesando {datos}")
time.sleep(1) # Simular trabajo de I/O
return f"Resultado de {datos}"
# Enviar una tarea
futuro = pool.submit(procesar, "lote_A")
print(futuro.result()) # Bloquea hasta que termine
Alternativamente, map permite enviar múltiples tareas de una vez, similar a la función map integrada:
lista_datos = ["lote_B", "lote_C", "lote_D"]
resultados = pool.map(procesar, lista_datos)
for res in resultados:
print(res) # Los resultados se procesan en orden de envío
Manejo de Objetos Future
Los objetos Future ofrecan métodos útiles para monitorear tareas:
result(timeout=None): Espera el resultado con un límite de tiempo opcional.done(): Verifica si la tarea ha finalizado.add_done_callback(fn): Ejecuta una función cuando la tarea se completa.
Ejemplo con callback y verificación de estado:
def al_completar(fut):
print(f"Tarea finalizada: {fut.result()}")
futuro2 = pool.submit(procesar, "lote_E")
futuro2.add_done_callback(al_completar)
while not futuro2.done():
time.sleep(0.5)
print("Esperando...")
Cierre del Pool
Usa shutdown(wait=True) para cerrar el pool después de que todas las tareas terminen. Si wait=False, el pool se cierra inmediatamente y las tareas pendientes se cancelan.
pool.shutdown(wait=True)
print("Pool cerrado tras completar todas las tareas.")
Uso de as_completed para Resultados en Orden de Finalización
La función as_completed del mismo módulo itera sobre los objetos Future en el orden en que se completan, no en el orden de envío. Esto es útil para procesar resultados tan pronto como estén disponibles.
from concurrent.futures import as_completed
with ThreadPoolExecutor(max_workers=2) as ejecutor:
tareas = [ejecutor.submit(procesar, f"elemento_{i}") for i in range(4)]
for futuro in as_completed(tareas):
print(futuro.result()) # Los resultados se muestran según se completen
Este método soporta control de tiempo con el parámetro timeout y manejo de excepciones mediante bloques try-except en el bucle.
Comparación de Métodos
Elige el enfoque según tus necesidades:
submit+result: Control granular, resultados en orden de envío.map: Sintaxis concisa, resultados en orden de envío, fijo el número de tareas.as_completed: Resultados en orden de finalización, ideal para tareas con tiempos variables.
Ejemplo Práctico: Incremento Concurrente con Lock
A continuación, un programa que usa múltiples hilos para incrementar una variable compartida. Se emplea un bloqueo (Lock) para evitar condiciones de carrera.
from concurrent.futures import ThreadPoolExecutor
import threading
contador_global = 0 # Variable compartida
bloqueo = threading.Lock()
def incrementar(id_tarea, repeticiones):
global contador_global
for _ in range(repeticiones):
with bloqueo: # Adquirir y liberar el bloqueo automáticamente
contador_global += 1
print(f"Tarea {id_tarea} completada")
# Crear pool y ejecutar tareas
with ThreadPoolExecutor(max_workers=5) as ejecutor:
for idx in range(5):
ejecutor.submit(incrementar, idx, 100)
print(f"Valor final: {contador_global}") # Debe ser 500
Este código asegura que el contador final sea exacto gracias al bloqueo. El uso de with simplifica la gestión del recurso.