Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2669,6 +2669,8 @@ typedef struct {
int64_t syncCommitIndex;
int64_t bufferSegmentUsed;
int64_t bufferSegmentSize;
int32_t snapSeq;
int64_t syncTotalIndex;
} SVnodeLoad;

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions include/libs/sync/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ typedef struct SSyncState {
SyncTerm term;
int64_t roleTimeMs;
int64_t startTimeMs;
int32_t snapSeq;
int64_t totalIndex;
} SSyncState;

typedef struct SSyncMetrics {
Expand Down
4 changes: 3 additions & 1 deletion include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_REPLICA 5
#define TSDB_MAX_LEARNER_REPLICA 10
#define TSDB_SYNC_RESTORE_lEN 20
#define TSDB_SYNC_APPLY_COMMIT_LEN 41
#define TSDB_SYNC_APPLY_COMMIT_LEN 100
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
Expand Down Expand Up @@ -702,6 +702,8 @@ typedef enum ELogicConditionType {
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 // 50MB
#define TFS_MIN_DISK_FREE_SIZE_MAX (2ULL * 1024 * 1024 * 1024 * 1024) // 2TB

#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF

enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };
enum {
Expand Down
20 changes: 20 additions & 0 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,13 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->auditDB));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->auditToken));

vlen = (int32_t)taosArrayGetSize(pReq->pVloads);
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pload->snapSeq));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncTotalIndex));
}

tEndEncode(&encoder);

_exit:
Expand Down Expand Up @@ -2202,6 +2209,19 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->clusterCfg.statusIntervalMs));
}

if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *vload = taosArrayGet(pReq->pVloads, i);
if (vload == NULL) {
TAOS_CHECK_EXIT(terrno);
}

TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vload->snapSeq));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload->syncTotalIndex));
}
}


if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeWhiteVer));
}
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ typedef struct {
int32_t learnerProgress;
int64_t bufferSegmentUsed;
int64_t bufferSegmentSize;
int32_t snapSeq;
int64_t syncTotalIndex;
} SVnodeGid;

typedef struct {
Expand Down
10 changes: 10 additions & 0 deletions source/dnode/mnode/impl/src/mndDnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,16 @@ static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVloa
pGid->syncCommitIndex = pVload->syncCommitIndex;
pGid->bufferSegmentUsed = pVload->bufferSegmentUsed;
pGid->bufferSegmentSize = pVload->bufferSegmentSize;
pGid->learnerProgress = pVload->learnerProgress;
pGid->snapSeq = pVload->snapSeq;
pGid->syncTotalIndex = pVload->syncTotalIndex;
if (pVload->snapSeq >= 0 && pVload->snapSeq < SYNC_SNAPSHOT_SEQ_END || pVload->syncState == TAOS_SYNC_STATE_LEARNER) {
mInfo("vgId:%d, update vnode state:%s from dnode:%d, syncAppliedIndex:%" PRId64 " , syncCommitIndex:%" PRId64
" , syncTotalIndex:%" PRId64 " ,learnerProgress:%d, snapSeq:%d",
vgId, syncStr(pVload->syncState), pGid->dnodeId, pVload->syncAppliedIndex, pVload->syncCommitIndex,
pVload->syncTotalIndex, pVload->learnerProgress, pVload->snapSeq);
}

if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
pGid->startTimeMs != pVload->startTimeMs) {
mInfo(
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/src/mndMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pGid->syncRestore = 0;
pGid->syncCanRead = 0;
pGid->startTimeMs = 0;
pGid->learnerProgress = 0;
pGid->snapSeq = -1;
stateChanged = true;
}
break;
Expand Down
53 changes: 28 additions & 25 deletions source/dnode/mnode/impl/src/mndVgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
if (pVgroup->replica == 1) {
pVgid->syncState = TAOS_SYNC_STATE_LEADER;
}
pVgid->snapSeq = -1;
}
if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
Expand Down Expand Up @@ -283,6 +284,9 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
pNewGid->learnerProgress = pOldGid->learnerProgress;
pNewGid->snapSeq = pOldGid->snapSeq;
pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
}
}
}
Expand Down Expand Up @@ -1262,29 +1266,6 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
leaderState = pVgroup->vnodeGid[i].syncState;
snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
/*
mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);

if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
if(pVgroup->vnodeGid[i].learnerProgress < 0){
snprintf(role, sizeof(role), "%s-",
syncStr(pVgroup->vnodeGid[i].syncState));

}
else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
snprintf(role, sizeof(role), "%s--",
syncStr(pVgroup->vnodeGid[i].syncState));
}
else{
snprintf(role, sizeof(role), "%s%d",
syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
}
}
else{
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
}
*/
} else {
}
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);

Expand All @@ -1293,8 +1274,30 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p

char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
pVgroup->vnodeGid[i].syncCommitIndex);

if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
(pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);

snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
pVgroup->vnodeGid[i].learnerProgress);
} else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);

snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
} else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
pVgroup->vnodeGid[i].snapSeq);
} else {
snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
pVgroup->vnodeGid[i].syncCommitIndex);
}

STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
Expand Down
6 changes: 5 additions & 1 deletion source/dnode/vnode/src/vnd/vnodeQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->roleTimeMs = state.roleTimeMs;
pLoad->startTimeMs = state.startTimeMs;
pLoad->syncCanRead = state.canRead;
pLoad->learnerProgress = state.progress;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta));
Expand All @@ -983,6 +982,11 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
vnodeGetBufferInfo(pVnode, &pLoad->bufferSegmentUsed, &pLoad->bufferSegmentSize);
vDebug("vgId:%d, get vnode load, state:%s snapSeq:%d, learnerProgress:%d, totalIndex:%" PRId64, TD_VID(pVnode),
syncStr(state.state), state.snapSeq, state.progress, state.totalIndex);
pLoad->learnerProgress = state.progress;
pLoad->snapSeq = state.snapSeq;
pLoad->syncTotalIndex = state.totalIndex;
return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion source/libs/sync/inc/syncInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ struct SSyncNode {
// metrics
int64_t wal_write_bytes;
int64_t wal_write_time;

int32_t snapSeq;
};

// open/close --------------
Expand Down Expand Up @@ -320,7 +322,7 @@ SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId, char* reason);

SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
Expand Down
1 change: 0 additions & 1 deletion source/libs/sync/inc/syncSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ extern "C" {
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_PREP -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF

#define SYNC_SNAPSHOT_RETRY_MS 5000

Expand Down
16 changes: 11 additions & 5 deletions source/libs/sync/src/syncMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ SSyncState syncGetState(int64_t rid) {
} else {
state.canRead = state.restored;
}
/*
state.totalIndex = pSyncNode->pLogBuf->totalIndex;

double progress = 0;
if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
Expand All @@ -695,12 +696,15 @@ SSyncState syncGetState(int64_t rid) {
else{
state.progress = -1;
}
sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
if (pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
sInfo("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64
", "
"progress:%lf, progress:%d",
pSyncNode->vgId,
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
*/
pSyncNode->vgId, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
}

state.term = raftStoreGetTerm(pSyncNode);
state.snapSeq = pSyncNode->snapSeq;
syncNodeRelease(pSyncNode);
}

Expand Down Expand Up @@ -1514,6 +1518,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t elec
pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0;

pSyncNode->snapSeq = -1;

sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode,
pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout);
return pSyncNode;
Expand Down
26 changes: 19 additions & 7 deletions source/libs/sync/src/syncPipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -1119,11 +1119,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
char* msg1 = " rollback match index failure";
char* msg2 = " incomplete fsm state";
sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s, match index:%" PRId64
", last sent:%" PRId64,
pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
pMsg->lastSendIndex);
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, reason:%s, match index:%" PRId64
", last sent:%" PRId64,
pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
pMsg->lastSendIndex);
pNode->snapSeq = -1;
if ((code = syncNodeStartSnapshot(pNode, &destId, (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1))) <
0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
TAOS_RETURN(code);
}
Expand Down Expand Up @@ -1157,11 +1159,21 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, index:%" PRId64 ", firstVer:%" PRId64
", term:%" PRId64 ", lastMatchTerm:%" PRId64,
pNode->vgId, DID(&destId), index, firstVer, term, pMsg->lastMatchTerm);
char reason[100] = {0};
if (index + 1 < firstVer)
tsnprintf(reason, 100, "matched entry not in log range, index:%" PRId64 ", firstVer:%" PRId64, index, firstVer);
else if (term < 0)
tsnprintf(reason, 100, "failed to get prev log term");
else
tsnprintf(reason, 100, "log term mismatch");
pNode->snapSeq = -1;
if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
TAOS_RETURN(code);
}
sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
return 0;
}

Expand Down
12 changes: 11 additions & 1 deletion source/libs/sync/src/syncSnapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ _out:;
TAOS_RETURN(code);
}

int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId, char *reason) {
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
sNError(pSyncNode, "snapshot sender start error since get failed");
Expand All @@ -402,6 +402,9 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {

taosMsleep(1);

sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s", pSyncNode->vgId, DID(pDestId),
reason);

int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
Expand Down Expand Up @@ -1089,6 +1092,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
"vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg "
"signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime);
pSyncNode->snapSeq = pMsg->seq;
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
sDebug(
"vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg "
Expand All @@ -1102,6 +1106,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64
", %" PRId64 "), snapshot msg seq:%d",
pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
pSyncNode->snapSeq = pMsg->seq;
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64
", %" PRId64 ")",
Expand All @@ -1122,6 +1127,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
", %" PRId64 "), snapshot msg seq:%d",
pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
}
pSyncNode->snapSeq = pMsg->seq;
lastRecvPrintLog = currentTimestamp;
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId);
Expand All @@ -1133,6 +1139,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64
"), snapshot msg seq:%d",
pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
pSyncNode->snapSeq = pMsg->seq;
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (code != 0) {
sRError(pReceiver, "failed to end snapshot.");
Expand Down Expand Up @@ -1364,6 +1371,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ",
TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
pSyncNode->snapSeq = pMsg->ack;
if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
goto _ERROR;
}
Expand All @@ -1380,6 +1388,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
}
lastSendPrintLog = currentTimestamp;
pSyncNode->snapSeq = pMsg->ack;
if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
pSender->seq, pSender->pReader, pSender->finish);
Expand All @@ -1390,6 +1399,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// end
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp");
pSyncNode->snapSeq = pMsg->ack;
snapshotSenderStop(pSender, true);
TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
}
Expand Down
Loading