Persistencia de mensajes MQTT en OpenTSDB mediante EMQ X

OpenTSDB es una base de datos de series temporales (TSDB) distribuida y escalable que utiliza HBase como capa de almacenamiento subyacente. Al aprovechar las características de almacenamiento en columnas de HBase, OpenTSDB es capaz de procesar millones de operaciones de lectura y escritura por segundo, lo que la convierte en una solución robusta para entornos industriales.

En el ecosistema del Internet de las Cosas (IoT), el volumen de datos generado por sensores y registros de transacciones crece de manera exponencial. Las bases de datos de series temporales están diseñadas específicamente para manejar esta carga, ofreciendo altas tasas de ingesta, consultas analíticas rápidas sobre grandes volúmenes de datos y algoritmos de compresión optimizados para este tipo de información.

Configuración del conector en EMQ X

Para integrar EMQ X con OpenTSDB, es necesario configurar el plugin de persistencia correspondiente. Si ha instalado EMQ X mediante paquetes RPM, el archivo de configuración se localiza generalmente en /etc/emqx/plugins/emqx_backend_opentsdb.conf. Este plugin está diseñado específicamente para la persistencia de mensajes publicados en el bróker.

A continuación, se detalla la configuración de la conexión, el tamaño del pool de procesos y la estrategia de escritura por lotes (batching):

## Dirección de acceso al servidor OpenTSDB
backend.opentsdb.pool_principal.server = 127.0.0.1:4242

## Tamaño del pool de conexiones
backend.opentsdb.pool_principal.pool_size = 12

## Tamaño máximo del lote para la operación 'put'
backend.opentsdb.pool_principal.max_batch_size = 50

## Regla de captura de mensajes mediante filtro de topic
backend.opentsdb.hook.message.publish.1 = {"topic": "sensores/#", "action": {"function": "on_message_publish"}, "pool": "pool_principal"}

Para activar el backend y aplicar los parámetros definidos, se debe ejecutar el siguiente comando desde la terminal:

emqx_ctl plugins load emqx_backend_opentsdb

Definición de plantillas de datos

Dado que los mensajes MQTT poseen una estructura libre y OpenTSDB requiere un formato de punto de datos (DataPoint) específico, el back end utiliza un sistema de plantillas basado en archivos .tmpl. Esto permite transformar el payload JSON del mensaje en una estructura compatible.

El archivo de plantilla se ubica en data/templates/emqx_backend_opentsdb_example.tmpl. A continuación se muestra un ejemplo de cómo estructurar la transformación para datos de telemetría:

{
    "telemetria_industrial": {
        "measurement": "$topic",
        "tags": {
            "sensor_id": ["$payload", "metricas", "$0", "id"],
            "zona": ["$payload", "metricas", "$0", "area"],
            "nivel_qos": "$qos",
            "cliente_id": "$from"
        },
        "value": ["$payload", "metricas", "$0", "lectura"],
        "timestamp": "$timestamp"
    }
}

En este esquema, los campos measurement y value son obligatorios. El sistema permite el uso de marcadores de posición para extraer información dinámica:

  • $topic: El tópico donde se publicó el mensaje.
  • $qos: El nivel de calidad de servicio.
  • $from: El identificador del cliente que envía el mensaje.
  • $payload.*: Permite acceder a objetos anidados dentro de un JSON. Por ejemplo, ["$payload", "metricas", "id"] accedería al valor de la clave 'id' dentro del objeto 'metricas'.

Ejemplo de flujo de datos

Supongamos que un dispositivo publica un mensaje en el tópico sensores/clima con el siguiente cuerpo JSON:

{
  "metricas": [
    {
      "lectura": 24.5,
      "id": "termometro_01",
      "area": "almacen_norte"
    },
    {
      "lectura": 26.2,
      "id": "termometro_02",
      "area": "almacen_sur"
    }
  ]
}

Tras procesar el mensaje con la plantilla anterior, el backend generará los siguientes puntos de datos para OpenTSDB:

[
  {
    "measurement": "sensores/clima",
    "tags": {
      "cliente_id": "dispositivo_iot_abc",
      "sensor_id": "termometro_01",
      "nivel_qos": "1",
      "zona": "almacen_norte"
    },
    "value": "24.5",
    "timestamp": "1672531200000"
  },
  {
    "measurement": "sensores/clima",
    "tags": {
      "cliente_id": "dispositivo_iot_abc",
      "sensor_id": "termometro_02",
      "nivel_qos": "1",
      "zona": "almacen_sur"
    },
    "value": "26.2",
    "timestamp": "1672531200000"
  }
]

Para verificar la integración, puede utilizar la herramienta WebSocket dentro del panel de control de EMQ X para publicar mensajes que coincidan con el filtro de tópicos configurado y observar la persistencia inmediata en la instancia de OpenTSDB.

Etiquetas: EMQ X OpenTSDB MQTT IoT HBase

Publicado el 6-24 18:00