From 088a01227942fae8dec7c62301e1c8640656e4ec Mon Sep 17 00:00:00 2001 From: wushengyu Date: Wed, 25 Sep 2024 12:31:00 +0800 Subject: [PATCH] fix beat --- internal/client.go | 9 +++++---- internal/trace.go | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/client.go b/internal/client.go index 6f27f5e8..208c2c79 100644 --- a/internal/client.go +++ b/internal/client.go @@ -411,8 +411,8 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R } func (c *rmqClient) Start() { - //ctx, cancel := context.WithCancel(context.Background()) - //c.cancel = cancel + // ctx, cancel := context.WithCancel(context.Background()) + // c.cancel = cancel atomic.AddInt32(&c.instanceCount, 1) c.once.Do(func() { if !c.option.Credentials.IsEmpty() { @@ -467,9 +467,10 @@ func (c *rmqClient) Start() { go primitive.WithRecover(func() { op := func() { c.GetNameSrv().cleanOfflineBroker() - c.SendHeartbeatToAllBrokerWithLock() + if c.option.InstanceName != TraceInstanceName { + c.SendHeartbeatToAllBrokerWithLock() + } } - time.Sleep(time.Second) op() diff --git a/internal/trace.go b/internal/trace.go index 24e075ce..44c0fbbe 100644 --- a/internal/trace.go +++ b/internal/trace.go @@ -103,7 +103,7 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean { } else { buffer.WriteString(bean.Topic) } - //buffer.WriteString(bean.Topic) + // buffer.WriteString(bean.Topic) buffer.WriteRune(contentSplitter) buffer.WriteString(bean.MsgId) buffer.WriteRune(contentSplitter) @@ -208,8 +208,9 @@ const ( maxMsgSize = 128000 - 10*1000 batchSize = 100 - TraceTopicPrefix = SystemTopicPrefix + "TRACE_DATA_" - TraceGroupName = "_INNER_TRACE_PRODUCER" + TraceTopicPrefix = SystemTopicPrefix + "TRACE_DATA_" + TraceGroupName = "_INNER_TRACE_PRODUCER" + TraceInstanceName = "INNER_TRACE_CLIENT_DEFAULT" ) type TraceDispatcher interface { @@ -276,7 +277,7 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher { cliOp := DefaultClientOptions() cliOp.GroupName = traceCfg.GroupName cliOp.NameServerAddrs = traceCfg.NamesrvAddrs - cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT" + cliOp.InstanceName = TraceInstanceName cliOp.RetryTimes = 0 cliOp.Namesrv = srvs cliOp.Credentials = traceCfg.Credentials