@@ -4,11 +4,13 @@ package jobmanager
44
55import (
66 "bytes"
7+ "container/ring"
78 "context"
89 "encoding/json"
910 "runtime"
1011 "strings"
1112 "sync"
13+ "sync/atomic"
1214 "time"
1315
1416 "github.com/fullstorydev/grpcurl"
@@ -21,6 +23,7 @@ import (
2123 "go.viam.com/utils/rpc"
2224 "google.golang.org/grpc/metadata"
2325 reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
26+ "google.golang.org/protobuf/types/known/timestamppb"
2427
2528 "go.viam.com/rdk/config"
2629 "go.viam.com/rdk/grpc"
@@ -35,6 +38,7 @@ const (
3538 // has movementsensor as its second index in a dot separated slice, which is the resource
3639 // the job manager will be looking for.
3740 componentServiceIndex int = 2
41+ historyLength int = 10
3842)
3943
4044// JobManager keeps track of the currently scheduled jobs and updates the schedule with
@@ -48,6 +52,61 @@ type JobManager struct {
4852 conn rpc.ClientConn
4953 isClosed bool
5054 closeMutex sync.Mutex
55+
56+ NumJobHistories atomic.Int32
57+ JobHistories ssync.Map [string , * JobHistory ]
58+ }
59+
60+ // JobHistory records historical metadata about a job.
61+ type JobHistory struct {
62+ successTimesMu sync.Mutex
63+ successTimes * ring.Ring
64+ failureTimesMu sync.Mutex
65+ failureTimes * ring.Ring
66+ }
67+
68+ // Successes returns timestamps of the last historyLength number successfully completed jobs.
69+ func (jh * JobHistory ) Successes () []* timestamppb.Timestamp {
70+ ts := make ([]* timestamppb.Timestamp , 0 , historyLength )
71+ jh .successTimesMu .Lock ()
72+ defer jh .successTimesMu .Unlock ()
73+ for i := 0 ; i < historyLength ; i ++ {
74+ if jh .successTimes .Value != nil {
75+ ts = append (ts , jh .successTimes .Value .(* timestamppb.Timestamp ))
76+ }
77+ jh .successTimes = jh .successTimes .Next ()
78+ }
79+ return ts
80+ }
81+
82+ // Failures returns timestamps of the last historyLength number jobs that returned with Error or panic.
83+ func (jh * JobHistory ) Failures () []* timestamppb.Timestamp {
84+ ts := make ([]* timestamppb.Timestamp , 0 , historyLength )
85+ jh .failureTimesMu .Lock ()
86+ defer jh .failureTimesMu .Unlock ()
87+ for i := 0 ; i < historyLength ; i ++ {
88+ if jh .failureTimes .Value != nil {
89+ ts = append (ts , jh .failureTimes .Value .(* timestamppb.Timestamp ))
90+ }
91+ jh .failureTimes = jh .failureTimes .Next ()
92+ }
93+ return ts
94+ }
95+
96+ // AddSuccess adds a timestamp to successTimes, overwriting the earliest entry if it is full.
97+ func (jh * JobHistory ) AddSuccess (time * timestamppb.Timestamp ) {
98+ jh .successTimesMu .Lock ()
99+ defer jh .successTimesMu .Unlock ()
100+ jh .successTimes .Value = time
101+ jh .successTimes = jh .successTimes .Next ()
102+ }
103+
104+ // AddFailure adds a timestamp to failureTimes, overwriting the earliest entry if it is full.
105+ func (jh * JobHistory ) AddFailure (time * timestamppb.Timestamp ) {
106+ jh .failureTimesMu .Lock ()
107+ defer jh .failureTimesMu .Unlock ()
108+ jh .failureTimes .Value = time
109+ jh .failureTimes = jh .failureTimes .Next ()
51110}
52111
53112// New sets up the context and grpcConn that is used in scheduled jobs. The actual
@@ -162,9 +221,8 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func() error {
162221 response , err := res .DoCommand (jm .ctx , jc .Command )
163222 if err != nil {
164223 jobLogger .CWarnw (jm .ctx , "Job failed" , "error" , err .Error ())
165- } else {
166- jobLogger .CInfow (jm .ctx , "Job succeeded" , "response" , response )
167224 }
225+ jobLogger .CInfow (jm .ctx , "Job succeeded" , "response" , response )
168226 return err
169227 }
170228
@@ -219,9 +277,8 @@ func (jm *JobManager) createJobFunction(jc config.JobConfig) func() error {
219277 if err != nil {
220278 jobLogger .CWarnw (jm .ctx , "Unmarshalling grpc response failed with error" , "error" , err .Error ())
221279 return err
222- } else {
223- jobLogger .CInfow (jm .ctx , "Job succeeded" , "response" , response )
224280 }
281+ jobLogger .CInfow (jm .ctx , "Job succeeded" , "response" , response )
225282 return nil
226283 }
227284}
@@ -259,6 +316,16 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
259316 jobLimitMode = gocron .LimitModeWait
260317 }
261318
319+ if _ , ok := jm .JobHistories .Load (jc .Name ); ! ok {
320+ jm .JobHistories .Store (jc .Name , & JobHistory {
321+ successTimes : ring .New (historyLength ),
322+ failureTimes : ring .New (historyLength ),
323+ })
324+ jm .NumJobHistories .Add (1 )
325+ }
326+
327+ jobLogger := jm .logger .Sublogger (jc .Name )
328+
262329 jobFunc := jm .createJobFunction (jc )
263330 j , err := jm .scheduler .NewJob (
264331 jobType ,
@@ -292,10 +359,29 @@ func (jm *JobManager) scheduleJob(jc config.JobConfig, verbose bool) {
292359 // It is also important to note that DURATION jobs start relative to when they were
293360 // queued on the job scheduler, while CRON jobs are tied to the physical clock.
294361 gocron .WithSingletonMode (jobLimitMode ),
362+ gocron .WithName (jc .Name ),
363+ gocron .WithEventListeners (
364+ // May be slightly more accurate to use j.LastRun(), but we don't have direct reference to it here, and we don't want the job to
365+ // complete before we can store the returned Job.
366+ gocron .AfterJobRuns (func (jobID uuid.UUID , jobName string ) {
367+ now := timestamppb .Now ()
368+ if jh , ok := jm .JobHistories .Load (jobName ); ok {
369+ jh .AddSuccess (now )
370+ }
371+ }),
372+ gocron .AfterJobRunsWithError (func (jobID uuid.UUID , jobName string , err error ) {
373+ now := timestamppb .Now ()
374+ if jh , ok := jm .JobHistories .Load (jobName ); ok {
375+ jh .AddFailure (now )
376+ }
377+ }),
378+ gocron .AfterJobRunsWithPanic (func (jobID uuid.UUID , jobName string , recoverData any ) {
379+ now := timestamppb .Now ()
380+ if jh , ok := jm .JobHistories .Load (jobName ); ok {
381+ jh .AddFailure (now )
382+ }
383+ })),
295384 )
296-
297- jobLogger := jm .logger .Sublogger (jc .Name )
298-
299385 if err != nil {
300386 jobLogger .CErrorw (jm .ctx , "Failed to create a new job" , "name" , jc .Name , "error" , err .Error ())
301387 return
0 commit comments