From 4d002c0747431ff347451a63fbeceb04e6f30fff Mon Sep 17 00:00:00 2001 From: LI123456mo Date: Wed, 4 Feb 2026 18:40:24 +0300 Subject: [PATCH] refactor(tri): implement zero-copy in DataQueueCommand to avoid arraycopy #15597 --- .../tri/command/DataQueueCommand.java | 22 +++++++++++-------- .../stream/AbstractTripleClientStream.java | 3 ++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java index a9ef447abb4d..33c66972820e 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java @@ -19,20 +19,21 @@ import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2DataFrame; public class DataQueueCommand extends StreamQueueCommand { - private final byte[] data; + private final ByteBuf data; private final int compressFlag; private final boolean endStream; private DataQueueCommand( - TripleStreamChannelFuture streamChannelFuture, byte[] data, int compressFlag, boolean endStream) { + TripleStreamChannelFuture streamChannelFuture, ByteBuf data, int compressFlag, boolean endStream) { super(streamChannelFuture); this.data = data; this.compressFlag = compressFlag; @@ -40,7 +41,7 @@ private DataQueueCommand( } public static DataQueueCommand create( - TripleStreamChannelFuture streamChannelFuture, byte[] data, boolean endStream, int compressFlag) { + TripleStreamChannelFuture streamChannelFuture, ByteBuf data, boolean endStream, int compressFlag) { return new DataQueueCommand(streamChannelFuture, data, compressFlag, endStream); } @@ -49,16 +50,19 @@ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) { if (data == null) { ctx.write(new DefaultHttp2DataFrame(endStream), promise); } else { - ByteBuf buf = ctx.alloc().buffer(); - buf.writeByte(compressFlag); - buf.writeInt(data.length); - buf.writeBytes(data); - ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise); + ByteBuf header = ctx.alloc().buffer(5); + header.writeByte(compressFlag); + header.writeByte(data.readableBytes()); + + CompositeByteBuf composite = ctx.alloc().compositeBuffer(); + composite.addComponents(true, header, this.data); + + ctx.write(new DefaultHttp2DataFrame(composite, endStream), promise); } } // for test - public byte[] getData() { + public ByteBuf getData() { return data; } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java index 339517e71442..25a5311b6315 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java @@ -205,7 +205,8 @@ public ChannelFuture sendMessage(byte[] message, int compressFlag) { final int messageSize = message.length; onSendingBytes(messageSize); - final DataQueueCommand cmd = DataQueueCommand.create(streamChannelFuture, message, false, compressFlag); + final DataQueueCommand cmd = DataQueueCommand.create( + streamChannelFuture, io.netty.buffer.Unpooled.wrappedBuffer(message), false, compressFlag); return this.writeQueue.enqueueFuture(cmd, parent.eventLoop()).addListener(future -> { if (!future.isSuccess()) { rollbackSendingBytes(messageSize);