Creación y Eliminación de Tablas Kudu con Spark

Creación de Tablas Kudu mediante Spark

El proceso para definir una tabla en Kudu utilizando Spark consta de 5 pasos principales:

  1. Especificar el nombre de la tabla
  2. Definir el esquema (schema)
  3. Establecer las claves primarias
  4. Confiugrar las opciones importantes; por ejemplo: el esquema de particionamiento
  5. Invocar la API de creación de tablas

A continuación se presenta un ejemplo práctico de implementación:


import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import collection.JavaConverters._

object OperacionesKudu {
  def main(args: Array[String]): Unit = {
    // Configuración de Spark
    val configuracionSpark = new SparkConf().setAppName("ProcesamientoDatos")
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculation", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    val contextoSpark = SparkContext.getOrCreate(configuracionSpark)
    val sqlContext = SparkSession.builder().config(configuracionSpark).getOrCreate().sqlContext
    
    // Inicialización del contexto Kudu
    val contextoKudu = new KuduContext("maestro01:7051,maestro02:7051,maestro03:7051", contextoSpark)
    
    // Paso 1: Definir nombre de tabla
    val nombreTablaKudu = "empleados_datos"
    
    // Paso 2: Definir esquema
    val esquemaTabla = StructType(
        StructField("idEmpleado", StringType, false) ::
        StructField("nombre", StringType, false) ::
        StructField("genero", StringType, true) ::
        StructField("edad", IntegerType, true) :: Nil
    )
    
    // Paso 3: Definir clave primaria
    val clavePrimaria = Seq("idEmpleado")
    
    // Paso 4: Configurar opciones de particionamiento
    val opcionesTabla = new CreateTableOptions()
    opcionesTabla.
      setRangePartitionColumns(List("nombre").asJava).
      setNumReplicas(3)
    
    // Paso 5: Crear la tabla
    contextoKudu.createTable(
      nombreTablaKudu, esquemaTabla, clavePrimaria, opcionesTabla)
  }
}

Es importante tener en cuenta que al definir opciones para tablas Kudu, cuando se especifica una lista de columnas para particionamiento por rangos, es necesario utilizar el método "asJava". Esto se debe a que la API de Kudu Java requiere objetos Java (java.util.List) en lugar de listas de Scala. Para que esté disponible este método, es imprescindible importar la librería JavaConverters.

Una vez creada la tabla, puede verificarse a través de la interfaz de usuario de Kudu en http://<nombre-host-maestro>:8051/tables. Al hacer clic en el ID de la tabla, será posible visualizar el esquema y la información de particionamiento.

Eliminación de Tablas Kudu con Spark

Para eliminar una tabla existente en Kudu utilizando Spark, primero se debe verificar su eixstencia y luego proceder con la eliminación:


object EliminarTabla {
  def main(args: Array[String]): Unit = {
    // Configuración de Spark
    val configuracionSpark = new SparkConf().setAppName("EliminacionDatos")
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculation", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    val contextoSpark = SparkContext.getOrCreate(configuracionSpark)
    val sqlContext = SparkSession.builder().config(configuracionSpark).getOrCreate().sqlContext
    
    // Inicialización del contexto Kudu
    val contextoKudu = new KuduContext("maestro01:7051,maestro02:7051,maestro03:7051", contextoSpark)
    
    // Especificar el nombre de la tabla a eliminar
    val nombreTablaEliminar = "empleados_datos"
    
    // Verificar existencia y eliminar si existe
    if (contextoKudu.tableExists(nombreTablaEliminar)) {
      contextoKudu.deleteTable(nombreTablaEliminar)
      println(s"Tabla $nombreTablaEliminar eliminada exitosamente")
    } else {
      println(s"La tabla $nombreTablaEliminar no existe")
    }
  }
}

Etiquetas: Spark Kudu big data DDL Apache

Publicado el 6-28 01:53