Al revisar el entorno de pruebas, se observó que Kafka acumuló más de un millón de datos, lo que llevó a sincronizarlos con la base de HBase de pruebas. Durante este proceso, se identificaron problemas de rendimiento en las inserciones.
El código inicial verificaba la existencia de la tabla en cadda inserción, lo que introducía un retraso significativo. Por ejemplo, la función para insertar datos realizaba una inicialización completa:
def insertarDatosEnHBase(nombreTabla: String, familiaColumnas: String, clave: String, datosMapa: Map[String, String]): Unit = {
val tabla: Table = obtenerTablaHBase(nombreTabla, familiaColumnas)
try {
val rowkey = generarClaveFilaMD5(clave)
val insercion = new Put(rowkey)
if (datosMapa.nonEmpty) {
datosMapa.foreach { case (k, v) =>
insercion.addColumn(Bytes.toBytes(familiaColumnas), Bytes.toBytes(k), Bytes.toBytes(v))
}
}
tabla.put(insercion)
} catch {
case e: Exception => e.printStackTrace()
} finally {
tabla.close()
}
}
La función obtenerTablaHBase se definía de la siguiente manera, realizando comprobaciones costosas:
def obtenerTablaHBase(nombreTabla: String, familiaColumnas: String): Table = {
val descriptorTabla = new HTableDescriptor(TableName.valueOf(nombreTabla))
val descriptorColumna = new HColumnDescriptor(familiaColumnas)
descriptorTabla.addFamily(descriptorColumna)
if (!admin.tableExists(TableName.valueOf(nombreTabla))) {
crearTablaHBase(conexion, nombreTabla, 10, Array(familiaColumnas))
}
conexion.getTable(TableName.valueOf(nombreTabla))
}
Este enfoque resultaba en tiempos de 50 a 70 milisegundos por inserción, afectando el rendimiento general con grandes volúmenes de datos. Para optimizar, se trasladó la inicialización a nivel de objeto, evitando repetir la verificación de la tabla en cada operación. La configuración y las referencias se gestionaron de manera estática:
object GestorHBase {
private val configuracion: Configuration = HBaseConfiguration.create()
configuracion.set("hbase.zookeeper.quorum", "zoo1,zoo2,zoo3")
configuracion.set("hbase.master", "hbase-master:16000")
configuracion.set("hbase.zookeeper.property.clientPort", "2181")
private val conexion: Connection = ConnectionFactory.createConnection(configuracion)
private val admin: Admin = conexion.getAdmin
val descriptorTabla = new HTableDescriptor(TableName.valueOf("tabla_ordenes"))
val descriptorColumna = new HColumnDescriptor("cf_datos")
descriptorTabla.addFamily(descriptorColumna)
if (!admin.tableExists(TableName.valueOf("tabla_ordenes"))) {
crearTablaHBase(conexion, "tabla_ordenes", 10, Array("cf_datos"))
}
}
Con este cambio, las inserciones posteriores reutilizaron la conexión existente, reduciendo drásticamente los tiempos de procesamiento. Pruebas mostraron que 2000 registros se insertaron en aproximadamenet 10000 milisegundos, frente a los 140000 milisegundos iniciales, logrando una mejora de rendimiento notable.