Skip to content

Commit f26d477

Browse files
committed
[core] Fix handling of await and failures
1 parent 404e66d commit f26d477

File tree

1 file changed

+43
-34
lines changed

1 file changed

+43
-34
lines changed

core/environment/environment.go

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ package environment
2929
import (
3030
"errors"
3131
"fmt"
32-
"github.com/AliceO2Group/Control/apricot"
3332
"strconv"
3433
"strings"
3534
"sync"
3635
"time"
3736

37+
"github.com/AliceO2Group/Control/apricot"
38+
3839
"github.com/AliceO2Group/Control/common/event"
3940
"github.com/AliceO2Group/Control/common/gera"
4041
"github.com/AliceO2Group/Control/common/logger"
@@ -251,10 +252,11 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
251252
// FOR EACH weight within the current state machine trigger moment
252253
// 4 phases: start calls, await calls, execute task hooks, error handling
253254
for _, weight := range allWeights {
254-
hooksForWeight, ok := hooksMapForTrigger[weight]
255-
if ok {
256-
// PHASE 1: start asynchronously any call hooks and add them to the pending await map
255+
hooksForWeight, thereAreHooksToStartForTheCurrentTriggerAndWeight := hooksMapForTrigger[weight]
256+
257+
// PHASE 1: start asynchronously any call hooks and add them to the pending await map
257258

259+
if thereAreHooksToStartForTheCurrentTriggerAndWeight {
258260
// Hooks can be call hooks or task hooks, we do the calls first
259261
callsToStart := hooksForWeight.FilterCalls()
260262
if len(callsToStart) != 0 {
@@ -277,45 +279,49 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
277279
env.callsPendingAwait[awaitName][awaitWeight] = append(
278280
env.callsPendingAwait[awaitName][awaitWeight], call)
279281
}
280-
callsToStart.StartAll() // returns immediately (async)
282+
callsToStart.StartAll() // returns immediately (async)
281283
}
284+
}
282285

283-
// PHASE 2: collect any calls awaiting termination
284-
285-
// We take care of any pending hooks whose await expression corresponds to the current trigger,
286-
// including any calls that have just been started (for which trigger == call.Trigger == call.Await).
287-
callErrors := make(map[*callable.Call]error)
288-
if _, ok := env.callsPendingAwait[trigger]; ok {
289-
pendingCalls, ok := env.callsPendingAwait[trigger][weight]
290-
if ok && len(pendingCalls) != 0 { // meaning there are hook calls to take care of
291-
// AwaitAll blocks with no global timeout - it is up to the specific called function to implement
292-
// a timeout internally.
293-
// The Call instance pushes to the call's varStack some special values including the timeout
294-
// (provided by the workflow template). At that point the integration plugin must acquire the
295-
// timeout value and use the Context mechanism or some other approach to ensure the timeouts are
296-
// respected.
297-
298-
callErrors = pendingCalls.AwaitAll()
299-
}
286+
// PHASE 2: collect any calls awaiting termination
287+
288+
// We take care of any pending hooks whose await expression corresponds to the current trigger,
289+
// including any calls that have just been started (for which trigger == call.Trigger == call.Await).
290+
callErrors := make(map[*callable.Call]error)
291+
if _, ok := env.callsPendingAwait[trigger]; ok {
292+
pendingCalls, ok := env.callsPendingAwait[trigger][weight]
293+
if ok && len(pendingCalls) != 0 { // meaning there are hook calls to take care of
294+
// AwaitAll blocks with no global timeout - it is up to the specific called function to implement
295+
// a timeout internally.
296+
// The Call instance pushes to the call's varStack some special values including the timeout
297+
// (provided by the workflow template). At that point the integration plugin must acquire the
298+
// timeout value and use the Context mechanism or some other approach to ensure the timeouts are
299+
// respected.
300+
301+
callErrors = pendingCalls.AwaitAll()
300302
}
303+
}
301304

302-
// PHASE 3: start and finish any task hooks (synchronous!)
305+
// PHASE 3: start and finish any task hooks (synchronous!)
306+
307+
taskErrors := make(map[*task.Task]error)
308+
if thereAreHooksToStartForTheCurrentTriggerAndWeight {
303309

304310
// Tasks are handled separately for now, and they must have trigger==await
305311
hookTasksToTrigger := hooksForWeight.FilterTasks()
306-
taskErrors := env.runTasksAsHooks(hookTasksToTrigger) // blocking call, timeouts in executor
312+
taskErrors = env.runTasksAsHooks(hookTasksToTrigger) // blocking call, timeouts in executor
313+
}
307314

308-
// PHASE 4: collect any errors
315+
// PHASE 4: collect any errors
309316

310-
// We merge hook call errors and hook task errors into a single map for
311-
// critical trait processing
312-
for hook, err := range callErrors {
313-
allErrors[hook] = err
314-
}
315-
for hook, err := range taskErrors {
316-
allErrors[hook] = err
317-
}
318-
} //validity of hooksForWeight
317+
// We merge hook call errors and hook task errors into a single map for
318+
// critical trait processing
319+
for hook, err := range callErrors {
320+
allErrors[hook] = err
321+
}
322+
for hook, err := range taskErrors {
323+
allErrors[hook] = err
324+
}
319325
}
320326

321327
for hook, err := range allErrors {
@@ -327,6 +333,9 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
327333
if hook.GetTraits().Critical {
328334
log.Errorf("critical hook failed: %s", err)
329335
criticalFailures = append(criticalFailures, err)
336+
} else {
337+
log.WithField("level", infologger.IL_Devel).
338+
Debugf("non-critical hook failed: %s", err)
330339
}
331340
}
332341

0 commit comments

Comments
 (0)