Creación de Tablas Kudu mediante Spark
El proceso para definir una tabla en Kudu utilizando Spark consta de 5 pasos principales:
- Especificar el nombre de la tabla
- Definir el esquema (schema)
- Establecer las claves primarias
- Confiugrar las opciones importantes; por ejemplo: el esquema de particionamiento
- Invocar la API de creación de tablas
A continuación se presenta un ejemplo práctico de implementación:
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import collection.JavaConverters._
object OperacionesKudu {
def main(args: Array[String]): Unit = {
// Configuración de Spark
val configuracionSpark = new SparkConf().setAppName("ProcesamientoDatos")
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculation", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val contextoSpark = SparkContext.getOrCreate(configuracionSpark)
val sqlContext = SparkSession.builder().config(configuracionSpark).getOrCreate().sqlContext
// Inicialización del contexto Kudu
val contextoKudu = new KuduContext("maestro01:7051,maestro02:7051,maestro03:7051", contextoSpark)
// Paso 1: Definir nombre de tabla
val nombreTablaKudu = "empleados_datos"
// Paso 2: Definir esquema
val esquemaTabla = StructType(
StructField("idEmpleado", StringType, false) ::
StructField("nombre", StringType, false) ::
StructField("genero", StringType, true) ::
StructField("edad", IntegerType, true) :: Nil
)
// Paso 3: Definir clave primaria
val clavePrimaria = Seq("idEmpleado")
// Paso 4: Configurar opciones de particionamiento
val opcionesTabla = new CreateTableOptions()
opcionesTabla.
setRangePartitionColumns(List("nombre").asJava).
setNumReplicas(3)
// Paso 5: Crear la tabla
contextoKudu.createTable(
nombreTablaKudu, esquemaTabla, clavePrimaria, opcionesTabla)
}
}
Es importante tener en cuenta que al definir opciones para tablas Kudu, cuando se especifica una lista de columnas para particionamiento por rangos, es necesario utilizar el método "asJava". Esto se debe a que la API de Kudu Java requiere objetos Java (java.util.List) en lugar de listas de Scala. Para que esté disponible este método, es imprescindible importar la librería JavaConverters.
Una vez creada la tabla, puede verificarse a través de la interfaz de usuario de Kudu en http://<nombre-host-maestro>:8051/tables. Al hacer clic en el ID de la tabla, será posible visualizar el esquema y la información de particionamiento.
Eliminación de Tablas Kudu con Spark
Para eliminar una tabla existente en Kudu utilizando Spark, primero se debe verificar su eixstencia y luego proceder con la eliminación:
object EliminarTabla {
def main(args: Array[String]): Unit = {
// Configuración de Spark
val configuracionSpark = new SparkConf().setAppName("EliminacionDatos")
.setMaster("local")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "1")
.set("spark.speculation", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val contextoSpark = SparkContext.getOrCreate(configuracionSpark)
val sqlContext = SparkSession.builder().config(configuracionSpark).getOrCreate().sqlContext
// Inicialización del contexto Kudu
val contextoKudu = new KuduContext("maestro01:7051,maestro02:7051,maestro03:7051", contextoSpark)
// Especificar el nombre de la tabla a eliminar
val nombreTablaEliminar = "empleados_datos"
// Verificar existencia y eliminar si existe
if (contextoKudu.tableExists(nombreTablaEliminar)) {
contextoKudu.deleteTable(nombreTablaEliminar)
println(s"Tabla $nombreTablaEliminar eliminada exitosamente")
} else {
println(s"La tabla $nombreTablaEliminar no existe")
}
}
}