Protocolos Personalizados y Serialización en la Capa de Aplicación

Introducción a la Capa de Aplicación y Protocolos

Las aplicaciones de red que desarrollamos para resolver problemas específicos operan principalmente en la capa de aplicación. La comunicación efectiva entre estas aplicaciones requiere un conjunto de reglas o un "protocolo" que defina cómo se estructuran y transmiten los datos.

Entendiendo los Protocolos de Aplicación

Cuando enviamos datos a través de sockets, ya sea usando UDP o TCP, necesitamos un formato para los datos. Si solo enviamos cadenas de texto simples, la comunicación es directa. Sin embargo, para datos estructurados, como los requeridos para una calculadora de red donde el cliente envía operandos y el servidor calcula, necesitamos un enfoque más robusto. Existen dos estrategias principales:

  1. Diseñar un formato de cadena personalizado que ambas partes puedan interpretar.
  2. Definir estructuras de datos (structs) para los operandos, operadores y resultados, y luego convertirlas a un formato de cadena para la transmisión (serialización) y viceversa (deserialización).

Independientemente del método, el acuerdo mutuo sobre la estructura y el significado de los datos constituye el protocolo de aplicación. Este protocolo es, en esencia, un acuerdo sobre datos estructurados entre las partes comunicantes.

Aunque es posible transmitir datos binarios de estructuras directamente, no es recomendable debido a posibles problemas de alineación de memoria y la incompatibilidad entre lenguajes de programación en diferentes sistemas. La serialización y deserialización proporcionan una solución más universal y compatible para la comunicación en la capa de aplicación.

Comprensión de read, write y la Comunicación Full-Dúplex de TCP

TCP implementa internamente búferes de envío y recepción. Las operaciones read y write no interactúan directamente con la red, sino con estos búferes. La comunicación de red es fundamentalmente una copia de datos: desde la aplicación al búfer de envío de TCP, luego a la tarjeta de red, a través de la red, al búfer de recepción de TCP del destino, y finalmente a la aplicación receptora.

TCP soporta comunicación full-dúplex porque cada extremo de la conexión tiene búferes de envío y recepción independientes. Esto permite que ambos extremos envíen y reciban datos simultáneamente. Un sockfd (socket file descriptor) puede, por lo tanto, ser utilizado tanto para lectura como para escritura. El control de flujo y la sincronización son gestionados por el sistema operativo, funcionando como un modelo de productor-consumidor cuádruple entre los búferes y las aplicaciones. El sistema operativo maneja cuándo operar sobre los búferes, ya que TCP es un protocolo orientado a flujos de bytes, a diferencia de UDP, que maneja datagramas discretos.

Implementación de una Calculadora de Red

Servidor

La implementación del servidor se organiza en varias capas:

  • Socket.hpp: Proporciona clases base y derivadas (Socket y TcpSocket) para manejar las operaciones de socket (creación, enlace, escucha, aceptación, recepción, envío, conexión). El método BuildTcpSocketMethod simplifica la inicialización del socket del servidor.
  • TcpServer.hpp: Gestiona la aceptación de conexiones entrantes. Utiliza fork para crear un proceso hijo que maneja cada conexión, permitiendo que el proceso padre continúe escuchando nuevas conexiones. La lógica de manejo de datos de cada cliente se delega a una función de servicio proporcionada.
  • Protocol.hpp: Maneja la serialización y deserialización de datos (usando JSON como formato) y asegura la recepción de mensajes completos. Implementa un protocolo simple donde cada mensaje está prefijado por su longitud y delimitado por \r\n. La función GetRequest es el punto de entrada para el procesamiento de datos de un cliente, llamando a la lógica de cálculo de la capa superior.
  • NetCal.hpp: Contiene la lógica de cálculo real (Cal class) que realiza las operaciones matemáticas solicitadas por el cliente.

El flujo general es: inicialización del servidor, espera de conexiones, aceptación de un cliente, delegación del manejo de datos al protocolo, que a su vez llama a la lógica de cálculo, y finalmente, envío de la respuesta serializada de vuelta al cliente.

Cliente

El cliente realiza los siguientes pasos:

  1. Crea un socket TCP.
  2. Establece una conexión con el servidor usando la dirección IP y el puerto proporcionados.
  3. Lee la entrada del usuario (operandos y operador).
  4. Utiliza la capa de protocolo para construir una solicitud serializada y con el encabezado adecuado.
  5. Envía la solicitud al servidor.
  6. Recibe la respuesta del servidor, la deserializa y muestra el resultado.

Fragmentos de Código Relevantes

Socket.hpp


 #pragma once
 #include <iostream>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <string.h>
 #include <arpa/inet.h>
 #include <string>
 #include <functional>
 #include <memory>
 #include "InetAddr.hpp" // Asumiendo que InetAddr está definido en este archivo
 #include "Log.hpp"    // Asumiendo que Log está definido en este archivo

 namespace SocketModule
 {
     const int DEFAULT_BACKLOG = 16;
     // Asumiendo que LogModule y sus niveles están definidos
     // using namespace LogModule;

     class Socket
     {
     public:
         virtual ~Socket() = default;
         virtual void Create() = 0;
         virtual void Bind(uint16_t port) = 0;
         virtual void Listen(int backlog) = 0;
         virtual std::shared_ptr<Socket> Accept(InetAddr* clientAddr) = 0;
         virtual void Close() = 0;
         virtual int Receive(std::string *output) = 0;
         virtual int Send(const std::string& message) = 0;
         virtual void Connect(const std::string& serverIp, uint16_t serverPort) = 0;

         void InitializeServerSocket(uint16_t port, int backlog = DEFAULT_BACKLOG)
         {
             Create();
             Bind(port);
             Listen(backlog);
         }

         void InitializeClientSocket()
         {
             Create();
         }
     };

     const int INVALID_SOCKET_FD = -1;
     class TcpSocket : public Socket
     {
     private:
         int _socketFd; // Puede ser listenFd o readWriteFd

     public:
         TcpSocket() : _socketFd(INVALID_SOCKET_FD) {}
         explicit TcpSocket(int sockfd) : _socketFd(sockfd) {}

         void Create() override
         {
             _socketFd = ::socket(AF_INET, SOCK_STREAM, 0);
             if (_socketFd == -1)
             {
                 // logger(LogLevel::FATAL) << "Socket creation error!";
                 // exit(SOCKET_ERR); // Asumiendo que SOCKET_ERR es una macro definida
             }
             // logger(LogLevel::INFO) << "Socket created successfully. File descriptor: " << _socketFd;
         }

         void Bind(uint16_t port) override
         {
             InetAddr serverAddress(port); // Asumiendo que InetAddr(port) crea una dirección local
             int result = ::bind(_socketFd, serverAddress.GetSockAddr(), serverAddress.GetAddrLen());
             if (result != 0)
             {
                 // logger(LogLevel::FATAL) << "Bind error!";
                 // exit(BIND_ERR);
             }
             // logger(LogLevel::INFO) << "Bind successful on port " << port;
         }

         void Listen(int backlog) override
         {
             int result = ::listen(_socketFd, backlog);
             if (result != 0)
             {
                 // logger(LogLevel::FATAL) << "Listen error!";
                 // exit(LISTEN_ERR);
             }
             // logger(LogLevel::INFO) << "Listening for connections.";
         }

         std::shared_ptr<Socket> Accept(InetAddr* clientAddr) override
         {
             struct sockaddr_in peerAddr;
             socklen_t addrLen = sizeof(peerAddr);
             int clientFd = ::accept(_socketFd, CONV(peerAddr), &addrLen); // CONV es una macro para casting
             if (clientFd < 0)
             {
                 // logger(LogLevel::WARNING) << "Accept error";
                 return nullptr;
             }
             if (clientAddr) {
                 clientAddr->SetSockAddr(peerAddr); // Asumiendo que SetSockAddr existe
             }
             return std::make_shared<TcpSocket>(clientFd);
         }

         void Close() override
         {
             if (_socketFd >= 0)
                 ::close(_socketFd);
         }

         int Receive(std::string *output) override
         {
             char buffer[1024];
             int bytesReceived = ::recv(_socketFd, buffer, sizeof(buffer) - 1, 0);
             if (bytesReceived > 0)
             {
                 buffer[bytesReceived] = '\0';
                 *output += buffer; // Acumula datos en el buffer de salida
             }
             return bytesReceived;
         }

         int Send(const std::string& message) override
         {
             return ::send(_socketFd, message.c_str(), message.size(), 0);
         }

         void Connect(const std::string& serverIp, uint16_t serverPort) override
         {
             InetAddr serverAddress(serverIp, serverPort); // Asumiendo que InetAddr(ip, port) existe
             int result = ::connect(_socketFd, serverAddress.GetSockAddr(), serverAddress.GetAddrLen());
             if (result < 0)
             {
                 // logger(LogLevel::WARNING) << "Connect error";
                 // exit(CONNECT_ERR);
             }
             // logger(LogLevel::INFO) << "Connected to " << serverIp << ":" << serverPort;
         }
     };
 }
 

TcpServer.hpp


 #pragma once
 #include "Socket.hpp"
 #include <memory>
 #include <functional>
 #include <unistd.h> // Para fork()
 #include <sys> // Para waitpid()

 // Asumiendo que SocketModule y LogModule están incluidos y definidos
 // using namespace SocketModule;
 // using namespace LogModule;

 using ServiceHandler = std::function<void(std::shared_ptr<Socket>&, InetAddr&)>;

 class TcpServer
 {
 private:
     uint16_t _port;
     std::unique_ptr<Socket> _listenerSocket;
     bool _isRunning;
     ServiceHandler _serviceHandler;

 public:
     TcpServer(uint16_t port, ServiceHandler handler) :
         _port(port), _listenerSocket(std::make_unique<TcpSocket>()), // Asume TcpSocket es la implementación concreta
         _isRunning(false), _serviceHandler(std::move(handler))
     {
         _listenerSocket->InitializeServerSocket(_port);
     }

     void Start()
     {
         _isRunning = true;
         while (_isRunning)
         {
             InetAddr clientAddress;
             auto clientSocket = _listenerSocket->Accept(&clientAddress);
             if (clientSocket == nullptr)
             {
                 continue; // Error en accept o conexión rechazada
             }
             // logger(LogLevel::INFO) << "Connection accepted from " << clientAddress.ToString(); // Asumiendo ToString()

             pid_t pid = fork();
             if (pid < 0)
             {
                 // logger(LogLevel::FATAL) << "Fork error";
                 // exit(FORK_ERR);
             }
             else if (pid == 0)
             {
                 // Proceso hijo: maneja la conexión del cliente
                 _listenerSocket->Close(); // El hijo no necesita el socket del listener

                 // Crear un proceso nieto para manejar la tarea y que el hijo pueda salir
                 if (fork() > 0) {
                     exit(0); // El hijo sale, dejando al nieto como huérfano
                 }

                 // Proceso nieto: ejecuta el manejador de servicio
                 _serviceHandler(clientSocket, clientAddress);
                 clientSocket->Close(); // Cierra el socket del cliente al terminar
                 exit(0);
             }
             else
             {
                 // Proceso padre: cierra el socket del cliente y espera al hijo
                 clientSocket->Close();
                 ::waitpid(pid, nullptr, 0); // Espera a que el hijo (que ahora es padre del nieto) termine
             }
         }
         _isRunning = false;
     }
 };
 </sys></unistd.h></functional></memory>

Protocol.hpp


 #pragma once
 #include "Socket.hpp"
 #include <string>
 #include <memory>
 #include <json/json.h> // Biblioteca para JSON
 #include <functional>

 // Asumiendo que SocketModule y LogModule están incluidos y definidos
 // using namespace SocketModule;
 // using namespace LogModule;

 // Clase para representar una solicitud de cálculo
 class CalculationRequest
 {
 private:
     int _operand1;
     int _operand2;
     char _operation;

 public:
     CalculationRequest() = default;
     CalculationRequest(int op1, int op2, char oper) : _operand1(op1), _operand2(op2), _operation(oper) {}
     ~CalculationRequest() = default;

     int GetOperand1() const { return _operand1; }
     int GetOperand2() const { return _operand2; }
     char GetOperation() const { return _operation; }

     std::string Serialize() const
     {
         Json::Value root;
         root["operand1"] = _operand1;
         root["operand2"] = _operand2;
         root["operation"] = static_cast<int>(_operation); // Convertir char a int para JSON
         Json::FastWriter writer;
         return writer.write(root);
     }

     bool Deserialize(const std::string& input)
     {
         Json::Reader reader;
         Json::Value root;
         bool success = reader.parse(input, root);
         if (success)
         {
             _operand1 = root["operand1"].asInt();
             _operand2 = root["operand2"].asInt();
             _operation = static_cast<char>(root["operation"].asInt());
         }
         return success;
     }
 };

 // Clase para representar una respuesta de cálculo
 class CalculationResponse
 {
 private:
     int _result;
     int _statusCode; // 0: Éxito, otros códigos para diferentes errores

 public:
     CalculationResponse() : _result(0), _statusCode(0) {}
     CalculationResponse(int result, int code) : _result(result), _statusCode(code) {}
     ~CalculationResponse() = default;

     void SetResult(int result) { _result = result; }
     void SetStatusCode(int code) { _statusCode = code; }
     int GetResult() const { return _result; }
     int GetStatusCode() const { return _statusCode; }

     std::string Serialize() const
     {
         Json::Value root;
         root["result"] = _result;
         root["statusCode"] = _statusCode;
         Json::FastWriter writer;
         return writer.write(root);
     }

     bool Deserialize(const std::string& input)
     {
         Json::Reader reader;
         Json::Value root;
         bool success = reader.parse(input, root);
         if (success)
         {
             _result = root["result"].asInt();
             _statusCode = root["statusCode"].asInt();
         }
         return success;
     }

     void Display() const
     {
         std::cout << "Resultado: " << _result << " [Código: " << _statusCode << "]" << std::endl;
     }
 };

 const std::string DELIMITER = "\r\n";

 // Función de callback para la lógica de cálculo real
 using CalculationFunction = std::function<CalculationResponse(const CalculationRequest&)>;

 class ProtocolHandler
 {
 private:
     CalculationFunction _calculatorFunc;

 public:
     ProtocolHandler(CalculationFunction calcFunc) : _calculatorFunc(std::move(calcFunc)) {}
     ProtocolHandler() = default; // Constructor por defecto si no se proporciona función de cálculo
     ~ProtocolHandler() = default;

     // Codifica un mensaje JSON añadiendo longitud y delimitadores
     std::string EncodeMessage(const std::string& jsonData) const
     {
         std::string lengthStr = std::to_string(jsonData.length());
         return lengthStr + DELIMITER + jsonData + DELIMITER;
     }

     // Decodifica un mensaje del buffer, extrayendo un paquete completo si está disponible
     bool DecodeMessage(std::string& buffer, std::string* extractedPackage)
     {
         size_t delimiterPos = buffer.find(DELIMITER);
         if (delimiterPos == std::string::npos) {
             return false; // Delimitador no encontrado
         }

         std::string lengthStr = buffer.substr(0, delimiterPos);
         int messageLength = std::stoi(lengthStr);
         size_t totalMessageSize = lengthStr.length() + DELIMITER.length() + messageLength + DELIMITER.length();

         if (buffer.length() < totalMessageSize) {
             return false; // Buffer no contiene un mensaje completo
         }

         *extractedPackage = buffer.substr(delimiterPos + DELIMITER.length(), messageLength);
         buffer.erase(0, totalMessageSize); // Elimina el mensaje procesado del buffer
         return true;
     }

     // Maneja la solicitud entrante de un cliente
     void HandleClientRequest(std::shared_ptr<Socket>& clientSock, InetAddr& clientAddr)
     {
         std::string dataBuffer;
         while (true)
         {
             int bytesRead = clientSock->Receive(&dataBuffer);

             if (bytesRead > 0)
             {
                 std::string completeMessage;
                 while (DecodeMessage(dataBuffer, &completeMessage)) // Procesa todos los mensajes completos en el buffer
                 {
                     CalculationRequest request;
                     if (request.Deserialize(completeMessage))
                     {
                         CalculationResponse response = _calculatorFunc(request);
                         std::string jsonResponse = response.Serialize();
                         std::string encodedResponse = EncodeMessage(jsonResponse);
                         clientSock->Send(encodedResponse);
                     }
                     else
                     {
                         // logger(LogLevel::WARNING) << "Failed to deserialize request.";
                         // Enviar una respuesta de error si es necesario
                     }
                 }
             }
             else if (bytesRead == 0) // Conexión cerrada por el cliente
             {
                 // logger(LogLevel::INFO) << "Client disconnected: " << clientAddr.ToString();
                 break;
             }
             else // Error de lectura
             {
                 // logger(LogLevel::WARNING) << "Receive error from client: " << clientAddr.ToString();
                 break;
             }
         }
     }

     // Obtiene la respuesta del servidor (para el cliente)
     bool GetServerResponse(std::shared_ptr<Socket>& serverConn, std::string& responseBuffer, CalculationResponse* response)
     {
         while (true)
         {
             int bytesRead = serverConn->Receive(&responseBuffer);
             if (bytesRead > 0)
             {
                 std::string jsonPayload;
                 if (DecodeMessage(responseBuffer, &jsonPayload))
                 {
                     response->Deserialize(jsonPayload);
                     return true;
                 }
                 // Si DecodeMessage falla, continúa leyendo para completar el paquete
             }
             else if (bytesRead == 0)
             {
                 // logger(LogLevel::INFO) << "Server disconnected.";
                 break;
             }
             else
             {
                 // logger(LogLevel::WARNING) << "Receive error from server.";
                 break;
             }
         }
         return false;
     }

     // Construye la cadena de solicitud para enviar al servidor
     std::string BuildRequestString(int operand1, int operand2, char operation)
     {
         CalculationRequest request(operand1, operand2, operation);
         std::string jsonRequest = request.Serialize();
         return EncodeMessage(jsonRequest);
     }
 };
 

NetCal.hpp


 #pragma once
 #include <iostream>
 #include "Protocol.hpp" // Incluye las definiciones de Request y Response

 class CalculatorLogic
 {
 public:
     CalculatorLogic() = default;
     ~CalculatorLogic() = default;

     CalculationResponse Execute(const CalculationRequest& request)
     {
         CalculationResponse response;
         int op1 = request.GetOperand1();
         int op2 = request.GetOperand2();
         char operation = request.GetOperation();

         switch (operation)
         {
             case '+' :
                 response.SetResult(op1 + op2);
                 break;
             case '-' :
                 response.SetResult(op1 - op2);
                 break;
             case '*' :
                 response.SetResult(op1 * op2);
                 break;
             case '/' :
                 if (op2 == 0)
                 {
                     response.SetStatusCode(1); // Error: División por cero
                 }
                 else
                 {
                     response.SetResult(op1 / op2);
                 }
                 break;
             case '%' :
                 if (op2 == 0)
                 {
                     response.SetStatusCode(2); // Error: Módulo por cero
                 }
                 else
                 {
                     response.SetResult(op1 % op2);
                 }
                 break;
             default:
                 response.SetStatusCode(3); // Error: Operación inválida
                 break;
         }
         return response;
     }
 };
 

Resultado de la Prueba

Los resultados de las pruebas muestran el funcionamiento correcto de la comunicación cliente-servidor para las operaciones de cálculo.

Gestión de Procesos: Daemonización

Los servidores de red deben ejecutarse de forma continua e independiente de la sesión del terminal desde la que se iniciaron. Si el terminal se cierra, el servidor no debería detenerse. Esto se logra mediante la daemonización, convirtiendo el proceso del servidor en un demonio (o proceso daemon).

Conceptos Previos: Grupos de Procesos y Sesiones

  • Grupo de Procesos (Process Group): Un conjunto de uno o más procesos relacionados que comparten un ID de grupo de procesos (PGID). El PGID suele ser el PID del proceso líder del grupo.
  • Sesión (Session): Una colección de uno o más grupos de procesos. Una sesión tiene un ID de sesión (SID). Al iniciar sesión en un sistema Unix, se crea una nueva sesión y un grupo de procesos inicial (generalmente el shell).

Cuando un proceso se ejecuta en primer plano (foreground), tiene acceso al terminal de control para entrada/salida estándar. Los procesos en segundo plano (background) no tienen este acceso directo al terminal. Un terminal puede tener un solo grupo de procesos en primer plano y cero o más en segundo plano.

Daemonización: Creando un Proceso Demonio

Para que un proceso servidor funcione como demonio, debe:

  1. Ignorar Señales Relevantes: Ignorar señales como SIGPIPE (enviado si se escribe en un socket cerrado) y SIGCHLD (enviado cuando un proceso hijo termina).
  2. Desvincularse de la Sesión del Terminal: Crear un nuevo proceso hijo y hacer que el padre termine inmediatamente. Esto convierte al hijo en un proceso huérfano, adoptado por el proceso init (PID 1).
  3. Crear una Nueva Sesión: Utilizar la llamada al sistema setsid() para que el proceso huérfano se convierta en el líder de una nueva sesión y un nuevo grupo de procesos, completamente separado del terminal original.
  4. Cambiar el Directorio de Trabajo: Opcionalmente, cambiar el directorio de trabajo a la raíz (/) para evitar que el demonio mantenga un directorio bloqueado.
  5. Redirigir E/S Estándar: Redirigir stdin, stdout y stderr a /dev/null para que el demonio no intente interactuar con el terminal.

Implementación de la Daemonización

El siguiente código muestra cómo implementar estas etapas:


 #include <iostream>
 #include <string>
 #include <cstdio>
 #include <sys/types.h>
 #include <unistd.h>
 #include <signal.h>
 #include <sys/stat.h>
 #include <fcntl.h>

 // Asumiendo que LogModule y Common.hpp (con macros de error) están incluidos
 // #include "Log.hpp"
 // #include "Common.hpp"

 const std::string DEV_NULL = "/dev/null";

 void BecomeDaemon(bool changeDir = true, bool closeIO = true)
 {
     // 1. Ignorar señales importantes
     signal(SIGPIPE, SIG_IGN);
     signal(SIGCHLD, SIG_IGN);

     // 2. Crear un proceso hijo y que el padre termine
     if (fork() > 0) {
         exit(0); // Padre termina
     }

     // 3. El hijo se convierte en líder de una nueva sesión y grupo de procesos
     setsid();

     // 4. Cambiar directorio de trabajo si se especifica
     if (changeDir) {
         if (chdir("/") != 0) {
             // logger(LogLevel::ERROR) << "Failed to change directory to /";
             // Consider error handling
         }
     }

     // 5. Redirigir E/S estándar si se especifica
     if (closeIO)
     {
         int nullFd = open(DEV_NULL.c_str(), O_RDWR);
         if (nullFd < 0)
         {
             // logger(LogLevel::FATAL) << "Failed to open " << DEV_NULL;
             // exit(OPEN_ERR);
         }
         else
         {
             dup2(nullFd, STDIN_FILENO);  // Redirigir stdin
             dup2(nullFd, STDOUT_FILENO); // Redirigir stdout
             dup2(nullFd, STDERR_FILENO); // Redirigir stderr
             close(nullFd); // Cerrar el descriptor de archivo original
         }
     }
 }
 

Al ejecutar el servidor después de llamar a BecomeDaemon(), la salida estándar y los errores se redirigirán a /dev/null, y el proceso continuará ejecutándose incluso después de cerrar el terminal.

Etiquetas: tcp sockets protocolos Serialización daemon

Publicado el 7-3 19:32