El módulo Stream de Elixir es fundamental para procesar colecciones de datos de manera perezosa y eficiente. Sin embargo, su naturaleza calculada puede conducir a comportamientos inesperados, especialmente en la gestión de recursos. Un problema común surge cuando la función de limpieza (after_fun) proporcionada a Stream.resource/3 se ejecuta más de una vez, lo que potencialmente agota recursos o provoca errores.
Comprender este comportamiento requiere reconocer que un Stream es una receta de cálculo, no una secuencia de datos almacenada. Cada vez que se consume el Stream (por ejemplo, con Enum.take/2), se reevalúa la definición completa, incluyendo las funciones de inicio y limpieza. Por lo tanto, un Stream basado en recursos no debe reutilizarse si se desea que la limpieza ocurra exactamente una vez.
Escenario Problemático
Considere una función que crea un Stream para leer un archivo línea por línea. La intención es abrir el archivo al principio y cerrarlo al final del consumo.
def crear_stream_de_archivo(ruta) do
Stream.resource(
fn -> File.open!(ruta) end, # start_fun: abre el archivo
fn handle -> # next_fun: lee una línea
case IO.read(handle, :line) do
nil -> {:halt, handle}
linea -> {[linea], handle}
end
end,
fn handle -> IO.puts("Cerrando handle"); File.close(handle) end # after_fun
)
end
stream = crear_stream_de_archivo("datos.txt")
Enum.take(stream, 2) # Primera enumeración: abre, lee, cierra.
Enum.take(stream, 2) # Segunda enumeración: intenta cerrar un handle ya cerrado.
En la segunda llamada a Enum.take/2, se intenta ejecutar after_fun nuevamente sobre un recurso ya liberado, lo que genera un error.
Estrategias de Solución
Existen varios enfoques para diseñar Streams que manejen recursos de forma segura y predecible.
1. Consumo Único y Explotación
La solución más directa es tratar el Stream como un valor de un solo uso. Esto se alinea con el patrón estándar de la librería Elixir, como se observa en File.stream!/3. La lógica de procesamiento debe envolverse en una función que cree y consuma el Stream inmediatamente.
def procesar_archivo(ruta, limite) do
File.stream!(ruta) # Crea un Stream nuevo cada vez que se llama.
|> Stream.take(limite)
|> Enum.to_list()
end
# Cada invocación gestiona su propio ciclo de vida del recurso.
primer_lote = procesar_archivo("datos.txt", 10)
segundo_lote = procesar_archivo("datos.txt", 5)
Este enfoque es simple, seguro para concurrencia y evita completamente los problemas de recursos compartidos.
2. Separación de la Lógica de Consumo
Si se necesita aplicar múltiples operaciones al mismo conjunto de datos, la estrategia consiste en extraer primero los datos en memoria y luego aplicar las transformaciones de Stream sobre esa lista materializada. Esto evita la reapertura del recurso subyacente.
lineas = "datos.txt" |> File.stream!() |> Enum.to_list()
# Ahora se puede crear múltiples Streams sobre la lista en memoria.
stream1 = Stream.map(lineas, &String.upcase/1)
stream2 = Stream.filter(lineas, &(String.length(&1) > 5))
Este método es ideal cuando el conjunto de datos cabe razonablemente en la memoria.
3. Uso de un Proceso de Estado (Avanzado)
Para escenarios complejos donde el recurso es costoso de inicializar (como una conexión de base de datos) y se requiere su reutilización controlada, se puede encapsular el estado en un proceso supervisor, como un Agent.
defmodule GestorDeRecursos do
use Agent
def start_link(_opts) do
Agent.start_link(fn -> %{recurso: nil, contador: 0} end, name: __MODULE__)
end
def obtener_recurso do
Agent.get_and_update(__MODULE__, fn estado ->
if estado.recurso do
{estado.recurso, %{estado | contador: estado.contador + 1}}
else
recurso = inicializar_recurso_pesado() # Ej: conexión a DB
{recurso, %{recurso: recurso, contador: 1}}
end
end)
end
def liberar_si_necesario do
Agent.update(__MODULE__, fn estado ->
nuevo_contador = estado.contador - 1
if nuevo_contador <= 0 do
cerrar_recurso_pesado(estado.recurso) # Ej: cerrar conexión
%{recurso: nil, contador: 0}
else
%{estado | contador: nuevo_contador}
end
end)
end
end
def crear_stream_con_recurso_compartido do
Stream.resource(
fn -> GestorDeRecursos.obtener_recurso() end,
fn recurso -> {producir_dato(recurso), recurso} end,
fn _recurso -> GestorDeRecursos.liberar_si_necesario() end
)
end
Este patrón permite múltiples enumeraciones concurrentes sobre el mismo recurso, con una limpieza basada en conteo de referencias. Sin embargo, incrementa significativamente la complejidad.
Recomendaciones Clave
- Claridad en el diseeño: Decida si su Stream es para consumo único (gessiona recursos) o reutilizable (solo transforma datos en memoria).
- Preferir
File.stream!: Para archivos, utilice siempre la función de la librería estándar que implementa el patrón de consumo único. - Evitar estado global oculto: No utilice el truco del
Agentpara recursos simples. Resérvelo para casos justificados de recursos pesados y compartidos. - Depuración: Si sospecha de una limpieza múltiple, inserte sentencias de registro (
IO.inspect) dentro de suafter_funpara rastrear su ejecución.
El modelo de evaluación perezosa de Stream es poderoso pero exige una gestión explícita del ciclo de vida de los recursos. Adoptar el patrón de "Stream como valer efímero" para operaciones con recursos la mayoría de los errores relacionados con after_fun.