@@ -4,8 +4,10 @@ import (
44 "encoding/json"
55 "errors"
66 "fmt"
7+ "io/ioutil"
78 "math"
89 "os"
10+ "path/filepath"
911 "strconv"
1012 "strings"
1113 "sync"
@@ -32,6 +34,12 @@ const (
3234 defaultMaxProcs = 1 // 默认没有并发
3335 // TypeMarshalError 表示marshal出错
3436 TypeMarshalError = reqerr .SendErrorType ("Data Marshal failed" )
37+ // KeyUnMarshalError
38+ KeyUnMarshalError = "Data unmarshal failed"
39+ // NumUnMarshalError
40+ NumUnMarshalError = 10
41+ // lag file
42+ LagFilename = "meta.lag"
3543)
3644
3745var _ SkipDeepCopySender = & FtSender {}
@@ -202,6 +210,9 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
202210 isBlock : opt .isBlock ,
203211 backoff : utils .NewBackoff (2 , 1 , 1 * time .Second , 5 * time .Minute ),
204212 }
213+ ftSender .statsMutex .Lock ()
214+ ftSender .stats .FtSendLag = ftSender .readLag ()
215+ ftSender .statsMutex .Unlock ()
205216
206217 if opt .innerSenderType == TypePandora {
207218 ftSender .pandoraKeyCache = make (map [string ]KeyInfo )
@@ -269,9 +280,17 @@ func (ft *FtSender) RawSend(datas []string) error {
269280 } else {
270281 // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
271282 se .AddSuccessNum (len (datas ))
283+ ft .statsMutex .Lock ()
284+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
285+ ft .statsMutex .Unlock ()
272286 ft .backoff .Reset ()
273287 }
274288 se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
289+ if se .FtQueueLag == 0 {
290+ ft .statsMutex .Lock ()
291+ ft .stats .FtSendLag = 0
292+ ft .statsMutex .Unlock ()
293+ }
275294 }
276295 return se
277296}
@@ -350,9 +369,17 @@ func (ft *FtSender) Send(datas []Data) error {
350369 } else {
351370 // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag
352371 se .AddSuccessNum (len (datas ))
372+ ft .statsMutex .Lock ()
373+ ft .stats .FtSendLag = ft .stats .FtSendLag + int64 (len (datas ))
374+ ft .statsMutex .Unlock ()
353375 ft .backoff .Reset ()
354376 }
355377 se .FtQueueLag = ft .BackupQueue .Depth () + ft .logQueue .Depth ()
378+ if se .FtQueueLag == 0 {
379+ ft .statsMutex .Lock ()
380+ ft .stats .FtSendLag = 0
381+ ft .statsMutex .Unlock ()
382+ }
356383 return se
357384}
358385
@@ -391,6 +418,9 @@ func (ft *FtSender) Close() error {
391418 // persist queue's meta data
392419 ft .logQueue .Close ()
393420 ft .BackupQueue .Close ()
421+ ft .statsMutex .Lock ()
422+ ft .writeLag (ft .stats .FtSendLag )
423+ ft .statsMutex .Unlock ()
394424
395425 return ft .innerSender .Close ()
396426}
@@ -477,6 +507,9 @@ func (ft *FtSender) saveToFile(datas []Data) error {
477507}
478508
479509func (ft * FtSender ) asyncSendLogFromQueue () {
510+ // if not sleep, queue lag may be cleared
511+ time .Sleep (time .Second * 10 )
512+
480513 for i := 0 ; i < ft .procs ; i ++ {
481514 if ft .opt .sendRaw {
482515 readLinesChan := make (<- chan []string )
@@ -502,18 +535,31 @@ func (ft *FtSender) asyncSendLogFromQueue() {
502535}
503536
504537// trySend 从bytes反序列化数据后尝试发送数据
505- func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool ) (backDataContext []* datasContext , err error ) {
538+ func (ft * FtSender ) trySendBytes (dat []byte , failSleep int , isRetry bool , isFromQueue bool ) (backDataContext []* datasContext , err error ) {
506539 if ft .opt .sendRaw {
507540 datas , err := ft .unmarshalRaws (dat )
508541 if err != nil {
509- return nil , err
542+ return nil , errors . New ( KeyUnMarshalError + ":" + err . Error ())
510543 }
544+ ft .statsMutex .Lock ()
545+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
546+ if ft .stats .FtSendLag < 0 {
547+ ft .stats .FtSendLag = 0
548+ }
549+ ft .statsMutex .Unlock ()
550+
511551 return ft .backOffSendRawFromQueue (datas , failSleep , isRetry )
512552 }
513553 datas , err := ft .unmarshalData (dat )
514554 if err != nil {
515- return nil , err
555+ return nil , errors .New (KeyUnMarshalError + ":" + err .Error ())
556+ }
557+ ft .statsMutex .Lock ()
558+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
559+ if ft .stats .FtSendLag < 0 {
560+ ft .stats .FtSendLag = 0
516561 }
562+ ft .statsMutex .Unlock ()
517563
518564 return ft .backOffSendFromQueue (datas , failSleep , isRetry )
519565}
@@ -562,6 +608,9 @@ func (ft *FtSender) trySendRaws(datas []string, failSleep int, isRetry bool) (ba
562608 log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
563609 return nil , nil
564610 }
611+ ft .statsMutex .Lock ()
612+ ft .stats .FtSendLag += int64 (len (v .Lines ))
613+ ft .statsMutex .Unlock ()
565614 }
566615
567616 time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -616,6 +665,9 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac
616665 log .Errorf ("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d" , ft .runnerName , ft .innerSender .Name (), ft .BackupQueue .Name (), err , len (datas ))
617666 return nil , nil
618667 }
668+ ft .statsMutex .Lock ()
669+ ft .stats .FtSendLag += int64 (len (v .Datas ))
670+ ft .statsMutex .Unlock ()
619671 }
620672
621673 time .Sleep (time .Second * time .Duration (math .Pow (2 , float64 (failSleep ))))
@@ -876,6 +928,7 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
876928 timer := time .NewTicker (time .Second )
877929 defer timer .Stop ()
878930 numWaits := 1
931+ unmarshalDataError := 0
879932 var curDataContext , otherDataContext []* datasContext
880933 var curIdx int
881934 var backDataContext []* datasContext
@@ -891,8 +944,14 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
891944 } else {
892945 select {
893946 case bytes := <- readChan :
894- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
947+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
895948 case datas := <- readDatasChan :
949+ ft .statsMutex .Lock ()
950+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
951+ if ft .stats .FtSendLag < 0 {
952+ ft .stats .FtSendLag = 0
953+ }
954+ ft .statsMutex .Unlock ()
896955 backDataContext , err = ft .backOffSendRawFromQueue (datas , numWaits , isRetry )
897956 case <- timer .C :
898957 continue
@@ -908,6 +967,15 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r
908967 if numWaits > 5 {
909968 numWaits = 5
910969 }
970+ if strings .HasPrefix (err .Error (), KeyUnMarshalError ) {
971+ unmarshalDataError ++
972+ if unmarshalDataError > NumUnMarshalError {
973+ time .Sleep (time .Second )
974+ log .Errorf ("Runner[%s] Sender[%s] sleep 1s due to unmarshal err" , ft .runnerName , ft .innerSender .Name (), queueName , err )
975+ }
976+ } else {
977+ unmarshalDataError = 0
978+ }
911979 }
912980 if backDataContext != nil {
913981 otherDataContext = append (otherDataContext , backDataContext ... )
@@ -924,6 +992,7 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
924992 timer := time .NewTicker (time .Second )
925993 defer timer .Stop ()
926994 numWaits := 1
995+ unmarshalDataError := 0
927996 var curDataContext , otherDataContext []* datasContext
928997 var curIdx int
929998 var backDataContext []* datasContext
@@ -939,8 +1008,14 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
9391008 } else {
9401009 select {
9411010 case bytes := <- readChan :
942- backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry )
1011+ backDataContext , err = ft .trySendBytes (bytes , numWaits , isRetry , true )
9431012 case datas := <- readDatasChan :
1013+ ft .statsMutex .Lock ()
1014+ ft .stats .FtSendLag = ft .stats .FtSendLag - int64 (len (datas ))
1015+ if ft .stats .FtSendLag < 0 {
1016+ ft .stats .FtSendLag = 0
1017+ }
1018+ ft .statsMutex .Unlock ()
9441019 backDataContext , err = ft .backOffSendFromQueue (datas , numWaits , isRetry )
9451020 case <- timer .C :
9461021 continue
@@ -956,6 +1031,15 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
9561031 if numWaits > 5 {
9571032 numWaits = 5
9581033 }
1034+ if strings .HasPrefix (err .Error (), KeyUnMarshalError ) {
1035+ unmarshalDataError ++
1036+ if unmarshalDataError > NumUnMarshalError {
1037+ time .Sleep (time .Second )
1038+ log .Errorf ("Runner[%s] Sender[%s] sleep 1s due to unmarshal err" , ft .runnerName , ft .innerSender .Name (), queueName , err )
1039+ }
1040+ } else {
1041+ unmarshalDataError = 0
1042+ }
9591043 }
9601044 if backDataContext != nil {
9611045 otherDataContext = append (otherDataContext , backDataContext ... )
@@ -993,8 +1077,8 @@ func SplitData(data string) (valArray []string) {
9931077 valArray = SplitDataWithSplitSize (valArray , data [start :offset ], DefaultSplitSize )
9941078 if len (valArray ) > 0 {
9951079 // 最后一个分片参与下次split
996- start = offset - len (valArray [len (valArray ) - 1 ])
997- valArray = valArray [:len (valArray ) - 1 ]
1080+ start = offset - len (valArray [len (valArray )- 1 ])
1081+ valArray = valArray [:len (valArray )- 1 ]
9981082 }
9991083 continue
10001084 }
@@ -1017,7 +1101,7 @@ func SplitDataWithSplitSize(originArray []string, data string, splitSize int64)
10171101 if len (originArray ) != 0 {
10181102 num := (DefaultMaxBatchSize - int64 (len (originArray [len (originArray )- 1 ]))) / splitSize
10191103 if num > 0 {
1020- end := num * splitSize
1104+ end := num * splitSize
10211105 if end > int64 (len (data )) {
10221106 end = int64 (len (data ))
10231107 }
@@ -1200,3 +1284,34 @@ func (ft *FtSender) backOffReTrySendRaw(lines []string, isRetry bool) (res []*da
12001284 time .Sleep (backoff .Duration ())
12011285 }
12021286}
1287+
1288+ // readLag read lag from file
1289+ func (ft * FtSender ) readLag () int64 {
1290+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1291+ f , err := ioutil .ReadFile (path )
1292+ if err != nil {
1293+ log .Errorf ("Runner[%v] Sender[%v] read file error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1294+ return 0
1295+ }
1296+ lag , err := strconv .ParseInt (string (f ), 10 , 64 )
1297+ if err != nil {
1298+ log .Errorf ("Runner[%v] Sender[%v] parse lag error : %v" , ft .runnerName , ft .innerSender .Name (), err )
1299+ }
1300+ return lag
1301+ }
1302+
1303+ // writeLag write lag into file
1304+ func (ft * FtSender ) writeLag (lag int64 ) error {
1305+ path := filepath .Join (ft .opt .saveLogPath , LagFilename )
1306+ file , err := os .OpenFile (path , os .O_WRONLY | os .O_TRUNC | os .O_CREATE , 0666 )
1307+ defer func () {
1308+ file .Sync ()
1309+ file .Close ()
1310+ }()
1311+ if err != nil {
1312+ return err
1313+ }
1314+ lagStr := strconv .FormatInt (lag , 10 )
1315+ _ , err = file .WriteString (lagStr )
1316+ return err
1317+ }
0 commit comments