Structured Streaming ofrece una interfaz que permite monitorear el ciclo de vida completo de las consultas de streaming. Esta funcionalidad resulta particularmente útil para implementar sistemas de monitoreo y alertas en tiempo real.
La interfaz StreamingQueryListener
Esta clase abstracta proporciona tres métodos que se invocan en diferentes momentos del procesamiento:
abstract class StreamingQueryListener {
def onQueryStarted(event: QueryStartedEvent): Unit
def onQueryProgress(event: QueryProgressEvent): Unit
def onQueryTerminated(event: QueryTerminatedEvent): Unit
}
Descripción de los métodos
- onQueryStarted: Se ejecuta de forma síncrona cuando inicia una consulta de streaming
- onQueryProgress: Se invoca de forma asíncrona cuando hay actualizaciones en el estado del procesamiento
- onQueryTerminated: Se ejecuta cuando una consulta finaliza, ya sea exitosamente o por un error
Casos de uso prácticos
Esta funcionalidad es especialmente valiosa para implementar alertas automáticas. Por ejemplo, dentro del método onQueryStarted se pueden evaluar condiciones específicas para determinar si es necesario enviar notificaciones. Del mismo modo, en onQueryTerminated se puede acceder a los detalles del error a través del campo exception:
@InterfaceStability.Evolving
class QueryTerminatedEvent private[sql](
val id: UUID,
val runId: UUID,
val exception: Option[String]) extends Event
Ejemplo de implementación
A continuación se presenta un ejemplo completo que demuestra cómo registrar un listener y mointorear múltiples consultas de streaming:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
object MonitorDemo {
def main(args: Array[String]): Unit = {
val session = SparkSession
.builder()
.appName("StreamingMonitor")
.master("local[4]")
.getOrCreate()
session.sparkContext.setLogLevel("ERROR")
val escuchador = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println(s"[INICIO] Consulta iniciada: ${event.id}")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(s"[PROCESO] ${event.progress.name} - Lotes: ${event.progress.batchId}")
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
val mensajeError = event.exception.getOrElse("Finalización normal")
println(s"[FINALIZADO] Consulta ${event.id} - Razón: $mensajeError")
}
}
session.streams.addListener(escuchador)
val flujoRapido = session.readStream
.format("rate")
.load()
.writeStream()
.format("console")
.trigger(Trigger.ProcessingTime(3.seconds))
.option("truncate", false)
.start()
val flujoLento = session.readStream
.format("rate")
.load()
.writeStream()
.format("console")
.trigger(Trigger.ProcessingTime(8.seconds))
.option("truncate", false)
.start()
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
val detenerFlujoRapido = new Runnable {
override def run(): Unit = {
println("Deteniendo flujo rápido...")
flujoRapido.stop()
}
}
val programarDetencion = Executors.newSingleThreadScheduledExecutor()
programarDetencion.scheduleWithFixedDelay(detenerFlujoRapido, 12, 100, TimeUnit.SECONDS)
session.streams.awaitAnyTermination()
session.streams.resetTerminated()
if (!flujoLento.awaitTermination(30, TimeUnit.SECONDS)) {
flujoLento.stop()
}
println("Todas las consultas finalizadas")
session.stop()
}
}
Este ejemplo crea dos flujos de datos con diferentes intervalos de activación, registra un listener personalizado que registra los eventos en consola, y programa la detención automática del primer flujo después de 12 segundso.