Continuación del artículo anterior: Comunicación entre procesos (II): Implementación de un pool de procesos de alta disponibilidad.
- Fenómeno observado: dos problemas clave
- Bloqueo de procesos: el proceso hijo se queda detenido en
read, y el proceso padre también se bloquea enwaitpid, impidiendo la finalización normal del programa. - Procesos hijos no recolectados: al ejecutar
ps ajx | grep ProcessPoolse ven 6 procesos (1 padre + 5 hijos). Esto indica que los hijos terminaron pero se convirtieron en procesos zombies, porque el padre no logró recolectarlos.
- Causa principal: fuga de descriptores de archivo (extremo de lectura del pipe no cerrado correctamente)
2.1. Regla fundamental del pipe
El comportamiento de read en un pipe depende de que todos los extremos de escritura estén cerrados:
- Si al menos un descriptor de escritura sigue abierto,
readse bloquea indefinidamente y nunca retorna 0 (EOF). - Solo cuando todos los extremos de escrituar están cerrados,
readretorna 0 y el proceso hijo puede salir del bucle.
En ProcessPool::Start() se crean procesos hijos y pipes:
for (int i = 0; i < _process_num; i++)
{
int pipefd[2] = {0};
pipe(pipefd);
pid_t subid = fork();
if (subid == 0)
{
close(pipefd[1]); // hijo cierra escritura
Work(pipefd[0]); // hijo bloqueado en read(pipefd[0])
close(pipefd[0]);
exit(0);
}
else
{
close(pipefd[0]); // padre cierra lectura
_cm.Insert(pipefd[1], subid); // padre conserva escritura
}
}
El problema es que cada hijo hereda los descriptores de archivo de los pipes creados anteriormente por el padre.
- En la primera iteración: padre crea pipe1 → fork hijo1 → hijo1 hereda ambos extremos del pipe1.
- En la segunda iteración: padre crea pipe2 → fork hijo2 → hijo2 hereda el extremo de escritura del pipe1 + ambos extremos del pipe2.
- En la tercera iteración: padre crea pipe3 → fork hijo3 → hijo3 hereda escritura de pipe1, escritura de pipe2 y ambos extremos de pipe3.
- … y así sucesivamente; el quinto hijo hereda todos los extremos de escritura de los 4 pipes anteriores.
2.2. Consecuencia: los extremos de escritura nunca se cierran todos
Al llamar a Stop() para cerrar todos los extremos de escritura que posee el padre:
- Los extremos del padre se cierran, pero los hijos aún mantienen abiertos extremos de escritura de pipes anteriores.
- Para el pipe1: los hijos 2 a 5 todavía tienen su extremo de escritura → el contador de escrituras del pipe1 sigue siendo > 0.
- El hijo1 llama a
read(pipefd[0])y descubre que aún hay extremos de escritura abiertos → se bloquea permanentemente, nunca sale. - El padre llama a
waitpidy el hijo sigue vivo → el padre también se bloquea. - Finalmente: todos los hijos quedan atascados en
read, el padre atascado enwaitpid, el programa se cuelga.
- Diagrama: por qué los hijos heredan descriptores del padre
En Unix, tras un fork(), el proceso hijo obtiene una copia de la tabla de descriptores del padre. Cualquier descriptor abierto antes del fork permanece abierto en el hijo a menos que se cierre explícitamente. Esto incluye los extremos de lectura y escritura de todos los pipes creados previamente.
- ¿Por qué los hijos no se recolectan?
Porque los hijos nunca terminan realmente:
- El hijo permanece bloqueado en
ready no ejecutaexit(0). - El padre llama a
waitpidmientras el hijo aún se ejecuta →waitpidse bloquea esperando. - Un proceso hijo solo se convierte en zombie si termina pero el padre no lo recolecta. Aquí el hijo no ha terminado; el padre está esperándolo. Es un deadlock.
- Solución
5.1. Cerrar en orden inverso
Una opción es cerrar los pipes y recolectar hijos en orden inverso al de creación, de modo que el último hijo creado no herede escrituras de pipes posteriores. Sin embargo, esto no resuelve completamente la herencia de escrituras de pipes anteriores.
5.2. Cerrar todos los extremos de ecsritura heredados dentro de cada hijo
En el código del hijo, antes de comenzar a leer, se debe cerrar cualquier extremo de escritura que haya sido heredado. Para ello, el ChannelManager ofrece el método CloseAll(), que cierra todos los extremos de escritura gestionados hasta ese momento. Invocándolo en el hijo justo después del fork se eliminan las escrituras heredadas.
// Versión corregida con cierre de escrituras heredadas
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"
class Channel {
public:
Channel(int fd, pid_t id) : _wfd(fd), _subid(id) {
_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
~Channel() {}
void Send(int code) {
int n = write(_wfd, &code, sizeof(code));
(void)n;
}
void Close() {
close(_wfd);
}
void Wait() {
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid;
}
int Fd() { return _wfd; }
pid_t SubId() { return _subid; }
std::string Name() { return _name; }
private:
int _wfd;
pid_t _subid;
std::string _name;
};
class ChannelManager {
public:
ChannelManager() : _next(0) {}
void Insert(int wfd, pid_t subid) {
_channels.emplace_back(wfd, subid);
}
Channel& Select() {
auto& c = _channels[_next];
_next = (_next + 1) % _channels.size();
return c;
}
void PrintfChannel() {
for (auto& ch : _channels)
std::cout << ch.Name() << std::endl;
}
void CloseAll() {
for (auto& ch : _channels)
ch.Close();
}
void StopSubProcess() {
for (auto& ch : _channels) {
ch.Close();
std::cout << "Cerrando: " << ch.Name() << std::endl;
}
}
void WaitSubProcess() {
for (auto& ch : _channels) {
ch.Wait();
std::cout << "Recolectado: " << ch.Name() << std::endl;
}
}
void CloseAndWait() {
// Opción 2: cerrar y esperar en orden directo (funciona tras corrección en hijo)
for (auto& ch : _channels) {
ch.Close();
std::cout << "Cerrando: " << ch.Name() << std::endl;
ch.Wait();
std::cout << "Recolectado: " << ch.Name() << std::endl;
}
// Opción alternativa: cerrar en orden inverso
// for (int i = _channels.size() - 1; i >= 0; i--) {
// _channels[i].Close();
// _channels[i].Wait();
// }
}
~ChannelManager() {}
private:
std::vector<Channel> _channels;
int _next;
};
const int gdefaultnum = 5;
class ProcessPool {
public:
ProcessPool(int num) : _process_num(num) {
_tm.Register(PrintLog);
_tm.Register(Download);
_tm.Register(Upload);
}
void Work(int rfd) {
while (true) {
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0) {
if (n != sizeof(code)) continue;
std::cout << "Hijo[" << getpid() << "] recibe código de tarea: " << code << std::endl;
_tm.Execute(code);
} else if (n == 0) {
std::cout << "Hijo sale" << std::endl;
break;
} else {
std::cout << "Error de lectura" << std::endl;
break;
}
}
}
bool Start() {
for (int i = 0; i < _process_num; i++) {
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0) return false;
pid_t subid = fork();
if (subid < 0) return false;
else if (subid == 0) {
// Hijo: cierra todos los extremos de escritura heredados
_cm.CloseAll();
close(pipefd[1]); // cierra escritura de su propio pipe
Work(pipefd[0]);
close(pipefd[0]);
exit(0);
} else {
// Padre
close(pipefd[0]); // cierra lectura
_cm.Insert(pipefd[1], subid);
}
}
return true;
}
void Debug() { _cm.PrintfChannel(); }
void Run() {
int taskcode = _tm.Code();
auto& c = _cm.Select();
std::cout << "Seleccionado hijo: " << c.Name() << std::endl;
c.Send(taskcode);
std::cout << "Enviado código de tarea: " << c.Name() << std::endl;
}
void Stop() {
_cm.CloseAndWait();
}
~ProcessPool() {}
private:
ChannelManager _cm;
int _process_num;
TaskManager _tm;
};
#endif
Con esta corrección, cada hijo, justo después de bifurcarse, cierra todos los extremos de escritura de los pipes de sus hermanos que han sido almacenados en _cm. De esta forma, cuando el padre cierre sus propios extremos de escritura, todos los extremos de escritura de cada pipe quedarán efectivamente cerrados, permitiendo que los hijos salgan del read y el padre los recolecte sin bloqueo.