Skip to content

Commit bfd3d83

Browse files
committed
fix mem leak
Change-Id: Iae6c4757a928da0cde6d5c1115b4a745620d7d81
1 parent d750461 commit bfd3d83

File tree

1 file changed

+18
-16
lines changed

1 file changed

+18
-16
lines changed

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.netty.handler.timeout.IdleStateEvent;
4141
import io.netty.handler.timeout.IdleStateHandler;
4242
import io.netty.resolver.NoopAddressResolverGroup;
43+
import io.netty.util.AttributeKey;
4344
import io.netty.util.HashedWheelTimer;
4445
import io.netty.util.Timeout;
4546
import io.netty.util.TimerTask;
@@ -96,6 +97,9 @@
9697
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
9798
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
9899

100+
private static final AttributeKey<ChannelWrapper> CHANNEL_WRAPPER_ATTRIBUTE_KEY = AttributeKey.valueOf(
101+
"channelWrapper");
102+
99103
private static final long LOCK_TIMEOUT_MILLIS = 3000;
100104
private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
101105

@@ -106,7 +110,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
106110
private final Map<String /* cidr */, SocksProxyConfig /* proxy */> proxyMap = new HashMap<>();
107111
private final ConcurrentHashMap<String /* cidr */, Bootstrap> bootstrapMap = new ConcurrentHashMap<>();
108112
private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
109-
private final ConcurrentMap<Channel, ChannelWrapper> channelWrapperTables = new ConcurrentHashMap<>();
110113

111114
private final HashedWheelTimer timer = new HashedWheelTimer(r -> new Thread(r, "ClientHouseKeepingService"));
112115

@@ -381,7 +384,6 @@ public void shutdown() {
381384
channel.getValue().close();
382385
}
383386

384-
this.channelWrapperTables.clear();
385387
this.channelTables.clear();
386388

387389
this.eventLoopGroupWorker.shutdownGracefully();
@@ -439,7 +441,8 @@ public void closeChannel(final String addr, final Channel channel) {
439441
}
440442

441443
if (removeItemFromTable) {
442-
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
444+
ChannelWrapper channelWrapper =
445+
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
443446
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
444447
this.channelTables.remove(addrRemote);
445448
}
@@ -487,7 +490,8 @@ public void closeChannel(final Channel channel) {
487490
}
488491

489492
if (removeItemFromTable) {
490-
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
493+
ChannelWrapper channelWrapper =
494+
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
491495
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
492496
this.channelTables.remove(addrRemote);
493497
}
@@ -724,7 +728,6 @@ private ChannelWrapper createChannel(String addr) {
724728
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
725729
ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
726730
this.channelTables.put(addr, cw);
727-
this.channelWrapperTables.put(channelFuture.channel(), cw);
728731
return cw;
729732
}
730733

@@ -831,17 +834,12 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
831834
if (response.getCode() == ResponseCode.GO_AWAY) {
832835
if (nettyClientConfig.isEnableReconnectForGoAway()) {
833836
LOGGER.info("Receive go away from channelId={}, channel={}", channel.id(), channel);
834-
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
835-
try {
836-
if (channelWrapper0.reconnect(channel0)) {
837-
LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, channelWrapper0.getChannel().id());
838-
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
839-
}
840-
} catch (Throwable t) {
841-
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
842-
}
843-
return channelWrapper0;
844-
});
837+
ChannelWrapper channelWrapper = RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY,
838+
channel);
839+
if (channelWrapper != null && channelWrapper.reconnect(channel)) {
840+
LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}",
841+
channel.id(), channel, channelWrapper.getChannel().id());
842+
}
845843
if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) {
846844
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
847845
retryRequest.setBody(request.getBody());
@@ -1006,6 +1004,7 @@ public ChannelWrapper(String address, ChannelFuture channelFuture) {
10061004
this.channelFuture = channelFuture;
10071005
this.lastResponseTime = System.currentTimeMillis();
10081006
this.channelAddress = address;
1007+
RemotingHelper.setPropertyToAttr(channelFuture.channel(), CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
10091008
}
10101009

10111010
public boolean isOK() {
@@ -1055,10 +1054,13 @@ public boolean reconnect(Channel channel) {
10551054
if (isWrapperOf(channel)) {
10561055
channelToClose = channelFuture;
10571056
channelFuture = doConnect(channelAddress);
1057+
RemotingHelper.setPropertyToAttr(channelFuture.channel(), CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
10581058
return true;
10591059
} else {
10601060
LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
10611061
}
1062+
} catch (Throwable t) {
1063+
LOGGER.error("ChannelWrapper {} reconnect error", this, t);
10621064
} finally {
10631065
lock.writeLock().unlock();
10641066
}

0 commit comments

Comments
 (0)