Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,29 @@
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;

public class DataQueueCommand extends StreamQueueCommand {

private final byte[] data;
private final ByteBuf data;

private final int compressFlag;

private final boolean endStream;

private DataQueueCommand(
TripleStreamChannelFuture streamChannelFuture, byte[] data, int compressFlag, boolean endStream) {
TripleStreamChannelFuture streamChannelFuture, ByteBuf data, int compressFlag, boolean endStream) {
super(streamChannelFuture);
this.data = data;
this.compressFlag = compressFlag;
this.endStream = endStream;
}

public static DataQueueCommand create(
TripleStreamChannelFuture streamChannelFuture, byte[] data, boolean endStream, int compressFlag) {
TripleStreamChannelFuture streamChannelFuture, ByteBuf data, boolean endStream, int compressFlag) {
return new DataQueueCommand(streamChannelFuture, data, compressFlag, endStream);
}

Expand All @@ -49,16 +50,19 @@ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
if (data == null) {
ctx.write(new DefaultHttp2DataFrame(endStream), promise);
} else {
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(compressFlag);
buf.writeInt(data.length);
buf.writeBytes(data);
ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
ByteBuf header = ctx.alloc().buffer(5);
header.writeByte(compressFlag);
header.writeByte(data.readableBytes());

CompositeByteBuf composite = ctx.alloc().compositeBuffer();
composite.addComponents(true, header, this.data);

ctx.write(new DefaultHttp2DataFrame(composite, endStream), promise);
}
}

// for test
public byte[] getData() {
public ByteBuf getData() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public ChannelFuture sendMessage(byte[] message, int compressFlag) {
final int messageSize = message.length;
onSendingBytes(messageSize);

final DataQueueCommand cmd = DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
final DataQueueCommand cmd = DataQueueCommand.create(
streamChannelFuture, io.netty.buffer.Unpooled.wrappedBuffer(message), false, compressFlag);
return this.writeQueue.enqueueFuture(cmd, parent.eventLoop()).addListener(future -> {
if (!future.isSuccess()) {
rollbackSendingBytes(messageSize);
Expand Down
Loading