Skip to content

Commit fab549e

Browse files
committed
[core] New DEPLOY transition in env state machine
1 parent 220c2f6 commit fab549e

File tree

8 files changed

+1244
-1173
lines changed

8 files changed

+1244
-1173
lines changed

coconut/protos/o2control.pb.go

Lines changed: 501 additions & 497 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/environment/environment.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,14 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
114114
env.Sm = fsm.NewFSM(
115115
"STANDBY",
116116
fsm.Events{
117-
{Name: "CONFIGURE", Src: []string{"STANDBY"}, Dst: "CONFIGURED"},
118-
{Name: "RESET", Src: []string{"CONFIGURED"}, Dst: "STANDBY"},
117+
{Name: "DEPLOY", Src: []string{"STANDBY"}, Dst: "DEPLOYED"},
118+
{Name: "CONFIGURE", Src: []string{"DEPLOYED"}, Dst: "CONFIGURED"},
119+
{Name: "RESET", Src: []string{"CONFIGURED"}, Dst: "DEPLOYED"},
119120
{Name: "START_ACTIVITY", Src: []string{"CONFIGURED"}, Dst: "RUNNING"},
120121
{Name: "STOP_ACTIVITY", Src: []string{"RUNNING"}, Dst: "CONFIGURED"},
121-
{Name: "EXIT", Src: []string{"CONFIGURED", "STANDBY"}, Dst: "DONE"},
122-
{Name: "GO_ERROR", Src: []string{"CONFIGURED", "RUNNING"}, Dst: "ERROR"},
123-
{Name: "RECOVER", Src: []string{"ERROR"}, Dst: "STANDBY"},
122+
{Name: "EXIT", Src: []string{"CONFIGURED", "DEPLOYED", "STANDBY"}, Dst: "DONE"},
123+
{Name: "GO_ERROR", Src: []string{"CONFIGURED", "DEPLOYED", "RUNNING"}, Dst: "ERROR"},
124+
{Name: "RECOVER", Src: []string{"ERROR"}, Dst: "DEPLOYED"},
124125
},
125126
fsm.Callbacks{
126127
"before_event": func(e *fsm.Event) {

core/environment/manager.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,13 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
133133
envs.m[env.id] = env
134134
envs.pendingStateChangeCh[env.id] = env.stateChangedCh
135135

136-
err = env.TryTransition(NewConfigureTransition(
136+
err = env.TryTransition(NewDeployTransition(
137137
envs.taskman,
138138
nil, //roles,
139-
nil,
140-
true),
139+
nil),
140+
)
141+
err = env.TryTransition(NewConfigureTransition(
142+
envs.taskman),
141143
)
142144
envs.mu.Unlock()
143145

@@ -457,11 +459,13 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
457459
envs.pendingStateChangeCh[env.id] = env.stateChangedCh
458460
envs.mu.Unlock()
459461

460-
err = env.TryTransition(NewConfigureTransition(
462+
err = env.TryTransition(NewDeployTransition(
461463
envs.taskman,
462464
nil, //roles,
463-
nil,
464-
true ))
465+
nil))
466+
err = env.TryTransition(NewConfigureTransition(
467+
envs.taskman))
468+
465469
if err != nil {
466470
envState := env.CurrentState()
467471
env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), Error: err})

core/environment/transition.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package environment
2727

2828
import (
2929
"errors"
30+
3031
"github.com/AliceO2Group/Control/core/protos"
3132
"github.com/AliceO2Group/Control/core/task"
3233
)
@@ -39,8 +40,10 @@ type Transition interface {
3940

4041
func MakeTransition(taskman *task.Manager, optype pb.ControlEnvironmentRequest_Optype) Transition {
4142
switch optype {
43+
case pb.ControlEnvironmentRequest_DEPLOY:
44+
return NewDeployTransition(taskman, nil, nil)
4245
case pb.ControlEnvironmentRequest_CONFIGURE:
43-
return NewConfigureTransition(taskman, nil, nil, true)
46+
return NewConfigureTransition(taskman)
4447
case pb.ControlEnvironmentRequest_START_ACTIVITY:
4548
return NewStartActivityTransition(taskman)
4649
case pb.ControlEnvironmentRequest_STOP_ACTIVITY:

core/environment/transition_configure.go

Lines changed: 1 addition & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,23 @@ package environment
2626

2727
import (
2828
"errors"
29-
"fmt"
30-
"time"
3129

3230
"github.com/AliceO2Group/Control/common/event"
3331
"github.com/AliceO2Group/Control/core/task"
3432
"github.com/AliceO2Group/Control/core/task/taskop"
35-
"github.com/AliceO2Group/Control/core/workflow"
36-
"github.com/pborman/uuid"
37-
"github.com/sirupsen/logrus"
3833
)
3934

40-
func NewConfigureTransition(taskman *task.Manager, addRoles []string, removeRoles []string, reconfigureAll bool) Transition {
35+
func NewConfigureTransition(taskman *task.Manager) Transition {
4136
return &ConfigureTransition{
4237
baseTransition: baseTransition{
4338
name: "CONFIGURE",
4439
taskman: taskman,
4540
},
46-
addRoles: addRoles,
47-
removeRoles: removeRoles,
48-
reconfigureAll: reconfigureAll,
4941
}
5042
}
5143

5244
type ConfigureTransition struct {
5345
baseTransition
54-
addRoles []string
55-
removeRoles []string
56-
reconfigureAll bool
5746
}
5847

5948
func (t ConfigureTransition) do(env *Environment) (err error) {
@@ -63,161 +52,6 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
6352

6453
wf := env.Workflow()
6554

66-
67-
// Role tree operations go here, and afterwards we'll generally get a role tree which
68-
// has
69-
// - some TaskRoles already deployed with Tasks
70-
// - some TaskRoles with no Tasks but with matching Tasks in the roster
71-
// - some TaskRoles with no Tasks and no matching running Tasks in the roster
72-
73-
/*
74-
// First we free the relevant roles, if any
75-
if len(t.removeRoles) != 0 {
76-
rolesThatStay := env.roles[:0]
77-
rolesToRelease := make([]string, 0)
78-
79-
for _, role := range env.roles {
80-
for _, removeRole := range t.removeRoles {
81-
if role == removeRole {
82-
rolesToRelease = append(rolesToRelease, role)
83-
break
84-
}
85-
rolesThatStay = append(rolesThatStay, role)
86-
}
87-
}
88-
err = t.roleman.ReleaseRoles(env.Id().Array(), rolesToRelease)
89-
if err != nil {
90-
return
91-
}
92-
env.roles = rolesThatStay
93-
}
94-
// IDEA: instead of passing around m.state or roleman, pass around one or more of
95-
// roleman's channels. This way roleman could potentially be lockless, and we just pipe
96-
// him a list of rolenames to remove/add, or even a function or a struct that does so.
97-
// This struct would implement an interface of the type of his channel, and he could
98-
// use type assertion to check whether he needs to add, remove or do something else.
99-
100-
// Alright, so now we have freed some roles (if required).
101-
// We proceed by deduplicating and attempting an acquire.
102-
if len(t.addRoles) != 0 {
103-
rolesToAcquire := make([]string, 0)
104-
105-
for _, addRole := range t.addRoles {
106-
alreadyInEnv := false
107-
for _, role := range env.roles {
108-
if role == addRole {
109-
alreadyInEnv = true
110-
break
111-
}
112-
}
113-
if !alreadyInEnv {
114-
rolesToAcquire = append(rolesToAcquire, addRole)
115-
}
116-
}
117-
err = t.roleman.AcquireRoles(env.Id().Array(), rolesToAcquire)
118-
if err != nil {
119-
return
120-
}
121-
122-
// We complete a move to CONFIGURED for all roles and we're done.
123-
err = t.roleman.ConfigureRoles(env.Id().Array(), rolesToAcquire)
124-
if err != nil {
125-
return
126-
}
127-
128-
env.roles = append(env.roles, rolesToAcquire...)
129-
}
130-
131-
// Finally, we configure.
132-
if t.reconfigureAll {
133-
err = t.roleman.ConfigureRoles(env.Id().Array(), env.roles)
134-
if err != nil {
135-
return
136-
}
137-
}
138-
139-
return*/
140-
141-
notifyStatus := make(chan task.Status)
142-
subscriptionId := uuid.NewUUID().String()
143-
env.wfAdapter.SubscribeToStatusChange(subscriptionId, notifyStatus)
144-
defer env.wfAdapter.UnsubscribeFromStatusChange(subscriptionId)
145-
146-
// listen to workflow State changes
147-
notifyState := make(chan task.State)
148-
env.wfAdapter.SubscribeToStateChange(subscriptionId, notifyState)
149-
defer env.wfAdapter.UnsubscribeFromStateChange(subscriptionId)
150-
151-
taskDescriptors := wf.GenerateTaskDescriptors()
152-
if len(taskDescriptors) != 0 {
153-
// err = t.taskman.AcquireTasks(env.Id().Array(), taskDescriptors)
154-
taskmanMessage := task.NewEnvironmentMessage(taskop.AcquireTasks, env.Id(), nil, taskDescriptors)
155-
t.taskman.MessageChannel <- taskmanMessage
156-
}
157-
if err != nil {
158-
return
159-
}
160-
161-
// We set all callRoles to ACTIVE right now, because there's no task activation for them.
162-
// This is the callRole equivalent of AcquireTasks, which only pushes updates to taskRoles.
163-
allHooks := wf.GetHooksForTrigger("") // no trigger = all hooks
164-
callHooks := allHooks.FilterCalls() // get the calls
165-
if len(callHooks) > 0 {
166-
for _, h := range callHooks {
167-
pr, ok := h.GetParentRole().(workflow.PublicUpdatable)
168-
if !ok {
169-
continue
170-
}
171-
go pr.UpdateStatus(task.ACTIVE)
172-
}
173-
}
174-
175-
deploymentTimeout := 90 * time.Second
176-
wfStatus := wf.GetStatus()
177-
if wfStatus != task.ACTIVE {
178-
WORKFLOW_ACTIVE_LOOP:
179-
for {
180-
log.Debug("waiting for workflow to become active")
181-
select {
182-
case wfStatus = <-notifyStatus:
183-
log.WithField("status", wfStatus.String()).
184-
Debug("workflow status change")
185-
if wfStatus == task.ACTIVE {
186-
break WORKFLOW_ACTIVE_LOOP
187-
}
188-
continue
189-
case <-time.After(deploymentTimeout):
190-
err = errors.New(fmt.Sprintf("workflow deployment timed out. timeout: %s",deploymentTimeout.String()))
191-
break WORKFLOW_ACTIVE_LOOP
192-
// This is needed for when the workflow fails during the STAGING state(mesos status),mesos responds with the `REASON_COMMAND_EXECUTOR_FAILED`,
193-
// By listening to workflow state ERROR we can break the loop before reaching the timeout (1m30s), we can trigger the cleanup faster
194-
// in the CreateEnvironment (environment/manager.go) and the lock in the `envman` is reserved for a sorter period, which allows operations like
195-
// `environment list` to be done almost immediatelly after mesos informs with TASK_FAILED.
196-
case wfState := <-notifyState:
197-
if wfState == task.ERROR {
198-
workflow.LeafWalk(wf, func(role workflow.Role) {
199-
if st := role.GetState(); st == task.ERROR {
200-
log.WithField("state", st).
201-
WithField("role", role.GetPath()).
202-
WithField("environment", role.GetEnvironmentId().String()).
203-
Error("environment reached invalid state")
204-
}
205-
})
206-
log.WithField("state", wfState.String()).
207-
Debug("workflow state change")
208-
err = errors.New("workflow deployment failed, aborting and cleaning up")
209-
break WORKFLOW_ACTIVE_LOOP
210-
}
211-
}
212-
}
213-
}
214-
215-
if err != nil {
216-
log.WithFields(logrus.Fields{"error": err.Error()}).
217-
Error("workflow deployment error")
218-
return
219-
}
220-
22155
tasks := wf.GetTasks()
22256

22357
if len(tasks) != 0 {

0 commit comments

Comments
 (0)