From 50e81cf33151698db8412700c1b780a9d00ef210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E9=87=91=E8=99=8E?= <1050780355@qq.com> Date: Fri, 6 Feb 2026 18:03:31 +0800 Subject: [PATCH] fix: fallback to replset primary when leader lease expired --- pkg/lorry/engines/mongodb/manager.go | 76 ++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/pkg/lorry/engines/mongodb/manager.go b/pkg/lorry/engines/mongodb/manager.go index bea7e22d2b1..2238f01b872 100644 --- a/pkg/lorry/engines/mongodb/manager.go +++ b/pkg/lorry/engines/mongodb/manager.go @@ -366,13 +366,79 @@ func (mgr *Manager) GetReplSetClient(ctx context.Context, cluster *dcs.Cluster) } func (mgr *Manager) GetLeaderClient(ctx context.Context, cluster *dcs.Cluster) (*mongo.Client, error) { - if cluster.Leader == nil || cluster.Leader.Name == "" { - return nil, fmt.Errorf("cluster has no leader") + if cluster == nil { + return nil, fmt.Errorf("cluster is nil") } - leaderMember := cluster.GetMemberWithName(cluster.Leader.Name) - host := cluster.GetMemberAddrWithPort(*leaderMember) - return NewReplSetClient(context.TODO(), []string{host}) + if cluster.Leader != nil && cluster.Leader.Name != "" { + leaderMember := cluster.GetMemberWithName(cluster.Leader.Name) + if leaderMember != nil { + host := cluster.GetMemberAddrWithPort(*leaderMember) + client, err := NewReplSetClient(ctx, []string{host}) + if err == nil { + return client, nil + } + mgr.Logger.Info("Get leader client failed, fallback to discover primary", "error", err.Error()) + } + } + + primaryMember, err := mgr.findPrimaryMember(ctx, cluster) + if err != nil { + return nil, err + } + host := cluster.GetMemberAddrWithPort(*primaryMember) + return NewReplSetClient(ctx, []string{host}) +} + +func (mgr *Manager) findPrimaryMember(ctx context.Context, cluster *dcs.Cluster) (*dcs.Member, error) { + if cluster == nil { + return nil, fmt.Errorf("cluster is nil") + } + + status, err := func() (*ReplSetStatus, error) { + client, err := mgr.GetReplSetClient(ctx, cluster) + if err != nil { + return nil, err + } + defer client.Disconnect(ctx) //nolint:errcheck + return GetReplSetStatus(ctx, client) + }() + if err == nil { + if member := primaryMemberFromStatus(cluster, status); member != nil { + return member, nil + } + } + + for _, member := range cluster.Members { + host := cluster.GetMemberAddrWithPort(member) + client, err := NewStandaloneClient(ctx, host) + if err != nil { + continue + } + status, err := GetReplSetStatus(ctx, client) + _ = client.Disconnect(ctx) //nolint:errcheck + if err != nil { + continue + } + if primary := primaryMemberFromStatus(cluster, status); primary != nil { + return primary, nil + } + } + + return nil, fmt.Errorf("cluster has no leader") +} + +func primaryMemberFromStatus(cluster *dcs.Cluster, status *ReplSetStatus) *dcs.Member { + if cluster == nil || status == nil { + return nil + } + for _, member := range status.Members { + if member == nil || member.State != MemberStatePrimary { + continue + } + return cluster.GetMemberWithHost(member.Name) + } + return nil } func (mgr *Manager) GetReplSetClientWithHosts(ctx context.Context, hosts []string) (*mongo.Client, error) {