@@ -27,6 +27,11 @@ package environment
2727import (
2828 "errors"
2929 "fmt"
30+ "strconv"
31+ "strings"
32+ "sync"
33+ "time"
34+
3035 "github.com/AliceO2Group/Control/common"
3136 "github.com/AliceO2Group/Control/common/controlmode"
3237 "github.com/AliceO2Group/Control/common/event"
@@ -40,10 +45,6 @@ import (
4045 "github.com/AliceO2Group/Control/core/workflow"
4146 pb "github.com/AliceO2Group/Control/executor/protos"
4247 "github.com/sirupsen/logrus"
43- "strconv"
44- "strings"
45- "sync"
46- "time"
4748)
4849
4950type Manager struct {
@@ -177,7 +178,31 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
177178 }
178179 }
179180
180- env , err := newEnvironment (envUserVars )
181+ cleanedUpTasks , runningTasks , err := envs .taskman .Cleanup ()
182+ if err != nil {
183+ log .WithError (err ).
184+ Warnf ("pre-deployment cleanup failed, continuing anyway" )
185+ err = nil
186+ } else {
187+ cleanedUpTaskIds , runningTaskIds := make ([]string , 0 ), make ([]string , 0 )
188+ for _ , t := range cleanedUpTasks {
189+ cleanedUpTaskIds = append (cleanedUpTaskIds , fmt .Sprintf ("%s.%s#%s" , t .GetHostname (), t .GetClassName (), t .GetTaskId ()))
190+ }
191+ for _ , t := range runningTasks {
192+ runningTaskIds = append (runningTaskIds , fmt .Sprintf ("%s.%s#%s" , t .GetHostname (), t .GetClassName (), t .GetTaskId ()))
193+ }
194+ log .WithField ("tasksCleanedUp" , strings .Join (cleanedUpTaskIds , ", " )).
195+ WithField ("level" , infologger .IL_Devel ).
196+ Debug ("tasks cleaned up during pre-deployment cleanup" )
197+ log .WithField ("tasksStillRunning" , strings .Join (runningTaskIds , ", " )).
198+ WithField ("level" , infologger .IL_Devel ).
199+ Debug ("tasks still running after pre-deployment cleanup" )
200+ log .WithField ("level" , infologger .IL_Ops ).
201+ Infof ("pre-deployment cleanup completed (%d tasks cleaned up, %d tasks still running)" , len (cleanedUpTasks ), len (runningTasks ))
202+ }
203+
204+ var env * Environment
205+ env , err = newEnvironment (envUserVars )
181206 newEnvId := uid .NilID ()
182207 if err == nil {
183208 if env != nil {
@@ -635,16 +660,23 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
635660 if t == nil {
636661 log .WithPrefix ("scheduler" ).
637662 WithField ("partition" , envId .String ()).
663+ WithField ("taskId" , taskId .Value ).
638664 Debug ("cannot find task for DeviceEvent END_OF_STREAM" )
639665 return
640666 }
641667 env , err := envs .environment (t .GetEnvironmentId ())
642668 if err != nil {
643669 log .WithPrefix ("scheduler" ).
644670 WithField ("partition" , envId .String ()).
671+ WithField ("taskId" , taskId .Value ).
645672 WithError (err ).
646673 Error ("cannot find environment for DeviceEvent" )
647674 }
675+ log .WithPrefix ("scheduler" ).
676+ WithField ("partition" , envId .String ()).
677+ WithField ("taskId" , taskId .Value ).
678+ WithField ("envState" , env .CurrentState ()).
679+ Debug ("received END_OF_STREAM event from task, trying to stop the run" )
648680 if env .CurrentState () == "RUNNING" {
649681 t .SetSafeToStop (true ) // we mark this specific task as ok to STOP
650682 go func () {
0 commit comments