Skip to content

Commit 23371d0

Browse files
committed
[core] Await expressions, optional with trigger
1 parent 4df1afb commit 23371d0

File tree

5 files changed

+89
-7
lines changed

5 files changed

+89
-7
lines changed

core/environment/environment.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/AliceO2Group/Control/core/task"
4242
"github.com/AliceO2Group/Control/core/the"
4343
"github.com/AliceO2Group/Control/core/workflow"
44+
"github.com/AliceO2Group/Control/core/workflow/callable"
4445
"github.com/gobwas/glob"
4546
"github.com/looplab/fsm"
4647
"github.com/pborman/uuid"
@@ -69,6 +70,8 @@ type Environment struct {
6970
stateChangedCh chan *event.TasksStateChangedEvent
7071
unsubscribe chan struct{}
7172
eventStream Subscription
73+
74+
callsPendingAwait map[string /*await expression*/]callable.Calls
7275
}
7376

7477
func (env *Environment) NotifyEvent(e event.DeviceEvent) {
@@ -93,6 +96,8 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
9396
GlobalVars: gera.MakeStringMapWithMap(the.ConfSvc().GetVars()),
9497
UserVars: gera.MakeStringMapWithMap(userVars),
9598
stateChangedCh: make(chan *event.TasksStateChangedEvent),
99+
100+
callsPendingAwait: make(map[string]callable.Calls),
96101
}
97102

98103
// Make the KVs accessible to the workflow via ParentAdapter
@@ -160,10 +165,35 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
160165
}
161166

162167
func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err error) {
168+
// First we start any tasks
163169
allHooks := workflow.GetHooksForTrigger(trigger)
164-
allHooks.FilterCalls().CallAll()
170+
callsToStart := allHooks.FilterCalls()
171+
if len(callsToStart) != 0 {
172+
// Before we run anything asynchronously we must associate each call we're about
173+
// to start with its corresponding await expression
174+
for _, call := range callsToStart {
175+
awaitExpr := call.GetTraits().Await
176+
if _, ok := env.callsPendingAwait[awaitExpr]; !ok || len(env.callsPendingAwait[awaitExpr]) == 0 {
177+
env.callsPendingAwait[awaitExpr] = make(callable.Calls, 0)
178+
}
179+
env.callsPendingAwait[awaitExpr] = append(env.callsPendingAwait[awaitExpr], call)
180+
}
181+
callsToStart.StartAll()
182+
}
165183

184+
// Then we take care of any pending hooks, including from the current trigger
185+
// TODO: this should be further refined by adding priority/weight
186+
pendingCalls, ok := env.callsPendingAwait[trigger]
187+
if ok && len(pendingCalls) != 0 { // there are hooks to take care of
188+
pendingCalls.AwaitAll()
189+
}
190+
191+
// Tasks are handled separately for now, and they cannot have trigger!=await
166192
hooksToTrigger := allHooks.FilterTasks()
193+
return env.runTasksAsHooks(hooksToTrigger)
194+
}
195+
196+
func (env *Environment) runTasksAsHooks(hooksToTrigger task.Tasks) (err error) {
167197
if len(hooksToTrigger) == 0 {
168198
return nil
169199
}
@@ -190,7 +220,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
190220

191221
for {
192222
select {
193-
case e := <- env.incomingEvents:
223+
case e := <-env.incomingEvents:
194224
switch evt := e.(type) {
195225
case *event.BasicTaskTerminated:
196226
tid := evt.GetOrigin().TaskId
@@ -205,10 +235,10 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
205235
failedHooksById[thisHook.GetTaskId()] = thisHook
206236
log.WithField("task", thisHook.GetName()).
207237
WithFields(logrus.Fields{
208-
"exitCode": evt.ExitCode,
209-
"stdout": evt.Stdout,
210-
"stderr": evt.Stderr,
211-
"partition": env.Id().String(),
238+
"exitCode": evt.ExitCode,
239+
"stdout": evt.Stdout,
240+
"stderr": evt.Stderr,
241+
"partition": env.Id().String(),
212242
"finalMesosState": evt.FinalMesosState.String(),
213243
}).
214244
Warn("hook failed")
@@ -221,7 +251,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
221251
default:
222252
continue
223253
}
224-
case thisHook := <- timeoutCh:
254+
case thisHook := <-timeoutCh:
225255
log.WithField("partition", env.Id().String()).
226256
WithField("task", thisHook.GetName()).Warn("hook response timed out")
227257
delete(hookTimers, thisHook)

core/task/task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type parentRole interface {
7676

7777
type Traits struct {
7878
Trigger string
79+
Await string
7980
Timeout string
8081
Critical bool
8182
}

core/workflow/callable/call.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type Call struct {
5252
VarStack map[string]string
5353
Traits task.Traits
5454
parentRole ParentRole
55+
56+
await chan error
5557
}
5658

5759
func (s Hooks) FilterCalls() (calls Calls) {
@@ -85,6 +87,23 @@ func (s Calls) CallAll() map[*Call]error {
8587
return errors
8688
}
8789

90+
func (s Calls) StartAll() {
91+
for _, v := range s {
92+
v.Start()
93+
}
94+
}
95+
96+
func (s Calls) AwaitAll() map[*Call]error {
97+
errors := make(map[*Call]error)
98+
for _, v := range s {
99+
err := v.Await()
100+
if err != nil {
101+
errors[v] = err
102+
}
103+
}
104+
return errors
105+
}
106+
88107

89108
func NewCall(funcCall string, returnVar string, varStack map[string]string, parent ParentRole) (call *Call) {
90109
return &Call{
@@ -117,6 +136,18 @@ func (c *Call) Call() error {
117136
return nil
118137
}
119138

139+
func (c *Call) Start() {
140+
c.await = make(chan error)
141+
go func() {
142+
c.await <- c.Call()
143+
}()
144+
}
145+
146+
func (c *Call) Await() error {
147+
defer close(c.await)
148+
return <-c.await
149+
}
150+
120151
func (c *Call) GetParentRole() interface{} {
121152
return c.parentRole
122153
}

core/workflow/callrole.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func (t *callRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
5252
Func string
5353
Return string
5454
Trigger *string
55+
Await *string
5556
Timeout *string
5657
Critical *bool
5758
}
@@ -81,6 +82,12 @@ func (t *callRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
8182
} else {
8283
role.Timeout = (30 * time.Second).String()
8384
}
85+
if aux.Call.Await != nil && len(*aux.Call.Await) > 0 {
86+
role.Await = *aux.Call.Await
87+
} else {
88+
// if no await is specified, await := trigger
89+
role.Await = *aux.Call.Trigger
90+
}
8491
} else { // basic task
8592
if aux.Call.Timeout != nil && len(*aux.Call.Timeout) > 0 {
8693
role.Timeout = *aux.Call.Timeout
@@ -103,6 +110,7 @@ func (t *callRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
103110
func (t *callRole) MarshalYAML() (interface{}, error) {
104111
callRole := make(map[string]interface{})
105112
if t.Traits.Trigger != "" { callRole["trigger"] = t.Traits.Trigger }
113+
if t.Traits.Await != "" { callRole["await"] = t.Traits.Await }
106114
if t.Traits.Timeout != "" { callRole["timeout"] = t.Traits.Timeout }
107115
callRole["critical"] = t.Traits.Critical
108116
callRole["func"] = t.FuncCall
@@ -137,6 +145,7 @@ func (t *callRole) ProcessTemplates(workflowRepo *repos.Repo) (err error) {
137145
template.WrapPointer(&t.ReturnVar),
138146
template.WrapPointer(&t.Timeout),
139147
template.WrapPointer(&t.Trigger),
148+
template.WrapPointer(&t.Await),
140149
},
141150
template.STAGE4: append(append(
142151
WrapConstraints(t.Constraints),
@@ -239,6 +248,7 @@ func (t* callRole) GetTaskTraits() task.Traits {
239248
if t == nil {
240249
return task.Traits{
241250
Trigger: "",
251+
Await: "",
242252
Timeout: "0s",
243253
Critical: false,
244254
}

core/workflow/taskrole.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func (t *taskRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
5151
Task struct{
5252
Load string
5353
Trigger *string
54+
Await *string
5455
Timeout *string
5556
Critical *bool
5657
}
@@ -79,6 +80,12 @@ func (t *taskRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
7980
} else {
8081
role.Timeout = (30 * time.Second).String()
8182
}
83+
if aux.Task.Await != nil && len(*aux.Task.Await) > 0 {
84+
role.Await = *aux.Task.Await
85+
} else {
86+
// if no await is specified, await := trigger
87+
role.Await = *aux.Task.Trigger
88+
}
8289
} else { // basic task
8390
if aux.Task.Timeout != nil && len(*aux.Task.Timeout) > 0 {
8491
role.Timeout = *aux.Task.Timeout
@@ -100,6 +107,7 @@ func (t *taskRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
100107
func (t *taskRole) MarshalYAML() (interface{}, error) {
101108
taskRole := make(map[string]interface{})
102109
if t.Traits.Trigger != "" { taskRole["trigger"] = t.Traits.Trigger }
110+
if t.Traits.Await != "" { taskRole["await"] = t.Traits.Await }
103111
if t.Traits.Timeout != "" { taskRole["timeout"] = t.Traits.Timeout }
104112
taskRole["critical"] = t.Traits.Critical
105113
taskRole["load"] = t.LoadTaskClass
@@ -132,6 +140,7 @@ func (t *taskRole) ProcessTemplates(workflowRepo *repos.Repo) (err error) {
132140
template.WrapPointer(&t.LoadTaskClass),
133141
template.WrapPointer(&t.Timeout),
134142
template.WrapPointer(&t.Trigger),
143+
template.WrapPointer(&t.Await),
135144
},
136145
template.STAGE4: append(append(
137146
WrapConstraints(t.Constraints),
@@ -268,6 +277,7 @@ func (t* taskRole) GetTaskTraits() task.Traits {
268277
if t == nil {
269278
return task.Traits{
270279
Trigger: "",
280+
Await: "",
271281
Timeout: "0s",
272282
Critical: false,
273283
}

0 commit comments

Comments
 (0)