Caso de Desarrollo para AIGlasses para Navegación: Implementación de Monitoreo Remoto mediante Mini Programa de WeChat
- Introducción: De Local a Remoto, Desbloqueando Más Posibilidades para las Gafas Inteligentes
Si estás utilizando o desarrollando AIGlasses para Navegación, ya has experimentado sus potentes capacidades de navegación e interacción locales. Ya sea para reconocimiento de aceras para invidentes, detección de semáforos o búsqueda de objetos, este sistema puede ejecutarse de manera estable en un servidor local a través de una interfaz web que proporciona retroalimentación intuitiva.
Pero imagina este escenario: un familiar de un usuario con discapacidad visual quiere conocer en todo momento el estado de seguridad del usuario cuando está fuera; o un equipo de servicios comunitarios necesita monitorear simultáneamente el estado de funcionamiento de múltiples dispositivos. En estos casos, depender únicamente de la interfaz web local resulta poco conveniente.
Este es precisamente el tema que exploraremos hoy: cómo extender las capacidades de monitoreo de estado de AIGlasses para Navegación desde el servidor local hasta un Mini Programa de WeChat. A través de este caso, aprenderás cómo agregar un "tablero de control remoto" a este sistema de gafas inteligentes, permitiendo que información clave esté al alcance de la mano.
En este artículo, te guiaremos paso a paso para implementar esta función, desde el diseño de la arquitectura hasta la implementación del código, para que finalmente puedas tener un Mini Programa de WeChat que puede verificar el estado del dispositivo en cualquier momento y recibir alertas importantes. Todo el proceso no requiere modificaciones de hardware complejas, solo necesita agregar algunos "códigos puente" sobre el sistema existente.
- ¿Por qué se necesita el monitoreo remoto?
Antes de profundizar en los detalles técnicos, analicemos los problemas prácticos que puede resolver el monitoreo remoto de estado.
2.1 Escenarios de Aplicación Práctica
Escenario uno: Modo de Cuidado Familiar La abuela Zhang tiene problemas de visión, pero le gusta pasear por el complejo residencial cada tarde. Su hija, Xiao Wang, trabaja en otra ciudad y siempre se preocupa por la seguridad de su madre. A través del Mini Programa de WeChat, Xiao Wang puede verificar en cualquier momento:
- Si su madre está utilizando las gafas para navegación
- El entorno de ubicación actual (a través de los tipos de escena detectados por las gafas)
- Si la batería del dispositivo tiene suficiente carga
- Cuándo encontró un obstáculo por última vez
Escenario dos: Gestión de Servicios Comunitarios Un centro comunitario que sirve a personas con discapacidad visual gestiona 20 dispositivos AIGlasses. El administrador necesita:
- Verificar en lote el estado en línea de todos los dispositivos
- Localizar rápidamente los dispositivos con fallos
- Estadísticas de frecuencia de uso y duración del servicio
- Recibir notificaciones oportunas cuando los dispositivos presenten anomalías
Escenario tres: Depuración y Mantenimiento por Desarrolladores Como desarrollador, es posible que necesites:
- Verificar los registros de ejecución del servicio de forma remota
- Monitorear las llamadas a la API y los límites restantes
- Diagnosticar rápidamente cuando los usuarios encuentran problemas
- Recopilar datos de uso para optimizar los algoritmos
2.2 Valor Técnico
Desde una perspectiva técnica, el monitoreo remoto aporta varias ventajas clave:
- Tiempo real: los cambios de estado pueden enviarse inmediatamente al Mini Programa, sin necesidad de que el usuario consulte activamente
- Conveniencia: los usuarios no necesitan abrir una computadora o recordar la IP del servidor, pueden verlo directamente en WeChat
- Escalabilidad: un sistema de monitoreo puede servir a múltiples dispositivos, facilitando la implementación a gran escala
- Persistencia de datos: el estado histórico puede almacenarse para análisis e informes
- Diseño de Arquitectura General
Para implementar el monitoreo remoto, necesitamos agregar varios componentes al sistema existente. A continuación se muestra el diagrama de arquitectura general:
Sistema Existente (AIGlasses para Navegación) Nuevos Componentes Mini Programa de WeChat
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ Página de Estado │
│ │ Programa │ │ HTTP │ │ Servicio de │ │ WebSocket │ Tarjetas de Estado │
│ │ Principal │───┼───────────┼─▶│ Recolección │───┼─────────┼▶ en Tiempo Real │
│ │ app_main.py │ │ │ │ de Estado │ │ │ Lista de Historial │
│ └─────────────┘ │ │ └─────────────┘ │ │ Centro de Alertas │
│ │ │ │ └─────────────────────┘
│ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Interfaz │ │ │ │ API de │ │
│ │ Web │ │ │ │ Datos │ │
│ │ index.html │───┘ │ │ data_api.py │ │
│ └─────────────┘ │ └─────────────┘ │
│ │ │
│ ┌─────────────┐ │ ┌─────────────┐ │
│ │ Archivos de │ │ │ Base de │ │
│ │ Modelos │ │ │ Datos │ │
│ │ model/ │ │ │ (SQLite) │ │
│ └─────────────┘ │ └─────────────┘ │
└─────────────────────┘ └─────────────────────┘
3.1 Descripción de Responsabilidades de los Componentes
Servicio de Recolección de Estado (recolector_estado.py) Este es el "ojo" de todo el sistema, responsable de:
- Obtener periódicamente datos de estado del programa principal de AIGlasses
- Procesar y analizar los datos brutos
- Almacenar el estado procesado en la base de datos
- Detectar estados anómalos y activar alertas
Servicio de API de Datos (api_datos.py) Esta es la "boca" del sistema, responsable de:
- Proporcionar interfaces RESTful para que el Mini Programa las llame
- Soportar WebSocket para implementar empuje en tiempo real
- Manejar autenticación y control de permisos de usuario
- Devolver datos de estado formateados
Base de Datos (SQLite) Esta es la "memoria" del sistema, responsable de:
- Almacenar registros históricos de estado
- Almacenar información de configuración del dispositivo
- Almacenar datos de usuarios y permisos
- Soportar consultas y análisis rápidos
Mini Programa de WeChat Este es el "control remoto" del usuario, responsable de:
- Mostrar información de estado en tiempo real
- Mostrar registros históricos de estado
- Recibir y mostrar notificaciones de alertas
- Proporcionar funciones de control simples
- Implementación del Servicio de Recolección de Estado
El servicio de recolección de estado es el puente entre el sistema AIGlasses existente y el sistema de monitoreo. Primero, necesitamos entender qué información de estado proporciona AIGlasses.
4.1 Interfaz de Estado del Sistema Existente
Al revisar el código de AIGlasses para Navegación, descubrimos que el sistema ya muestra mucha información de estado a través de la interfaz web. Necesitamos "extraer" esta información.
Primero, creemos un script de recolección de estado:
# recolector_estado.py
import json
import time
import sqlite3
import requests
from datetime import datetime
import logging
from typing import Dict, Any, Optional
# Configuración de registro
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RecolectorEstado:
def __init__(self, url_base: str = "http://localhost:8081"):
"""
Inicializar el recolector de estado
Args:
url_base: Dirección del servicio web de AIGlasses
"""
self.url_base = url_base
self.ruta_db = "estado_aiglasses.db"
self.inicializar_base_datos()
def inicializar_base_datos(self):
"""Inicializar la estructura de tablas de la base de datos"""
conn = sqlite3.connect(self.ruta_db)
cursor = conn.cursor()
# Crear tabla de registros de estado
cursor.execute('''
CREATE TABLE IF NOT EXISTS estado_dispositivo (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
id_dispositivo TEXT,
estado_servicio TEXT,
api_configurada INTEGER,
modelos_cargados TEXT,
archivos_audio INTEGER,
camara_conectada INTEGER,
modo_actual TEXT,
fps REAL,
uso_cpu REAL,
uso_memoria REAL,
ultimo_obstaculo DATETIME,
ultima_navegacion TEXT,
nivel_bateria INTEGER
)
''')
# Crear tabla de alertas
cursor.execute('''
CREATE TABLE IF NOT EXISTS alertas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
id_dispositivo TEXT,
tipo_alerta TEXT,
nivel_alerta TEXT,
mensaje TEXT,
resuelto INTEGER DEFAULT 0
)
''')
# Crear tabla de información de dispositivos
cursor.execute('''
CREATE TABLE IF NOT EXISTS dispositivos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
id_dispositivo TEXT UNIQUE,
nombre_dispositivo TEXT,
nombre_propietario TEXT,
ubicacion TEXT,
ultima_actividad DATETIME,
activo INTEGER DEFAULT 1
)
''')
conn.commit()
conn.close()
logger.info("Inicialización de base de datos completada")
def recolectar_estado(self) -> Optional[Dict[str, Any]]:
"""
Recopilar información de estado del sistema AIGlasses
Returns:
Diccionario de datos de estado, si falla la recolección devuelve None
"""
try:
# Intentar obtener información de estado de la interfaz web
# Aquí se necesita ajustar según la estructura real de la página web
datos_estado = {
"timestamp": datetime.now().isoformat(),
"id_dispositivo": self.obtener_id_dispositivo(),
"estado_servicio": self.verificar_estado_servicio(),
"api_configurada": self.verificar_configuracion_api(),
"modelos_cargados": self.obtener_modelos_cargados(),
"archivos_audio": self.contar_archivos_audio(),
"camara_conectada": self.verificar_conexion_camara(),
"modo_actual": self.obtener_modo_actual(),
"fps": self.obtener_fps(),
"metricas_sistema": self.obtener_metricas_sistema(),
"estado_navegacion": self.obtener_estado_navegacion()
}
# Guardar en la base de datos
self.guardar_en_base_datos(datos_estado)
# Verificar estados anómalos
self.verificar_alertas(datos_estado)
logger.info(f"Recolección de estado exitosa: {datos_estado['id_dispositivo']}")
return datos_estado
except Exception as e:
logger.error(f"Fallo en la recolección de estado: {str(e)}")
return None
def verificar_estado_servicio(self) -> str:
"""Verificar el estado de ejecución del servicio principal"""
try:
# Verificar estado del servicio a través de Supervisor
import subprocess
resultado = subprocess.run(
["supervisorctl", "status", "aiglasses"],
capture_output=True,
text=True
)
if "RUNNING" in resultado.stdout:
return "ejecutando"
else:
return "detenido"
except:
return "desconocido"
def verificar_configuracion_api(self) -> bool:
"""Verificar el estado de configuración de la API"""
try:
# Leer archivo de configuración de API
import os
archivo_clave_api = "/root/AIGlasses_para_Navegacion/.api_key.json"
if os.path.exists(archivo_clave_api):
with open(archivo_clave_api, 'r') as f:
config = json.load(f)
return bool(config.get("api_key"))
return False
except:
return False
def obtener_modelos_cargados(self) -> str:
"""Obtener la lista de modelos cargados"""
# Aquí se necesita obtener según la lógica real de carga de modelos
# Versión simplificada: verificar si existen archivos de modelo
modelos = []
archivos_modelo = [
"yolo-seg.pt",
"yoloe-11l-seg.pt",
"shoppingbest5.pt",
"trafficlight.pt",
"hand_landmarker.task"
]
for modelo in archivos_modelo:
ruta_modelo = f"/root/AIGlasses_para_Navegacion/modelo/{modelo}"
if os.path.exists(ruta_modelo):
modelos.append(modelo.split('.')[0])
return ",".join(modelos) if modelos else "ninguno"
def guardar_en_base_datos(self, datos_estado: Dict[str, Any]):
"""Guardar datos de estado en la base de datos"""
conn = sqlite3.connect(self.ruta_db)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO estado_dispositivo
(id_dispositivo, estado_servicio, api_configurada, modelos_cargados,
archivos_audio, camara_conectada, modo_actual, fps,
uso_cpu, uso_memoria, ultimo_obstaculo, ultima_navegacion, nivel_bateria)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
datos_estado.get("id_dispositivo", "predeterminado"),
datos_estado.get("estado_servicio", "desconocido"),
1 if datos_estado.get("api_configurada") else 0,
datos_estado.get("modelos_cargados", ""),
datos_estado.get("archivos_audio", 0),
1 if datos_estado.get("camara_conectada") else 0,
datos_estado.get("modo_actual", "inactivo"),
datos_estado.get("fps", 0.0),
datos_estado.get("metricas_sistema", {}).get("uso_cpu", 0.0),
datos_estado.get("metricas_sistema", {}).get("uso_memoria", 0.0),
datos_estado.get("estado_navegacion", {}).get("ultimo_obstaculo"),
datos_estado.get("estado_navegacion", {}).get("ultima_comando"),
datos_estado.get("nivel_bateria", 100)
))
conn.commit()
conn.close()
def verificar_alertas(self, datos_estado: Dict[str, Any]):
"""Verificar estados anómalos y registrar alertas"""
alertas = []
# Verificar estado del servicio
if datos_estado.get("estado_servicio") != "ejecutando":
alertas.append({
"tipo": "servicio_detenido",
"nivel": "critico",
"mensaje": "El servicio principal se ha detenido"
})
# Verificar configuración de API
if not datos_estado.get("api_configurada"):
alertas.append({
"tipo": "api_no_configurada",
"nivel": "advertencia",
"mensaje": "La clave API no está configurada, la función de voz no está disponible"
})
# Verificar carga de modelos
modelos = datos_estado.get("modelos_cargados", "")
if modelos == "ninguno" or not modelos:
alertas.append({
"tipo": "sin_modelos_cargados",
"nivel": "critico",
"mensaje": "No se ha cargado ningún modelo de IA"
})
# Verificar conexión de cámara
if not datos_estado.get("camara_conectada"):
alertas.append({
"tipo": "camara_desconectada",
"nivel": "advertencia",
"mensaje": "La cámara no está conectada, las funciones visuales están limitadas"
})
# Guardar alertas en la base de datos
if alertas:
self.guardar_alertas(alertas, datos_estado.get("id_dispositivo", "predeterminado"))
def guardar_alertas(self, alertas: list, id_dispositivo: str):
"""Guardar alertas en la base de datos"""
conn = sqlite3.connect(self.ruta_db)
cursor = conn.cursor()
for alerta in alertas:
cursor.execute('''
INSERT INTO alertas (id_dispositivo, tipo_alerta, nivel_alerta, mensaje)
VALUES (?, ?, ?, ?)
''', (id_dispositivo, alerta["tipo"], alerta["nivel"], alerta["mensaje"]))
conn.commit()
conn.close()
logger.warning(f"Registro de alertas: {len(alertas)} alertas")
def iniciar_recoleccion(self, intervalo: int = 30):
"""
Comenzar a recolectar estado periódicamente
Args:
intervalo: Intervalo de recolección (segundos)
"""
logger.info(f"Iniciando recolección de estado, intervalo: {intervalo} segundos")
while True:
try:
self.recolectar_estado()
time.sleep(intervalo)
except KeyboardInterrupt:
logger.info("Señal de detención recibida, saliendo del bucle de recolección")
break
except Exception as e:
logger.error(f"Error en el bucle de recolección: {str(e)}")
time.sleep(intervalo)
if __name__ == "__main__":
# Ejemplo de uso
recolector = RecolectorEstado()
# Prueba de recolección única
estado = recolector.recolectar_estado()
if estado:
print("Estado actual:")
print(json.dumps(estado, indent=2, ensure_ascii=False))
# Comenzar recolección periódica (en implementación real, esto debería ejecutarse como servicio)
# recolector.iniciar_recoleccion(intervalo=30)
Este servicio de recolección de estado verificará periódicamente los indicadores clave del sistema y guardará los datos en una base de datos SQLite. También detectará estados anómalos y generará alertas.
4.2 Integración con el Sistema Existente
Para que el servicio de recolección de estado pueda obtener información más detallada, necesitamos hacer algunas modificaciones menores al sistema AIGlasses existente.
Primero, agreguemos una interfaz de exportación de estado en app_principal.py:
# Agregar el siguiente código en app_principal.py
@app.route('/api/estado', methods=['GET'])
def obtener_estado_sistema():
"""Interfaz para obtener el estado del sistema"""
estado = {
"servicio": "ejecutando",
"modelos": {
"acera_invidente": cargador_modelos.obtener_estado_modelo("acera_invidente"),
"semaforo": cargador_modelos.obtener_estado_modelo("semaforo"),
"deteccion_objetos": cargador_modelos.obtener_estado_modelo("deteccion_objetos")
},
"hardware": {
"camara": gestor_camara.obtener_estado_camara(),
"audio_entrada": gestor_audio.obtener_estado_entrada(),
"audio_salida": gestor_audio.obtener_estado_salida()
},
"navegacion": {
"modo_actual": controlador_navegacion.obtener_modo_actual(),
"ultima_comando": controlador_navegacion.obtener_ultimo_comando(),
"ultimo_obstaculo": controlador_navegacion.obtener_ultimo_obstaculo()
},
"rendimiento": {
"fps": monitor_rendimiento.obtener_fps_actual(),
"uso_cpu": monitor_rendimiento.obtener_uso_cpu(),
"uso_memoria": monitor_rendimiento.obtener_uso_memoria()
},
"timestamp": datetime.now().isoformat()
}
return jsonify(estado)
Luego, necesitamos crear un simple módulo de monitoreo de rendimiento:
# monitor_rendimiento.py
import psutil
import time
from collections import deque
class MonitorRendimiento:
def __init__(self, tamano_ventana=60):
self.historial_fps = deque(maxlen=tamano_ventana)
self.ultimo_tiempo_fotograma = time.time()
def actualizar_fotograma(self):
"""Actualizar cálculo de FPS"""
tiempo_actual = time.time()
intervalo_fotograma = tiempo_actual - self.ultimo_tiempo_fotograma
if intervalo_fotograma > 0:
fps = 1.0 / intervalo_fotograma
self.historial_fps.append(fps)
self.ultimo_tiempo_fotograma = tiempo_actual
def obtener_fps_actual(self):
"""Obtener FPS actual"""
if len(self.historial_fps) == 0:
return 0.0
return sum(self.historial_fps) / len(self.historial_fps)
def obtener_uso_cpu(self):
"""Obtener uso de CPU"""
return psutil.cpu_percent(interval=0.1)
def obtener_uso_memoria(self):
"""Obtener uso de memoria"""
memoria = psutil.virtual_memory()
return memoria.percent
- Implementación del Servicio de API de Datos
Con los datos de estado, ahora necesitamos crear un servicio API para que el Mini Programa de WeChat pueda acceder a estos datos.
5.1 Diseño de RESTful API
Usamos Flask para crear un servicio API simple:
# api_datos.py
from flask import Flask, jsonify, request
from flask_cors import CORS
from flask_socketio import SocketIO, emit
import sqlite3
from datetime import datetime, timedelta
import json
import logging
app = Flask(__name__)
CORS(app) # Permitir solicitudes跨域
socketio = SocketIO(app, cors_allowed_origins="*")
# Configuración de registro
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def obtener_conexion_db():
"""Obtener conexión a la base de datos"""
conn = sqlite3.connect('estado_aiglasses.db')
conn.row_factory = sqlite3.Row # Devolver resultados en formato de diccionario
return conn
@app.route('/api/estado/actual', methods=['GET'])
def obtener_estado_actual():
"""Obtener el estado actual del dispositivo"""
id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
conn = obtener_conexion_db()
cursor = conn.cursor()
# Obtener estado más reciente
cursor.execute('''
SELECT * FROM estado_dispositivo
WHERE id_dispositivo = ?
ORDER BY timestamp DESC
LIMIT 1
''', (id_dispositivo,))
fila = cursor.fetchone()
conn.close()
if fila:
estado = dict(fila)
# Convertir tipos de datos
estado['api_configurada'] = bool(estado['api_configurada'])
estado['camara_conectada'] = bool(estado['camara_conectada'])
return jsonify({"exito": True, "datos": estado})
else:
return jsonify({"exito": False, "mensaje": "No se encontró el estado del dispositivo"}), 404
@app.route('/api/estado/historial', methods=['GET'])
def obtener_historial_estado():
"""Obtener historial de estado del dispositivo"""
id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
horas = int(request.args.get('horas', 24)) # Consultar por defecto las últimas 24 horas
conn = obtener_conexion_db()
cursor = conn.cursor()
# Calcular rango de tiempo
tiempo_inicio = datetime.now() - timedelta(hours=horas)
cursor.execute('''
SELECT
timestamp,
estado_servicio,
api_configurada,
camara_conectada,
modo_actual,
fps,
uso_cpu,
uso_memoria
FROM estado_dispositivo
WHERE id_dispositivo = ? AND timestamp >= ?
ORDER BY timestamp ASC
''', (id_dispositivo, tiempo_inicio.isoformat()))
filas = cursor.fetchall()
conn.close()
historial = [dict(fila) for fila in filas]
return jsonify({"exito": True, "datos": historial})
@app.route('/api/alertas', methods=['GET'])
def obtener_alertas():
"""Obtener información de alertas"""
id_dispositivo = request.args.get('id_dispositivo', 'predeterminado')
solo_no_resueltas = request.args.get('solo_no_resueltas', 'true').lower() == 'true'
conn = obtener_conexion_db()
cursor = conn.cursor()
if solo_no_resueltas:
cursor.execute('''
SELECT * FROM alertas
WHERE id_dispositivo = ? AND resuelto = 0
ORDER BY timestamp DESC
''', (id_dispositivo,))
else:
cursor.execute('''
SELECT * FROM alertas
WHERE id_dispositivo = ?
ORDER BY timestamp DESC
LIMIT 50
''', (id_dispositivo,))
filas = cursor.fetchall()
conn.close()
alertas = [dict(fila) for fila in filas]
return jsonify({"exito": True, "datos": alertas})
@app.route('/api/alertas/<int:id_alerta>/resolver', methods=['POST'])
def resolver_alerta(id_alerta):
"""Marcar alerta como resuelta"""
conn = obtener_conexion_db()
cursor = conn.cursor()
cursor.execute('''
UPDATE alertas SET resuelto = 1
WHERE id = ?
''', (id_alerta,))
conn.commit()
afectadas = cursor.rowcount
conn.close()
if afectadas > 0:
# Notificar a todos los clientes conectados a través de WebSocket
socketio.emit('alerta_resuelta', {'id_alerta': id_alerta})
return jsonify({"exito": True, "mensaje": "La alerta se ha marcado como resuelta"})
else:
return jsonify({"exito": False, "mensaje": "No se encontró la alerta especificada"}), 404
@app.route('/api/dispositivos', methods=['GET'])
def obtener_dispositivos():
"""Obtener lista de todos los dispositivos"""
conn = obtener_conexion_db()
cursor = conn.cursor()
cursor.execute('''
SELECT
d.*,
s.estado_servicio as ultimo_estado,
s.timestamp as ultima_actividad
FROM dispositivos d
LEFT JOIN estado_dispositivo s ON d.id_dispositivo = s.id_dispositivo
WHERE s.timestamp = (
SELECT MAX(timestamp)
FROM estado_dispositivo
WHERE id_dispositivo = d.id_dispositivo
)
ORDER BY d.nombre_dispositivo
''')
filas = cursor.fetchall()
conn.close()
dispositivos = []
for fila in filas:
dispositivo = dict(fila)
# Calcular si el dispositivo está en línea (datos en los últimos 5 minutos)
if dispositivo['ultima_actividad']:
ultima_actividad = datetime.fromisoformat(dispositivo['ultima_actividad'])
esta_en_linea = (datetime.now() - ultima_actividad).total_seconds() < 300
dispositivo['esta_en_linea'] = esta_en_linea
else:
dispositivo['esta_en_linea'] = False
dispositivos.append(dispositivo)
return jsonify({"exito": True, "datos": dispositivos})
@app.route('/api/dispositivos', methods=['POST'])
def registrar_dispositivo():
"""Registrar nuevo dispositivo"""
datos = request.json
if not datos or 'id_dispositivo' not in datos:
return jsonify({"exito": False, "mensaje": "Faltan parámetros requeridos"}), 400
conn = obtener_conexion_db()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO dispositivos
(id_dispositivo, nombre_dispositivo, nombre_propietario, ubicacion, ultima_actividad, activo)
VALUES (?, ?, ?, ?, ?, ?)
''', (
datos['id_dispositivo'],
datos.get('nombre_dispositivo', 'Dispositivo sin nombre'),
datos.get('nombre_propietario', ''),
datos.get('ubicacion', ''),
datetime.now().isoformat(),
1
))
conn.commit()
id_dispositivo = datos['id_dispositivo']
# Notificar registro de nuevo dispositivo a través de WebSocket
socketio.emit('dispositivo_registrado', {
'id_dispositivo': id_dispositivo,
'nombre_dispositivo': datos.get('nombre_dispositivo', 'Dispositivo sin nombre')
})
return jsonify({
"exito": True,
"mensaje": "Dispositivo registrado exitosamente",
"id_dispositivo": id_dispositivo
})
except Exception as e:
conn.rollback()
logger.error(f"Fallo en el registro del dispositivo: {str(e)}")
return jsonify({"exito": False, "mensaje": f"Registro fallido: {str(e)}"}), 500
finally:
conn.close()
# Manejadores de eventos WebSocket
@socketio.on('conectar')
def manejar_conexion():
"""Evento de conexión del cliente"""
logger.info(f"Cliente conectado: {request.sid}")
emit('conectado', {'mensaje': 'Conexión exitosa'})
@socketio.on('suscribir_dispositivo')
def manejar_suscripcion_dispositivo(datos):
"""Suscripción a actualizaciones de estado del dispositivo"""
id_dispositivo = datos.get('id_dispositivo')
if id_dispositivo:
logger.info(f"Cliente suscrito al dispositivo: {id_dispositivo}")
# Agregar el cliente a la sala correspondiente al dispositivo
socketio.server.enter_room(request.sid, f"dispositivo_{id_dispositivo}")
emit('suscrito', {'id_dispositivo': id_dispositivo})
@socketio.on('desconectar')
def manejar_desconexion():
"""Evento de desconexión del cliente"""
logger.info(f"Cliente desconectado: {request.sid}")
# Funciones de empuje de estado (llamadas por el servicio de recolección de estado)
def empujar_actualizacion_estado(id_dispositivo, datos_estado):
"""Empujar actualización de estado a todos los clientes suscritos al dispositivo"""
socketio.emit('actualizacion_estado', {
'id_dispositivo': id_dispositivo,
'estado': datos_estado,
'timestamp': datetime.now().isoformat()
}, room=f"dispositivo_{id_dispositivo}")
def empujar_nueva_alerta(id_dispositivo, datos_alerta):
"""Empujar nueva alerta a todos los clientes suscritos al dispositivo"""
socketio.emit('nueva_alerta', {
'id_dispositivo': id_dispositivo,
'alerta': datos_alerta,
'timestamp': datetime.now().isoformat()
}, room=f"dispositivo_{id_dispositivo}")
if __name__ == '__main__':
# Iniciar servicio API
logger.info("Iniciando servicio de datos API...")
socketio.run(app, host='0.0.0.0', port=8082, debug=True)
Este servicio API proporciona las siguientes interfaces clave:
- Obtener estado actual (
/api/estado/actual) - Obtener el estado más reciente del dispositivo - Obtener historial (
/api/estado/historial) - Obtener el historial de estado en un período de tiempo específico - Obtener alertas (
/api/alertas) - Obtener registros de alertas del dispositivo - Gestión de dispositivos (
/api/dispositivos) - Registrar y consultar información de dispositivos - Interfaz WebSocket - Empujar actualizaciones de estado y alertas en tiempo real
5.2 Integración de Empuje de Estado
Necesitamos modificar el servicio de recolección de estado para llamar a las funciones de empuje del servicio API cuando se recopile un nuevo estado:
# En recolector_estado.py, agregar
class RecolectorEstado:
# ... código anterior sin cambios ...
def __init__(self, url_base: str = "http://localhost:8081",
url_api: str = "http://localhost:8082"):
self.url_base = url_base
self.url_api = url_api # Nuevo: dirección del servicio API
self.ruta_db = "estado_aiglasses.db"
self.inicializar_base_datos()
def recolectar_estado(self) -> Optional[Dict[str, Any]]:
"""Recopilar información de estado del sistema AIGlasses"""
try:
# ... código de recolección de estado sin cambios ...
# Guardar en la base de datos
self.guardar_en_base_datos(datos_estado)
# Verificar estados anómalos
nuevas_alertas = self.verificar_alertas(datos_estado)
# Empujar al servicio API (si está configurado)
if self.url_api:
self.empujar_a_api(datos_estado, nuevas_alertas)
logger.info(f"Recolección de estado exitosa: {datos_estado['id_dispositivo']}")
return datos_estado
except Exception as e:
logger.error(f"Fallo en la recolección de estado: {str(e)}")
return None
def empujar_a_api(self, datos_estado: Dict[str, Any], nuevas_alertas: list):
"""Empujar estado al servicio API"""
try:
# Empujar actualización de estado
url_empuje = f"{self.url_api}/empujar/estado"
respuesta = requests.post(url_empuje, json=datos_estado, timeout=5)
if respuesta.status_code == 200:
logger.debug("Empuje de estado exitoso")
else:
logger.warning(f"Fallo en el empuje de estado: {respuesta.status_code}")
# Empujar nuevas alertas
if nuevas_alertas:
for alerta in nuevas_alertas:
url_alerta = f"{self.url_api}/empujar/alerta"
datos_alerta = {
"id_dispositivo": datos_estado.get("id_dispositivo"),
**alerta
}
requests.post(url_alerta, json=datos_alerta, timeout=5)
except requests.exceptions.RequestException as e:
logger.warning(f"Fallo en el empuje de API: {str(e)}")
- Desarrollo del Mini Programa de WeChat
Ahora desarrollemos la parte frontal del Mini Programa de WeChat. El Mini Programa proporcionará las siguientes funciones:
- Visualización en tiempo real del estado del dispositivo
- Gráficos de historial de estado
- Centro de notificaciones de alertas
- Interfaz de gestión de dispositivos
6.1 Estructura de Páginas del Mini Programa
Estructura del directorio del Mini Programa:
pages/
├── index/ # Página de inicio - Resumen de dispositivos
│ ├── index.js
│ ├── index.json
│ ├── index.wxml
│ └── index.wxss
├── dispositivo/ # Página de detalles del dispositivo
│ ├── dispositivo.js
│ ├── dispositivo.json
│ ├── dispositivo.wxml
│ └── dispositivo.wxss
├── alertas/ # Centro de alertas
│ ├── alertas.js
│ ├── alertas.json
│ ├── alertas.wxml
│ └── alertas.wxss
└── configuracion/ # Página de configuración
├── configuracion.js
├── configuracion.json
├── configuracion.wxml
└── configuracion.wxss
6.2 Implementación de la Página de Inicio (Resumen de Dispositivos)
<!-- pages/index/index.wxml -->
<view class="container">
<!-- Barra de estado superior -->
<view class="barra-estado">
<view class="item-estado">
<text class="etiqueta-estado">Dispositivos en línea</text>
<text class="valor-estado">{{conteoEnLinea}}/{{totalDispositivos}}</text>
</view>
<view class="item-estado">
<text class="etiqueta-estado">Alertas activas</text>
<text class="valor-estado alerta">{{alertasActivas}}</text>
</view>
</view>
<!-- Lista de dispositivos -->
<scroll-view class="lista-dispositivos" scroll-y>
<block wx:for="{{dispositivos}}" wx:key="id_dispositivo">
<view class="tarjeta-dispositivo" bindtap="navegarADetalle" data-id="{{item.id_dispositivo}}">
<view class="encabezado-dispositivo">
<view class="info-dispositivo">
<text class="nombre-dispositivo">{{item.nombre_dispositivo}}</text>
<text class="propietario-dispositivo">{{item.nombre_propietario}}</text>
</view>
<view class="estado-dispositivo">
<view class="indicador-estado {{item.esta_en_linea ? 'en-linea' : 'fuera-linea'}}"></view>
<text class="texto-estado">{{item.esta_en_linea ? 'En línea' : 'Fuera de línea'}}</text>
</view>
</view>
<view class="detalles-dispositivo">
<view class="item-detalle">
<text class="etiqueta-detalle">Última actividad</text>
<text class="valor-detalle">{{formatearTiempo(item.ultima_actividad)}}</text>
</view>
<view class="item-detalle">
<text class="etiqueta-detalle">Modo de operación</text>
<text class="valor-detalle">{{item.ultimo_estado || 'Desconocido'}}</text>
</view>
<view class="item-detalle">
<text class="etiqueta-detalle">Ubicación</text>
<text class="valor-detalle">{{item.ubicacion || 'No configurada'}}</text>
</view>
</view>
<!-- Mostrar indicador de alerta si hay alertas -->
<block wx:if="{{item.conteo_alertas > 0}}">
<view class="indicador-alerta">
<text>{{item.conteo_alertas}} alertas</text>
</view>
</block>
</view>
</block>
</scroll-view>
<!-- Botón para agregar dispositivo -->
<view class="btn-agregar-dispositivo" bindtap="mostrarModalAgregarDispositivo">
<text>+ Agregar dispositivo</text>
</view>
</view>
// pages/index/index.js
Page({
data: {
dispositivos: [],
conteoEnLinea: 0,
totalDispositivos: 0,
alertasActivas: 0,
socketConectado: false
},
onLoad() {
this.cargarDispositivos();
this.conectarWebSocket();
},
onShow() {
// Refrescar datos al mostrar la página
this.cargarDispositivos();
},
cargarDispositivos() {
const that = this;
// Llamar a la API para obtener la lista de dispositivos
wx.request({
url: 'http://tu-servidor-ip:8082/api/dispositivos',
method: 'GET',
success(res) {
if (res.data.exito) {
const dispositivos = res.data.datos;
const conteoEnLinea = dispositivos.filter(d => d.esta_en_linea).length;
const alertasActivas = dispositivos.reduce((sum, dispositivo) => sum + (dispositivo.conteo_alertas || 0), 0);
that.setData({
dispositivos: dispositivos,
conteoEnLinea: conteoEnLinea,
totalDispositivos: dispositivos.length,
alertasActivas: alertasActivas
});
}
},
fail(err) {
wx.showToast({
title: 'Carga fallida',
icon: 'error'
});
}
});
},
conectarWebSocket() {
const that = this;
// Conectar WebSocket
wx.connectSocket({
url: 'ws://tu-servidor-ip:8082/socket.io/',
success() {
that.setData({ socketConectado: true });
}
});
// Escuchar mensajes WebSocket
wx.onSocketMessage((res) => {
const datos = JSON.parse(res.data);
switch(datos.tipo) {
case 'actualizacion_estado':
that.manejarActualizacionEstado(datos);
break;
case 'nueva_alerta':
that.manejarNuevaAlerta(datos);
break;
case 'dispositivo_registrado':
that.manejarDispositivoRegistrado(datos);
break;
}
});
// Escuchar cierre de conexión
wx.onSocketClose(() => {
that.setData({ socketConectado: false });
// Intentar reconectar
setTimeout(() => {
that.conectarWebSocket();
}, 3000);
});
},
manejarActualizacionEstado(datos) {
// Actualizar estado del dispositivo
const dispositivos = this.data.dispositivos.map(dispositivo => {
if (dispositivo.id_dispositivo === datos.id_dispositivo) {
return {
...dispositivo,
ultimo_estado: datos.estado.modo_actual,
ultima_actividad: datos.timestamp,
esta_en_linea: true
};
}
return dispositivo;
});
this.setData({
dispositivos: dispositivos,
conteoEnLinea: dispositivos.filter(d => d.esta_en_linea).length
});
},
manejarNuevaAlerta(datos) {
// Mostrar notificación de nueva alerta
wx.showModal({
title: 'Nueva alerta',
content: `Dispositivo ${datos.id_dispositivo}: ${datos.alerta.mensaje}`,
showCancel: false
});
// Actualizar conteo de alertas
this.setData({
alertasActivas: this.data.alertasActivas + 1
});
},
manejarDispositivoRegistrado(datos) {
// Refrescar lista de dispositivos
this.cargarDispositivos();
wx.showToast({
title: `Nuevo dispositivo ${datos.nombre_dispositivo} registrado`,
icon: 'success'
});
},
navegarADetalle(e) {
const idDispositivo = e.currentTarget.dataset.id;
wx.navigateTo({
url: `/pages/dispositivo/dispositivo?id_dispositivo=${idDispositivo}`
});
},
mostrarModalAgregarDispositivo() {
wx.navigateTo({
url: '/pages/configuracion/configuracion'
});
},
formatearTiempo(timestamp) {
if (!timestamp) return 'Nunca en línea';
const fecha = new Date(timestamp);
const ahora = new Date();
const diff = ahora - fecha;
if (diff < 60000) { // 1 minuto
return 'Ahora mismo';
} else if (diff < 3600000) { // 1 hora
return `${Math.floor(diff / 60000)} minutos atrás`;
} else if (diff < 86400000) { // 1 día
return `${Math.floor(diff / 3600000)} horas atrás`;
} else {
return `${Math.floor(diff / 86400000)} días atrás`;
}
}
});
6.3 Implementación de la Página de Detalles del Dispositivo
<!-- pages/dispositivo/dispositivo.wxml -->
<view class="container">
<!-- Información del encabezado del dispositivo -->
<view class="encabezado-dispositivo">
<view class="titulo-dispositivo">
<text class="nombre-dispositivo">{{infoDispositivo.nombre_dispositivo}}</text>
<view class="estado-dispositivo {{infoDispositivo.esta_en_linea ? 'en-linea' : 'fuera-linea'}}">
<text>{{infoDispositivo.esta_en_linea ? 'En línea' : 'Fuera de línea'}}</text>
</view>
</view>
<text class="id-dispositivo">ID: {{infoDispositivo.id_dispositivo}}</text>
</view>
<!-- Tarjetas de estado en tiempo real -->
<view class="tarjetas-estado">
<view class="tarjeta-estado">
<text class="titulo-tarjeta">Estado del servicio</text>
<text class="valor-tarjeta {{estadoActual.estado_servicio}}">
{{estadoActual.estado_servicio === 'ejecutando' ? 'Ejecutando' : 'Detenido'}}
</text>
</view>
<view class="tarjeta-estado">
<text class="titulo-tarjeta">API configurada</text>
<text class="valor-tarjeta {{estadoActual.api_configurada ? 'configurada' : 'no-configurada'}}">
{{estadoActual.api_configurada ? 'Configurada' : 'No configurada'}}
</text>
</view>
<view class="tarjeta-estado">
<text class="titulo-tarjeta">Cámara</text>
<text class="valor-tarjeta {{estadoActual.camara_conectada ? 'conectada' : 'desconectada'}}">
{{estadoActual.camara_conectada ? 'Conectada' : 'Desconectada'}}
</text>
</view>
<view class="tarjeta-estado">
<text class="titulo-tarjeta">Modo actual</text>
<text class="valor-tarjeta">{{estadoActual.modo_actual || 'Espera'}}</text>
</view>
</view>
<!-- Métricas de rendimiento -->
<view class="seccion-rendimiento">
<text class="titulo-seccion">Métricas de rendimiento</text>
<view class="cuadricula-rendimiento">
<view class="item-metrica">
<text class="etiqueta-metrica">FPS</text>
<text class="valor-metrica">{{estadoActual.fps.toFixed(1)}}</text>
</view>
<view class="item-metrica">
<text class="etiqueta-metrica">Uso de CPU</text>
<text class="valor-metrica">{{estadoActual.uso_cpu.toFixed(1)}}%</text>
</view>
<view class="item-metrica">
<text class="etiqueta-metrica">Uso de memoria</text>
<text class="valor-metrica">{{estadoActual.uso_memoria.toFixed(1)}}%</text>
</view>
<view class="item-metrica">
<text class="etiqueta-metrica">Nivel de batería</text>
<text class="valor-metrica">{{estadoActual.nivel_bateria}}%</text>
</view>
</view>
</view>
<!-- Estado de modelos de IA -->
<view class="seccion-modelos">
<text class="titulo-seccion">Estado de modelos de IA</text>
<view class="lista-modelos">
<block wx:for="{{modelosCargados}}" wx:key="index">
<view class="item-modelo">
<text class="nombre-modelo">{{item}}</text>
<view class="estado-modelo cargado"></view>
</view>
</block>
</view>
</view>
<!-- Gráfico histórico -->
<view class="seccion-grafico">
<text class="titulo-seccion">Estado histórico (últimas 24 horas)</text>
<canvas canvas-id="graficoEstado" class="canvas-grafico"></canvas>
</view>
<!-- Botones de acción -->
<view class="botones-accion">
<button class="btn btn-actualizar" bindtap="actualizarEstado">Actualizar estado</button>
<button class="btn btn-historial" bindtap="verHistorial">Ver historial</button>
<button class="btn btn-alerta" bindtap="verAlertas">Ver alertas</button>
</view>
</view>
// pages/dispositivo/dispositivo.js
import * as echarts from '../../ec-canvas/echarts';
Page({
data: {
idDispositivo: '',
infoDispositivo: {},
estadoActual: {
estado_servicio: 'desconocido',
api_configurada: false,
camara_conectada: false,
modo_actual: 'inactivo',
fps: 0,
uso_cpu: 0,
uso_memoria: 0,
nivel_bateria: 100
},
modelosCargados: [],
grafico: null
},
onLoad(options) {
this.setData({
idDispositivo: options.id_dispositivo || 'predeterminado'
});
this.cargarInfoDispositivo();
this.cargarEstadoActual();
this.cargarGraficoHistorial();
this.suscribirADispositivo();
},
cargarInfoDispositivo() {
const that = this;
wx.request({
url: `http://tu-servidor-ip:8082/api/dispositivos`,
method: 'GET',
success(res) {
if (res.data.exito) {
const dispositivo = res.data.datos.find(d => d.id_dispositivo === that.data.idDispositivo);
if (dispositivo) {
that.setData({ infoDispositivo: dispositivo });
}
}
}
});
},
cargarEstadoActual() {
const that = this;
wx.request({
url: `http://tu-servidor-ip:8082/api/estado/actual?id_dispositivo=${this.data.idDispositivo}`,
method: 'GET',
success(res) {
if (res.data.exito) {
const estado = res.data.datos;
const modelos = estado.modelos_cargados ? estado.modelos_cargados.split(',') : [];
that.setData({
estadoActual: estado,
modelosCargados: modelos
});
}
}
});
},
cargarGraficoHistorial() {
const that = this;
wx.request({
url: `http://tu-servidor-ip:8082/api/estado/historial?id_dispositivo=${this.data.idDispositivo}&horas=24`,
method: 'GET',
success(res) {
if (res.data.exito) {
that.renderizarGrafico(res.data.datos);
}
}
});
},
renderizarGrafico(datosHistorial) {
// Usar echarts para renderizar el gráfico
const componenteGrafico = this.selectComponent('#graficoEstado');
if (!componenteGrafico) {
setTimeout(() => this.renderizarGrafico(datosHistorial), 100);
return;
}
const timestamps = datosHistorial.map(item => {
const fecha = new Date(item.timestamp);
return `${fecha.getHours()}:${fecha.getMinutes().toString().padStart(2, '0')}`;
});
const fpsData = datosHistorial.map(item => item.fps || 0);
const cpuData = datosHistorial.map(item => item.uso_cpu || 0);
const memoryData = datosHistorial.map(item => item.uso_memoria || 0);
const option = {
tooltip: {
trigger: 'axis',
axisPointer: {
type: 'cross'
}
},
legend: {
data: ['FPS', 'Uso de CPU%', 'Uso de memoria%']
},
xAxis: {
type: 'category',
data: timestamps,
axisLabel: {
rotate: 45
}
},
yAxis: [
{
type: 'value',
name: 'FPS',
position: 'left'
},
{
type: 'value',
name: 'Uso%',
position: 'right',
max: 100
}
],
series: [
{
name: 'FPS',
type: 'line',
yAxisIndex: 0,
data: fpsData,
smooth: true,
lineStyle: {
color: '#5470c6'
}
},
{
name: 'Uso de CPU%',
type: 'line',
yAxisIndex: 1,
data: cpuData,
smooth: true,
lineStyle: {
color: '#91cc75'
}
},
{
name: 'Uso de memoria%',
type: 'line',
yAxisIndex: 1,
data: memoryData,
smooth: true,
lineStyle: {
color: '#fac858'
}
}
]
};
componenteGrafico.init((canvas, width, height) => {
const grafico = echarts.init(canvas, null, {
width: width,
height: height
});
grafico.setOption(option);
this.setData({ grafico: grafico });
return grafico;
});
},
suscribirADispositivo() {
// Suscribir a actualizaciones de estado del dispositivo a través de WebSocket
if (wx.onSocketMessage) {
wx.onSocketMessage((res) => {
const datos = JSON.parse(res.data);
if (datos.tipo === 'actualizacion_estado' && datos.id_dispositivo === this.data.idDispositivo) {
this.manejarActualizacionEstado(datos.estado);
}
});
}
},
manejarActualizacionEstado(nuevoEstado) {
// Actualizar estado actual
this.setData({
estadoActual: {
...this.data.estadoActual,
...nuevoEstado
}
});
// Mostrar indicador de actualización
wx.showToast({
title: 'Estado actualizado',
icon: 'success',
duration: 1000
});
},
actualizarEstado() {
this.cargarEstadoActual();
wx.showToast({
title: 'Actualizando...',
icon: 'loading'
});
},
verHistorial() {
wx.navigateTo({
url: `/pages/historial/historial?id_dispositivo=${this.data.idDispositivo}`
});
},
verAlertas() {
wx.navigateTo({
url: `/pages/alertas/alertas?id_dispositivo=${this.data.idDispositivo}`
});
}
});
6.4 Implementación del Centro de Alertas
// pages/alertas/alertas.js
Page({
data: {
idDispositivo: '',
alertas: [],
filtro: 'todos' // todos, no-resueltas, resueltas
},
onLoad(options) {
this.setData({
idDispositivo: options.id_dispositivo || 'todos'
});
this.cargarAlertas();
this.suscribirAlertas();
},
cargarAlertas() {
const that = this;
let url = `http://tu-servidor-ip:8082/api/alertas`;
if (this.data.idDispositivo !== 'todos') {
url += `?id_dispositivo=${this.data.idDispositivo}`;
}
if (this.data.filtro === 'no-resueltas') {
url += `${this.data.idDispositivo !== 'todos' ? '&' : '?'}solo_no_resueltas=true`;
}
wx.request({
url: url,
method: 'GET',
success(res) {
if (res.data.exito) {
that.setData({ alertas: res.data.datos });
}
}
});
},
suscribirAlertas() {
// Suscribir a nuevas alertas
if (wx.onSocketMessage) {
wx.onSocketMessage((res) => {
const datos = JSON.parse(res.data);
if (datos.tipo === 'nueva_alerta') {
// Si se muestran todos los dispositivos o el dispositivo actual, agregar la nueva alerta
if (this.data.idDispositivo === 'todos' || datos.id_dispositivo === this.data.idDispositivo) {
const alertas = [datos.alerta, ...this.data.alertas];
this.setData({ alertas: alertas.slice(0, 50) }); // Mantener solo las 50 más recientes
// Mostrar notificación
wx.showModal({
title: 'Nueva alerta',
content: `Dispositivo ${datos.id_dispositivo}: ${datos.alerta.mensaje}`,
showCancel: false
});
}
}
});
}
},
resolverAlerta(e) {
const idAlerta = e.currentTarget.dataset.id;
const that = this;
wx.request({
url: `http://tu-servidor-ip:8082/api/alertas/${idAlerta}/resolver`,
method: 'POST',
success(res) {
if (res.data.exito) {
wx.showToast({
title: 'Marcada como resuelta',
icon: 'success'
});
// Actualizar datos locales
const alertas = that.data.alertas.map(alerta => {
if (alerta.id === idAlerta) {
return { ...alerta, resuelto: 1 };
}
return alerta;
});
that.setData({ alertas });
}
}
});
},
cambiarFiltro(e) {
const filtro = e.currentTarget.dataset.filtro;
this.setData({ filtro }, () => {
this.cargarAlertas();
});
}
});
- Despliegue y Configuración
7.1 Despliegue del Servidor
Despleguemos los servicios de recolección de estado y API de datos en el servidor:
# 1. Crear directorio de despliegue
mkdir -p /opt/monitor_aiglasses
cd /opt/monitor_aiglasses
# 2. Copiar archivos de código
cp /ruta/a/recolector_estado.py .
cp /ruta/a/api_datos.py .
cp /ruta/a/monitor_rendimiento.py .
# 3. Instalar dependencias
pip install flask flask-cors flask-socketio psutil
# 4. Crear configuración de Supervisor
sudo nano /etc/supervisor/conf.d/monitor_aiglasses.conf
Contenido de la configuración de Supervisor:
[program:recolector_estado]
command=python3 /opt/monitor_aiglasses/recolector_estado.py
directory=/opt/monitor_aiglasses
autostart=true
autorestart=true
stderr_logfile=/var/log/recolector_estado.err.log
stdout_logfile=/var/log/recolector_estado.out.log
[program:api_datos]
command=python3 /opt/monitor_aiglasses/api_datos.py
directory=/opt/monitor_aiglasses
autostart=true
autorestart=true
stderr_logfile=/var/log/api_datos.err.log
stdout_logfile=/var/log/api_datos.out.log
# 5. Recargar configuración de Supervisor
sudo supervisorctl reread
sudo supervisorctl update
# 6. Iniciar servicios
sudo supervisorctl start recolector_estado
sudo supervisorctl start api_datos
# 7. Verificar estado de los servicios
sudo supervisorctl status
7.2 Configuración del Mini Programa de WeChat
Configuración en la consola de administración del Mini Programa:
- Configuración de dominios del servidor:
- En «Desarrollo» -> «Configuración de desarrollo» -> «Dominios del servidor», agregar el dominio de tu servidor API
- Dominio legal para solicitud:
https://tu-dominio - Dominio legal para socket:
wss://tu-dominio
- Obtener AppID del Mini Programa:
- En «Configuración» -> «Configuración básica», obtener el AppID
- Se usará para autenticación y vinculación de dispositivos posteriormente
- Configuración de dominios de negocio (si se necesita WebView):
- En «Desarrollo» -> «Configuración de desarrollo» -> «Dominios de negocio», agregar dominios necesarios
7.3 Configuración de Seguridad
Para garantizar la seguridad del sistema, necesitamos agregar algunas medidas de seguridad:
# middleware_seguridad.py
import hashlib
import hmac
import time
from functools import wraps
from flask import request, jsonify
class MiddlewareSeguridad:
def __init__(self, app, clave_secreta):
self.app = app
self.clave_secreta = clave_secreta.encode('utf-8')
def generar_firma(self, datos: dict, timestamp: int) -> str:
"""Generar firma de solicitud"""
# Ordenar por nombre de parámetro
params_ordenados = sorted(datos.items())
param_str = '&'.join([f'{k}={v}' for k, v in params_ordenados])
# Agregar timestamp
sign_str = f'{param_str}×tamp={timestamp}'
# Generar firma usando HMAC-SHA256
firma = hmac.new(
self.clave_secreta,
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return firma
def verificar_solicitud(self):
"""Verificar legitimidad de la solicitud"""
# Obtener firma y timestamp
firma = request.headers.get('X-Firma')
timestamp = request.headers.get('X-Timestamp')
if not firma or not timestamp:
return False, "Falta firma o timestamp"
# Verificar si el timestamp está en el período válido (5 minutos)
try:
ts = int(timestamp)
if abs(time.time() - ts) > 300: # 5 minutos
return False, "Solicitud expirada"
except ValueError:
return False, "Formato de timestamp inválido"
# Obtener parámetros de solicitud
if request.method == 'GET':
datos = request.args.to_dict()
else:
datos = request.get_json() or {}
# Generar firma
firma_esperada = self.generar_firma(datos, ts)
# Comparar firmas
if not hmac.compare_digest(firma, firma_esperada):
return False, "Verificación de firma fallida"
return True, "Verificación exitosa"
def requerir_autenticacion(self, f):
"""Decorador de autenticación"""
@wraps(f)
def funcion_decorada(*args, **kwargs):
# Saltar ciertas interfaces que no requieren autenticación
if request.path in ['/api/iniciar_sesion', '/api/registrarse']:
return f(*args, **kwargs)
# Verificar solicitud
valida, mensaje = self.verificar_solicitud()
if not valida:
return jsonify({
"exito": False,
"mensaje": mensaje,
"codigo": 401
}), 401
return f(*args, **kwargs)
return funcion_decorada
# Usar en api_datos.py
from middleware_seguridad import MiddlewareSeguridad
app = Flask(__name__)
seguridad = MiddlewareSeguridad(app, "tu-clave-secreta-aqui")
# Proteger interfaces que requieren autenticación
@app.route('/api/estado/actual', methods=['GET'])
@seguridad.requerir_autenticacion
def obtener_estado_actual():
# ... código original ...
- Efecto de Aplicación Práctica
8.1 Visualización de la Interfaz de Monitoreo
Una vez completado el despliegue, el Mini Programa de WeChat proporcionará las siguientes interfaces de monitoreo:
Página de Inicio - Resumen de Dispositivos
- Muestra el estado en línea de todos los dispositivos registrados
- Estadísticas en tiempo real del número de dispositivos en línea y alertas activas
- Vista rápida de la información básica y última actividad de cada dispositivo
- Hacer clic en la tarjeta del dispositivo para ir a la página de detalles
Página de Detalles del Dispositivo
- Muestra en tiempo real varios indicadores de estado del dispositivo
- Visualización de datos de rendimiento (FPS, CPU, memoria)
- Ver los modelos de IA cargados
- Gráfico de datos históricos de 24 horas
- Botones de acción para actualizar estado y ver historial/alertas
Centro de Alertas
- Recibir y mostrar alertas del dispositivo en tiempo real
- Filtrar alertas por dispositivo, tipo o estado
- Marcar alertas como resueltas
- Consultar registros históricos de alertas
8.2 Casos de Uso Práctico
Caso uno: Centro de Servicios Comunitarios Un centro comunitario que sirve a personas con discapacidad visual gestiona 20 dispositivos AIGlasses. El administrador, a través del Mini Programa:
- Verifica en tiempo real el estado en línea de todos los dispositivos
- Recibe notificaciones inmediatas cuando los dispositivos presentan anomalías
- Estadísticas de uso de dispositivos para optimizar horarios de servicio
- Asistencia remota a usuarios para resolver problemas con dispositivos
Caso dos: Cuidado Familiar La Sra. Li proporcionó AIGlasses a su padre con problemas de visión. A través del Mini Programa:
- Verifica en cualquier momento si su padre está utilizando el dispositivo
- Conoce el entorno de ubicación de su padre (a través de los tipos de escena detectados por las gafas)
- Recibe recordatorios cuando la batería del dispositivo es baja
- Ve los registros de actividad reciente de su padre
Caso tres: Mantenimiento por Desarrolladores Los desarrolladores, a través del Mini Programa:
- Monitorean el estado de ejecución de todos los dispositivos desplegados
- Localizan rápidamente dispositivos con fallos
- Recopilan datos de uso para optimizar algoritmos
- Depuración y actualización remota de configuraciones
- Resumen y Perspectivas
9.1 Resumen de la Implementación
A través de los pasos de este artículo, hemos logrado exitosamente agregar capacidades de monitoreo remoto al sistema AIGlasses para Navegación:
- Servicio de Recolección de Estado: Recopila periódicamente el estado del dispositivo, detecta anomalías y genera alertas
- Servicio de API de Datos: Proporciona interfaces RESTful y WebSocket, soporta empuje de datos en tiempo real
- Mini Programa de WeChat: Proporciona una interfaz de usuario amigable para mostrar en tiempo real el estado del dispositivo y datos históricos
- Mecanismos de Seguridad: Agrega verificación de firma de solicitud para garantizar la seguridad de la transmisión de datos
Las ventajas de esta solución son:
- No intrusiva: no necesita modificar el código principal de AIGlasses
- Tiempo real: implementa actualizaciones de estado a nivel de segundo a través de WebSocket
- Escalable: soporta la monitorización de múltiples dispositivos, facilitando la implementación a gran escala
- Facil de usar: el Mini Programa de WeChat no requiere instalación, escanear es usar
9.2 Direcciones de Expansión Futura
Basado en la implementación actual, se pueden expandir aún más las funciones:
1. Más Indicadores de Monitoreo
- Agregar monitoreo de calidad de conexión de red
- Monitoreo del tiempo de duración de la batería
- Estadísticas de frecuencia de uso de cada función
- Registro de rutas y trayectorias de navegación
2. Funciones Avanzadas de Alerta
- Soporte para reglas de alerta personalizadas
- Clasificación de alertas (urgente, importante, general)
- Múltiples métodos de notificación (plantillas de mensaje de WeChat, SMS, llamada telefónica)
- Reglas de procesamiento automático de alertas
3. Informes y Análisis de Datos
- Generación de informes de uso de dispositivos
- Análisis de patrones de comportamiento del usuario
- Predicción de tiempos de mantenimiento de dispositivos
- Sugerencias de optimización del rendimiento de algoritmos
4. Funciones de Control Remoto
- Reinicio remoto de servicios del dispositivo
- Actualización remota de configuraciones del dispositivo
- Ejecución de comandos de diagnóstico remoto
- Asistencia remota para depuración
5. Gestión de Permisos de Múltiples Usuarios
- Control de permisos por rol (administrador, operaciones, usuario)
- Gestión de agrupación de dispositivos
- Auditoría de registros de operación
- Control de acceso a datos
9.3 Recomendaciones para Desarrolladores
Al implementar proyectos similares, se recomienda prestar atención a los siguientes puntos:
- Seguridad de datos primero: asegurar que todos los datos transmitidos estén encriptados y que las interfaces API tengan mecanismos de autenticación adecuados
- Considerar condiciones de red: las redes móviles pueden ser inestables, es necesario implementar reconexión automática y caché local
- Experiencia de usuario prioritaria: la interfaz de monitoreo debe ser simple e intuitiva, la información importante debe ser visible a simple vista
- Optimización de rendimiento: diseñar razonablemente la frecuencia de recolección de datos para evitar afectar el rendimiento del sistema original
- Mantenibilidad: el código debe tener buena documentación y comentarios para facilitar el mantenimiento y la expansión posteriores
A través de este sistema de monitoreo remoto, AIGlasses para Navegación evoluciona de una aplicación local independiente a un sistema de dispositivo inteligente que se puede gestionar y monitorear de forma remota. Esto no solo mejora la experiencia del usuario, sino que también proporciona conveniencia para el despliegue y mantenimiento a gran escala de dispositivos.
Obtener más imágenes de IA
¿Quieres explorar más imágenes de IA y escenarios de aplicación? Visita la Plaza de Imágenes de CSDN, que ofrece una rica variedad de imágenes preconfiguradas, que cubren múltiples campos como razonamiento de modelos grandes, generación de imágenes, generación de video, ajuste fino de modelos, etc., con implementación de un solo clic.