From bfebc36c75ec240c5b329651c4c35e4ee0dccb87 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 7 Jan 2026 17:33:59 +0800 Subject: [PATCH 1/4] feat(sync): add snapshot sequence and total index tracking in sync structures --- include/common/tmsg.h | 2 + include/libs/sync/sync.h | 2 + include/util/tdef.h | 2 +- source/common/src/msg/tmsg.c | 20 ++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 2 + source/dnode/mnode/impl/src/mndDnode.c | 11 +++++ source/dnode/mnode/impl/src/mndMain.c | 2 + source/dnode/mnode/impl/src/mndVgroup.c | 53 +++++++++++++------------ source/dnode/vnode/src/vnd/vnodeQuery.c | 6 ++- source/libs/sync/inc/syncInt.h | 4 +- source/libs/sync/src/syncMain.c | 16 +++++--- source/libs/sync/src/syncPipeline.c | 25 ++++++++---- source/libs/sync/src/syncSnapshot.c | 12 +++++- 13 files changed, 116 insertions(+), 41 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 84410a23b47e..244c899fdea7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2669,6 +2669,8 @@ typedef struct { int64_t syncCommitIndex; int64_t bufferSegmentUsed; int64_t bufferSegmentSize; + int32_t snapSeq; + int64_t syncTotalIndex; } SVnodeLoad; typedef struct { diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2dd1d4288eb7..f932b09c037f 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -265,6 +265,8 @@ typedef struct SSyncState { SyncTerm term; int64_t roleTimeMs; int64_t startTimeMs; + int32_t snapSeq; + int64_t totalIndex; } SSyncState; typedef struct SSyncMetrics { diff --git a/include/util/tdef.h b/include/util/tdef.h index 7b98f5699ced..5ea654d52db4 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -463,7 +463,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_REPLICA 5 #define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_SYNC_RESTORE_lEN 20 -#define TSDB_SYNC_APPLY_COMMIT_LEN 41 +#define TSDB_SYNC_APPLY_COMMIT_LEN 100 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256 #define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 7c11afe7109b..4bcd048fcf69 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -2047,6 +2047,13 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->auditDB)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->auditToken)); + vlen = (int32_t)taosArrayGetSize(pReq->pVloads); + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pload->snapSeq)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncTotalIndex)); + } + tEndEncode(&encoder); _exit: @@ -2202,6 +2209,19 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->clusterCfg.statusIntervalMs)); } + if (!tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *vload = taosArrayGet(pReq->pVloads, i); + if (vload == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vload->snapSeq)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload->syncTotalIndex)); + } + } + + if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeWhiteVer)); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c33a0b1baea9..744a79be065a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -783,6 +783,8 @@ typedef struct { int32_t learnerProgress; int64_t bufferSegmentUsed; int64_t bufferSegmentSize; + int32_t snapSeq; + int64_t syncTotalIndex; } SVnodeGid; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e332fc7b7935..da8268a24b4f 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -39,6 +39,7 @@ #include "taoskInt.h" #endif +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define TSDB_DNODE_VER_NUMBER 2 #define TSDB_DNODE_RESERVE_SIZE 40 @@ -585,6 +586,16 @@ static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVloa pGid->syncCommitIndex = pVload->syncCommitIndex; pGid->bufferSegmentUsed = pVload->bufferSegmentUsed; pGid->bufferSegmentSize = pVload->bufferSegmentSize; + pGid->learnerProgress = pVload->learnerProgress; + pGid->snapSeq = pVload->snapSeq; + pGid->syncTotalIndex = pVload->syncTotalIndex; + if (pVload->snapSeq >= 0 && pVload->snapSeq < INT32_MAX || pVload->syncState == TAOS_SYNC_STATE_LEARNER) { + mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64 + " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d", + vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex, + pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq); + } + if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead || pGid->startTimeMs != pVload->startTimeMs) { mInfo( diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index be181ac4b94e..50180f33ac2c 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -325,6 +325,8 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) pGid->syncRestore = 0; pGid->syncCanRead = 0; pGid->startTimeMs = 0; + pGid->learnerProgress = 0; + pGid->snapSeq = -1; stateChanged = true; } break; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3faf1d16bfcd..ee65ba4bda57 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -30,6 +30,7 @@ #include "mndUser.h" #include "tmisce.h" +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2 #define VGROUP_VER_NUMBER VGROUP_VER_COMPAT_MOUNT_KEEP_VER #define VGROUP_RESERVE_SIZE 60 @@ -179,6 +180,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { if (pVgroup->replica == 1) { pVgid->syncState = TAOS_SYNC_STATE_LEADER; } + pVgid->snapSeq = -1; } if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER) @@ -283,6 +285,8 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pNewGid->syncCommitIndex = pOldGid->syncCommitIndex; pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed; pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize; + pNewGid->learnerProgress = pOldGid->learnerProgress; + pNewGid->snapSeq = pOldGid->snapSeq; } } } @@ -1262,29 +1266,6 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) leaderState = pVgroup->vnodeGid[i].syncState; snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState)); - /* - mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress); - - if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) { - if(pVgroup->vnodeGid[i].learnerProgress < 0){ - snprintf(role, sizeof(role), "%s-", - syncStr(pVgroup->vnodeGid[i].syncState)); - - } - else if(pVgroup->vnodeGid[i].learnerProgress >= 100){ - snprintf(role, sizeof(role), "%s--", - syncStr(pVgroup->vnodeGid[i].syncState)); - } - else{ - snprintf(role, sizeof(role), "%s%d", - syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress); - } - } - else{ - snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star); - } - */ - } else { } STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); @@ -1293,8 +1274,30 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0}; char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0}; - snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex, - pVgroup->vnodeGid[i].syncCommitIndex); + + if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER && + (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) { + mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress); + + snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)", + pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex, + pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq, + pVgroup->vnodeGid[i].learnerProgress); + } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) { + mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress); + + snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)", + pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex, + pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress); + } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) { + snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)", + pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex, + pVgroup->vnodeGid[i].snapSeq); + } else { + snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex, + pVgroup->vnodeGid[i].syncCommitIndex); + } + STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3aef51edf079..d2f67e22a76f 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -969,7 +969,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->roleTimeMs = state.roleTimeMs; pLoad->startTimeMs = state.startTimeMs; pLoad->syncCanRead = state.canRead; - pLoad->learnerProgress = state.progress; pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta)); @@ -983,6 +982,11 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); vnodeGetBufferInfo(pVnode, &pLoad->bufferSegmentUsed, &pLoad->bufferSegmentSize); + vDebug("vgId:%d, get vnode load, state:%s snapSeq:%d, learnerProgress:%d, totalIndex:%" PRId64, TD_VID(pVnode), + syncStr(state.state), state.snapSeq, state.progress, state.totalIndex); + pLoad->learnerProgress = state.progress; + pLoad->snapSeq = state.snapSeq; + pLoad->syncTotalIndex = state.totalIndex; return 0; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a4b8a97804da..fc43fe081032 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -247,6 +247,8 @@ struct SSyncNode { // metrics int64_t wal_write_bytes; int64_t wal_write_time; + + int32_t snapSeq; }; // open/close -------------- @@ -320,7 +322,7 @@ SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId); // snapshot -------------- bool syncNodeHasSnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); -int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId); +int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId, char* reason); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b9553f57c0f7..08614571fc08 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -686,7 +686,8 @@ SSyncState syncGetState(int64_t rid) { } else { state.canRead = state.restored; } - /* + state.totalIndex = pSyncNode->pLogBuf->totalIndex; + double progress = 0; if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){ progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex; @@ -695,12 +696,15 @@ SSyncState syncGetState(int64_t rid) { else{ state.progress = -1; } - sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", " + if (pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { + sInfo("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 + ", " "progress:%lf, progress:%d", - pSyncNode->vgId, - pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress); - */ + pSyncNode->vgId, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress); + } + state.term = raftStoreGetTerm(pSyncNode); + state.snapSeq = pSyncNode->snapSeq; syncNodeRelease(pSyncNode); } @@ -1514,6 +1518,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t elec pSyncNode->hbrSlowNum = 0; pSyncNode->tmrRoutineNum = 0; + pSyncNode->snapSeq = -1; + sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode, pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout); return pSyncNode; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index bb205bcba908..91b890ea3358 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -1119,11 +1119,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) { char* msg1 = " rollback match index failure"; char* msg2 = " incomplete fsm state"; - sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s, match index:%" PRId64 - ", last sent:%" PRId64, - pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex, - pMsg->lastSendIndex); - if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { + sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, reason:%s, match index:%" PRId64 + ", last sent:%" PRId64, + pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex, + pMsg->lastSendIndex); + pNode->snapSeq = -1; + if ((code = syncNodeStartSnapshot(pNode, &destId, (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1))) < + 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); TAOS_RETURN(code); } @@ -1157,11 +1159,20 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR; - if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { + sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, index:%" PRId64 ", firstVer:%" PRId64 + ", term:%" PRId64 ", lastMatchTerm:%" PRId64, + pNode->vgId, DID(&destId), index, firstVer, term, pMsg->lastMatchTerm); + char reason[100] = {0}; + if (index + 1 < firstVer) + tsnprintf(reason, 100, "matched entry not in log range, index:%" PRId64 ", firstVer:%" PRId64, index, firstVer); + else if (term < 0) + tsnprintf(reason, 100, "failed to get prev log term"); + else + tsnprintf(reason, 100, "log term mismatch"); + if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); TAOS_RETURN(code); } - sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId)); return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index b857fe51bdef..4de281a84ca3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -388,7 +388,7 @@ _out:; TAOS_RETURN(code); } -int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { +int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId, char *reason) { SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { sNError(pSyncNode, "snapshot sender start error since get failed"); @@ -402,6 +402,9 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { taosMsleep(1); + sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s", pSyncNode->vgId, DID(pDestId), + reason); + int32_t code = snapshotSenderStart(pSender); if (code != 0) { sSError(pSender, "snapshot sender start error since %s", tstrerror(code)); @@ -1089,6 +1092,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { "vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg " "signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime); + pSyncNode->snapSeq = pMsg->seq; code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); sDebug( "vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg " @@ -1102,6 +1106,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64 ", %" PRId64 "), snapshot msg seq:%d", pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq); + pSyncNode->snapSeq = pMsg->seq; code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64 ", %" PRId64 ")", @@ -1122,6 +1127,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { ", %" PRId64 "), snapshot msg seq:%d", pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq); } + pSyncNode->snapSeq = pMsg->seq; lastRecvPrintLog = currentTimestamp; code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId); @@ -1133,6 +1139,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64 "), snapshot msg seq:%d", pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq); + pSyncNode->snapSeq = pMsg->seq; code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); if (code != 0) { sRError(pReceiver, "failed to end snapshot."); @@ -1364,6 +1371,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) { sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ", TMSG_INFO(pRpcMsg->msgType), pMsg->ack); + pSyncNode->snapSeq = pMsg->ack; if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) { goto _ERROR; } @@ -1380,6 +1388,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { TMSG_INFO(pRpcMsg->msgType), pMsg->ack); } lastSendPrintLog = currentTimestamp; + pSyncNode->snapSeq = pMsg->ack; if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) { sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code), pSender->seq, pSender->pReader, pSender->finish); @@ -1390,6 +1399,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // end if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp"); + pSyncNode->snapSeq = pMsg->ack; snapshotSenderStop(pSender, true); TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR); } From c9127bdce2991285bbc69f8ad951ef9c58557b74 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 7 Jan 2026 17:53:36 +0800 Subject: [PATCH 2/4] feat(sync): define SYNC_SNAPSHOT_SEQ_END constant and update related structures --- include/util/tdef.h | 2 ++ source/dnode/mnode/impl/src/mndDnode.c | 1 - source/dnode/mnode/impl/src/mndVgroup.c | 2 +- source/libs/sync/inc/syncSnapshot.h | 1 - 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 5ea654d52db4..b82b4584e3ff 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -702,6 +702,8 @@ typedef enum ELogicConditionType { #define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 // 50MB #define TFS_MIN_DISK_FREE_SIZE_MAX (2ULL * 1024 * 1024 * 1024 * 1024) // 2TB +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF + enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED }; enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK }; enum { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index da8268a24b4f..08b4a1d91ded 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -39,7 +39,6 @@ #include "taoskInt.h" #endif -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define TSDB_DNODE_VER_NUMBER 2 #define TSDB_DNODE_RESERVE_SIZE 40 diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index ee65ba4bda57..48c781dcafc5 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -30,7 +30,6 @@ #include "mndUser.h" #include "tmisce.h" -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2 #define VGROUP_VER_NUMBER VGROUP_VER_COMPAT_MOUNT_KEEP_VER #define VGROUP_RESERVE_SIZE 60 @@ -287,6 +286,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize; pNewGid->learnerProgress = pOldGid->learnerProgress; pNewGid->snapSeq = pOldGid->snapSeq; + pNewGid->syncTotalIndex = pOldGid->syncTotalIndex; } } } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index b42cf1132f04..3f82b1da8b2d 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -26,7 +26,6 @@ extern "C" { #define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_PREP -1 #define SYNC_SNAPSHOT_SEQ_BEGIN 0 -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_RETRY_MS 5000 From d123427e477241a3efc5067b2be7ea627a9a3c75 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 7 Jan 2026 18:00:02 +0800 Subject: [PATCH 3/4] feat(sync): reset snapshot sequence on log term mismatch during recovery --- source/libs/sync/src/syncPipeline.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 91b890ea3358..f19fc80f2821 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -1169,6 +1169,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn tsnprintf(reason, 100, "failed to get prev log term"); else tsnprintf(reason, 100, "log term mismatch"); + pNode->snapSeq = -1; if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); TAOS_RETURN(code); From 6f3741eb12557f6b41171282c67ff1871e5f4cd9 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 7 Jan 2026 18:04:05 +0800 Subject: [PATCH 4/4] feat(sync): update snapshot sequence validation to use SYNC_SNAPSHOT_SEQ_END constant --- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 08b4a1d91ded..d1b0617e70aa 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -588,7 +588,7 @@ static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVloa pGid->learnerProgress = pVload->learnerProgress; pGid->snapSeq = pVload->snapSeq; pGid->syncTotalIndex = pVload->syncTotalIndex; - if (pVload->snapSeq >= 0 && pVload->snapSeq < INT32_MAX || pVload->syncState == TAOS_SYNC_STATE_LEARNER) { + if (pVload->snapSeq >= 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) { mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64 " , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d", vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,