Skip to content

Commit debaae9

Browse files
committed
OCTRL-678 Pass the error message to BKP in case of GO_ERROR
An error message is now propagated to kafka events just before scheduling a GO_ERROR transition.
1 parent 5a36d3c commit debaae9

File tree

5 files changed

+80
-0
lines changed

5 files changed

+80
-0
lines changed

core/environment/environment.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,15 @@ func (env *Environment) QueryRoles(pathSpec string) (rs []workflow.Role) {
11761176
return
11771177
}
11781178

1179+
func (env *Environment) GetId() uid.ID {
1180+
if env == nil {
1181+
return ""
1182+
}
1183+
env.Mu.RLock()
1184+
defer env.Mu.RUnlock()
1185+
return env.id
1186+
}
1187+
11791188
func (env *Environment) GetPath() string {
11801189
return ""
11811190
}
@@ -1222,7 +1231,12 @@ func (env *Environment) subscribeToWfState(taskman *task.Manager) {
12221231
log.WithField("partition", env.id).
12231232
WithField("level", infologger.IL_Ops).
12241233
Error("one of the critical tasks went into ERROR state, transitioning the environment into ERROR")
1234+
1235+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1236+
NewEnvGoErrorEvent(env, newCriticalTasksErrorMessage(env)),
1237+
)
12251238
err := env.TryTransition(NewGoErrorTransition(taskman))
1239+
12261240
if err != nil {
12271241
if env.Sm.Current() == "ERROR" {
12281242
log.WithField("partition", env.id).
@@ -1471,6 +1485,11 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t
14711485
log.WithField("partition", env.id).
14721486
WithField("run", env.currentRunNumber).
14731487
Errorf("Scheduled auto stop transition failed: %s, Transitioning into ERROR", err.Error())
1488+
1489+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1490+
NewEnvGoErrorEvent(env, fmt.Sprintf("scheduled auto stop transition failed: %s", err.Error())),
1491+
)
1492+
14741493
err = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman))
14751494
if err != nil {
14761495
log.WithField("partition", env.id).

core/environment/manager.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
488488
WithError(err).
489489
Warnf("auto-transitioning environment failed %s, cleanup in progress", op)
490490

491+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
492+
NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)),
493+
)
491494
err := env.TryTransition(NewGoErrorTransition(
492495
envs.taskman),
493496
)
@@ -592,6 +595,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
592595
WithField("level", infologger.IL_Devel).
593596
Error("environment deployment and configuration error, cleanup in progress")
594597

598+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
599+
NewEnvGoErrorEvent(env, fmt.Sprintf("deployment or configuration failed: %v", err)),
600+
)
595601
errTxErr := env.TryTransition(NewGoErrorTransition(
596602
envs.taskman),
597603
)
@@ -1052,6 +1058,9 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve
10521058
}
10531059

10541060
if env.CurrentState() != "ERROR" {
1061+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1062+
NewEnvGoErrorEvent(env, "ODC partition went to ERROR during RUNNING"),
1063+
)
10551064
err = env.TryTransition(NewGoErrorTransition(envs.taskman))
10561065
if err != nil {
10571066
log.WithPrefix("scheduler").
@@ -1376,6 +1385,9 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
13761385
WithError(err).
13771386
Warnf("auto-transitioning environment failed %s, cleanup in progress", op)
13781387

1388+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1389+
NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)),
1390+
)
13791391
err := env.TryTransition(NewGoErrorTransition(
13801392
envs.taskman),
13811393
)

core/environment/utils.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"encoding/json"
3030
"fmt"
3131
"github.com/AliceO2Group/Control/common/logger/infologger"
32+
pb "github.com/AliceO2Group/Control/common/protos"
33+
"github.com/AliceO2Group/Control/core/task"
34+
"github.com/AliceO2Group/Control/core/task/sm"
35+
"github.com/AliceO2Group/Control/core/workflow"
3236
"os"
3337
"sort"
3438

@@ -101,3 +105,37 @@ func sortMapToString(m map[string]string) string {
101105
}
102106
return b.String()
103107
}
108+
109+
func NewEnvGoErrorEvent(env *Environment, err string) *pb.Ev_EnvironmentEvent {
110+
return &pb.Ev_EnvironmentEvent{
111+
EnvironmentId: env.GetId().String(),
112+
State: env.Sm.Current(),
113+
RunNumber: env.GetCurrentRunNumber(),
114+
Error: err,
115+
Message: "a critical error occurred, GO_ERROR transition imminent",
116+
LastRequestUser: env.GetLastRequestUser(),
117+
WorkflowTemplateInfo: env.GetWorkflowInfo(),
118+
}
119+
}
120+
121+
func newCriticalTasksErrorMessage(env *Environment) string {
122+
criticalTasksInError := env.workflow.GetTasks().Filtered(func(t *task.Task) bool {
123+
return t.GetTraits().Critical && t.GetState() == sm.ERROR
124+
})
125+
126+
if len(criticalTasksInError) == 0 {
127+
return "no critical tasks in ERROR"
128+
} else if len(criticalTasksInError) == 1 {
129+
t := criticalTasksInError[0]
130+
name := t.GetName()
131+
132+
// if available, we prefer role name, because it does not have a long hash for JIT-generated DPL tasks
133+
role, ok := t.GetParentRole().(workflow.Role)
134+
if ok {
135+
name = role.GetName()
136+
}
137+
return fmt.Sprintf("critical task '%s' on host '%s' transitioned to ERROR", name, t.GetHostname())
138+
} else {
139+
return fmt.Sprintf("%d critical tasks transitioned to ERROR, could not determine the first one to fail", len(criticalTasksInError))
140+
}
141+
}

core/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package core
2828

2929
import (
3030
"encoding/json"
31+
"fmt"
3132
"maps"
3233
"runtime"
3334
"sort"
@@ -646,6 +647,9 @@ func (m *RpcServer) ControlEnvironment(cxt context.Context, req *pb.ControlEnvir
646647
WithField("level", infologger.IL_Ops).
647648
WithError(err).
648649
Errorf("transition '%s' failed, transitioning into ERROR.", req.GetType().String())
650+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
651+
environment.NewEnvGoErrorEvent(env, fmt.Sprintf("transition %s failed: %v", req.GetType().String(), err)),
652+
)
649653
err = env.TryTransition(environment.NewGoErrorTransition(m.state.taskman))
650654
if err != nil {
651655
log.WithField("partition", env.Id()).Warnf("could not complete requested GO_ERROR transition, forcing move to ERROR: %s", err.Error())

core/task/task.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ func (t *Task) SetSafeToStop(done bool) {
151151
t.safeToStop = done
152152
}
153153

154+
func (t *Task) GetState() sm.State {
155+
t.mu.Lock()
156+
defer t.mu.Unlock()
157+
158+
return t.state
159+
}
160+
154161
func (t *Task) GetParentRole() interface{} {
155162
t.mu.RLock()
156163
defer t.mu.RUnlock()

0 commit comments

Comments
 (0)