Decodificadores fundamentales en Netty para gestión de tramas

En el desarrollo de aplicaciones de red basadas en Netty, es común enfrentar los problemas de fragmentación (half packet) y concatenación (sticky packet) de los mensajes TCP. Para solucionar esto, Netty proporciona varios decodificadores de tramas que extienden la clase abstracta ByteToMessageDecoder. A continuación, se analizan los más utilizados y su funcionamiento interno.

Decodificador de longitud fija

El decodificador FixedLengthFrameDecoder determina los límites de las tramas suponiendo que cada mensaje tiene una longitud de bytes constante. Su implementación principal es directa: solo requiere verificar si hay suficientes bytes disponibles para formar una trama completa.

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int fixedLen;

    public FixedLengthFrameDecoder(int len) {
        if (len <= 0) throw new IllegalArgumentException("len debe ser positivo");
        this.fixedLen = len;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> output) {
        while (buffer.readableBytes() >= fixedLen) {
            ByteBuf frame = buffer.readRetainedSlice(fixedLen);
            output.add(frame);
        }
    }
}

Decodificador basado en delimitador de línea

El decodificador LineBasedFrameDecoder utiliza los caracteres de fin de línea (\n o \r\n) para separar los mensajes. Maneja casos donde la línea excede la longitud máxima configurada, con opciones para fallo rápido o diferido.

public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    private final int maxLen;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discarding;
    private int discardedBytes;
    private int nextSearchIndex;

    public LineBasedFrameDecoder(int maxLength) {
        this(maxLength, true, false);
    }

    public LineBasedFrameDecoder(int maxLength, boolean strip, boolean fail) {
        this.maxLen = maxLength;
        this.stripDelimiter = strip;
        this.failFast = fail;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> output) {
        int eolPos = findLineEnd(buffer);
        if (!discarding) {
            if (eolPos >= 0) {
                int lineLen = eolPos - buffer.readerIndex();
                int delimLen = (buffer.getByte(eolPos) == '\r') ? 2 : 1;
                if (lineLen > maxLen) {
                    buffer.readerIndex(eolPos + delimLen);
                    throw new TooLongFrameException("Línea excede maxLen");
                }
                ByteBuf frame;
                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(lineLen);
                    buffer.skipBytes(delimLen);
                } else {
                    frame = buffer.readRetainedSlice(lineLen + delimLen);
                }
                output.add(frame);
            } else {
                if (buffer.readableBytes() > maxLen) {
                    discardedBytes = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discarding = true;
                    nextSearchIndex = 0;
                    if (failFast) {
                        throw new TooLongFrameException("Línea excede maxLen");
                    }
                }
            }
        } else {
            if (eolPos >= 0) {
                int totalDiscarded = discardedBytes + (eolPos - buffer.readerIndex());
                int delimLen = (buffer.getByte(eolPos) == '\r') ? 2 : 1;
                buffer.readerIndex(eolPos + delimLen);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    throw new TooLongFrameException("Línea excede maxLen: " + totalDiscarded);
                }
            } else {
                discardedBytes += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
                nextSearchIndex = 0;
            }
        }
    }

    private int findLineEnd(ByteBuf buf) {
        int start = buf.readerIndex() + nextSearchIndex;
        int end = buf.writerIndex();
        for (int i = start; i < end; i++) {
            byte b = buf.getByte(i);
            if (b == '\n') {
                int pos = (i > start && buf.getByte(i - 1) == '\r') ? i - 1 : i;
                nextSearchIndex = 0;
                return pos;
            }
        }
        nextSearchIndex = end - buf.readerIndex();
        return -1;
    }
}

Decodificador basado en delimitador personalizado

El decodificador DelimiterBasedFrameDecoder exteinde el concepto anterior para permitir cualquier secuencia de bytes como delimitador. Internamente, si se usan los delimitadores de línea estándar, delega en LineBasedFrameDecoder.

public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf[] delimiters;
    private final int maxFrameLen;
    private final boolean stripDelim;
    private final boolean failFast;
    private boolean discardingFrame;
    private long tooLongFrameLen;
    private LineBasedFrameDecoder lineDecoder;

    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean strip, boolean fail, ByteBuf... delims) {
        validateMaxLength(maxFrameLength);
        if (delims.length == 0) throw new IllegalArgumentException("Debe especificar al menos un delimitador");
        if (isLineBased(delims)) {
            lineDecoder = new LineBasedFrameDecoder(maxFrameLength, strip, fail);
            this.delimiters = null;
        } else {
            this.delimiters = new ByteBuf[delims.length];
            for (int i = 0; i < delims.length; i++) {
                validateDelimiter(delims[i]);
                this.delimiters[i] = delims[i].slice(delims[i].readerIndex(), delims[i].readableBytes());
            }
            lineDecoder = null;
        }
        this.maxFrameLen = maxFrameLength;
        this.stripDelim = strip;
        this.failFast = fail;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> output) {
        if (lineDecoder != null) {
            lineDecoder.decode(ctx, buffer, output);
            return;
        }
        int minLen = Integer.MAX_VALUE;
        ByteBuf minDelim = null;
        for (ByteBuf delim : delimiters) {
            int pos = indexOf(buffer, delim);
            if (pos >= 0 && pos < minLen) {
                minLen = pos;
                minDelim = delim;
            }
        }
        if (minDelim != null) {
            int delimLen = minDelim.capacity();
            if (discardingFrame) {
                discardingFrame = false;
                buffer.skipBytes(minLen + delimLen);
                if (!failFast) {
                    throw new TooLongFrameException("Trama excede maxLen");
                }
                return;
            }
            if (minLen > maxFrameLen) {
                buffer.skipBytes(minLen + delimLen);
                throw new TooLongFrameException("Trama excede maxLen");
            }
            ByteBuf frame;
            if (stripDelim) {
                frame = buffer.readRetainedSlice(minLen);
                buffer.skipBytes(delimLen);
            } else {
                frame = buffer.readRetainedSlice(minLen + delimLen);
            }
            output.add(frame);
        } else {
            if (!discardingFrame) {
                if (buffer.readableBytes() > maxFrameLen) {
                    tooLongFrameLen = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discardingFrame = true;
                    if (failFast) {
                        throw new TooLongFrameException("Trama excede maxLen");
                    }
                }
            } else {
                tooLongFrameLen += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
        }
    }

    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
            int haystackIdx = i;
            int needleIdx;
            for (needleIdx = 0; needleIdx < needle.capacity(); needleIdx++) {
                if (haystack.getByte(haystackIdx) != needle.getByte(needleIdx)) {
                    break;
                }
                haystackIdx++;
                if (haystackIdx == haystack.writerIndex() && needleIdx != needle.capacity() - 1) {
                    return -1;
                }
            }
            if (needleIdx == needle.capacity()) {
                return i - haystack.readerIndex();
            }
        }
        return -1;
    }
}

Decodificador basado en campo de longitud

El decodificador LengthFieldBasedFrameDecoder es el más flexible. Extrae la longitud de la trama de un campo dentro de los propios datos del protocolo. Los parámetros clave son:

  • lengthFieldOffset: posición de inicio del campo de longitud.
  • lengthFieldLength: tamaño en bytes del campo de longitud (1,2,3,4 u 8).
  • lengthAdjustment: valor para ajustar la longitud extraída y obtener el tamaño total de la trama.
  • initialBytesToStrip: bytes a omitir al inicio de la trama resultante.
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteOrder byteOrder;
    private final int maxFrameLen;
    private final int lengthFieldOffset;
    private final int lengthFieldLen;
    private final int lengthFieldEndOffset;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;
    private final boolean failFast;
    private boolean discardingFrame;
    private long tooLongFrameLen;
    private long bytesToDiscard;
    private int currentFrameLen = -1;

    public LengthFieldBasedFrameDecoder(ByteOrder order, int maxLen, int fieldOffset, int fieldLen, int adjustment, int strip, boolean fail) {
        this.byteOrder = (order != null) ? order : ByteOrder.BIG_ENDIAN;
        this.maxFrameLen = maxLen;
        this.lengthFieldOffset = fieldOffset;
        this.lengthFieldLen = fieldLen;
        this.lengthFieldEndOffset = fieldOffset + fieldLen;
        this.lengthAdjustment = adjustment;
        this.initialBytesToStrip = strip;
        this.failFast = fail;
        validateParameters();
    }

    private void validateParameters() {
        if (maxFrameLen <= 0) throw new IllegalArgumentException("maxLen debe ser positivo");
        if (lengthFieldOffset < 0) throw new IllegalArgumentException("fieldOffset no puede ser negativo");
        if (initialBytesToStrip < 0) throw new IllegalArgumentException("strip no puede ser negativo");
        if (lengthFieldOffset > maxFrameLen - lengthFieldLen) {
            throw new IllegalArgumentException("maxLen debe ser >= fieldOffset + fieldLen");
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> output) throws Exception {
        if (currentFrameLen == -1) {
            if (discardingFrame) {
                discardFrame(buffer);
            }
            if (buffer.readableBytes() < lengthFieldEndOffset) {
                return;
            }
            int offset = buffer.readerIndex() + lengthFieldOffset;
            long lenValue = readLength(buffer, offset, lengthFieldLen, byteOrder);
            if (lenValue < 0) {
                buffer.skipBytes(lengthFieldEndOffset);
                throw new CorruptedFrameException("Longitud negativa: " + lenValue);
            }
            lenValue += lengthAdjustment + lengthFieldEndOffset;
            if (lenValue < lengthFieldEndOffset) {
                buffer.skipBytes(lengthFieldEndOffset);
                throw new CorruptedFrameException("Longitud ajustada demasiado pequeña");
            }
            if (lenValue > maxFrameLen) {
                handleExceededFrame(buffer, lenValue);
                return;
            }
            currentFrameLen = (int) lenValue;
        }
        if (buffer.readableBytes() < currentFrameLen) {
            return;
        }
        if (initialBytesToStrip > currentFrameLen) {
            throw new CorruptedFrameException("strip excede longitud de trama");
        }
        buffer.skipBytes(initialBytesToStrip);
        int frameStart = buffer.readerIndex();
        int actualLen = currentFrameLen - initialBytesToStrip;
        ByteBuf frame = buffer.retainedSlice(frameStart, actualLen);
        buffer.readerIndex(frameStart + actualLen);
        currentFrameLen = -1;
        output.add(frame);
    }

    private long readLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        ByteBuf orderedBuf = buf.order(order);
        switch (length) {
            case 1: return orderedBuf.getUnsignedByte(offset);
            case 2: return orderedBuf.getUnsignedShort(offset);
            case 3: return orderedBuf.getUnsignedMedium(offset);
            case 4: return orderedBuf.getUnsignedInt(offset);
            case 8: return orderedBuf.getLong(offset);
            default: throw new DecoderException("lengthFieldLen no soportado: " + length);
        }
    }

    private void handleExceededFrame(ByteBuf buffer, long frameLength) {
        long discard = frameLength - buffer.readableBytes();
        tooLongFrameLen = frameLength;
        if (discard <= 0) {
            buffer.skipBytes((int) frameLength);
        } else {
            discardingFrame = true;
            bytesToDiscard = discard;
            buffer.skipBytes(buffer.readableBytes());
        }
        failIfNecessary(true);
    }

    private void discardFrame(ByteBuf buffer) {
        long toDiscard = Math.min(bytesToDiscard, buffer.readableBytes());
        buffer.skipBytes((int) toDiscard);
        bytesToDiscard -= toDiscard;
        failIfNecessary(false);
    }

    private void failIfNecessary(boolean firstDetection) {
        if (bytesToDiscard == 0) {
            long frameLen = tooLongFrameLen;
            tooLongFrameLen = 0;
            discardingFrame = false;
            if (!failFast || firstDetection) {
                throw new TooLongFrameException("Trama excede maxLen: " + frameLen);
            }
        } else {
            if (failFast && firstDetection) {
                throw new TooLongFrameException("Trama excede maxLen: " + tooLongFrameLen);
            }
        }
    }
}

Ejemplo de servidor con decodificador por delimitador de línea

public class ServidorNetty {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                 @Override
                                 protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                     System.out.println("Mensaje recibido: " + msg);
                                 }
                             });
                         }
                     });
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Ejemplo de cliente con codificador de longitud

public class ClienteNetty {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ch.pipeline().addLast(new LengthFieldPrepender(4, 0, false));
                             ch.pipeline().addLast(new StringEncoder());
                             ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                 @Override
                                 public void channelActive(ChannelHandlerContext ctx) {
                                     for (int i = 1; i <= 3; i++) {
                                         ctx.writeAndFlush("Mensaje #" + i);
                                     }
                                 }
                             });
                         }
                     });
            ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

Etiquetas: Netty ByteToMessageDecoder FixedLengthFrameDecoder LineBasedFrameDecoder DelimiterBasedFrameDecoder

Publicado el 6-8 04:05