@@ -4,8 +4,10 @@ import (
44 "encoding/json"
55 "errors"
66 "fmt"
7+ "io/ioutil"
78 "math"
89 "os"
10+ "path/filepath"
911 "strconv"
1012 "strings"
1113 "sync"
@@ -36,6 +38,8 @@ const (
3638 KeyUnMarshalError = "Data unmarshal failed"
3739 // NumUnMarshalError
3840 NumUnMarshalError = 10
41+ // lag file
42+ LagFilename = "meta.lag"
3943)
4044
4145var _ SkipDeepCopySender = & FtSender {}
@@ -206,6 +210,9 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
206210 isBlock : opt .isBlock ,
207211 backoff : utils .NewBackoff (2 , 1 , 1 * time .Second , 5 * time .Minute ),
208212 }
213+ ftSender .statsMutex .Lock ()
214+ ftSender .stats .FtSendLag = ftSender .readLag ()
215+ ftSender .statsMutex .Unlock ()
209216
210217 if opt .innerSenderType == TypePandora {
211218 ftSender .pandoraKeyCache = make (map [string ]KeyInfo )
@@ -273,9 +280,17 @@ func (ft *FtSender) RawSend(datas []string) error {
273280 } else {
274281 // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
275282 se .AddSuccessNum (len (datas ))
283+ ft .statsMutex .Lock ()
284+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
285+ ft .statsMutex .Unlock ()
276286 ft .backoff .Reset ()
277287 }
278288 se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
289+ if se .FtQueueLag == 0 {
290+ ft .statsMutex .Lock ()
291+ ft .stats .FtSendLag = 0
292+ ft .statsMutex .Unlock ()
293+ }
279294 }
280295 return se
281296}
@@ -354,9 +369,17 @@ func (ft *FtSender) Send(datas []Data) error {
354369 } else {
355370 // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
356371 se .AddSuccessNum (len (datas ))
372+ ft .statsMutex .Lock ()
373+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
374+ ft .statsMutex .Unlock ()
357375 ft .backoff .Reset ()
358376 }
359377 se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
378+ if se .FtQueueLag == 0 {
379+ ft .statsMutex .Lock ()
380+ ft .stats .FtSendLag = 0
381+ ft .statsMutex .Unlock ()
382+ }
360383 return se
361384}
362385
@@ -395,6 +418,9 @@ func (ft *FtSender) Close() error {
395418 // persist queue's meta data
396419 ft .logQueue .Close ()
397420 ft .BackupQueue .Close ()
421+ ft .statsMutex .Lock ()
422+ ft .writeLag (ft .stats .FtSendLag )
423+ ft .statsMutex .Unlock ()
398424
399425 return ft .innerSender .Close ()
400426}
@@ -481,6 +507,9 @@ func (ft *FtSender) saveToFile(datas []Data) error {
481507}
482508
483509func (ft * FtSender ) asyncSendLogFromQueue () {
510+ // if not sleep, queue lag may be cleared
511+ time .Sleep (time .Second * 10 )
512+
484513 for i := 0 ; i < ft .procs ; i ++ {
485514 if ft .opt .sendRaw {
486515 readLinesChan := make (<- chan []string )
@@ -506,18 +535,32 @@ func (ft *FtSender) asyncSendLogFromQueue() {
506535}
507536
508537// trySend 从bytes反序列化数据后尝试发送数据
509- func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool ) (backDataContext []* datasContext , err error ) {
538+ func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool , isFromQueue bool ) (backDataContext []* datasContext , err error ) {
510539 if ft .opt .sendRaw {
511540 datas , err := ft .unmarshalRaws (dat )
512541 if err != nil {
513542 return nil , errors .New (KeyUnMarshalError + ":" + err .Error ())
514543 }
544+ ft .statsMutex .Lock ()
545+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
546+ if ft .stats .FtSendLag < 0 {
547+ ft .stats .FtSendLag = 0
548+ }
549+ ft .statsMutex .Unlock ()
550+
515551 return ft .backOffSendRawFromQueue (datas , failSleep , isRetry )
516552 }
517553 datas , err := ft .unmarshalData (dat )
518554 if err != nil {
519555 return nil , errors .New (KeyUnMarshalError + ":" + err .Error ())
556+
557+ }
558+ ft .statsMutex .Lock ()
559+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
560+ if ft .stats .FtSendLag < 0 {
561+ ft .stats .FtSendLag = 0
520562 }
563+ ft .statsMutex .Unlock ()
521564
522565 return ft .backOffSendFromQueue (datas , failSleep , isRetry )
523566}
@@ -566,6 +609,9 @@ func (ft *FtSender) trySendRaws(datas []string, failSleep int, isRetry bool) (ba
566609 log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
567610 return nil , nil
568611 }
612+ ft .statsMutex .Lock ()
613+ ft .stats .FtSendLag += int64 (len (v .Lines ))
614+ ft .statsMutex .Unlock ()
569615 }
570616
571617 time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -620,6 +666,9 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac
620666 log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
621667 return nil , nil
622668 }
669+ ft .statsMutex .Lock ()
670+ ft .stats .FtSendLag += int64 (len (v .Datas ))
671+ ft .statsMutex .Unlock ()
623672 }
624673
625674 time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -896,8 +945,14 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
896945 } else {
897946 select {
898947 case bytes := <- readChan :
899- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
948+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
900949 case datas := <- readDatasChan :
950+ ft .statsMutex .Lock ()
951+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
952+ if ft .stats .FtSendLag < 0 {
953+ ft .stats .FtSendLag = 0
954+ }
955+ ft .statsMutex .Unlock ()
901956 backDataContext , err = ft .backOffSendRawFromQueue (datas , numWaits , isRetry )
902957 case <- timer .C :
903958 continue
@@ -939,7 +994,6 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
939994 defer timer .Stop ()
940995 numWaits := 1
941996 unmarshalDataError := 0
942-
943997 var curDataContext , otherDataContext []* datasContext
944998 var curIdx int
945999 var backDataContext []* datasContext
@@ -955,8 +1009,14 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
9551009 } else {
9561010 select {
9571011 case bytes := <- readChan :
958- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
1012+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
9591013 case datas := <- readDatasChan :
1014+ ft .statsMutex .Lock ()
1015+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
1016+ if ft .stats .FtSendLag < 0 {
1017+ ft .stats .FtSendLag = 0
1018+ }
1019+ ft .statsMutex .Unlock ()
9601020 backDataContext , err = ft .backOffSendFromQueue (datas , numWaits , isRetry )
9611021 case <- timer .C :
9621022 continue
@@ -1225,3 +1285,34 @@ func (ft *FtSender) backOffReTrySendRaw(lines []string, isRetry bool) (res []*da
12251285 time .Sleep (backoff .Duration ())
12261286 }
12271287}
1288+
1289+ // readLag read lag from file
1290+ func (ft * FtSender ) readLag () int64 {
1291+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1292+ f , err := ioutil .ReadFile (path )
1293+ if err != nil {
1294+ log .Errorf ("Runner[%v] Sender[%v] read file error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1295+ return 0
1296+ }
1297+ lag , err := strconv .ParseInt (string (f ), 10 , 64 )
1298+ if err != nil {
1299+ log .Errorf ("Runner[%v] Sender[%v] parse lag error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1300+ }
1301+ return lag
1302+ }
1303+
1304+ // writeLag write lag into file
1305+ func (ft * FtSender ) writeLag (lag int64 ) error {
1306+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1307+ file , err := os .OpenFile (path , os .O_WRONLY | os .O_TRUNC | os .O_CREATE , 0666 )
1308+ defer func () {
1309+ file .Sync ()
1310+ file .Close ()
1311+ }()
1312+ if err != nil {
1313+ return err
1314+ }
1315+ lagStr := strconv .FormatInt (lag , 10 )
1316+ _ , err = file .WriteString (lagStr )
1317+ return err
1318+ }
0 commit comments