Instalación y Configuración de Kafka
Tabla de Contenidos
- Descarga
- Prerrequisitos (instalación de Zookeeper)
- Instalación
- Subida y descompresión
- Modificación de archivos de configuración
- Distribución del paquete de instalación a otros nodos
- Creación de enlaces simbólicos
- Modificación de variables de entorno
- Inicio
- Inicio del clúster de Zookeeper
- Inicio del servicio del clúster Kafka
- Creación de un topic
- Visualización de información de réplicas del topic
- Visualización de topics existentes
- Envío de mensajes con el productor
- Consumo de mensajes con el consumidor
Descarga
Las direcciones de descarga son:
Prerrequisitos (instalación de Zookeeper)
Para la instalación de Zookeeper, consultar: http://www.cnblogs.com/qinygunzong/p/8634335.html#_label4_0
Instalación
Para este ejemplo, utilizaremos la versión kafka_2.11-0.8.2.0
Subida y descompresión
[usuario@servidor1 ~]$ tar -zxvf kafka_2.11-0.8.2.0.tgz -C aplicaciones
[usuario@servidor1 ~]$ cd aplicaciones/
[usuario@servidor1 aplicaciones]$ ln -s kafka_2.11-0.8.2.0/ kafka
Modificación de archivos de configuración
Entrar al directorio de cnofiguración de Kafka:
[usuario@servidor1 ~]$ cd aplicaciones/kafka/config/
El archivo principal a considerar es server.properties. Aunque hay muchos archivos en el directorio, incluyendo archivos de Zookeeper, es recomendable usar un clúster de Zookeeper independiente en lugar del que viene con Kafka.
En server.properties, asegúrate de que broker.id y host.name sean diferentes en cada nodo:
// Identificador único de esta máquina en el clúster, similar a myid de Zookeeper
broker.id=0
// Puerto por defecto para el servicio de Kafka
port=9092
// Parámetro generalmente deshabilitado. En la versión 0.8.1 había un bug con la resolución DNS
host.name=servidor1
// Número de hilos para el manejo de red en el broker
num.network.threads=3
// Número de hilos para el manejo de I/O en el broker
num.io.threads=8
// Tamaño del buffer de envío. Los datos no se envían de inmediato, se almacenan en el buffer hasta alcanzar un tamaño determinado
socket.send.buffer.bytes=102400
// Tamaño del buffer de recepción de Kafka. Los datos se serializan a disco cuando alcanzan cierto tamaño
socket.receive.buffer.bytes=102400
// Número máximo de solicitudes para mensajes o envío de mensajes a Kafka. No debe exceder el tamaño del heap de Java
socket.request.max.bytes=104857600
// Directorio para almacenar los mensajes. Puede ser una expresión con comas. num.io.threads debe ser mayor que el número de directorios
log.dirs=/home/usuario/log/kafka-logs
// Número de particiones por defecto para un topic
num.partitions=1
// Número de hilos por directorio de datos para la recuperación de logs
num.recovery.threads.per.data.dir=1
// Tiempo máximo de retención de mensajes por defecto (168 horas, 7 días)
log.retention.hours=168
// Tamaño máximo de un segmento de log. Cuando se supera este valor, Kafka inicia un nuevo archivo
log.segment.bytes=1073741824
// Intervalo de verificación del tiempo de expiración de logs (300000 ms = 5 minutos)
log.retention.check.interval.ms=300000
// Habilitación de la compresión de logs (generalmente no es necesaria)
log.cleaner.enable=false
// Configuración de la conexión a Zookeeper
zookeeper.connect=192.168.123.102:2181,192.168.123.103:2181,192.168.123.104:2181
// Tiempo de espera para la conexión a Zookeeper
zookeeper.connection.timeout.ms=6000
En producer.properties:
metadata.broker.list=192.168.123.102:9092,192.168.123.103:9092,192.168.123.104:9092
En consumer.properties:
zookeeper.connect=192.168.123.102:2181,192.168.123.103:2181,192.168.123.104:2181
Distribución del paquete de instalación a otros nodos
[usuario@servidor1 aplicaciones]$ scp -r kafka_2.11-0.8.2.0/ servidor2:$PWD
[usuario@servidor1 aplicaciones]$ scp -r kafka_2.11-0.8.2.0/ servidor3:$PWD
[usuario@servidor1 aplicaciones]$ scp -r kafka_2.11-0.8.2.0/ servidor4:$PWD
Creación de enlaces simbólicos
[usuario@servidor1 aplicaciones]$ ln -s kafka_2.11-0.8.2.0/ kafka
Modificación de variables de entorno
[usuario@servidor1 ~]$ vi .bashrc
#Kafka
export KAFKA_HOME=/home/usuario/aplicaciones/kafka
export PATH=$PATH:$KAFKA_HOME/bin
Guardar y aplicar los cambios inmediatamente:
[usuario@servidor1 ~]$ source ~/.bashrc
Inicio
Inicio del clúster de Zookeeper
Ejecutar en todos los nodos de Zookeeper:
[usuario@servidor1 ~]$ zkServer.sh start
Inicio del servicio del clúster Kafka
[usuario@servidor1 kafka]$ bin/kafka-server-start.sh config/server.properties
Repetir este proceso en los nodos servidor2, servidor3 y servidor4.
Creación de un topic
[usuario@servidor1 kafka]$ bin/kafka-topics.sh --create --zookeeper servidor1:2181 --replication-factor 3 --partitions 3 --topic topic_ejemplo
Visualización de información de réplicas del topic
[usuario@servidor1 kafka]$ bin/kafka-topics.sh --describe --zookeeper servidor1:2181 --topic topic_ejemplo
Visualización de topics existentes
[usuario@servidor1 kafka]$ bin/kafka-topics.sh --list --zookeeper servidor1:2181
Envío de mensajes con el productor
[usuario@servidor1 kafka]$ bin/kafka-console-producer.sh --broker-list servidor1:9092 --topic topic_ejemplo
En la consola de servidor1, escribir mensajes para ver que son recibidos.
Consumo de mensajes con el consumidor
En servidor2, consumir los mensajes:
[usuario@servidor2 kafka]$ bin/kafka-console-consumer.sh --zookeeper servidor1:2181 --from-beginning --topic topic_ejemplo