Este tutorial detalla el proceso para convertir modelos de procesamiento de lenguaje natural (NLP) entreandos con PyTorch 2.9 en servicios robustos y escalables. Cubriremos la optimización del modelo, la creación de APIs y el despliegue en contenedores.
- Configuración del Entorno de Ejecución
2.1. Imagen Base Preconfigurada
Utilizaremos la imagen PyTorch-CUDA-v2.9, que incluye PyTorch 2.9, CUDA, y bibliotecas esenciales como NumPy y Pandas. Esta imagen garantiza compatibilidad y evita conflictos de dependencias.
2.2. Verificación del Entorno
Para confirmar que el entorno está operativo y la GPU es accesible, ejecute el siguiente script de verificación:
# verificacion_entorno.py
import torch as pt
version_pt = pt.__version__
cuda_disp = pt.cuda.is_available()
print(f"Versión de PyTorch: {version_pt}")
print(f"GPU disponible: {cuda_disp}")
if cuda_disp:
info_gpu = pt.cuda.get_device_properties(0)
print(f"Dispositivo GPU: {info_gpu.name}")
num_gpus = pt.cuda.device_count()
print(f"Cantidad de GPUs: {num_gpus}")
# Ejecución de una operación matricial de prueba
dispositivo = pt.device("cuda")
matriz_a = pt.randn(800, 800, device=dispositivo)
matriz_b = pt.randn(800, 800, device=dispositivo)
producto = pt.mm(matriz_a, matriz_b)
print(f"Producto matricial completado. Dimensión: {producto.shape}")
- Preparación y Optimización del Modelo NLP
3.1. Modelo Base para Clasificación de Texto
Supongamos un modelo de clasificación basado en transformers. El siguiente código define una arquitectura simple:
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
class ClasificadorTexto(nn.Module):
def __init__(self, nombre_modelo="bert-base-uncased", num_etiquetas=5):
super().__init__()
self.transformador = AutoModel.from_pretrained(nombre_modelo)
self.capas_dropout = nn.Dropout(p=0.1)
self.capa_clasificacion = nn.Linear(self.transformador.config.hidden_size, num_etiquetas)
def forward(self, ids_entrada, mascara_atencion):
salidas = self.transformador(input_ids=ids_entrada, attention_mask=mascara_atencion)
vector_agregado = salidas.pooler_output
vector_agregado = self.capas_dropout(vector_agregado)
logits = self.capa_clasificacion(vector_agregado)
return logits
3.2. Técnicas de Optimización para Inferencia
Serialización con TorchScript: Mejora el rendimiento mediante la compilación gráfica del modelo.
# guardar_modelo_optimizado.py
import torch as pt
def serializar_modelo(modelo, tokenizador, directorio_salida):
modelo.eval()
# Texto de ejemplo para el rastreo
texto_ejemplo = "Texto de ejemplo para la compilación del modelo."
entradas = tokenizador(texto_ejemplo, return_tensors="pt", padding=True, truncation=True, max_length=128)
modelo_rastreado = pt.jit.trace(modelo, (entradas["input_ids"], entradas["attention_mask"]))
pt.jit.save(modelo_rastreado, f"{directorio_salida}/modelo_compilado.pt")
tokenizador.save_pretrained(directorio_salida)
# Guardar pesos originales como respaldo
pt.save(modelo.state_dict(), f"{directorio_salida}/pesos_modelo.pth")
print(f"Modelo serializado en: {directorio_salida}")
Procesamiento por Lotes (Batching): Aumenta el rendimiento al procesar múltiples entradas simultáneamente.
# inferencia_lotes.py
import torch as pt
class ClasificadorLotes:
def __init__(self, ruta_modelo):
self.modelo_compilado = pt.jit.load(f"{ruta_modelo}/modelo_compilado.pt")
self.tokenizador = AutoTokenizer.from_pretrained(ruta_modelo)
def predecir_lote(self, lista_textos, tamano_lote=32, longitud_max=128):
resultados_acumulados = []
total_textos = len(lista_textos)
for indice_inicio in range(0, total_textos, tamano_lote):
lote_textos = lista_textos[indice_inicio:indice_inicio + tamano_lote]
codificaciones = self.tokenizador(
lote_textos,
return_tensors="pt",
padding=True,
truncation=True,
max_length=longitud_max
)
with pt.no_grad():
salidas_logits = self.modelo_compilado(codificaciones["input_ids"], codificaciones["attention_mask"])
etiquetas_predichas = pt.argmax(salidas_logits, dim=-1)
resultados_acumulados.extend(etiquetas_predichas.cpu().tolist())
return resultados_acumulados
Cuantización Dinámica: Reduce el consumo de memoria sin una recalibración compleja.
# cuantizar_modelo.py
import torch as pt
import torch.nn as nn
def aplicar_cuantizacion(modelo_original):
modelo_original.eval()
modelo_cuantizado = pt.quantization.quantize_dynamic(
modelo_original,
{nn.Linear}, # Cuantizar solo capas lineales
dtype=pt.qint8
)
return modelo_cuantizado
- Implementación del Servicio API
4.1. API REST con FastAPI
Utilizamos FastAPI para crear un servicio web asíncrono. El siguiente código muestra la estructura básica de la aplicación:
# servicio_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import asyncio
# Modelos de datos para la API
class SolicitudPrediccion(BaseModel):
textos: List[str]
tamano_lote: int = 32
class RespuestaPrediccion(BaseModel):
predicciones: List[int]
tiempo_procesamiento: float
# Inicialización de la aplicación y modelo global
aplicacion = FastAPI(title="Servicio de Clasificación NLP")
modelo_global = None
@aplicacion.on_event("startup")
async def cargar_modelo_al_iniciar():
global modelo_global
print("Cargando modelo optimizado...")
# Cargar instancia de ClasificadorLotes
modelo_global = ClasificadorLotes("./modelo_optimizado")
print("Modelo listo para servir.")
@aplicacion.post("/predecir", response_model=RespuestaPrediccion)
async def realizar_prediccion(solicitud: SolicitudPrediccion):
if modelo_global is None:
raise HTTPException(status_code=503, detail="Modelo no disponible.")
marca_tiempo_inicio = asyncio.get_event_loop().time()
try:
# Ejecutar inferencia en un hilo separado para no bloquear el event loop
bucle = asyncio.get_event_loop()
predicciones = await bucle.run_in_executor(
None, # Usar executor por defecto
modelo_global.predecir_lote,
solicitud.textos,
solicitud.tamano_lote
)
tiempo_total = asyncio.get_event_loop().time() - marca_tiempo_inicio
return RespuestaPrediccion(predicciones=predicciones, tiempo_procesamiento=tiempo_total)
except Exception as error_interno:
raise HTTPException(status_code=500, detail=f"Error en la predicción: {str(error_interno)}")
@aplicacion.get("/estado")
async def verificar_salud():
return {
"servicio": "activo",
"modelo_cargado": modelo_global is not None,
"gpu_disponible": pt.cuda.is_available()
}
4.2. Monitoreo y Registro
Un servicio productivo requiere métricas. Se puede integrar un middleware para registrar tiempos de respuesta:
# monitoreo.py
import time
import logging
# Configurar logging básico
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("nlp-service")
# Middleware de ejemplo para FastAPI
async def middleware_registrar_peticiones(request, call_next):
inicio = time.time()
respuesta = await call_next(request)
duracion = time.time() - inicio
logger.info(f"{request.method} {request.url.path} | Estado: {respuesta.status_code} | Duración: {duracion:.3f}s")
return respuesta
4.3. Containerización con Docker
Para un despliegue reproducible, creamos un Dockerfile:
# Dockerfile
FROM pytorch/pytorch:2.9.0-cuda12.1-cudnn8-runtime
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "servicio_api:aplicacion", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
El archivo requirements.txt contendría dependencias específicas como fastapi, uvicorn, transformers, etc.
- Optimización y Operaciones en Producción
5.1. Pruebas de Carga
Antes del despliegue final, es crucial evaluar el rendimiento bajo carga concurrente. Se pueden usar herramientas como locust o scripts personalizados con aiohttp.
5.2. Gestión de Recursos en Kubernetes
Para alta disponibilidad, se define un manifiesto de despliegue en Kubernetes:
# despliegue-k8s.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: servicio-nlp
spec:
replicas: 2
selector:
matchLabels:
app: nlp-svc
template:
metadata:
labels:
app: nlp-svc
spec:
containers:
- name: contenedor-nlp
image: registry.ejemplo.com/servicio-nlp:1.0
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: 1
limits:
memory: "8Gi"
cpu: "4"
livenessProbe:
httpGet:
path: /estado
port: 8000
initialDelaySeconds: 40
periodSeconds: 15
5.3. Solución a Problemas Comunes
Falta de memoria en GPU: Implementar un sistema de lotes adpatativo que reduzca el tamaño del lote si ocurre un error de memoria.
# prediccion_adaptativa.py
import torch as pt
def predecir_con_lote_adaptativo(lista_textos, modelo, tamano_inicial=32):
tamano_actual = tamano_inicial
resultados_finales = []
indice = 0
total = len(lista_textos)
while indice < total:
try:
lote = lista_textos[indice:indice + tamano_actual]
res = modelo.predecir_lote(lote, tamano_lote=tamano_actual)
resultados_finales.extend(res)
indice += tamano_actual
# Si tuvo éxito, se puede intentar aumentar el tamaño para la siguiente iteración
except pt.cuda.OutOfMemoryError:
# Reducir el tamaño del lote y reintentar
tamano_actual = max(1, tamano_actual // 2)
print(f"Memoria insuficiente. Reduciendo tamaño de lote a {tamano_actual}.")
return resultados_finales