diff --git a/go.mod b/go.mod index 2939099..28aa5b5 100644 --- a/go.mod +++ b/go.mod @@ -11,19 +11,22 @@ require ( github.com/joho/godotenv v1.5.1 github.com/onsi/ginkgo/v2 v2.15.0 github.com/onsi/gomega v1.30.0 + github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 golang.org/x/crypto v0.24.0 k8s.io/apimachinery v0.27.4 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect - github.com/stretchr/testify v1.8.4 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.26.0 // indirect diff --git a/go.sum b/go.sum index 3fab045..80d048d 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,13 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index a5efb5b..52367f8 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -17,10 +17,14 @@ import ( ) const ( - cancelledStatus = "canceled" - ignoredStatus = "ignored" - abandonedStatus = "abandoned" - defaultJobId = "missing-job-id" + cancelledStatus = "canceled" + ignoredStatus = "ignored" + abandonedStatus = "abandoned" + defaultJobId = "missing-job-id" + maxProvisioningRetries = 3 + provisioningRetryInterval = 15 * time.Second + acquiredJobTimeout = 5 * time.Minute + provisioningTimeout = 10 * time.Minute ) func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerInterface, runnerProvisioner RunnerProvisionerInterface, runnerScaleSet *types.RunnerScaleSet) *RunnerMessageProcessor { @@ -32,15 +36,20 @@ func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerI runnerScaleSetName: runnerScaleSet.Name, canceledJobs: map[string]bool{}, canceledJobsMutex: sync.RWMutex{}, + acquiredJobs: map[int64]*AcquiredJobInfo{}, + acquiredJobsMutex: sync.RWMutex{}, } } func (p *RunnerMessageProcessor) StartProcessingMessages() error { + go p.monitorStuckJobs() + for { p.logger.Infof("waiting for message for runner %s...", p.runnerScaleSetName) select { case <-p.ctx.Done(): p.logger.Infof("message processing service is stopped for runner %s", p.runnerScaleSetName) + p.logStuckJobs() return nil default: err := p.runnerManager.ProcessMessages(p.ctx, p.processRunnerMessage) @@ -101,7 +110,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return fmt.Errorf("could not decode job available message. %w", err) } p.logger.Infof("Job available message received for JobId: %s, RunnerRequestId: %d", jobAvailable.JobId, jobAvailable.RunnerRequestId) - availableJobs = append(availableJobs, jobAvailable.RunnerRequestId) + + if !p.isJobAcquired(jobAvailable.RunnerRequestId) { + availableJobs = append(availableJobs, jobAvailable.RunnerRequestId) + } else { + p.logger.Warnf("Job %d already acquired, skipping", jobAvailable.RunnerRequestId) + } case "JobAssigned": var jobAssigned types.JobAssigned if err := json.Unmarshal(message, &jobAssigned); err != nil { @@ -110,28 +124,49 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job assigned message received for JobId: %s, RunnerRequestId: %d", jobAssigned.JobId, jobAssigned.RunnerRequestId) + p.updateAcquiredJobWithId(jobAssigned.RunnerRequestId, jobAssigned.JobId) + if provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("number of runners provisioning started: %d. Max required runners: %d", provisionedRunners, requiredRunners) - go func() { - jobId := jobAssigned.JobId + go func(runnerRequestId int64, jobId string) { if jobId == "" { jobId = defaultJobId } - for attempt := 1; !p.isCanceled(jobId); attempt++ { - err := p.runnerProvisioner.ProvisionRunner(p.ctx) + provisioned := false + for attempt := 1; attempt <= maxProvisioningRetries && !p.isCanceled(jobId); attempt++ { + p.logger.Infof("Provisioning runner for job %s (RunnerRequestId: %d), attempt %d/%d", jobId, runnerRequestId, attempt, maxProvisioningRetries) + + // Create timeout context for this provisioning attempt + provisionCtx, cancel := context.WithTimeout(p.ctx, provisioningTimeout) + err := p.runnerProvisioner.ProvisionRunner(provisionCtx) + cancel() // Clean up context resources + if err == nil { + p.logger.Infof("Successfully provisioned runner for job %s (RunnerRequestId: %d)", jobId, runnerRequestId) + provisioned = true break } - p.logger.Errorf("unable to provision Orka runner for %s (attempt %d). More information: %s", p.runnerScaleSetName, attempt, err.Error()) + if err == context.DeadlineExceeded { + p.logger.Errorf("Provisioning timeout for job %s (RunnerRequestId: %d) attempt %d/%d after %s", jobId, runnerRequestId, attempt, maxProvisioningRetries, provisioningTimeout) + } else { + p.logger.Errorf("Failed to provision runner for job %s (RunnerRequestId: %d) attempt %d/%d: %s", jobId, runnerRequestId, attempt, maxProvisioningRetries, err.Error()) + } + + if attempt < maxProvisioningRetries { + time.Sleep(provisioningRetryInterval) + } + } - time.Sleep(15 * time.Second) + if !provisioned && !p.isCanceled(jobId) { + p.logger.Errorf("Exhausted all %d provisioning attempts for job %s (RunnerRequestId: %d). Job may be stuck in queue.", maxProvisioningRetries, jobId, runnerRequestId) } p.removeCanceledJob(jobId) - }() + p.removeAcquiredJob(runnerRequestId) + }(jobAssigned.RunnerRequestId, jobAssigned.JobId) } case "JobStarted": var jobStarted types.JobStarted @@ -139,6 +174,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return fmt.Errorf("could not decode job started message. %w", err) } p.logger.Infof("Job started message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d", jobStarted.JobId, jobStarted.RunnerRequestId, jobStarted.RunnerId) + p.removeAcquiredJob(jobStarted.RunnerRequestId) case "JobCompleted": var jobCompleted types.JobCompleted if err := json.Unmarshal(message, &jobCompleted); err != nil { @@ -147,6 +183,8 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result) + p.removeAcquiredJob(jobCompleted.RunnerRequestId) + if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) { p.setCanceledJob(jobCompleted.JobId) } @@ -155,9 +193,15 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale } } - err := p.runnerManager.AcquireJobs(p.ctx, availableJobs) - if err != nil { - return fmt.Errorf("could not acquire jobs. %w", err) + if len(availableJobs) > 0 { + err := p.runnerManager.AcquireJobs(p.ctx, availableJobs) + if err != nil { + return fmt.Errorf("could not acquire jobs. %w", err) + } + + for _, requestId := range availableJobs { + p.trackAcquiredJob(requestId, "") + } } return nil @@ -180,3 +224,99 @@ func (p *RunnerMessageProcessor) removeCanceledJob(jobId string) { defer p.canceledJobsMutex.Unlock() delete(p.canceledJobs, jobId) } + +func (p *RunnerMessageProcessor) trackAcquiredJob(runnerRequestId int64, jobId string) { + p.acquiredJobsMutex.Lock() + defer p.acquiredJobsMutex.Unlock() + p.acquiredJobs[runnerRequestId] = &AcquiredJobInfo{ + RunnerRequestId: runnerRequestId, + JobId: jobId, + AcquiredAt: time.Now(), + RetryCount: 0, + } + p.logger.Infof("Tracked acquired job: RunnerRequestId=%d, JobId=%s", runnerRequestId, jobId) +} + +func (p *RunnerMessageProcessor) updateAcquiredJobWithId(runnerRequestId int64, jobId string) { + p.acquiredJobsMutex.Lock() + defer p.acquiredJobsMutex.Unlock() + if job, exists := p.acquiredJobs[runnerRequestId]; exists { + job.JobId = jobId + p.logger.Infof("Updated acquired job with JobId: RunnerRequestId=%d, JobId=%s", runnerRequestId, jobId) + } else { + p.acquiredJobs[runnerRequestId] = &AcquiredJobInfo{ + RunnerRequestId: runnerRequestId, + JobId: jobId, + AcquiredAt: time.Now(), + RetryCount: 0, + } + p.logger.Infof("Tracked acquired job: RunnerRequestId=%d, JobId=%s", runnerRequestId, jobId) + } +} + +func (p *RunnerMessageProcessor) removeAcquiredJob(runnerRequestId int64) { + p.acquiredJobsMutex.Lock() + defer p.acquiredJobsMutex.Unlock() + if _, exists := p.acquiredJobs[runnerRequestId]; exists { + p.logger.Infof("Removing tracked job: RunnerRequestId=%d", runnerRequestId) + delete(p.acquiredJobs, runnerRequestId) + } +} + +func (p *RunnerMessageProcessor) isJobAcquired(runnerRequestId int64) bool { + p.acquiredJobsMutex.RLock() + defer p.acquiredJobsMutex.RUnlock() + _, exists := p.acquiredJobs[runnerRequestId] + return exists +} + +func (p *RunnerMessageProcessor) getAcquiredJobs() []*AcquiredJobInfo { + p.acquiredJobsMutex.RLock() + defer p.acquiredJobsMutex.RUnlock() + + jobs := make([]*AcquiredJobInfo, 0, len(p.acquiredJobs)) + for _, job := range p.acquiredJobs { + jobs = append(jobs, job) + } + return jobs +} + +func (p *RunnerMessageProcessor) monitorStuckJobs() { + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-p.ctx.Done(): + return + case <-ticker.C: + p.logStuckJobs() + } + } +} + +func (p *RunnerMessageProcessor) logStuckJobs() { + jobs := p.getAcquiredJobs() + if len(jobs) == 0 { + return + } + + now := time.Now() + for _, job := range jobs { + elapsed := now.Sub(job.AcquiredAt) + if elapsed > acquiredJobTimeout { + p.logger.Warnf("Job stuck and will be cleaned up: RunnerRequestId=%d, JobId=%s, AcquiredAt=%s, Elapsed=%s", + job.RunnerRequestId, job.JobId, job.AcquiredAt.Format(time.RFC3339), elapsed.String()) + + // Mark job as canceled to stop any ongoing provisioning attempts + if job.JobId != "" && job.JobId != defaultJobId { + p.setCanceledJob(job.JobId) + p.logger.Infof("Marked stuck job as canceled: JobId=%s", job.JobId) + } + + // Remove from tracking to allow cleanup + p.removeAcquiredJob(job.RunnerRequestId) + p.logger.Infof("Removed stuck job from tracking: RunnerRequestId=%d", job.RunnerRequestId) + } + } +} diff --git a/pkg/github/runners/message-processor_test.go b/pkg/github/runners/message-processor_test.go new file mode 100644 index 0000000..c06fb5f --- /dev/null +++ b/pkg/github/runners/message-processor_test.go @@ -0,0 +1,255 @@ +// Licensed under the Apache License, Version 2.0 +// Original work from the Actions Runner Controller (ARC) project +// See https://github.com/actions/actions-runner-controller + +package runners + +import ( + "context" + "testing" + "time" + + "github.com/macstadium/orka-github-actions-integration/pkg/github/types" + "github.com/macstadium/orka-github-actions-integration/pkg/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func init() { + logging.SetupLogger("info") +} + +type MockRunnerManager struct { + mock.Mock +} + +func (m *MockRunnerManager) ProcessMessages(ctx context.Context, handler func(msg *types.RunnerScaleSetMessage) error) error { + args := m.Called(ctx, handler) + return args.Error(0) +} + +func (m *MockRunnerManager) AcquireJobs(ctx context.Context, requestIds []int64) error { + args := m.Called(ctx, requestIds) + return args.Error(0) +} + +type MockRunnerProvisioner struct { + mock.Mock +} + +func (m *MockRunnerProvisioner) ProvisionRunner(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func TestTrackAcquiredJob(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + runnerRequestId := int64(12345) + jobId := "job-abc-123" + + processor.trackAcquiredJob(runnerRequestId, jobId) + + assert.True(t, processor.isJobAcquired(runnerRequestId)) + + jobs := processor.getAcquiredJobs() + assert.Len(t, jobs, 1) + assert.Equal(t, runnerRequestId, jobs[0].RunnerRequestId) + assert.Equal(t, jobId, jobs[0].JobId) + assert.Equal(t, 0, jobs[0].RetryCount) +} + +func TestRemoveAcquiredJob(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + runnerRequestId := int64(12345) + jobId := "job-abc-123" + + processor.trackAcquiredJob(runnerRequestId, jobId) + assert.True(t, processor.isJobAcquired(runnerRequestId)) + + processor.removeAcquiredJob(runnerRequestId) + assert.False(t, processor.isJobAcquired(runnerRequestId)) + + jobs := processor.getAcquiredJobs() + assert.Len(t, jobs, 0) +} + +func TestIsJobAcquired(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + runnerRequestId := int64(12345) + jobId := "job-abc-123" + + assert.False(t, processor.isJobAcquired(runnerRequestId)) + + processor.trackAcquiredJob(runnerRequestId, jobId) + assert.True(t, processor.isJobAcquired(runnerRequestId)) +} + +func TestGetAcquiredJobs(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + jobs := processor.getAcquiredJobs() + assert.Len(t, jobs, 0) + + processor.trackAcquiredJob(int64(1), "job-1") + processor.trackAcquiredJob(int64(2), "job-2") + processor.trackAcquiredJob(int64(3), "job-3") + + jobs = processor.getAcquiredJobs() + assert.Len(t, jobs, 3) + + runnerRequestIds := make(map[int64]bool) + for _, job := range jobs { + runnerRequestIds[job.RunnerRequestId] = true + } + assert.True(t, runnerRequestIds[1]) + assert.True(t, runnerRequestIds[2]) + assert.True(t, runnerRequestIds[3]) +} + +func TestLogStuckJobs_NoStuckJobs(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + processor.trackAcquiredJob(int64(1), "job-1") + + processor.logStuckJobs() +} + +func TestLogStuckJobs_WithStuckJob(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + runnerRequestId := int64(12345) + jobId := "job-abc-123" + processor.trackAcquiredJob(runnerRequestId, jobId) + + // Verify job is tracked before cleanup + assert.True(t, processor.isJobAcquired(runnerRequestId)) + assert.False(t, processor.isCanceled(jobId)) + + // Manually set the acquired time to simulate a stuck job + job := processor.acquiredJobs[runnerRequestId] + job.AcquiredAt = time.Now().Add(-6 * time.Minute) + + // Call cleanup function + processor.logStuckJobs() + + // Verify job was removed from tracking + assert.False(t, processor.isJobAcquired(runnerRequestId)) + // Verify job was marked as canceled + assert.True(t, processor.isCanceled(jobId)) +} + +func TestLogStuckJobs_WithStuckJobDefaultId(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + runnerRequestId := int64(12345) + // Track with default job ID (should not be marked as canceled) + processor.trackAcquiredJob(runnerRequestId, defaultJobId) + + // Verify job is tracked before cleanup + assert.True(t, processor.isJobAcquired(runnerRequestId)) + + // Manually set the acquired time to simulate a stuck job + job := processor.acquiredJobs[runnerRequestId] + job.AcquiredAt = time.Now().Add(-6 * time.Minute) + + // Call cleanup function + processor.logStuckJobs() + + // Verify job was removed from tracking + assert.False(t, processor.isJobAcquired(runnerRequestId)) + // Verify default job ID was NOT marked as canceled + assert.False(t, processor.isCanceled(defaultJobId)) +} + +func TestCanceledJobFunctions(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + jobId := "job-abc-123" + + assert.False(t, processor.isCanceled(jobId)) + + processor.setCanceledJob(jobId) + assert.True(t, processor.isCanceled(jobId)) + + processor.removeCanceledJob(jobId) + assert.False(t, processor.isCanceled(jobId)) +} + +func TestConcurrentJobTracking(t *testing.T) { + ctx := context.Background() + mockManager := new(MockRunnerManager) + mockProvisioner := new(MockRunnerProvisioner) + runnerScaleSet := &types.RunnerScaleSet{Id: 1, Name: "test-runner"} + + processor := NewRunnerMessageProcessor(ctx, mockManager, mockProvisioner, runnerScaleSet) + + done := make(chan bool) + + go func() { + for i := range 100 { + processor.trackAcquiredJob(int64(i), "job") + } + done <- true + }() + + go func() { + for i := range 100 { + processor.isJobAcquired(int64(i)) + } + done <- true + }() + + go func() { + for i := range 100 { + processor.removeAcquiredJob(int64(i)) + } + done <- true + }() + + <-done + <-done + <-done +} diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index bfe1b8e..0c31ec9 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -7,6 +7,7 @@ package runners import ( "context" "sync" + "time" "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" "github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue" @@ -42,4 +43,13 @@ type RunnerMessageProcessor struct { runnerScaleSetName string canceledJobs map[string]bool canceledJobsMutex sync.RWMutex + acquiredJobs map[int64]*AcquiredJobInfo + acquiredJobsMutex sync.RWMutex +} + +type AcquiredJobInfo struct { + RunnerRequestId int64 + JobId string + AcquiredAt time.Time + RetryCount int }