Salidas de Datos para Procesamiento por Lotes en Apache Flink

En el procesamiento por lotes con Apache Flink, las salidas de datos (sinks) se utilizan para dirigir los resultados a distintos destinos. Dos enfoques comunes son las basadas en colecciones locales y las basadas en archivos, cada una con sus aplicaciones específicas.

Salidas Basadas en Colecciones Locales

Este método permite anviar datos a la consola o a estructuras en memoria durante el desarrollo y pruebas. A continuación, se muestra un ejemplo con variables y estructuras modificadas:

// Configurar el entorno de ejecución
val entornoEjecucion = ExecutionEnvironment.getExecutionEnvironment

// Definir datos de ejemplo: (edad, nombre, estatura)
val datosPersonas: DataSet[(Int, String, Double)] = entornoEjecucion.fromElements(
  (20, "maria", 175.5),
  (22, "pedro", 180.0),
  (19, "ana", 165.2)
)

// Enviar a la salida estándar
datosPersonas.print()

// Enviar a la salida de error estándar
datosPersonas.printToErr()

// Recolectar datos en el cliente y mostrar
println(datosPersonas.collect())

Salidas Basadas en Archivos

Flink soporta la escritura en sistemas de archivos locales o distribuidos como HDFS, con formatos como texto o CSV. La configuración del paralelismo determina si la ruta se interpreta como directorio o archivo único.

Escritura en Archivos Locales

Se utiliza el método writeAsText con modos de escritura como OVERWRITE para sobrescribir o NO_OVERWRITE para evitar conflictos.

// Preparar el entorno
val entorno = ExecutionEnvironment.getExecutionEnvironment

// Crear un conjunto de datos con estructura de mapa
val conjuntoMapas: DataSource[Map[Int, String]] = entorno.fromElements(
  Map(1 -> "spark", 2 -> "flink")
)

// Escribir en un archivo local, ajustando el paralelismo a 1 para tratar la ruta como archivo
conjuntoMapas.setParallelism(1)
  .writeAsText("resultado/datos_salida.txt", WriteMode.OVERWRITE)

entorno.execute()

Escritura en HDFS

El mismo método se aplica para HDFS, especificando la ruta completa del sistema de archivos distribuido.

val entornoHDFS = ExecutionEnvironment.getExecutionEnvironment
val datosParaHDFS = entornoHDFS.fromElements(Map(3 -> "hadoop", 4 -> "kafka"))

datosParaHDFS.setParallelism(1)
  .writeAsText("hdfs://servidor:9000/salida/archivo.txt", WriteMode.OVERWRITE)

entornoHDFS.execute()

Ordenación y Salida a Sistemas Externos

Antes de escribir los datos, es posible ordenarlos por campos específicos usando sortPartition, lo que resulta útil para organizar la salida.

val entornoOrden = ExecutionEnvironment.getExecutionEnvironment
val datosOrden: DataSet[(Int, String, Double)] = entornoOrden.fromElements(
  (20, "maria", 175.5),
  (22, "pedro", 180.0),
  (19, "ana", 165.2)
)

// Ordenar por edad de forma ascendente (índice 0)
datosOrden.sortPartition(0, Order.ASCENDING).print()

// Ordenar por nombre de forma descendente (índice 1)
datosOrden.sortPartition(1, Order.DESCENDING).print()

// Ordenamiento múltiple: primero por edad ascendente, luego por estatura descendente
datosOrden.sortPartition(0, Order.ASCENDING)
  .sortPartition(2, Order.DESCENDING)
  .print()

// Ejemplo con una case class y campos anidados
case class Persona(nombre: String, edad: Int)
val datosCompuestos: DataSet[(Persona, Double)] = entornoOrden.fromElements(
  (Persona("maria", 20), 175.5),
  (Persona("pedro", 22), 180.0),
  (Persona("ana", 19), 165.2)
)

// Ordenar por la edad de la persona y escribir en HDFS
val datosOrdenados = datosCompuestos.sortPartition("_1.edad", Order.ASCENDING)
  .setParallelism(1)

// Escribir como archivo de texto
datosOrdenados.writeAsText("hdfs://servidor:9000/salida/personas_orden.txt", WriteMode.OVERWRITE)
entornoOrden.execute()

// Escribir como archivo CSV con delimitadores personalizados
datosOrdenados.writeAsCsv(
  "hdfs://servidor:9000/salida/personas_orden.csv",
  rowDelimiter = "\n",
  fieldDelimiter = ";",
  WriteMode.OVERWRITE
)
entornoOrden.execute()

Etiquetas: Apache Flink Procesamiento por Lotes Salida de Datos scala HDFS

Publicado el 6-30 00:51