Caso de Desarrollo para AIGlasses para Navegación: Implementación de Monitoreo Remoto mediante Mini Programa de WeChat

Caso de Desarrollo para AIGlasses para Navegación: Implementación de Monitoreo Remoto mediante Mini Programa de WeChat

  1. Introducción: De Local a Remoto, Desbloqueando Más Posibilidades para las Gafas Inteligentes

Si estás utilizando o desarrollando AIGlasses para Navegación, ya has experimentado sus potentes capacidades de navegación e interacción locales. Ya sea para reconocimiento de aceras para invidentes, detección de semáforos o búsqueda de objetos, este sistema puede ejecutarse de manera estable en un servidor local a través de una interfaz web que proporciona retroalimentación intuitiva.

Pero imagina este escenario: un familiar de un usuario con discapacidad visual quiere conocer en todo momento el estado de seguridad del usuario cuando está fuera; o un equipo de servicios comunitarios necesita monitorear simultáneamente el estado de funcionamiento de múltiples dispositivos. En estos casos, depender únicamente de la interfaz web local resulta poco conveniente.

Este es precisamente el tema que exploraremos hoy: cómo extender las capacidades de monitoreo de estado de AIGlasses para Navegación desde el servidor local hasta un Mini Programa de WeChat. A través de este caso, aprenderás cómo agregar un "tablero de control remoto" a este sistema de gafas inteligentes, permitiendo que información clave esté al alcance de la mano.

En este artículo, te guiaremos paso a paso para implementar esta función, desde el diseño de la arquitectura hasta la implementación del código, para que finalmente puedas tener un Mini Programa de WeChat que puede verificar el estado del dispositivo en cualquier momento y recibir alertas importantes. Todo el proceso no requiere modificaciones de hardware complejas, solo necesita agregar algunos "códigos puente" sobre el sistema existente.

  1. ¿Por qué se necesita el monitoreo remoto?

Antes de profundizar en los detalles técnicos, analicemos los problemas prácticos que puede resolver el monitoreo remoto de estado.

2.1 Escenarios de Aplicación Práctica

Escenario uno: Modo de Cuidado Familiar La abuela Zhang tiene problemas de visión, pero le gusta pasear por el complejo residencial cada tarde. Su hija, Xiao Wang, trabaja en otra ciudad y siempre se preocupa por la seguridad de su madre. A través del Mini Programa de WeChat, Xiao Wang puede verificar en cualquier momento:

  • Si su madre está utilizando las gafas para navegación
  • El entorno de ubicación actual (a través de los tipos de escena detectados por las gafas)
  • Si la batería del dispositivo tiene suficiente carga
  • Cuándo encontró un obstáculo por última vez

Escenario dos: Gestión de Servicios Comunitarios Un centro comunitario que sirve a personas con discapacidad visual gestiona 20 dispositivos AIGlasses. El administrador necesita:

  • Verificar en lote el estado en línea de todos los dispositivos
  • Localizar rápidamente los dispositivos con fallos
  • Estadísticas de frecuencia de uso y duración del servicio
  • Recibir notificaciones oportunas cuando los dispositivos presenten anomalías

Escenario tres: Depuración y Mantenimiento por Desarrolladores Como desarrollador, es posible que necesites:

  • Verificar los registros de ejecución del servicio de forma remota
  • Monitorear las llamadas a la API y los límites restantes
  • Diagnosticar rápidamente cuando los usuarios encuentran problemas
  • Recopilar datos de uso para optimizar los algoritmos

2.2 Valor Técnico

Desde una perspectiva técnica, el monitoreo remoto aporta varias ventajas clave:

  1. Tiempo real: los cambios de estado pueden enviarse inmediatamente al Mini Programa, sin necesidad de que el usuario consulte activamente
  2. Conveniencia: los usuarios no necesitan abrir una computadora o recordar la IP del servidor, pueden verlo directamente en WeChat
  3. Escalabilidad: un sistema de monitoreo puede servir a múltiples dispositivos, facilitando la implementación a gran escala
  4. Persistencia de datos: el estado histórico puede almacenarse para análisis e informes
  5. Diseño de Arquitectura General

Para implementar el monitoreo remoto, necesitamos agregar varios componentes al sistema existente. A continuación se muestra el diagrama de arquitectura general:

Sistema Existente (AIGlasses para Navegación)       Nuevos Componentes                     Mini Programa de WeChat
┌─────────────────────┐           ┌─────────────────────┐        ┌─────────────────────┐
│                     │           │                     │        │                     │
│  ┌─────────────┐   │           │  ┌─────────────┐   │        │ Página de Estado   │
│  │ Programa    │   │  HTTP     │  │ Servicio de │   │  WebSocket │ Tarjetas de Estado  │
│  │ Principal   │───┼───────────┼─▶│ Recolección  │───┼─────────┼▶ en Tiempo Real     │
│  │ app_main.py │   │           │  │ de Estado   │   │         │ Lista de Historial │
│  └─────────────┘   │           │  └─────────────┘   │        │ Centro de Alertas   │
│                     │           │                     │        └─────────────────────┘
│  ┌─────────────┐   │           │  ┌─────────────┐   │
│  │ Interfaz    │   │           │  │ API de      │   │
│  │ Web         │   │           │  │ Datos       │   │
│  │ index.html  │───┘           │  │ data_api.py │   │
│  └─────────────┘               │  └─────────────┘   │
│                                 │                     │
│  ┌─────────────┐               │  ┌─────────────┐   │
│  │ Archivos de │               │  │ Base de     │   │
│  │ Modelos     │               │  │ Datos       │   │
│  │ model/      │               │  │ (SQLite)    │   │
│  └─────────────┘               │  └─────────────┘   │
└─────────────────────┘           └─────────────────────┘


3.1 Descripción de Responsabilidades de los Componentes

Servicio de Recolección de Estado (recolector_estado.py) Este es el "ojo" de todo el sistema, responsable de:

  • Obtener periódicamente datos de estado del programa principal de AIGlasses
  • Procesar y analizar los datos brutos
  • Almacenar el estado procesado en la base de datos
  • Detectar estados anómalos y activar alertas

Servicio de API de Datos (api_datos.py) Esta es la "boca" del sistema, responsable de:

  • Proporcionar interfaces RESTful para que el Mini Programa las llame
  • Soportar WebSocket para implementar empuje en tiempo real
  • Manejar autenticación y control de permisos de usuario
  • Devolver datos de estado formateados

Base de Datos (SQLite) Esta es la "memoria" del sistema, responsable de:

  • Almacenar registros históricos de estado
  • Almacenar información de configuración del dispositivo
  • Almacenar datos de usuarios y permisos
  • Soportar consultas y análisis rápidos

Mini Programa de WeChat Este es el "control remoto" del usuario, responsable de:

  • Mostrar información de estado en tiempo real
  • Mostrar registros históricos de estado
  • Recibir y mostrar notificaciones de alertas
  • Proporcionar funciones de control simples
  1. Implementación del Servicio de Recolección de Estado

El servicio de recolección de estado es el puente entre el sistema AIGlasses existente y el sistema de monitoreo. Primero, necesitamos entender qué información de estado proporciona AIGlasses.

4.1 Interfaz de Estado del Sistema Existente

Al revisar el código de AIGlasses para Navegación, descubrimos que el sistema ya muestra mucha información de estado a través de la interfaz web. Necesitamos "extraer" esta información.

Primero, creemos un script de recolección de estado:

# recolector_estado.py
import json
import time
import sqlite3
import requests
from datetime import datetime
import logging
from typing import Dict, Any, Optional

# Configuración de registro
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class RecolectorEstado:
    def __init__(self, url_base: str = "http://localhost:8081"):
        """
        Inicializar el recolector de estado
        
        Args:
            url_base: Dirección del servicio web de AIGlasses
        """
        self.url_base = url_base
        self.ruta_db = "estado_aiglasses.db"
        self.inicializar_base_datos()
        
    def inicializar_base_datos(self):
        """Inicializar la estructura de tablas de la base de datos"""
        conn = sqlite3.connect(self.ruta_db)
        cursor = conn.cursor()
        
        # Crear tabla de registros de estado
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS estado_dispositivo (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            id_dispositivo TEXT,
            estado_servicio TEXT,
            api_configurada INTEGER,
            modelos_cargados TEXT,
            archivos_audio INTEGER,
            camara_conectada INTEGER,
            modo_actual TEXT,
            fps REAL,
            uso_cpu REAL,
            uso_memoria REAL,
            ultimo_obstaculo DATETIME,
            ultima_navegacion TEXT,
            nivel_bateria INTEGER
        )
        ''')
        
        # Crear tabla de alertas
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS alertas (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            id_dispositivo TEXT,
            tipo_alerta TEXT,
            nivel_alerta TEXT,
            mensaje TEXT,
            resuelto INTEGER DEFAULT 0
        )
        ''')
        
        # Crear tabla de información de dispositivos
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS dispositivos (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            id_dispositivo TEXT UNIQUE,
            nombre_dispositivo TEXT,
            nombre_propietario TEXT,
            ubicacion TEXT,
            ultima_actividad DATETIME,
            activo INTEGER DEFAULT 1
        )
        ''')
        
        conn.commit()
        conn.close()
        logger.info("Inicialización de base de datos completada")
    
    def recolectar_estado(self) -> Optional[Dict[str, Any]]:
        """
        Recopilar información de estado del sistema AIGlasses
        
        Returns:
            Diccionario de datos de estado, si falla la recolección devuelve None
        """
        try:
            # Intentar obtener información de estado de la interfaz web
            # Aquí se necesita ajustar según la estructura real de la página web
            datos_estado = {
                "timestamp": datetime.now().isoformat(),
                "id_dispositivo": self.obtener_id_dispositivo(),
                "estado_servicio": self.verificar_estado_servicio(),
                "api_configurada": self.verificar_configuracion_api(),
                "modelos_cargados": self.obtener_modelos_cargados(),
                "archivos_audio": self.contar_archivos_audio(),
                "camara_conectada": self.verificar_conexion_camara(),
                "modo_actual": self.obtener_modo_actual(),
                "fps": self.obtener_fps(),
                "metricas_sistema": self.obtener_metricas_sistema(),
                "estado_navegacion": self.obtener_estado_navegacion()
            }
            
            # Guardar en la base de datos
            self.guardar_en_base_datos(datos_estado)
            
            # Verificar estados anómalos
            self.verificar_alertas(datos_estado)
            
            logger.info(f"Recolección de estado exitosa: {datos_estado['id_dispositivo']}")
            return datos_estado
            
        except Exception as e:
            logger.error(f"Fallo en la recolección de estado: {str(e)}")
            return None
    
    def verificar_estado_servicio(self) -> str:
        """Verificar el estado de ejecución del servicio principal"""
        try:
            # Verificar estado del servicio a través de Supervisor
            import subprocess
            resultado = subprocess.run(
                ["supervisorctl", "status", "aiglasses"],
                capture_output=True,
                text=True
            )
            if "RUNNING" in resultado.stdout:
                return "ejecutando"
            else:
                return "detenido"
        except:
            return "desconocido"
    
    def verificar_configuracion_api(self) -> bool:
        """Verificar el estado de configuración de la API"""
        try:
            # Leer archivo de configuración de API
            import os
            archivo_clave_api = "/root/AIGlasses_para_Navegacion/.api_key.json"
            if os.path.exists(archivo_clave_api):
                with open(archivo_clave_api, 'r') as f:
                    config = json.load(f)
                    return bool(config.get("api_key"))
            return False
        except:
            return False
    
    def obtener_modelos_cargados(self) -> str:
        """Obtener la lista de modelos cargados"""
        # Aquí se necesita obtener según la lógica real de carga de modelos
        # Versión simplificada: verificar si existen archivos de modelo
        modelos = []
        archivos_modelo = [
            "yolo-seg.pt",
            "yoloe-11l-seg.pt", 
            "shoppingbest5.pt",
            "trafficlight.pt",
            "hand_landmarker.task"
        ]
        
        for modelo in archivos_modelo:
            ruta_modelo = f"/root/AIGlasses_para_Navegacion/modelo/{modelo}"
            if os.path.exists(ruta_modelo):
                modelos.append(modelo.split('.')[0])
        
        return ",".join(modelos) if modelos else "ninguno"
    
    def guardar_en_base_datos(self, datos_estado: Dict[str, Any]):
        """Guardar datos de estado en la base de datos"""
        conn = sqlite3.connect(self.ruta_db)
        cursor = conn.cursor()
        
        cursor.execute('''
        INSERT INTO estado_dispositivo 
        (id_dispositivo, estado_servicio, api_configurada, modelos_cargados, 
         archivos_audio, camara_conectada, modo_actual, fps, 
         uso_cpu, uso_memoria, ultimo_obstaculo, ultima_navegacion, nivel_bateria)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            datos_estado.get("id_dispositivo", "predeterminado"),
            datos_estado.get("estado_servicio", "desconocido"),
            1 if datos_estado.get("api_configurada") else 0,
            datos_estado.get("modelos_cargados", ""),
            datos_estado.get("archivos_audio", 0),
            1 if datos_estado.get("camara_conectada") else 0,
            datos_estado.get("modo_actual", "inactivo"),
            datos_estado.get("fps", 0.0),
            datos_estado.get("metricas_sistema", {}).get("uso_cpu", 0.0),
            datos_estado.get("metricas_sistema", {}).get("uso_memoria", 0.0),
            datos_estado.get("estado_navegacion", {}).get("ultimo_obstaculo"),
            datos_estado.get("estado_navegacion", {}).get("ultima_comando"),
            datos_estado.get("nivel_bateria", 100)
        ))
        
        conn.commit()
        conn.close()
    
    def verificar_alertas(self, datos_estado: Dict[str, Any]):
        """Verificar estados anómalos y registrar alertas"""
        alertas = []
        
        # Verificar estado del servicio
        if datos_estado.get("estado_servicio") != "ejecutando":
            alertas.append({
                "tipo": "servicio_detenido",
                "nivel": "critico",
                "mensaje": "El servicio principal se ha detenido"
            })
        
        # Verificar configuración de API
        if not datos_estado.get("api_configurada"):
            alertas.append({
                "tipo": "api_no_configurada", 
                "nivel": "advertencia",
                "mensaje": "La clave API no está configurada, la función de voz no está disponible"
            })
        
        # Verificar carga de modelos
        modelos = datos_estado.get("modelos_cargados", "")
        if modelos == "ninguno" or not modelos:
            alertas.append({
                "tipo": "sin_modelos_cargados",
                "nivel": "critico",
                "mensaje": "No se ha cargado ningún modelo de IA"
            })
        
        # Verificar conexión de cámara
        if not datos_estado.get("camara_conectada"):
            alertas.append({
                "tipo": "camara_desconectada",
                "nivel": "advertencia", 
                "mensaje": "La cámara no está conectada, las funciones visuales están limitadas"
            })
        
        # Guardar alertas en la base de datos
        if alertas:
            self.guardar_alertas(alertas, datos_estado.get("id_dispositivo", "predeterminado"))
    
    def guardar_alertas(self, alertas: list, id_dispositivo: str):
        """Guardar alertas en la base de datos"""
        conn = sqlite3.connect(self.ruta_db)
        cursor = conn.cursor()
        
        for alerta in alertas:
            cursor.execute('''
            INSERT INTO alertas (id_dispositivo, tipo_alerta, nivel_alerta, mensaje)
            VALUES (?, ?, ?, ?)
            ''', (id_dispositivo, alerta["tipo"], alerta["nivel"], alerta["mensaje"]))
        
        conn.commit()
        conn.close()
        logger.warning(f"Registro de alertas: {len(alertas)} alertas")
    
    def iniciar_recoleccion(self, intervalo: int = 30):
        """
        Comenzar a recolectar estado periódicamente
        
        Args:
            intervalo: Intervalo de recolección (segundos)
        """
        logger.info(f"Iniciando recolección de estado, intervalo: {intervalo} segundos")
        
        while True:
            try:
                self.recolectar_estado()
                time.sleep(intervalo)
            except KeyboardInterrupt:
                logger.info("Señal de detención recibida, saliendo del bucle de recolección")
                break
            except Exception as e:
                logger.error(f"Error en el bucle de recolección: {str(e)}")
                time.sleep(intervalo)

if __name__ == "__main__":
    # Ejemplo de uso
    recolector = RecolectorEstado()
    
    # Prueba de recolección única
    estado = recolector.recolectar_estado()
    if estado:
        print("Estado actual:")
        print(json.dumps(estado, indent=2, ensure_ascii=False))
    
    # Comenzar recolección periódica (en implementación real, esto debería ejecutarse como servicio)
    # recolector.iniciar_recoleccion(intervalo=30)

Este servicio de recolección de estado verificará periódicamente los indicadores clave del sistema y guardará los datos en una base de datos SQLite. También detectará estados anómalos y generará alertas.

4.2 Integración con el Sistema Existente

Para que el servicio de recolección de estado pueda obtener información más detallada, necesitamos hacer algunas modificaciones menores al sistema AIGlasses existente.

Primero, agreguemos una interfaz de exportación de estado en app_principal.py:

# Agregar el siguiente código en app_principal.py

@app.route('/api/estado', methods=['GET'])
def obtener_estado_sistema():
    """Interfaz para obtener el estado del sistema"""
    estado = {
        "servicio": "ejecutando",
        "modelos": {
            "acera_invidente": cargador_modelos.obtener_estado_modelo("acera_invidente"),
            "semaforo": cargador_modelos.obtener_estado_modelo("semaforo"),
            "deteccion_objetos": cargador_modelos.obtener_estado_modelo("deteccion_objetos")
        },
        "hardware": {
            "camara": gestor_camara.obtener_estado_camara(),
            "audio_entrada": gestor_audio.obtener_estado_entrada(),
            "audio_salida": gestor_audio.obtener_estado_salida()
        },
        "navegacion": {
            "modo_actual": controlador_navegacion.obtener_modo_actual(),
            "ultima_comando": controlador_navegacion.obtener_ultimo_comando(),
            "ultimo_obstaculo": controlador_navegacion.obtener_ultimo_obstaculo()
        },
        "rendimiento": {
            "fps": monitor_rendimiento.obtener_fps_actual(),
            "uso_cpu": monitor_rendimiento.obtener_uso_cpu(),
            "uso_memoria": monitor_rendimiento.obtener_uso_memoria()
        },
        "timestamp": datetime.now().isoformat()
    }
    return jsonify(estado)

Luego, necesitamos crear un simple módulo de monitoreo de rendimiento:

# monitor_rendimiento.py
import psutil
import time
from collections import deque

class MonitorRendimiento:
    def __init__(self, tamano_ventana=60):
        self.historial_fps = deque(maxlen=tamano_ventana)
        self.ultimo_tiempo_fotograma = time.time()
        
    def actualizar_fotograma(self):
        """Actualizar cálculo de FPS"""
        tiempo_actual = time.time()
        intervalo_fotograma = tiempo_actual - self.ultimo_tiempo_fotograma
        if intervalo_fotograma > 0:
            fps = 1.0 / intervalo_fotograma
            self.historial_fps.append(fps)
        self.ultimo_tiempo_fotograma = tiempo_actual
    
    def obtener_fps_actual(self):
        """Obtener FPS actual"""
        if len(self.historial_fps) == 0:
            return 0.0
        return sum(self.historial_fps) / len(self.historial_fps)
    
    def obtener_uso_cpu(self):
        """Obtener uso de CPU"""
        return psutil.cpu_percent(interval=0.1)
    
    def obtener_uso_memoria(self):
        """Obtener uso de memoria"""
        memoria = psutil.virtual_memory()
        return memoria.percent

  1. Implementación del Servicio de API de Datos

Con los datos de estado, ahora necesitamos crear un servicio API para que el Mini Programa de WeChat pueda acceder a estos datos.

5.1 Diseño de RESTful API

Usamos Flask para crear un servicio API simple:

# api_datos.py
from flask import Flask, jsonify, request
from flask_cors import CORS
from flask_socketio import SocketIO, emit
import sqlite3
from datetime import datetime, timedelta
import json
import logging

app = Flask(__name__)
CORS(app)  # Permitir solicitudes跨域
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuración de registro
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def obtener_conexion_db():
    """Obtener conexión a la base de datos"""
    conn = sqlite3.connect('estado_aiglasses.db')
    conn.row_factory = sqlite3.Row  # Devolver resultados en formato de diccionario
    return conn

@app.route('/api/estado/actual', methods=['GET'])
def obtener_estado_actual():
    """Obtener el estado actual del dispositivo"""
    id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
    
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    # Obtener estado más reciente
    cursor.execute('''
    SELECT * FROM estado_dispositivo 
    WHERE id_dispositivo = ? 
    ORDER BY timestamp DESC 
    LIMIT 1
    ''', (id_dispositivo,))
    
    fila = cursor.fetchone()
    conn.close()
    
    if fila:
        estado = dict(fila)
        # Convertir tipos de datos
        estado['api_configurada'] = bool(estado['api_configurada'])
        estado['camara_conectada'] = bool(estado['camara_conectada'])
        return jsonify({"exito": True, "datos": estado})
    else:
        return jsonify({"exito": False, "mensaje": "No se encontró el estado del dispositivo"}), 404

@app.route('/api/estado/historial', methods=['GET'])
def obtener_historial_estado():
    """Obtener historial de estado del dispositivo"""
    id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
    horas = int(request.args.get('horas', 24))  # Consultar por defecto las últimas 24 horas
    
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    # Calcular rango de tiempo
    tiempo_inicio = datetime.now() - timedelta(hours=horas)
    
    cursor.execute('''
    SELECT 
        timestamp,
        estado_servicio,
        api_configurada,
        camara_conectada,
        modo_actual,
        fps,
        uso_cpu,
        uso_memoria
    FROM estado_dispositivo 
    WHERE id_dispositivo = ? AND timestamp >= ?
    ORDER BY timestamp ASC
    ''', (id_dispositivo, tiempo_inicio.isoformat()))
    
    filas = cursor.fetchall()
    conn.close()
    
    historial = [dict(fila) for fila in filas]
    return jsonify({"exito": True, "datos": historial})

@app.route('/api/alertas', methods=['GET'])
def obtener_alertas():
    """Obtener información de alertas"""
    id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
    solo_no_resueltas = request.args.get('solo_no_resueltas', 'true').lower() == 'true'
    
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    if solo_no_resueltas:
        cursor.execute('''
        SELECT * FROM alertas 
        WHERE id_dispositivo = ? AND resuelto = 0
        ORDER BY timestamp DESC
        ''', (id_dispositivo,))
    else:
        cursor.execute('''
        SELECT * FROM alertas 
        WHERE id_dispositivo = ?
        ORDER BY timestamp DESC
        LIMIT 50
        ''', (id_dispositivo,))
    
    filas = cursor.fetchall()
    conn.close()
    
    alertas = [dict(fila) for fila in filas]
    return jsonify({"exito": True, "datos": alertas})

@app.route('/api/alertas/<int:id_alerta>/resolver', methods=['POST'])
def resolver_alerta(id_alerta):
    """Marcar alerta como resuelta"""
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    cursor.execute('''
    UPDATE alertas SET resuelto = 1 
    WHERE id = ?
    ''', (id_alerta,))
    
    conn.commit()
    afectadas = cursor.rowcount
    conn.close()
    
    if afectadas > 0:
        # Notificar a todos los clientes conectados a través de WebSocket
        socketio.emit('alerta_resuelta', {'id_alerta': id_alerta})
        return jsonify({"exito": True, "mensaje": "La alerta se ha marcado como resuelta"})
    else:
        return jsonify({"exito": False, "mensaje": "No se encontró la alerta especificada"}), 404

@app.route('/api/dispositivos', methods=['GET'])
def obtener_dispositivos():
    """Obtener lista de todos los dispositivos"""
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    cursor.execute('''
    SELECT 
        d.*,
        s.estado_servicio as ultimo_estado,
        s.timestamp as ultima_actividad
    FROM dispositivos d
    LEFT JOIN estado_dispositivo s ON d.id_dispositivo = s.id_dispositivo
    WHERE s.timestamp = (
        SELECT MAX(timestamp) 
        FROM estado_dispositivo 
        WHERE id_dispositivo = d.id_dispositivo
    )
    ORDER BY d.nombre_dispositivo
    ''')
    
    filas = cursor.fetchall()
    conn.close()
    
    dispositivos = []
    for fila in filas:
        dispositivo = dict(fila)
        # Calcular si el dispositivo está en línea (datos en los últimos 5 minutos)
        if dispositivo['ultima_actividad']:
            ultima_actividad = datetime.fromisoformat(dispositivo['ultima_actividad'])
            esta_en_linea = (datetime.now() - ultima_actividad).total_seconds() < 300
            dispositivo['esta_en_linea'] = esta_en_linea
        else:
            dispositivo['esta_en_linea'] = False
        
        dispositivos.append(dispositivo)
    
    return jsonify({"exito": True, "datos": dispositivos})

@app.route('/api/dispositivos', methods=['POST'])
def registrar_dispositivo():
    """Registrar nuevo dispositivo"""
    datos = request.json
    
    if not datos or 'id_dispositivo' not in datos:
        return jsonify({"exito": False, "mensaje": "Faltan parámetros requeridos"}), 400
    
    conn = obtener_conexion_db()
    cursor = conn.cursor()
    
    try:
        cursor.execute('''
        INSERT OR REPLACE INTO dispositivos 
        (id_dispositivo, nombre_dispositivo, nombre_propietario, ubicacion, ultima_actividad, activo)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            datos['id_dispositivo'],
            datos.get('nombre_dispositivo', 'Dispositivo sin nombre'),
            datos.get('nombre_propietario', ''),
            datos.get('ubicacion', ''),
            datetime.now().isoformat(),
            1
        ))
        
        conn.commit()
        id_dispositivo = datos['id_dispositivo']
        
        # Notificar registro de nuevo dispositivo a través de WebSocket
        socketio.emit('dispositivo_registrado', {
            'id_dispositivo': id_dispositivo,
            'nombre_dispositivo': datos.get('nombre_dispositivo', 'Dispositivo sin nombre')
        })
        
        return jsonify({
            "exito": True, 
            "mensaje": "Dispositivo registrado exitosamente",
            "id_dispositivo": id_dispositivo
        })
        
    except Exception as e:
        conn.rollback()
        logger.error(f"Fallo en el registro del dispositivo: {str(e)}")
        return jsonify({"exito": False, "mensaje": f"Registro fallido: {str(e)}"}), 500
    finally:
        conn.close()

# Manejadores de eventos WebSocket
@socketio.on('conectar')
def manejar_conexion():
    """Evento de conexión del cliente"""
    logger.info(f"Cliente conectado: {request.sid}")
    emit('conectado', {'mensaje': 'Conexión exitosa'})

@socketio.on('suscribir_dispositivo')
def manejar_suscripcion_dispositivo(datos):
    """Suscripción a actualizaciones de estado del dispositivo"""
    id_dispositivo = datos.get('id_dispositivo')
    if id_dispositivo:
        logger.info(f"Cliente suscrito al dispositivo: {id_dispositivo}")
        # Agregar el cliente a la sala correspondiente al dispositivo
        socketio.server.enter_room(request.sid, f"dispositivo_{id_dispositivo}")
        emit('suscrito', {'id_dispositivo': id_dispositivo})

@socketio.on('desconectar')
def manejar_desconexion():
    """Evento de desconexión del cliente"""
    logger.info(f"Cliente desconectado: {request.sid}")

# Funciones de empuje de estado (llamadas por el servicio de recolección de estado)
def empujar_actualizacion_estado(id_dispositivo, datos_estado):
    """Empujar actualización de estado a todos los clientes suscritos al dispositivo"""
    socketio.emit('actualizacion_estado', {
        'id_dispositivo': id_dispositivo,
        'estado': datos_estado,
        'timestamp': datetime.now().isoformat()
    }, room=f"dispositivo_{id_dispositivo}")

def empujar_nueva_alerta(id_dispositivo, datos_alerta):
    """Empujar nueva alerta a todos los clientes suscritos al dispositivo"""
    socketio.emit('nueva_alerta', {
        'id_dispositivo': id_dispositivo,
        'alerta': datos_alerta,
        'timestamp': datetime.now().isoformat()
    }, room=f"dispositivo_{id_dispositivo}")

if __name__ == '__main__':
    # Iniciar servicio API
    logger.info("Iniciando servicio de datos API...")
    socketio.run(app, host='0.0.0.0', port=8082, debug=True)

Este servicio API proporciona las siguientes interfaces clave:

  1. Obtener estado actual (/api/estado/actual) - Obtener el estado más reciente del dispositivo
  2. Obtener historial (/api/estado/historial) - Obtener el historial de estado en un período de tiempo específico
  3. Obtener alertas (/api/alertas) - Obtener registros de alertas del dispositivo
  4. Gestión de dispositivos (/api/dispositivos) - Registrar y consultar información de dispositivos
  5. Interfaz WebSocket - Empujar actualizaciones de estado y alertas en tiempo real

5.2 Integración de Empuje de Estado

Necesitamos modificar el servicio de recolección de estado para llamar a las funciones de empuje del servicio API cuando se recopile un nuevo estado:

# En recolector_estado.py, agregar

class RecolectorEstado:
    # ... código anterior sin cambios ...
    
    def __init__(self, url_base: str = "http://localhost:8081", 
                 url_api: str = "http://localhost:8082"):
        self.url_base = url_base
        self.url_api = url_api  # Nuevo: dirección del servicio API
        self.ruta_db = "estado_aiglasses.db"
        self.inicializar_base_datos()
    
    def recolectar_estado(self) -> Optional[Dict[str, Any]]:
        """Recopilar información de estado del sistema AIGlasses"""
        try:
            # ... código de recolección de estado sin cambios ...
            
            # Guardar en la base de datos
            self.guardar_en_base_datos(datos_estado)
            
            # Verificar estados anómalos
            nuevas_alertas = self.verificar_alertas(datos_estado)
            
            # Empujar al servicio API (si está configurado)
            if self.url_api:
                self.empujar_a_api(datos_estado, nuevas_alertas)
            
            logger.info(f"Recolección de estado exitosa: {datos_estado['id_dispositivo']}")
            return datos_estado
            
        except Exception as e:
            logger.error(f"Fallo en la recolección de estado: {str(e)}")
            return None
    
    def empujar_a_api(self, datos_estado: Dict[str, Any], nuevas_alertas: list):
        """Empujar estado al servicio API"""
        try:
            # Empujar actualización de estado
            url_empuje = f"{self.url_api}/empujar/estado"
            respuesta = requests.post(url_empuje, json=datos_estado, timeout=5)
            
            if respuesta.status_code == 200:
                logger.debug("Empuje de estado exitoso")
            else:
                logger.warning(f"Fallo en el empuje de estado: {respuesta.status_code}")
            
            # Empujar nuevas alertas
            if nuevas_alertas:
                for alerta in nuevas_alertas:
                    url_alerta = f"{self.url_api}/empujar/alerta"
                    datos_alerta = {
                        "id_dispositivo": datos_estado.get("id_dispositivo"),
                        **alerta
                    }
                    requests.post(url_alerta, json=datos_alerta, timeout=5)
                    
        except requests.exceptions.RequestException as e:
            logger.warning(f"Fallo en el empuje de API: {str(e)}")

  1. Desarrollo del Mini Programa de WeChat

Ahora desarrollemos la parte frontal del Mini Programa de WeChat. El Mini Programa proporcionará las siguientes funciones:

  • Visualización en tiempo real del estado del dispositivo
  • Gráficos de historial de estado
  • Centro de notificaciones de alertas
  • Interfaz de gestión de dispositivos

6.1 Estructura de Páginas del Mini Programa

Estructura del directorio del Mini Programa:
pages/
├── index/                    # Página de inicio - Resumen de dispositivos
│   ├── index.js
│   ├── index.json
│   ├── index.wxml
│   └── index.wxss
├── dispositivo/                   # Página de detalles del dispositivo
│   ├── dispositivo.js
│   ├── dispositivo.json
│   ├── dispositivo.wxml
│   └── dispositivo.wxss
├── alertas/                   # Centro de alertas
│   ├── alertas.js
│   ├── alertas.json
│   ├── alertas.wxml
│   └── alertas.wxss
└── configuracion/                 # Página de configuración
    ├── configuracion.js
    ├── configuracion.json
    ├── configuracion.wxml
    └── configuracion.wxss


6.2 Implementación de la Página de Inicio (Resumen de Dispositivos)

<!-- pages/index/index.wxml -->
<view class="container">
  <!-- Barra de estado superior -->
  <view class="barra-estado">
    <view class="item-estado">
      <text class="etiqueta-estado">Dispositivos en línea</text>
      <text class="valor-estado">{{conteoEnLinea}}/{{totalDispositivos}}</text>
    </view>
    <view class="item-estado">
      <text class="etiqueta-estado">Alertas activas</text>
      <text class="valor-estado alerta">{{alertasActivas}}</text>
    </view>
  </view>

  <!-- Lista de dispositivos -->
  <scroll-view class="lista-dispositivos" scroll-y>
    <block wx:for="{{dispositivos}}" wx:key="id_dispositivo">
      <view class="tarjeta-dispositivo" bindtap="navegarADetalle" data-id="{{item.id_dispositivo}}">
        <view class="encabezado-dispositivo">
          <view class="info-dispositivo">
            <text class="nombre-dispositivo">{{item.nombre_dispositivo}}</text>
            <text class="propietario-dispositivo">{{item.nombre_propietario}}</text>
          </view>
          <view class="estado-dispositivo">
            <view class="indicador-estado {{item.esta_en_linea ? 'en-linea' : 'fuera-linea'}}"></view>
            <text class="texto-estado">{{item.esta_en_linea ? 'En línea' : 'Fuera de línea'}}</text>
          </view>
        </view>
        
        <view class="detalles-dispositivo">
          <view class="item-detalle">
            <text class="etiqueta-detalle">Última actividad</text>
            <text class="valor-detalle">{{formatearTiempo(item.ultima_actividad)}}</text>
          </view>
          <view class="item-detalle">
            <text class="etiqueta-detalle">Modo de operación</text>
            <text class="valor-detalle">{{item.ultimo_estado || 'Desconocido'}}</text>
          </view>
          <view class="item-detalle">
            <text class="etiqueta-detalle">Ubicación</text>
            <text class="valor-detalle">{{item.ubicacion || 'No configurada'}}</text>
          </view>
        </view>
        
        <!-- Mostrar indicador de alerta si hay alertas -->
        <block wx:if="{{item.conteo_alertas > 0}}">
          <view class="indicador-alerta">
            <text>{{item.conteo_alertas}} alertas</text>
          </view>
        </block>
      </view>
    </block>
  </scroll-view>

  <!-- Botón para agregar dispositivo -->
  <view class="btn-agregar-dispositivo" bindtap="mostrarModalAgregarDispositivo">
    <text>+ Agregar dispositivo</text>
  </view>
</view>

// pages/index/index.js
Page({
  data: {
    dispositivos: [],
    conteoEnLinea: 0,
    totalDispositivos: 0,
    alertasActivas: 0,
    socketConectado: false
  },

  onLoad() {
    this.cargarDispositivos();
    this.conectarWebSocket();
  },

  onShow() {
    // Refrescar datos al mostrar la página
    this.cargarDispositivos();
  },

  cargarDispositivos() {
    const that = this;
    
    // Llamar a la API para obtener la lista de dispositivos
    wx.request({
      url: 'http://tu-servidor-ip:8082/api/dispositivos',
      method: 'GET',
      success(res) {
        if (res.data.exito) {
          const dispositivos = res.data.datos;
          const conteoEnLinea = dispositivos.filter(d => d.esta_en_linea).length;
          const alertasActivas = dispositivos.reduce((sum, dispositivo) => sum + (dispositivo.conteo_alertas || 0), 0);
          
          that.setData({
            dispositivos: dispositivos,
            conteoEnLinea: conteoEnLinea,
            totalDispositivos: dispositivos.length,
            alertasActivas: alertasActivas
          });
        }
      },
      fail(err) {
        wx.showToast({
          title: 'Carga fallida',
          icon: 'error'
        });
      }
    });
  },

  conectarWebSocket() {
    const that = this;
    
    // Conectar WebSocket
    wx.connectSocket({
      url: 'ws://tu-servidor-ip:8082/socket.io/',
      success() {
        that.setData({ socketConectado: true });
      }
    });

    // Escuchar mensajes WebSocket
    wx.onSocketMessage((res) => {
      const datos = JSON.parse(res.data);
      
      switch(datos.tipo) {
        case 'actualizacion_estado':
          that.manejarActualizacionEstado(datos);
          break;
        case 'nueva_alerta':
          that.manejarNuevaAlerta(datos);
          break;
        case 'dispositivo_registrado':
          that.manejarDispositivoRegistrado(datos);
          break;
      }
    });

    // Escuchar cierre de conexión
    wx.onSocketClose(() => {
      that.setData({ socketConectado: false });
      // Intentar reconectar
      setTimeout(() => {
        that.conectarWebSocket();
      }, 3000);
    });
  },

  manejarActualizacionEstado(datos) {
    // Actualizar estado del dispositivo
    const dispositivos = this.data.dispositivos.map(dispositivo => {
      if (dispositivo.id_dispositivo === datos.id_dispositivo) {
        return {
          ...dispositivo,
          ultimo_estado: datos.estado.modo_actual,
          ultima_actividad: datos.timestamp,
          esta_en_linea: true
        };
      }
      return dispositivo;
    });
    
    this.setData({
      dispositivos: dispositivos,
      conteoEnLinea: dispositivos.filter(d => d.esta_en_linea).length
    });
  },

  manejarNuevaAlerta(datos) {
    // Mostrar notificación de nueva alerta
    wx.showModal({
      title: 'Nueva alerta',
      content: `Dispositivo ${datos.id_dispositivo}: ${datos.alerta.mensaje}`,
      showCancel: false
    });
    
    // Actualizar conteo de alertas
    this.setData({
      alertasActivas: this.data.alertasActivas + 1
    });
  },

  manejarDispositivoRegistrado(datos) {
    // Refrescar lista de dispositivos
    this.cargarDispositivos();
    
    wx.showToast({
      title: `Nuevo dispositivo ${datos.nombre_dispositivo} registrado`,
      icon: 'success'
    });
  },

  navegarADetalle(e) {
    const idDispositivo = e.currentTarget.dataset.id;
    wx.navigateTo({
      url: `/pages/dispositivo/dispositivo?id_dispositivo=${idDispositivo}`
    });
  },

  mostrarModalAgregarDispositivo() {
    wx.navigateTo({
      url: '/pages/configuracion/configuracion'
    });
  },

  formatearTiempo(timestamp) {
    if (!timestamp) return 'Nunca en línea';
    
    const fecha = new Date(timestamp);
    const ahora = new Date();
    const diff = ahora - fecha;
    
    if (diff < 60000) { // 1 minuto
      return 'Ahora mismo';
    } else if (diff < 3600000) { // 1 hora
      return `${Math.floor(diff / 60000)} minutos atrás`;
    } else if (diff < 86400000) { // 1 día
      return `${Math.floor(diff / 3600000)} horas atrás`;
    } else {
      return `${Math.floor(diff / 86400000)} días atrás`;
    }
  }
});

6.3 Implementación de la Página de Detalles del Dispositivo

<!-- pages/dispositivo/dispositivo.wxml -->
<view class="container">
  <!-- Información del encabezado del dispositivo -->
  <view class="encabezado-dispositivo">
    <view class="titulo-dispositivo">
      <text class="nombre-dispositivo">{{infoDispositivo.nombre_dispositivo}}</text>
      <view class="estado-dispositivo {{infoDispositivo.esta_en_linea ? 'en-linea' : 'fuera-linea'}}">
        <text>{{infoDispositivo.esta_en_linea ? 'En línea' : 'Fuera de línea'}}</text>
      </view>
    </view>
    <text class="id-dispositivo">ID: {{infoDispositivo.id_dispositivo}}</text>
  </view>

  <!-- Tarjetas de estado en tiempo real -->
  <view class="tarjetas-estado">
    <view class="tarjeta-estado">
      <text class="titulo-tarjeta">Estado del servicio</text>
      <text class="valor-tarjeta {{estadoActual.estado_servicio}}">
        {{estadoActual.estado_servicio === 'ejecutando' ? 'Ejecutando' : 'Detenido'}}
      </text>
    </view>
    
    <view class="tarjeta-estado">
      <text class="titulo-tarjeta">API configurada</text>
      <text class="valor-tarjeta {{estadoActual.api_configurada ? 'configurada' : 'no-configurada'}}">
        {{estadoActual.api_configurada ? 'Configurada' : 'No configurada'}}
      </text>
    </view>
    
    <view class="tarjeta-estado">
      <text class="titulo-tarjeta">Cámara</text>
      <text class="valor-tarjeta {{estadoActual.camara_conectada ? 'conectada' : 'desconectada'}}">
        {{estadoActual.camara_conectada ? 'Conectada' : 'Desconectada'}}
      </text>
    </view>
    
    <view class="tarjeta-estado">
      <text class="titulo-tarjeta">Modo actual</text>
      <text class="valor-tarjeta">{{estadoActual.modo_actual || 'Espera'}}</text>
    </view>
  </view>

  <!-- Métricas de rendimiento -->
  <view class="seccion-rendimiento">
    <text class="titulo-seccion">Métricas de rendimiento</text>
    <view class="cuadricula-rendimiento">
      <view class="item-metrica">
        <text class="etiqueta-metrica">FPS</text>
        <text class="valor-metrica">{{estadoActual.fps.toFixed(1)}}</text>
      </view>
      <view class="item-metrica">
        <text class="etiqueta-metrica">Uso de CPU</text>
        <text class="valor-metrica">{{estadoActual.uso_cpu.toFixed(1)}}%</text>
      </view>
      <view class="item-metrica">
        <text class="etiqueta-metrica">Uso de memoria</text>
        <text class="valor-metrica">{{estadoActual.uso_memoria.toFixed(1)}}%</text>
      </view>
      <view class="item-metrica">
        <text class="etiqueta-metrica">Nivel de batería</text>
        <text class="valor-metrica">{{estadoActual.nivel_bateria}}%</text>
      </view>
    </view>
  </view>

  <!-- Estado de modelos de IA -->
  <view class="seccion-modelos">
    <text class="titulo-seccion">Estado de modelos de IA</text>
    <view class="lista-modelos">
      <block wx:for="{{modelosCargados}}" wx:key="index">
        <view class="item-modelo">
          <text class="nombre-modelo">{{item}}</text>
          <view class="estado-modelo cargado"></view>
        </view>
      </block>
    </view>
  </view>

  <!-- Gráfico histórico -->
  <view class="seccion-grafico">
    <text class="titulo-seccion">Estado histórico (últimas 24 horas)</text>
    <canvas canvas-id="graficoEstado" class="canvas-grafico"></canvas>
  </view>

  <!-- Botones de acción -->
  <view class="botones-accion">
    <button class="btn btn-actualizar" bindtap="actualizarEstado">Actualizar estado</button>
    <button class="btn btn-historial" bindtap="verHistorial">Ver historial</button>
    <button class="btn btn-alerta" bindtap="verAlertas">Ver alertas</button>
  </view>
</view>

// pages/dispositivo/dispositivo.js
import * as echarts from '../../ec-canvas/echarts';

Page({
  data: {
    idDispositivo: '',
    infoDispositivo: {},
    estadoActual: {
      estado_servicio: 'desconocido',
      api_configurada: false,
      camara_conectada: false,
      modo_actual: 'inactivo',
      fps: 0,
      uso_cpu: 0,
      uso_memoria: 0,
      nivel_bateria: 100
    },
    modelosCargados: [],
    grafico: null
  },

  onLoad(options) {
    this.setData({
      idDispositivo: options.id_dispositivo || 'predeterminado'
    });
    
    this.cargarInfoDispositivo();
    this.cargarEstadoActual();
    this.cargarGraficoHistorial();
    this.suscribirADispositivo();
  },

  cargarInfoDispositivo() {
    const that = this;
    
    wx.request({
      url: `http://tu-servidor-ip:8082/api/dispositivos`,
      method: 'GET',
      success(res) {
        if (res.data.exito) {
          const dispositivo = res.data.datos.find(d => d.id_dispositivo === that.data.idDispositivo);
          if (dispositivo) {
            that.setData({ infoDispositivo: dispositivo });
          }
        }
      }
    });
  },

  cargarEstadoActual() {
    const that = this;
    
    wx.request({
      url: `http://tu-servidor-ip:8082/api/estado/actual?id_dispositivo=${this.data.idDispositivo}`,
      method: 'GET',
      success(res) {
        if (res.data.exito) {
          const estado = res.data.datos;
          const modelos = estado.modelos_cargados ? estado.modelos_cargados.split(',') : [];
          
          that.setData({
            estadoActual: estado,
            modelosCargados: modelos
          });
        }
      }
    });
  },

  cargarGraficoHistorial() {
    const that = this;
    
    wx.request({
      url: `http://tu-servidor-ip:8082/api/estado/historial?id_dispositivo=${this.data.idDispositivo}&horas=24`,
      method: 'GET',
      success(res) {
        if (res.data.exito) {
          that.renderizarGrafico(res.data.datos);
        }
      }
    });
  },

  renderizarGrafico(datosHistorial) {
    // Usar echarts para renderizar el gráfico
    const componenteGrafico = this.selectComponent('#graficoEstado');
    
    if (!componenteGrafico) {
      setTimeout(() => this.renderizarGrafico(datosHistorial), 100);
      return;
    }

    const timestamps = datosHistorial.map(item => {
      const fecha = new Date(item.timestamp);
      return `${fecha.getHours()}:${fecha.getMinutes().toString().padStart(2, '0')}`;
    });

    const fpsData = datosHistorial.map(item => item.fps || 0);
    const cpuData = datosHistorial.map(item => item.uso_cpu || 0);
    const memoryData = datosHistorial.map(item => item.uso_memoria || 0);

    const option = {
      tooltip: {
        trigger: 'axis',
        axisPointer: {
          type: 'cross'
        }
      },
      legend: {
        data: ['FPS', 'Uso de CPU%', 'Uso de memoria%']
      },
      xAxis: {
        type: 'category',
        data: timestamps,
        axisLabel: {
          rotate: 45
        }
      },
      yAxis: [
        {
          type: 'value',
          name: 'FPS',
          position: 'left'
        },
        {
          type: 'value',
          name: 'Uso%',
          position: 'right',
          max: 100
        }
      ],
      series: [
        {
          name: 'FPS',
          type: 'line',
          yAxisIndex: 0,
          data: fpsData,
          smooth: true,
          lineStyle: {
            color: '#5470c6'
          }
        },
        {
          name: 'Uso de CPU%',
          type: 'line',
          yAxisIndex: 1,
          data: cpuData,
          smooth: true,
          lineStyle: {
            color: '#91cc75'
          }
        },
        {
          name: 'Uso de memoria%',
          type: 'line',
          yAxisIndex: 1,
          data: memoryData,
          smooth: true,
          lineStyle: {
            color: '#fac858'
          }
        }
      ]
    };

    componenteGrafico.init((canvas, width, height) => {
      const grafico = echarts.init(canvas, null, {
        width: width,
        height: height
      });
      grafico.setOption(option);
      this.setData({ grafico: grafico });
      return grafico;
    });
  },

  suscribirADispositivo() {
    // Suscribir a actualizaciones de estado del dispositivo a través de WebSocket
    if (wx.onSocketMessage) {
      wx.onSocketMessage((res) => {
        const datos = JSON.parse(res.data);
        if (datos.tipo === 'actualizacion_estado' && datos.id_dispositivo === this.data.idDispositivo) {
          this.manejarActualizacionEstado(datos.estado);
        }
      });
    }
  },

  manejarActualizacionEstado(nuevoEstado) {
    // Actualizar estado actual
    this.setData({
      estadoActual: {
        ...this.data.estadoActual,
        ...nuevoEstado
      }
    });
    
    // Mostrar indicador de actualización
    wx.showToast({
      title: 'Estado actualizado',
      icon: 'success',
      duration: 1000
    });
  },

  actualizarEstado() {
    this.cargarEstadoActual();
    wx.showToast({
      title: 'Actualizando...',
      icon: 'loading'
    });
  },

  verHistorial() {
    wx.navigateTo({
      url: `/pages/historial/historial?id_dispositivo=${this.data.idDispositivo}`
    });
  },

  verAlertas() {
    wx.navigateTo({
      url: `/pages/alertas/alertas?id_dispositivo=${this.data.idDispositivo}`
    });
  }
});

6.4 Implementación del Centro de Alertas

// pages/alertas/alertas.js
Page({
  data: {
    idDispositivo: '',
    alertas: [],
    filtro: 'todos' // todos, no-resueltas, resueltas
  },

  onLoad(options) {
    this.setData({
      idDispositivo: options.id_dispositivo || 'todos'
    });
    
    this.cargarAlertas();
    this.suscribirAlertas();
  },

  cargarAlertas() {
    const that = this;
    let url = `http://tu-servidor-ip:8082/api/alertas`;
    
    if (this.data.idDispositivo !== 'todos') {
      url += `?id_dispositivo=${this.data.idDispositivo}`;
    }
    
    if (this.data.filtro === 'no-resueltas') {
      url += `${this.data.idDispositivo !== 'todos' ? '&' : '?'}solo_no_resueltas=true`;
    }
    
    wx.request({
      url: url,
      method: 'GET',
      success(res) {
        if (res.data.exito) {
          that.setData({ alertas: res.data.datos });
        }
      }
    });
  },

  suscribirAlertas() {
    // Suscribir a nuevas alertas
    if (wx.onSocketMessage) {
      wx.onSocketMessage((res) => {
        const datos = JSON.parse(res.data);
        if (datos.tipo === 'nueva_alerta') {
          // Si se muestran todos los dispositivos o el dispositivo actual, agregar la nueva alerta
          if (this.data.idDispositivo === 'todos' || datos.id_dispositivo === this.data.idDispositivo) {
            const alertas = [datos.alerta, ...this.data.alertas];
            this.setData({ alertas: alertas.slice(0, 50) }); // Mantener solo las 50 más recientes
            
            // Mostrar notificación
            wx.showModal({
              title: 'Nueva alerta',
              content: `Dispositivo ${datos.id_dispositivo}: ${datos.alerta.mensaje}`,
              showCancel: false
            });
          }
        }
      });
    }
  },

  resolverAlerta(e) {
    const idAlerta = e.currentTarget.dataset.id;
    const that = this;
    
    wx.request({
      url: `http://tu-servidor-ip:8082/api/alertas/${idAlerta}/resolver`,
      method: 'POST',
      success(res) {
        if (res.data.exito) {
          wx.showToast({
            title: 'Marcada como resuelta',
            icon: 'success'
          });
          
          // Actualizar datos locales
          const alertas = that.data.alertas.map(alerta => {
            if (alerta.id === idAlerta) {
              return { ...alerta, resuelto: 1 };
            }
            return alerta;
          });
          
          that.setData({ alertas });
        }
      }
    });
  },

  cambiarFiltro(e) {
    const filtro = e.currentTarget.dataset.filtro;
    this.setData({ filtro }, () => {
      this.cargarAlertas();
    });
  }
});

  1. Despliegue y Configuración

7.1 Despliegue del Servidor

Despleguemos los servicios de recolección de estado y API de datos en el servidor:

# 1. Crear directorio de despliegue
mkdir -p /opt/monitor_aiglasses
cd /opt/monitor_aiglasses

# 2. Copiar archivos de código
cp /ruta/a/recolector_estado.py .
cp /ruta/a/api_datos.py .
cp /ruta/a/monitor_rendimiento.py .

# 3. Instalar dependencias
pip install flask flask-cors flask-socketio psutil

# 4. Crear configuración de Supervisor
sudo nano /etc/supervisor/conf.d/monitor_aiglasses.conf

Contenido de la configuración de Supervisor:

[program:recolector_estado]
command=python3 /opt/monitor_aiglasses/recolector_estado.py
directory=/opt/monitor_aiglasses
autostart=true
autorestart=true
stderr_logfile=/var/log/recolector_estado.err.log
stdout_logfile=/var/log/recolector_estado.out.log

[program:api_datos]
command=python3 /opt/monitor_aiglasses/api_datos.py
directory=/opt/monitor_aiglasses
autostart=true
autorestart=true
stderr_logfile=/var/log/api_datos.err.log
stdout_logfile=/var/log/api_datos.out.log


# 5. Recargar configuración de Supervisor
sudo supervisorctl reread
sudo supervisorctl update

# 6. Iniciar servicios
sudo supervisorctl start recolector_estado
sudo supervisorctl start api_datos

# 7. Verificar estado de los servicios
sudo supervisorctl status

7.2 Configuración del Mini Programa de WeChat

Configuración en la consola de administración del Mini Programa:

  1. Configuración de dominios del servidor:
  • En «Desarrollo» -> «Configuración de desarrollo» -> «Dominios del servidor», agregar el dominio de tu servidor API
  • Dominio legal para solicitud: https://tu-dominio
  • Dominio legal para socket: wss://tu-dominio
  1. Obtener AppID del Mini Programa:
  • En «Configuración» -> «Configuración básica», obtener el AppID
  • Se usará para autenticación y vinculación de dispositivos posteriormente
  1. Configuración de dominios de negocio (si se necesita WebView):
  • En «Desarrollo» -> «Configuración de desarrollo» -> «Dominios de negocio», agregar dominios necesarios

7.3 Configuración de Seguridad

Para garantizar la seguridad del sistema, necesitamos agregar algunas medidas de seguridad:

# middleware_seguridad.py
import hashlib
import hmac
import time
from functools import wraps
from flask import request, jsonify

class MiddlewareSeguridad:
    def __init__(self, app, clave_secreta):
        self.app = app
        self.clave_secreta = clave_secreta.encode('utf-8')
        
    def generar_firma(self, datos: dict, timestamp: int) -> str:
        """Generar firma de solicitud"""
        # Ordenar por nombre de parámetro
        params_ordenados = sorted(datos.items())
        param_str = '&'.join([f'{k}={v}' for k, v in params_ordenados])
        
        # Agregar timestamp
        sign_str = f'{param_str}&timestamp={timestamp}'
        
        # Generar firma usando HMAC-SHA256
        firma = hmac.new(
            self.clave_secreta,
            sign_str.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return firma
    
    def verificar_solicitud(self):
        """Verificar legitimidad de la solicitud"""
        # Obtener firma y timestamp
        firma = request.headers.get('X-Firma')
        timestamp = request.headers.get('X-Timestamp')
        
        if not firma or not timestamp:
            return False, "Falta firma o timestamp"
        
        # Verificar si el timestamp está en el período válido (5 minutos)
        try:
            ts = int(timestamp)
            if abs(time.time() - ts) > 300:  # 5 minutos
                return False, "Solicitud expirada"
        except ValueError:
            return False, "Formato de timestamp inválido"
        
        # Obtener parámetros de solicitud
        if request.method == 'GET':
            datos = request.args.to_dict()
        else:
            datos = request.get_json() or {}
        
        # Generar firma
        firma_esperada = self.generar_firma(datos, ts)
        
        # Comparar firmas
        if not hmac.compare_digest(firma, firma_esperada):
            return False, "Verificación de firma fallida"
        
        return True, "Verificación exitosa"
    
    def requerir_autenticacion(self, f):
        """Decorador de autenticación"""
        @wraps(f)
        def funcion_decorada(*args, **kwargs):
            # Saltar ciertas interfaces que no requieren autenticación
            if request.path in ['/api/iniciar_sesion', '/api/registrarse']:
                return f(*args, **kwargs)
            
            # Verificar solicitud
            valida, mensaje = self.verificar_solicitud()
            if not valida:
                return jsonify({
                    "exito": False,
                    "mensaje": mensaje,
                    "codigo": 401
                }), 401
            
            return f(*args, **kwargs)
        return funcion_decorada

# Usar en api_datos.py
from middleware_seguridad import MiddlewareSeguridad

app = Flask(__name__)
seguridad = MiddlewareSeguridad(app, "tu-clave-secreta-aqui")

# Proteger interfaces que requieren autenticación
@app.route('/api/estado/actual', methods=['GET'])
@seguridad.requerir_autenticacion
def obtener_estado_actual():
    # ... código original ...

  1. Efecto de Aplicación Práctica

8.1 Visualización de la Interfaz de Monitoreo

Una vez completado el despliegue, el Mini Programa de WeChat proporcionará las siguientes interfaces de monitoreo:

Página de Inicio - Resumen de Dispositivos

  • Muestra el estado en línea de todos los dispositivos registrados
  • Estadísticas en tiempo real del número de dispositivos en línea y alertas activas
  • Vista rápida de la información básica y última actividad de cada dispositivo
  • Hacer clic en la tarjeta del dispositivo para ir a la página de detalles

Página de Detalles del Dispositivo

  • Muestra en tiempo real varios indicadores de estado del dispositivo
  • Visualización de datos de rendimiento (FPS, CPU, memoria)
  • Ver los modelos de IA cargados
  • Gráfico de datos históricos de 24 horas
  • Botones de acción para actualizar estado y ver historial/alertas

Centro de Alertas

  • Recibir y mostrar alertas del dispositivo en tiempo real
  • Filtrar alertas por dispositivo, tipo o estado
  • Marcar alertas como resueltas
  • Consultar registros históricos de alertas

8.2 Casos de Uso Práctico

Caso uno: Centro de Servicios Comunitarios Un centro comunitario que sirve a personas con discapacidad visual gestiona 20 dispositivos AIGlasses. El administrador, a través del Mini Programa:

  • Verifica en tiempo real el estado en línea de todos los dispositivos
  • Recibe notificaciones inmediatas cuando los dispositivos presentan anomalías
  • Estadísticas de uso de dispositivos para optimizar horarios de servicio
  • Asistencia remota a usuarios para resolver problemas con dispositivos

Caso dos: Cuidado Familiar La Sra. Li proporcionó AIGlasses a su padre con problemas de visión. A través del Mini Programa:

  • Verifica en cualquier momento si su padre está utilizando el dispositivo
  • Conoce el entorno de ubicación de su padre (a través de los tipos de escena detectados por las gafas)
  • Recibe recordatorios cuando la batería del dispositivo es baja
  • Ve los registros de actividad reciente de su padre

Caso tres: Mantenimiento por Desarrolladores Los desarrolladores, a través del Mini Programa:

  • Monitorean el estado de ejecución de todos los dispositivos desplegados
  • Localizan rápidamente dispositivos con fallos
  • Recopilan datos de uso para optimizar algoritmos
  • Depuración y actualización remota de configuraciones
  1. Resumen y Perspectivas

9.1 Resumen de la Implementación

A través de los pasos de este artículo, hemos logrado exitosamente agregar capacidades de monitoreo remoto al sistema AIGlasses para Navegación:

  1. Servicio de Recolección de Estado: Recopila periódicamente el estado del dispositivo, detecta anomalías y genera alertas
  2. Servicio de API de Datos: Proporciona interfaces RESTful y WebSocket, soporta empuje de datos en tiempo real
  3. Mini Programa de WeChat: Proporciona una interfaz de usuario amigable para mostrar en tiempo real el estado del dispositivo y datos históricos
  4. Mecanismos de Seguridad: Agrega verificación de firma de solicitud para garantizar la seguridad de la transmisión de datos

Las ventajas de esta solución son:

  • No intrusiva: no necesita modificar el código principal de AIGlasses
  • Tiempo real: implementa actualizaciones de estado a nivel de segundo a través de WebSocket
  • Escalable: soporta la monitorización de múltiples dispositivos, facilitando la implementación a gran escala
  • Facil de usar: el Mini Programa de WeChat no requiere instalación, escanear es usar

9.2 Direcciones de Expansión Futura

Basado en la implementación actual, se pueden expandir aún más las funciones:

1. Más Indicadores de Monitoreo

  • Agregar monitoreo de calidad de conexión de red
  • Monitoreo del tiempo de duración de la batería
  • Estadísticas de frecuencia de uso de cada función
  • Registro de rutas y trayectorias de navegación

2. Funciones Avanzadas de Alerta

  • Soporte para reglas de alerta personalizadas
  • Clasificación de alertas (urgente, importante, general)
  • Múltiples métodos de notificación (plantillas de mensaje de WeChat, SMS, llamada telefónica)
  • Reglas de procesamiento automático de alertas

3. Informes y Análisis de Datos

  • Generación de informes de uso de dispositivos
  • Análisis de patrones de comportamiento del usuario
  • Predicción de tiempos de mantenimiento de dispositivos
  • Sugerencias de optimización del rendimiento de algoritmos

4. Funciones de Control Remoto

  • Reinicio remoto de servicios del dispositivo
  • Actualización remota de configuraciones del dispositivo
  • Ejecución de comandos de diagnóstico remoto
  • Asistencia remota para depuración

5. Gestión de Permisos de Múltiples Usuarios

  • Control de permisos por rol (administrador, operaciones, usuario)
  • Gestión de agrupación de dispositivos
  • Auditoría de registros de operación
  • Control de acceso a datos

9.3 Recomendaciones para Desarrolladores

Al implementar proyectos similares, se recomienda prestar atención a los siguientes puntos:

  1. Seguridad de datos primero: asegurar que todos los datos transmitidos estén encriptados y que las interfaces API tengan mecanismos de autenticación adecuados
  2. Considerar condiciones de red: las redes móviles pueden ser inestables, es necesario implementar reconexión automática y caché local
  3. Experiencia de usuario prioritaria: la interfaz de monitoreo debe ser simple e intuitiva, la información importante debe ser visible a simple vista
  4. Optimización de rendimiento: diseñar razonablemente la frecuencia de recolección de datos para evitar afectar el rendimiento del sistema original
  5. Mantenibilidad: el código debe tener buena documentación y comentarios para facilitar el mantenimiento y la expansión posteriores

A través de este sistema de monitoreo remoto, AIGlasses para Navegación evoluciona de una aplicación local independiente a un sistema de dispositivo inteligente que se puede gestionar y monitorear de forma remota. Esto no solo mejora la experiencia del usuario, sino que también proporciona conveniencia para el despliegue y mantenimiento a gran escala de dispositivos.

Obtener más imágenes de IA

¿Quieres explorar más imágenes de IA y escenarios de aplicación? Visita la Plaza de Imágenes de CSDN, que ofrece una rica variedad de imágenes preconfiguradas, que cubren múltiples campos como razonamiento de modelos grandes, generación de imágenes, generación de video, ajuste fino de modelos, etc., con implementación de un solo clic.

Etiquetas: gafas inteligentes monitoreo remoto mini programa wechat IoT desarrollo de aplicaciones

Publicado el 6-14 03:58