En el procesamiento por lotes con Apache Flink, las salidas de datos (sinks) se utilizan para dirigir los resultados a distintos destinos. Dos enfoques comunes son las basadas en colecciones locales y las basadas en archivos, cada una con sus aplicaciones específicas.
Salidas Basadas en Colecciones Locales
Este método permite anviar datos a la consola o a estructuras en memoria durante el desarrollo y pruebas. A continuación, se muestra un ejemplo con variables y estructuras modificadas:
// Configurar el entorno de ejecución
val entornoEjecucion = ExecutionEnvironment.getExecutionEnvironment
// Definir datos de ejemplo: (edad, nombre, estatura)
val datosPersonas: DataSet[(Int, String, Double)] = entornoEjecucion.fromElements(
(20, "maria", 175.5),
(22, "pedro", 180.0),
(19, "ana", 165.2)
)
// Enviar a la salida estándar
datosPersonas.print()
// Enviar a la salida de error estándar
datosPersonas.printToErr()
// Recolectar datos en el cliente y mostrar
println(datosPersonas.collect())
Salidas Basadas en Archivos
Flink soporta la escritura en sistemas de archivos locales o distribuidos como HDFS, con formatos como texto o CSV. La configuración del paralelismo determina si la ruta se interpreta como directorio o archivo único.
Escritura en Archivos Locales
Se utiliza el método writeAsText con modos de escritura como OVERWRITE para sobrescribir o NO_OVERWRITE para evitar conflictos.
// Preparar el entorno
val entorno = ExecutionEnvironment.getExecutionEnvironment
// Crear un conjunto de datos con estructura de mapa
val conjuntoMapas: DataSource[Map[Int, String]] = entorno.fromElements(
Map(1 -> "spark", 2 -> "flink")
)
// Escribir en un archivo local, ajustando el paralelismo a 1 para tratar la ruta como archivo
conjuntoMapas.setParallelism(1)
.writeAsText("resultado/datos_salida.txt", WriteMode.OVERWRITE)
entorno.execute()
Escritura en HDFS
El mismo método se aplica para HDFS, especificando la ruta completa del sistema de archivos distribuido.
val entornoHDFS = ExecutionEnvironment.getExecutionEnvironment
val datosParaHDFS = entornoHDFS.fromElements(Map(3 -> "hadoop", 4 -> "kafka"))
datosParaHDFS.setParallelism(1)
.writeAsText("hdfs://servidor:9000/salida/archivo.txt", WriteMode.OVERWRITE)
entornoHDFS.execute()
Ordenación y Salida a Sistemas Externos
Antes de escribir los datos, es posible ordenarlos por campos específicos usando sortPartition, lo que resulta útil para organizar la salida.
val entornoOrden = ExecutionEnvironment.getExecutionEnvironment
val datosOrden: DataSet[(Int, String, Double)] = entornoOrden.fromElements(
(20, "maria", 175.5),
(22, "pedro", 180.0),
(19, "ana", 165.2)
)
// Ordenar por edad de forma ascendente (índice 0)
datosOrden.sortPartition(0, Order.ASCENDING).print()
// Ordenar por nombre de forma descendente (índice 1)
datosOrden.sortPartition(1, Order.DESCENDING).print()
// Ordenamiento múltiple: primero por edad ascendente, luego por estatura descendente
datosOrden.sortPartition(0, Order.ASCENDING)
.sortPartition(2, Order.DESCENDING)
.print()
// Ejemplo con una case class y campos anidados
case class Persona(nombre: String, edad: Int)
val datosCompuestos: DataSet[(Persona, Double)] = entornoOrden.fromElements(
(Persona("maria", 20), 175.5),
(Persona("pedro", 22), 180.0),
(Persona("ana", 19), 165.2)
)
// Ordenar por la edad de la persona y escribir en HDFS
val datosOrdenados = datosCompuestos.sortPartition("_1.edad", Order.ASCENDING)
.setParallelism(1)
// Escribir como archivo de texto
datosOrdenados.writeAsText("hdfs://servidor:9000/salida/personas_orden.txt", WriteMode.OVERWRITE)
entornoOrden.execute()
// Escribir como archivo CSV con delimitadores personalizados
datosOrdenados.writeAsCsv(
"hdfs://servidor:9000/salida/personas_orden.csv",
rowDelimiter = "\n",
fieldDelimiter = ";",
WriteMode.OVERWRITE
)
entornoOrden.execute()