En entornos de streaming con Apache Kafka y Apache Spark, es común encontrarse con excepciones relacionadas con la configuración del consumidor. Un error frecuente es la IllegalStateException que indica que el consumidor no está suscrito a ningún tópico ni asignado a ninguna partición. Esto ocurre al intentar obtaner datos sin haber establecido previamente una suscripción o asignación válida.
La raíz del problema radica en el ciclo de vida del consumidor de Kafka. Antes de invocar el método poll para recuperar registros, el consumidor debe estar vinculado a al menos un tópico o partición. Internamente, al realizar poll, el consumidor intenta usar el último offset confirmado como punto de inicio para la lectura, el cual puede configurarse automáticamente o mediante el método seek.
Al revisar el código fuente del consumider, se observa una verificación explícita en el método poll:
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0) {
throw new IllegalArgumentException("El timeout no puede ser negativo");
}
// Validación de suscripción o asignación
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("El consumidor no está suscrito a ningún tópico ni asignado a ninguna partición");
}
long initialTime = time.milliseconds();
long remainingTime = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords = pollOnce(remainingTime);
if (!fetchedRecords.isEmpty()) {
fetcher.sendFetches();
client.pollNoWakeup();
if (this.interceptors == null) {
return new ConsumerRecords<>(fetchedRecords);
} else {
return this.interceptors.onConsume(new ConsumerRecords<>(fetchedRecords));
}
}
long elapsed = time.milliseconds() - initialTime;
remainingTime = timeout - elapsed;
} while (remainingTime > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
En aplicaciones de Spark Streaming con Kafka, un enfoque incorrecto es utilizar asignaciones directas sin suscripciones para tópicos nuevos. Por ejemplo, el siguiente código puede fallar si no se manejan correctamente los offsets:
val kafkaConfig = Map[String, Object]("bootstrap.servers" -> "localhost:9092")
val topicList = Set("mi_topico")
val initialOffsets = Map(new TopicPartition("mi_topico", 0) -> 0L)
val directStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Assign[String, String](
initialOffsets.keys.toSeq, kafkaConfig, initialOffsets
)
)
Para tópicos nuevos o cuando se desea una gestión automática de offsets, se recomienda usar el método Subscribe. Esto permite que el consumidor se suscriba dinámicamente y gestione las particiones de manera eficiente. Un ejemplo corregido sería:
val streamingContext = new StreamingContext(conf, Seconds(5))
val parameters = Map[String, Object]("bootstrap.servers" -> "kafka-broker:9092", "group.id" -> "spark-consumer")
val topics = Seq("topico_eventos")
val subscribedStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, parameters)
)
Al usar Subscribe, el consumidor se registra automáticamente en el coordinador de Kafka y recibe asignaciones de particiones según sea necesario, evitando el error de falta de suscripción.