From e97c7500f58c735a6dd2e4848ea3204451d793fc Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Thu, 9 Jul 2020 16:39:52 -0700 Subject: [PATCH 01/17] Use a unique context for each call of Session.Run --- exec/bigmachine.go | 9 ++++----- exec/eval.go | 7 ++----- exec/local.go | 4 +--- exec/session.go | 8 ++++---- exec/slicemachine.go | 7 +++---- 5 files changed, 14 insertions(+), 21 deletions(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 5c593b8..5aa89a4 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -154,7 +154,7 @@ func (b *bigmachineExecutor) Start(sess *Session) (shutdown func()) { return b.b.Shutdown } -func (b *bigmachineExecutor) manager(i int) *machineManager { +func (b *bigmachineExecutor) manager(i int, ctx context.Context) *machineManager { b.mu.Lock() defer b.mu.Unlock() for i >= len(b.managers) { @@ -168,7 +168,7 @@ func (b *bigmachineExecutor) manager(i int) *machineManager { maxLoad = 0 } b.managers[i] = newMachineManager(b.b, b.params, b.status, b.sess.Parallelism(), maxLoad, b.worker) - go b.managers[i].Do(backgroundcontext.Get()) + go b.managers[i].Do(ctx) } return b.managers[i] } @@ -264,7 +264,7 @@ func (b *bigmachineExecutor) commit(ctx context.Context, m *sliceMachine, key st }) } -func (b *bigmachineExecutor) Run(task *Task) { +func (b *bigmachineExecutor) Run(task *Task, ctx context.Context) { task.Status.Print("waiting for a machine") // Use the default/shared cluster unless the func is exclusive. @@ -272,13 +272,12 @@ func (b *bigmachineExecutor) Run(task *Task) { if task.Invocation.Exclusive { cluster = int(task.Invocation.Index) } - mgr := b.manager(cluster) + mgr := b.manager(cluster, ctx) procs := task.Pragma.Procs() if task.Pragma.Exclusive() || procs > mgr.machprocs { procs = mgr.machprocs } var ( - ctx = backgroundcontext.Get() offerc, cancel = mgr.Offer(int(task.Invocation.Index), procs) m *sliceMachine ) diff --git a/exec/eval.go b/exec/eval.go index 99c9a6e..2d12490 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -51,7 +51,7 @@ type Executor interface { // Run runs a task. The executor sets the state of the task as it // progresses. The task should enter in state TaskWaiting; by the // time Run returns the task state is >= TaskOk. - Run(*Task) + Run(*Task, context.Context) // Reader returns a locally accessible ReadCloser for the requested task. Reader(*Task, int) sliceio.ReadCloser @@ -77,9 +77,6 @@ type Executor interface { // TODO(marius): we can often stream across shuffle boundaries. This would // complicate scheduling, but may be worth doing. func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.Group) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - state := newState() for _, task := range roots { state.Enqueue(task) @@ -120,7 +117,7 @@ func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.G task.state = TaskWaiting task.Status = status startRunTime = time.Now() - go executor.Run(task) + go executor.Run(task, ctx) } else { status.Print("running in another invocation") } diff --git a/exec/local.go b/exec/local.go index a170c66..323ad86 100644 --- a/exec/local.go +++ b/exec/local.go @@ -11,7 +11,6 @@ import ( "runtime/debug" "sync" - "github.com/grailbio/base/backgroundcontext" "github.com/grailbio/base/errors" "github.com/grailbio/base/eventlog" "github.com/grailbio/base/limiter" @@ -49,8 +48,7 @@ func (l *localExecutor) Start(sess *Session) (shutdown func()) { return } -func (l *localExecutor) Run(task *Task) { - ctx := backgroundcontext.Get() +func (l *localExecutor) Run(task *Task, ctx context.Context) { n := 1 if task.Pragma.Exclusive() { n = l.sess.p diff --git a/exec/session.go b/exec/session.go index 1632109..111f9c3 100644 --- a/exec/session.go +++ b/exec/session.go @@ -287,6 +287,8 @@ var statusMu sync.Mutex func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error) { location := "" + runContext, runContextCancel := context.WithCancel(ctx) + defer runContextCancel() if _, file, line, ok := runtime.Caller(calldepth + 1); ok { location = fmt.Sprintf("%s:%d", file, line) defer typecheck.Location(file, line) @@ -333,9 +335,7 @@ func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncVa return nil, err } if sliceGroup != nil { - maintainCtx, cancel := context.WithCancel(ctx) - defer cancel() - go maintainSliceGroup(maintainCtx, tasks, sliceGroup) + go maintainSliceGroup(runContext, tasks, sliceGroup) } // Register all the tasks so they may be used in visualization. s.mu.Lock() @@ -348,7 +348,7 @@ func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncVa sess: s, invIndex: inv.Index, tasks: tasks, - }, Eval(ctx, s.executor, tasks, taskGroup) + }, Eval(runContext, s.executor, tasks, taskGroup) } // Parallelism returns the desired amount of evaluation parallelism. diff --git a/exec/slicemachine.go b/exec/slicemachine.go index be29ce7..8bb70d6 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/grailbio/base/backgroundcontext" "github.com/grailbio/base/data" "github.com/grailbio/base/errors" "github.com/grailbio/base/log" @@ -189,6 +188,8 @@ loop: select { case <-time.After(statsPollInterval): case <-ctx.Done(): + // Job has finished. Don't need to track this machine anymore. + return case <-stopped: break loop } @@ -636,9 +637,7 @@ func startMachines(ctx context.Context, b *bigmachine.B, group *status.Group, ma Status: status, maxTaskProcs: maxTaskProcs, } - // TODO(marius): pass a context that's tied to the evaluation - // lifetime, or lifetime of the machine. - go sm.Go(backgroundcontext.Get()) + go sm.Go(ctx) slicemachines[i] = sm }() } From 8d262d71e40f09dbe1162ae7fd652daf245cdbfc Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Fri, 10 Jul 2020 17:14:15 -0700 Subject: [PATCH 02/17] Use shutdown channel instead --- cmd/slicer/main.go | 2 +- exec/bigmachine.go | 6 +++++- exec/eval.go | 3 +++ exec/local.go | 4 ++++ exec/session.go | 17 +++++++++++------ exec/slicemachine.go | 12 ++++++------ 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/cmd/slicer/main.go b/cmd/slicer/main.go index 6f4ed5b..0dd0441 100644 --- a/cmd/slicer/main.go +++ b/cmd/slicer/main.go @@ -40,7 +40,6 @@ Available test are: wait := flag.Bool("wait", false, "don't exit after completion") sess := sliceconfig.Parse() - defer sess.Shutdown() if flag.NArg() == 0 { flag.Usage() @@ -61,6 +60,7 @@ Available test are: case "oom": err = oomer(sess, args) } + sess.Shutdown() if *wait { if err != nil { log.Printf("finished with error %v: waiting", err) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 5aa89a4..ddd8621 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -168,7 +168,7 @@ func (b *bigmachineExecutor) manager(i int, ctx context.Context) *machineManager maxLoad = 0 } b.managers[i] = newMachineManager(b.b, b.params, b.status, b.sess.Parallelism(), maxLoad, b.worker) - go b.managers[i].Do(ctx) + go b.managers[i].Do(ctx, b.isShutdown()) } return b.managers[i] } @@ -476,6 +476,10 @@ func (b *bigmachineExecutor) HandleDebug(handler *http.ServeMux) { b.b.HandleDebug(handler) } +func (b *bigmachineExecutor) isShutdown() chan struct{} { + return b.sess.shutdownc +} + // Location returns the machine on which the results of the provided // task resides. func (b *bigmachineExecutor) location(task *Task) *sliceMachine { diff --git a/exec/eval.go b/exec/eval.go index 2d12490..50c3e38 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -67,6 +67,9 @@ type Executor interface { // http.ServeMux. This is used to serve diagnostic information relating // to the executor. HandleDebug(handler *http.ServeMux) + + // Returns a channel which is closed when the session is shutting down + isShutdown() chan struct{} } // Eval simultaneously evaluates a set of task graphs from the diff --git a/exec/local.go b/exec/local.go index 323ad86..1c076ef 100644 --- a/exec/local.go +++ b/exec/local.go @@ -238,6 +238,10 @@ func bufferOutput(ctx context.Context, task *Task, out sliceio.Reader) (buf task return buf, nil } +func (l *localExecutor) isShutdown() chan struct{} { + return l.sess.shutdownc +} + type multiReader struct { q []sliceio.Reader err error diff --git a/exec/session.go b/exec/session.go index 111f9c3..f9f041f 100644 --- a/exec/session.go +++ b/exec/session.go @@ -83,14 +83,18 @@ type Session struct { // roots stores all task roots compiled by this session; // used for debugging. roots map[*Task]struct{} + + // Channel to indicate that the session is shutting down + shutdownc chan struct{} } func newSession() *Session { return &Session{ - Context: backgroundcontext.Get(), - index: atomic.AddInt32(&nextSessionIndex, 1) - 1, - roots: make(map[*Task]struct{}), - eventer: eventlog.Nop{}, + Context: backgroundcontext.Get(), + index: atomic.AddInt32(&nextSessionIndex, 1) - 1, + roots: make(map[*Task]struct{}), + eventer: eventlog.Nop{}, + shutdownc: make(chan struct{}), } } @@ -287,8 +291,8 @@ var statusMu sync.Mutex func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error) { location := "" - runContext, runContextCancel := context.WithCancel(ctx) - defer runContextCancel() + runContext, runCancel := context.WithCancel(ctx) + defer runCancel() if _, file, line, ok := runtime.Caller(calldepth + 1); ok { location = fmt.Sprintf("%s:%d", file, line) defer typecheck.Location(file, line) @@ -367,6 +371,7 @@ func (s *Session) Shutdown() { if s.shutdown != nil { s.shutdown() } + close(s.shutdownc) if s.tracePath != "" { writeTraceFile(s.tracer, s.tracePath) } diff --git a/exec/slicemachine.go b/exec/slicemachine.go index 8bb70d6..9c3b848 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -124,7 +124,7 @@ func (s *sliceMachine) Assign(task *Task) { // Go manages a sliceMachine: it polls stats at regular intervals and // marks tasks as lost when a machine fails. -func (s *sliceMachine) Go(ctx context.Context) { +func (s *sliceMachine) Go(ctx context.Context, shutdownc chan struct{}) { stopped := s.Wait(bigmachine.Stopped) loop: for ctx.Err() == nil { @@ -188,7 +188,7 @@ loop: select { case <-time.After(statsPollInterval): case <-ctx.Done(): - // Job has finished. Don't need to track this machine anymore. + case <-shutdownc: return case <-stopped: break loop @@ -419,7 +419,7 @@ func (m *machineManager) Offer(priority, procs int) (<-chan *sliceMachine, func( // needed (as indicated by client's calls to Need); thus when a // machine is lost, it may be replaced with another should it be // needed. -func (m *machineManager) Do(ctx context.Context) { +func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { var ( need, pending int startc = make(chan startResult) @@ -549,7 +549,7 @@ func (m *machineManager) Do(ctx context.Context) { log.Printf("slicemachine: %d machines (%d procs); %d machines pending (%d procs)", have/m.machprocs, have, pending/m.machprocs, pending) go func() { - machines := startMachines(ctx, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) + machines := startMachines(ctx, shutdownc, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) startc <- startResult{ machines: machines, nFailures: needMachines - len(machines), @@ -590,7 +590,7 @@ func removeMachine(ms []*sliceMachine, m *sliceMachine) []*sliceMachine { // on each of them. StartMachines returns a slice of successfully started // machines when all of them are in bigmachine.Running state. If a machine // fails to start, it is not included. -func startMachines(ctx context.Context, b *bigmachine.B, group *status.Group, maxTaskProcs int, n int, worker *worker, params ...bigmachine.Param) []*sliceMachine { +func startMachines(ctx context.Context, shutdownc chan struct{}, b *bigmachine.B, group *status.Group, maxTaskProcs int, n int, worker *worker, params ...bigmachine.Param) []*sliceMachine { params = append([]bigmachine.Param{bigmachine.Services{"Worker": worker}}, params...) machines, err := b.Start(ctx, n, params...) if err != nil { @@ -637,7 +637,7 @@ func startMachines(ctx context.Context, b *bigmachine.B, group *status.Group, ma Status: status, maxTaskProcs: maxTaskProcs, } - go sm.Go(ctx) + go sm.Go(ctx, shutdownc) slicemachines[i] = sm }() } From c8933b3cf05aa54e91ee6b15a5cf7cbceba6edcd Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Fri, 10 Jul 2020 17:17:44 -0700 Subject: [PATCH 03/17] Put context as first argument --- exec/bigmachine.go | 6 +++--- exec/eval.go | 4 ++-- exec/local.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index ddd8621..277c13f 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -154,7 +154,7 @@ func (b *bigmachineExecutor) Start(sess *Session) (shutdown func()) { return b.b.Shutdown } -func (b *bigmachineExecutor) manager(i int, ctx context.Context) *machineManager { +func (b *bigmachineExecutor) manager(ctx context.Context, i int) *machineManager { b.mu.Lock() defer b.mu.Unlock() for i >= len(b.managers) { @@ -264,7 +264,7 @@ func (b *bigmachineExecutor) commit(ctx context.Context, m *sliceMachine, key st }) } -func (b *bigmachineExecutor) Run(task *Task, ctx context.Context) { +func (b *bigmachineExecutor) Run(ctx context.Context, task *Task) { task.Status.Print("waiting for a machine") // Use the default/shared cluster unless the func is exclusive. @@ -272,7 +272,7 @@ func (b *bigmachineExecutor) Run(task *Task, ctx context.Context) { if task.Invocation.Exclusive { cluster = int(task.Invocation.Index) } - mgr := b.manager(cluster, ctx) + mgr := b.manager(ctx, cluster) procs := task.Pragma.Procs() if task.Pragma.Exclusive() || procs > mgr.machprocs { procs = mgr.machprocs diff --git a/exec/eval.go b/exec/eval.go index 50c3e38..1960a54 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -51,7 +51,7 @@ type Executor interface { // Run runs a task. The executor sets the state of the task as it // progresses. The task should enter in state TaskWaiting; by the // time Run returns the task state is >= TaskOk. - Run(*Task, context.Context) + Run(context.Context, *Task) // Reader returns a locally accessible ReadCloser for the requested task. Reader(*Task, int) sliceio.ReadCloser @@ -120,7 +120,7 @@ func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.G task.state = TaskWaiting task.Status = status startRunTime = time.Now() - go executor.Run(task, ctx) + go executor.Run(ctx, task) } else { status.Print("running in another invocation") } diff --git a/exec/local.go b/exec/local.go index 1c076ef..2d3c2d8 100644 --- a/exec/local.go +++ b/exec/local.go @@ -48,7 +48,7 @@ func (l *localExecutor) Start(sess *Session) (shutdown func()) { return } -func (l *localExecutor) Run(task *Task, ctx context.Context) { +func (l *localExecutor) Run(ctx context.Context, task *Task) { n := 1 if task.Pragma.Exclusive() { n = l.sess.p From 2b4529b8f8ea81a8267bfe33fddd3e57dc0c5395 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Fri, 10 Jul 2020 17:23:36 -0700 Subject: [PATCH 04/17] revert debug change to slicer main --- cmd/slicer/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/slicer/main.go b/cmd/slicer/main.go index 0dd0441..6f4ed5b 100644 --- a/cmd/slicer/main.go +++ b/cmd/slicer/main.go @@ -40,6 +40,7 @@ Available test are: wait := flag.Bool("wait", false, "don't exit after completion") sess := sliceconfig.Parse() + defer sess.Shutdown() if flag.NArg() == 0 { flag.Usage() @@ -60,7 +61,6 @@ Available test are: case "oom": err = oomer(sess, args) } - sess.Shutdown() if *wait { if err != nil { log.Printf("finished with error %v: waiting", err) From 9f1d8397bcb8f465556bf2f59652dc01c6b66fa9 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 01:10:45 -0700 Subject: [PATCH 05/17] Exit machineManager.Do based on shutdownc --- exec/slicemachine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exec/slicemachine.go b/exec/slicemachine.go index 9c3b848..fd5c336 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -533,7 +533,7 @@ func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { } mach.health = machineLost mach.Status.Done() - case <-ctx.Done(): + case <-shutdownc: return } From ac25547a036e22285d2761da02cfde189413b94c Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 09:19:20 -0700 Subject: [PATCH 06/17] Use background context again --- exec/bigmachine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 277c13f..a1d94b2 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -168,7 +168,7 @@ func (b *bigmachineExecutor) manager(ctx context.Context, i int) *machineManager maxLoad = 0 } b.managers[i] = newMachineManager(b.b, b.params, b.status, b.sess.Parallelism(), maxLoad, b.worker) - go b.managers[i].Do(ctx, b.isShutdown()) + go b.managers[i].Do(backgroundcontext.Get(), b.isShutdown()) } return b.managers[i] } From 0eea356b11a65ddb43b443a98a02ecc7bfc1d2cb Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 10:15:12 -0700 Subject: [PATCH 07/17] test fixes --- exec/bigmachine_test.go | 31 +++++++++++++++++-------------- exec/eval_test.go | 12 ++++++++++-- exec/evalstress_test.go | 6 +++++- exec/slicemachine_test.go | 4 +++- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/exec/bigmachine_test.go b/exec/bigmachine_test.go index 5423b33..93067a4 100644 --- a/exec/bigmachine_test.go +++ b/exec/bigmachine_test.go @@ -37,8 +37,8 @@ func TestBigmachineExecutor(t *testing.T) { } task := tasks[0] - go x.Run(task) ctx := context.Background() + go x.Run(ctx, task) task.Lock() gate <- struct{}{} for task.state <= TaskRunning { @@ -53,7 +53,7 @@ func TestBigmachineExecutor(t *testing.T) { // If we run it again, it should first enter waiting/running state, and // then Ok again. There should not be a new invocation (p=1). - go x.Run(task) + go x.Run(ctx, task) task.Lock() for task.state <= TaskRunning { if err := task.Wait(ctx); err != nil { @@ -76,6 +76,7 @@ func TestBigmachineExecutorExclusive(t *testing.T) { }) fn = fn.Exclusive() + ctx := context.Background() const N = 5 var maxIndex int wg.Add(2 * N) //one for local invocation; one for remote @@ -89,7 +90,7 @@ func TestBigmachineExecutorExclusive(t *testing.T) { if err != nil { t.Fatal(err) } - go x.Run(tasks[0]) + go x.Run(ctx, tasks[0]) } wg.Wait() var n int @@ -136,8 +137,8 @@ func TestBigmachineExecutorTaskExclusive(t *testing.T) { } called.Add(2) replied.Add(1) - go x.Run(tasks[0]) - go x.Run(tasks[1]) + go x.Run(ctx, tasks[0]) + go x.Run(ctx, tasks[1]) called.Wait() if got, want := tasks[0].State(), TaskRunning; got != want { t.Fatalf("got %v, want %v", got, want) @@ -237,7 +238,7 @@ func TestBigmachineExecutorProcs(t *testing.T) { // Run three tasks (needing 6 procs), and verify that two machines have been // started on which to run them. for _, task := range tasks[:3] { - go x.Run(task) + go x.Run(ctx, task) state, err := task.WaitState(ctx, TaskRunning) if err != nil || state != TaskRunning { t.Fatal(state, err) @@ -253,7 +254,7 @@ func TestBigmachineExecutorProcs(t *testing.T) { // has blocked because it cannot acquire a machine on which to run a task. // If this is a problem, we'll need a better solution. for _, task := range tasks[3:] { - go x.Run(task) + go x.Run(ctx, task) func() { stateCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() @@ -352,7 +353,7 @@ func TestBigmachineExecutorLost(t *testing.T) { Slice: readerSlice, tasks: readerTasks, } - go x.Run(readerTask) + go x.Run(ctx, readerTask) system.Wait(1) readerTask.Lock() for readerTask.state != TaskOk { @@ -369,7 +370,7 @@ func TestBigmachineExecutorLost(t *testing.T) { return bigslice.Map(readerResult, func(v int) int { return v }) }) mapTask := mapTasks[0] - go x.Run(mapTask) + go x.Run(ctx, mapTask) if state, err := mapTask.WaitState(ctx, TaskOk); err != nil { t.Fatal(err) } else if state != TaskLost { @@ -385,7 +386,7 @@ func TestBigmachineExecutorLost(t *testing.T) { for readerTask.state != TaskOk { readerTask.state = TaskInit readerTask.Unlock() - go x.Run(readerTask) + go x.Run(ctx, readerTask) readerTask.Lock() if err := readerTask.Wait(ctx); err != nil { t.Fatal(err) @@ -397,7 +398,7 @@ func TestBigmachineExecutorLost(t *testing.T) { // it gets allocated on so no retries. This can take a few seconds as // we wait for machine probation to expire. mapTask.Set(TaskInit) - go x.Run(mapTask) + go x.Run(ctx, mapTask) if state, err := mapTask.WaitState(ctx, TaskOk); err != nil { t.Fatal(err) } else if state != TaskOk { @@ -548,8 +549,9 @@ func TestBigmachineMetrics(t *testing.T) { return }) task := tasks[0] - go x.Run(task) - if _, err := task.WaitState(context.Background(), TaskOk); err != nil { + ctx := context.Background() + go x.Run(ctx, task) + if _, err := task.WaitState(ctx, TaskOk); err != nil { t.Fatal(err) } if got, want := counter.Value(&task.Scope), int64(6); got != want { @@ -595,8 +597,9 @@ func newErrorReader(r io.ReadSeeker) *errorReader { func run(t *testing.T, x *bigmachineExecutor, tasks []*Task, expect TaskState) { t.Helper() + ctx := context.Background() for _, task := range tasks { - go x.Run(task) + go x.Run(ctx, task) } for _, task := range tasks { if _, err := task.WaitState(context.Background(), expect); err != nil { diff --git a/exec/eval_test.go b/exec/eval_test.go index 49ab371..7ef3fc1 100644 --- a/exec/eval_test.go +++ b/exec/eval_test.go @@ -31,7 +31,7 @@ func (testExecutor) Start(*Session) (shutdown func()) { return func() {} } -func (t testExecutor) Run(task *Task) { +func (t testExecutor) Run(ctx context.Context, task *Task) { task.Lock() task.state = TaskRunning task.Broadcast() @@ -52,6 +52,10 @@ func (testExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } +func (testExecutor) isShutdown() chan struct{} { + panic("not implemented") +} + // SimpleEvalTest sets up a simple, 2-node task graph. type simpleEvalTest struct { Tasks []*Task @@ -462,7 +466,7 @@ func (benchExecutor) Start(*Session) (shutdown func()) { return func() {} } -func (b benchExecutor) Run(task *Task) { +func (b benchExecutor) Run(ctx context.Context, task *Task) { task.Lock() task.state = TaskOk task.Broadcast() @@ -483,6 +487,10 @@ func (benchExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } +func (benchExecutor) isShutdown() chan struct{} { + panic("not implemented") +} + var evalStages = flag.Int("eval.bench.stages", 5, "number of stages for eval benchmark") func BenchmarkEval(b *testing.B) { diff --git a/exec/evalstress_test.go b/exec/evalstress_test.go index b1138ce..ca7034f 100644 --- a/exec/evalstress_test.go +++ b/exec/evalstress_test.go @@ -66,7 +66,7 @@ func delay() { <-time.After(delayMS * time.Millisecond) } -func (e *stressExecutor) Run(task *Task) { +func (e *stressExecutor) Run(ctx context.Context, task *Task) { e.wg.Add(1) go func() { defer e.wg.Done() @@ -100,6 +100,10 @@ func (*stressExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } +func (*stressExecutor) isShutdown() chan struct{} { + panic("not implemented") +} + func (e *stressExecutor) loop(ctx context.Context) { for { select { diff --git a/exec/slicemachine_test.go b/exec/slicemachine_test.go index 11381a6..0a6b3a9 100644 --- a/exec/slicemachine_test.go +++ b/exec/slicemachine_test.go @@ -237,16 +237,18 @@ func startTestSystem(machinep, maxp int, maxLoad float64) (system *testsystem.Sy system.KeepalivePeriod = time.Second system.KeepaliveTimeout = 5 * time.Second system.KeepaliveRpcTimeout = time.Second + shutdownc := make(chan struct{}) b = bigmachine.Start(system) ctx, ctxcancel := context.WithCancel(context.Background()) m = newMachineManager(b, nil, nil, maxp, maxLoad, &worker{MachineCombiners: false}) var wg sync.WaitGroup wg.Add(1) go func() { - m.Do(ctx) + m.Do(ctx, shutdownc) wg.Done() }() cancel = func() { + close(shutdownc) ctxcancel() wg.Wait() } From f90c0483b0eb96c9b23c6d72f5f0966a115833e2 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 11:04:33 -0700 Subject: [PATCH 08/17] add shutdown test --- exec/session_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/exec/session_test.go b/exec/session_test.go index e6626e2..1cd75a5 100644 --- a/exec/session_test.go +++ b/exec/session_test.go @@ -367,6 +367,23 @@ func TestDiscardStress(t *testing.T) { } } +// TestSessionShutdownChannel tests that the channel indicating session channel is only closed after a call to +// Session.Shutdown() +func TestSessionShutdownChannel(t *testing.T) { + testSession(t, func(t *testing.T, sess *Session) { + select { + case <-sess.shutdownc: + t.Error("shutdownc is closed too early") + default: + } + sess.Shutdown() + _, ok := <- sess.shutdownc + if ok { + t.Error("shutdownc should be closed") + } + }) +} + var executors = map[string]Option{ "Local": Local, "Bigmachine.Test": Bigmachine(testsystem.New()), From 9486b153dde9f44871356f991176e89984604661 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 11:14:26 -0700 Subject: [PATCH 09/17] Remove some context plumbing --- exec/bigmachine.go | 7 ++++--- exec/bigmachine_test.go | 31 ++++++++++++++----------------- exec/eval.go | 4 ++-- exec/eval_test.go | 4 ++-- exec/evalstress_test.go | 2 +- exec/local.go | 2 +- exec/slicemachine.go | 2 ++ 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index a1d94b2..5883f1b 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -154,7 +154,7 @@ func (b *bigmachineExecutor) Start(sess *Session) (shutdown func()) { return b.b.Shutdown } -func (b *bigmachineExecutor) manager(ctx context.Context, i int) *machineManager { +func (b *bigmachineExecutor) manager(i int) *machineManager { b.mu.Lock() defer b.mu.Unlock() for i >= len(b.managers) { @@ -264,7 +264,7 @@ func (b *bigmachineExecutor) commit(ctx context.Context, m *sliceMachine, key st }) } -func (b *bigmachineExecutor) Run(ctx context.Context, task *Task) { +func (b *bigmachineExecutor) Run(task *Task) { task.Status.Print("waiting for a machine") // Use the default/shared cluster unless the func is exclusive. @@ -272,12 +272,13 @@ func (b *bigmachineExecutor) Run(ctx context.Context, task *Task) { if task.Invocation.Exclusive { cluster = int(task.Invocation.Index) } - mgr := b.manager(ctx, cluster) + mgr := b.manager(cluster) procs := task.Pragma.Procs() if task.Pragma.Exclusive() || procs > mgr.machprocs { procs = mgr.machprocs } var ( + ctx = backgroundcontext.Get() offerc, cancel = mgr.Offer(int(task.Invocation.Index), procs) m *sliceMachine ) diff --git a/exec/bigmachine_test.go b/exec/bigmachine_test.go index 93067a4..5423b33 100644 --- a/exec/bigmachine_test.go +++ b/exec/bigmachine_test.go @@ -37,8 +37,8 @@ func TestBigmachineExecutor(t *testing.T) { } task := tasks[0] + go x.Run(task) ctx := context.Background() - go x.Run(ctx, task) task.Lock() gate <- struct{}{} for task.state <= TaskRunning { @@ -53,7 +53,7 @@ func TestBigmachineExecutor(t *testing.T) { // If we run it again, it should first enter waiting/running state, and // then Ok again. There should not be a new invocation (p=1). - go x.Run(ctx, task) + go x.Run(task) task.Lock() for task.state <= TaskRunning { if err := task.Wait(ctx); err != nil { @@ -76,7 +76,6 @@ func TestBigmachineExecutorExclusive(t *testing.T) { }) fn = fn.Exclusive() - ctx := context.Background() const N = 5 var maxIndex int wg.Add(2 * N) //one for local invocation; one for remote @@ -90,7 +89,7 @@ func TestBigmachineExecutorExclusive(t *testing.T) { if err != nil { t.Fatal(err) } - go x.Run(ctx, tasks[0]) + go x.Run(tasks[0]) } wg.Wait() var n int @@ -137,8 +136,8 @@ func TestBigmachineExecutorTaskExclusive(t *testing.T) { } called.Add(2) replied.Add(1) - go x.Run(ctx, tasks[0]) - go x.Run(ctx, tasks[1]) + go x.Run(tasks[0]) + go x.Run(tasks[1]) called.Wait() if got, want := tasks[0].State(), TaskRunning; got != want { t.Fatalf("got %v, want %v", got, want) @@ -238,7 +237,7 @@ func TestBigmachineExecutorProcs(t *testing.T) { // Run three tasks (needing 6 procs), and verify that two machines have been // started on which to run them. for _, task := range tasks[:3] { - go x.Run(ctx, task) + go x.Run(task) state, err := task.WaitState(ctx, TaskRunning) if err != nil || state != TaskRunning { t.Fatal(state, err) @@ -254,7 +253,7 @@ func TestBigmachineExecutorProcs(t *testing.T) { // has blocked because it cannot acquire a machine on which to run a task. // If this is a problem, we'll need a better solution. for _, task := range tasks[3:] { - go x.Run(ctx, task) + go x.Run(task) func() { stateCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() @@ -353,7 +352,7 @@ func TestBigmachineExecutorLost(t *testing.T) { Slice: readerSlice, tasks: readerTasks, } - go x.Run(ctx, readerTask) + go x.Run(readerTask) system.Wait(1) readerTask.Lock() for readerTask.state != TaskOk { @@ -370,7 +369,7 @@ func TestBigmachineExecutorLost(t *testing.T) { return bigslice.Map(readerResult, func(v int) int { return v }) }) mapTask := mapTasks[0] - go x.Run(ctx, mapTask) + go x.Run(mapTask) if state, err := mapTask.WaitState(ctx, TaskOk); err != nil { t.Fatal(err) } else if state != TaskLost { @@ -386,7 +385,7 @@ func TestBigmachineExecutorLost(t *testing.T) { for readerTask.state != TaskOk { readerTask.state = TaskInit readerTask.Unlock() - go x.Run(ctx, readerTask) + go x.Run(readerTask) readerTask.Lock() if err := readerTask.Wait(ctx); err != nil { t.Fatal(err) @@ -398,7 +397,7 @@ func TestBigmachineExecutorLost(t *testing.T) { // it gets allocated on so no retries. This can take a few seconds as // we wait for machine probation to expire. mapTask.Set(TaskInit) - go x.Run(ctx, mapTask) + go x.Run(mapTask) if state, err := mapTask.WaitState(ctx, TaskOk); err != nil { t.Fatal(err) } else if state != TaskOk { @@ -549,9 +548,8 @@ func TestBigmachineMetrics(t *testing.T) { return }) task := tasks[0] - ctx := context.Background() - go x.Run(ctx, task) - if _, err := task.WaitState(ctx, TaskOk); err != nil { + go x.Run(task) + if _, err := task.WaitState(context.Background(), TaskOk); err != nil { t.Fatal(err) } if got, want := counter.Value(&task.Scope), int64(6); got != want { @@ -597,9 +595,8 @@ func newErrorReader(r io.ReadSeeker) *errorReader { func run(t *testing.T, x *bigmachineExecutor, tasks []*Task, expect TaskState) { t.Helper() - ctx := context.Background() for _, task := range tasks { - go x.Run(ctx, task) + go x.Run(task) } for _, task := range tasks { if _, err := task.WaitState(context.Background(), expect); err != nil { diff --git a/exec/eval.go b/exec/eval.go index 1960a54..5a3903e 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -51,7 +51,7 @@ type Executor interface { // Run runs a task. The executor sets the state of the task as it // progresses. The task should enter in state TaskWaiting; by the // time Run returns the task state is >= TaskOk. - Run(context.Context, *Task) + Run(*Task) // Reader returns a locally accessible ReadCloser for the requested task. Reader(*Task, int) sliceio.ReadCloser @@ -120,7 +120,7 @@ func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.G task.state = TaskWaiting task.Status = status startRunTime = time.Now() - go executor.Run(ctx, task) + go executor.Run(task) } else { status.Print("running in another invocation") } diff --git a/exec/eval_test.go b/exec/eval_test.go index 7ef3fc1..baa518e 100644 --- a/exec/eval_test.go +++ b/exec/eval_test.go @@ -31,7 +31,7 @@ func (testExecutor) Start(*Session) (shutdown func()) { return func() {} } -func (t testExecutor) Run(ctx context.Context, task *Task) { +func (t testExecutor) Run(task *Task) { task.Lock() task.state = TaskRunning task.Broadcast() @@ -466,7 +466,7 @@ func (benchExecutor) Start(*Session) (shutdown func()) { return func() {} } -func (b benchExecutor) Run(ctx context.Context, task *Task) { +func (b benchExecutor) Run(task *Task) { task.Lock() task.state = TaskOk task.Broadcast() diff --git a/exec/evalstress_test.go b/exec/evalstress_test.go index ca7034f..a67da1d 100644 --- a/exec/evalstress_test.go +++ b/exec/evalstress_test.go @@ -66,7 +66,7 @@ func delay() { <-time.After(delayMS * time.Millisecond) } -func (e *stressExecutor) Run(ctx context.Context, task *Task) { +func (e *stressExecutor) Run(task *Task) { e.wg.Add(1) go func() { defer e.wg.Done() diff --git a/exec/local.go b/exec/local.go index 2d3c2d8..bd3b55e 100644 --- a/exec/local.go +++ b/exec/local.go @@ -48,7 +48,7 @@ func (l *localExecutor) Start(sess *Session) (shutdown func()) { return } -func (l *localExecutor) Run(ctx context.Context, task *Task) { +func (l *localExecutor) Run(task *Task) { n := 1 if task.Pragma.Exclusive() { n = l.sess.p diff --git a/exec/slicemachine.go b/exec/slicemachine.go index fd5c336..d4e2c87 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -535,6 +535,8 @@ func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { mach.Status.Done() case <-shutdownc: return + case <-ctx.Done(): + return } // TODO(marius): consider scaling down when we don't need as many From dad8604e22aecc108dccb9df7bae241936d64ea5 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 11:18:11 -0700 Subject: [PATCH 10/17] goimports --- exec/local.go | 2 ++ exec/session_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/exec/local.go b/exec/local.go index bd3b55e..051ce82 100644 --- a/exec/local.go +++ b/exec/local.go @@ -11,6 +11,7 @@ import ( "runtime/debug" "sync" + "github.com/grailbio/base/backgroundcontext" "github.com/grailbio/base/errors" "github.com/grailbio/base/eventlog" "github.com/grailbio/base/limiter" @@ -49,6 +50,7 @@ func (l *localExecutor) Start(sess *Session) (shutdown func()) { } func (l *localExecutor) Run(task *Task) { + ctx := backgroundcontext.Get() n := 1 if task.Pragma.Exclusive() { n = l.sess.p diff --git a/exec/session_test.go b/exec/session_test.go index 1cd75a5..33c0d1d 100644 --- a/exec/session_test.go +++ b/exec/session_test.go @@ -372,12 +372,12 @@ func TestDiscardStress(t *testing.T) { func TestSessionShutdownChannel(t *testing.T) { testSession(t, func(t *testing.T, sess *Session) { select { - case <-sess.shutdownc: - t.Error("shutdownc is closed too early") - default: + case <-sess.shutdownc: + t.Error("shutdownc is closed too early") + default: } sess.Shutdown() - _, ok := <- sess.shutdownc + _, ok := <-sess.shutdownc if ok { t.Error("shutdownc should be closed") } From 30aabc075f30a9e549b215dc25085ed7378551aa Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Tue, 14 Jul 2020 12:58:05 -0700 Subject: [PATCH 11/17] remove final context change --- exec/eval.go | 3 +++ exec/session.go | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/exec/eval.go b/exec/eval.go index 5a3903e..0b3143f 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -80,6 +80,9 @@ type Executor interface { // TODO(marius): we can often stream across shuffle boundaries. This would // complicate scheduling, but may be worth doing. func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.Group) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + state := newState() for _, task := range roots { state.Enqueue(task) diff --git a/exec/session.go b/exec/session.go index f9f041f..631405b 100644 --- a/exec/session.go +++ b/exec/session.go @@ -291,8 +291,6 @@ var statusMu sync.Mutex func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error) { location := "" - runContext, runCancel := context.WithCancel(ctx) - defer runCancel() if _, file, line, ok := runtime.Caller(calldepth + 1); ok { location = fmt.Sprintf("%s:%d", file, line) defer typecheck.Location(file, line) @@ -339,7 +337,9 @@ func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncVa return nil, err } if sliceGroup != nil { - go maintainSliceGroup(runContext, tasks, sliceGroup) + maintainCtx, cancel := context.WithCancel(ctx) + defer cancel() + go maintainSliceGroup(maintainCtx, tasks, sliceGroup) } // Register all the tasks so they may be used in visualization. s.mu.Lock() @@ -352,7 +352,7 @@ func (s *Session) run(ctx context.Context, calldepth int, funcv *bigslice.FuncVa sess: s, invIndex: inv.Index, tasks: tasks, - }, Eval(runContext, s.executor, tasks, taskGroup) + }, Eval(ctx, s.executor, tasks, taskGroup) } // Parallelism returns the desired amount of evaluation parallelism. From ca309f341907c8badba10d4b83e333848a58401a Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Wed, 15 Jul 2020 16:53:11 -0700 Subject: [PATCH 12/17] jerry proposal --- exec/bigmachine.go | 28 ++++++++++++++++++++++------ exec/eval.go | 3 --- exec/eval_test.go | 8 -------- exec/evalstress_test.go | 4 ---- exec/local.go | 4 ---- exec/session.go | 13 ++++--------- exec/session_test.go | 17 ----------------- exec/slicemachine.go | 21 +++++++++++++-------- exec/slicemachine_test.go | 4 +--- 9 files changed, 40 insertions(+), 62 deletions(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 5883f1b..5da9fdd 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -122,6 +122,9 @@ type bigmachineExecutor struct { // If the task is marked as exclusive, then one is added to their // manager index. managers []*machineManager + + shutdownc chan struct{} + managersWG sync.WaitGroup } func newBigmachineExecutor(system bigmachine.System, params ...bigmachine.Param) *bigmachineExecutor { @@ -150,13 +153,24 @@ func (b *bigmachineExecutor) Start(sess *Session) (shutdown func()) { b.worker = &worker{ MachineCombiners: sess.machineCombiners, } + b.shutdownc = make(chan struct{}) + - return b.b.Shutdown + return func(){ + close(b.shutdownc) + b.managersWG.Wait() + b.b.Shutdown() + } } func (b *bigmachineExecutor) manager(i int) *machineManager { b.mu.Lock() defer b.mu.Unlock() + ctx, cancel := context.WithCancel(backgroundcontext.Get()) + go func() { + <-b.shutdownc + cancel() + }() for i >= len(b.managers) { b.managers = append(b.managers, nil) } @@ -168,7 +182,13 @@ func (b *bigmachineExecutor) manager(i int) *machineManager { maxLoad = 0 } b.managers[i] = newMachineManager(b.b, b.params, b.status, b.sess.Parallelism(), maxLoad, b.worker) - go b.managers[i].Do(backgroundcontext.Get(), b.isShutdown()) + + b.managersWG.Add(1) + go func() { + defer b.managersWG.Done() + b.managers[i].Do(ctx) + b.managers[i].machinesWG.Wait() + }() } return b.managers[i] } @@ -477,10 +497,6 @@ func (b *bigmachineExecutor) HandleDebug(handler *http.ServeMux) { b.b.HandleDebug(handler) } -func (b *bigmachineExecutor) isShutdown() chan struct{} { - return b.sess.shutdownc -} - // Location returns the machine on which the results of the provided // task resides. func (b *bigmachineExecutor) location(task *Task) *sliceMachine { diff --git a/exec/eval.go b/exec/eval.go index 0b3143f..99c9a6e 100644 --- a/exec/eval.go +++ b/exec/eval.go @@ -67,9 +67,6 @@ type Executor interface { // http.ServeMux. This is used to serve diagnostic information relating // to the executor. HandleDebug(handler *http.ServeMux) - - // Returns a channel which is closed when the session is shutting down - isShutdown() chan struct{} } // Eval simultaneously evaluates a set of task graphs from the diff --git a/exec/eval_test.go b/exec/eval_test.go index baa518e..49ab371 100644 --- a/exec/eval_test.go +++ b/exec/eval_test.go @@ -52,10 +52,6 @@ func (testExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } -func (testExecutor) isShutdown() chan struct{} { - panic("not implemented") -} - // SimpleEvalTest sets up a simple, 2-node task graph. type simpleEvalTest struct { Tasks []*Task @@ -487,10 +483,6 @@ func (benchExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } -func (benchExecutor) isShutdown() chan struct{} { - panic("not implemented") -} - var evalStages = flag.Int("eval.bench.stages", 5, "number of stages for eval benchmark") func BenchmarkEval(b *testing.B) { diff --git a/exec/evalstress_test.go b/exec/evalstress_test.go index a67da1d..b1138ce 100644 --- a/exec/evalstress_test.go +++ b/exec/evalstress_test.go @@ -100,10 +100,6 @@ func (*stressExecutor) HandleDebug(handler *http.ServeMux) { panic("not implemented") } -func (*stressExecutor) isShutdown() chan struct{} { - panic("not implemented") -} - func (e *stressExecutor) loop(ctx context.Context) { for { select { diff --git a/exec/local.go b/exec/local.go index 051ce82..a170c66 100644 --- a/exec/local.go +++ b/exec/local.go @@ -240,10 +240,6 @@ func bufferOutput(ctx context.Context, task *Task, out sliceio.Reader) (buf task return buf, nil } -func (l *localExecutor) isShutdown() chan struct{} { - return l.sess.shutdownc -} - type multiReader struct { q []sliceio.Reader err error diff --git a/exec/session.go b/exec/session.go index 631405b..1632109 100644 --- a/exec/session.go +++ b/exec/session.go @@ -83,18 +83,14 @@ type Session struct { // roots stores all task roots compiled by this session; // used for debugging. roots map[*Task]struct{} - - // Channel to indicate that the session is shutting down - shutdownc chan struct{} } func newSession() *Session { return &Session{ - Context: backgroundcontext.Get(), - index: atomic.AddInt32(&nextSessionIndex, 1) - 1, - roots: make(map[*Task]struct{}), - eventer: eventlog.Nop{}, - shutdownc: make(chan struct{}), + Context: backgroundcontext.Get(), + index: atomic.AddInt32(&nextSessionIndex, 1) - 1, + roots: make(map[*Task]struct{}), + eventer: eventlog.Nop{}, } } @@ -371,7 +367,6 @@ func (s *Session) Shutdown() { if s.shutdown != nil { s.shutdown() } - close(s.shutdownc) if s.tracePath != "" { writeTraceFile(s.tracer, s.tracePath) } diff --git a/exec/session_test.go b/exec/session_test.go index 33c0d1d..e6626e2 100644 --- a/exec/session_test.go +++ b/exec/session_test.go @@ -367,23 +367,6 @@ func TestDiscardStress(t *testing.T) { } } -// TestSessionShutdownChannel tests that the channel indicating session channel is only closed after a call to -// Session.Shutdown() -func TestSessionShutdownChannel(t *testing.T) { - testSession(t, func(t *testing.T, sess *Session) { - select { - case <-sess.shutdownc: - t.Error("shutdownc is closed too early") - default: - } - sess.Shutdown() - _, ok := <-sess.shutdownc - if ok { - t.Error("shutdownc should be closed") - } - }) -} - var executors = map[string]Option{ "Local": Local, "Bigmachine.Test": Bigmachine(testsystem.New()), diff --git a/exec/slicemachine.go b/exec/slicemachine.go index d4e2c87..126899f 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -124,7 +124,7 @@ func (s *sliceMachine) Assign(task *Task) { // Go manages a sliceMachine: it polls stats at regular intervals and // marks tasks as lost when a machine fails. -func (s *sliceMachine) Go(ctx context.Context, shutdownc chan struct{}) { +func (s *sliceMachine) Go(ctx context.Context) { stopped := s.Wait(bigmachine.Stopped) loop: for ctx.Err() == nil { @@ -188,7 +188,6 @@ loop: select { case <-time.After(statsPollInterval): case <-ctx.Done(): - case <-shutdownc: return case <-stopped: break loop @@ -351,6 +350,8 @@ type machineManager struct { schedQ scheduleRequestQ schedc chan scheduleRequest unschedc chan scheduleRequest + + machinesWG sync.WaitGroup } // NewMachineManager returns a new machineManager paramterized by the @@ -419,7 +420,7 @@ func (m *machineManager) Offer(priority, procs int) (<-chan *sliceMachine, func( // needed (as indicated by client's calls to Need); thus when a // machine is lost, it may be replaced with another should it be // needed. -func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { +func (m *machineManager) Do(ctx context.Context) { var ( need, pending int startc = make(chan startResult) @@ -533,8 +534,6 @@ func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { } mach.health = machineLost mach.Status.Done() - case <-shutdownc: - return case <-ctx.Done(): return } @@ -551,7 +550,14 @@ func (m *machineManager) Do(ctx context.Context, shutdownc chan struct{}) { log.Printf("slicemachine: %d machines (%d procs); %d machines pending (%d procs)", have/m.machprocs, have, pending/m.machprocs, pending) go func() { - machines := startMachines(ctx, shutdownc, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) + machines := startMachines(ctx, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) + for _, machine := range machines { + m.machinesWG.Add(1) + go func() { + defer m.machinesWG.Done() + machine.Go(ctx) + }() + } startc <- startResult{ machines: machines, nFailures: needMachines - len(machines), @@ -592,7 +598,7 @@ func removeMachine(ms []*sliceMachine, m *sliceMachine) []*sliceMachine { // on each of them. StartMachines returns a slice of successfully started // machines when all of them are in bigmachine.Running state. If a machine // fails to start, it is not included. -func startMachines(ctx context.Context, shutdownc chan struct{}, b *bigmachine.B, group *status.Group, maxTaskProcs int, n int, worker *worker, params ...bigmachine.Param) []*sliceMachine { +func startMachines(ctx context.Context, b *bigmachine.B, group *status.Group, maxTaskProcs int, n int, worker *worker, params ...bigmachine.Param) []*sliceMachine { params = append([]bigmachine.Param{bigmachine.Services{"Worker": worker}}, params...) machines, err := b.Start(ctx, n, params...) if err != nil { @@ -639,7 +645,6 @@ func startMachines(ctx context.Context, shutdownc chan struct{}, b *bigmachine.B Status: status, maxTaskProcs: maxTaskProcs, } - go sm.Go(ctx, shutdownc) slicemachines[i] = sm }() } diff --git a/exec/slicemachine_test.go b/exec/slicemachine_test.go index 0a6b3a9..11381a6 100644 --- a/exec/slicemachine_test.go +++ b/exec/slicemachine_test.go @@ -237,18 +237,16 @@ func startTestSystem(machinep, maxp int, maxLoad float64) (system *testsystem.Sy system.KeepalivePeriod = time.Second system.KeepaliveTimeout = 5 * time.Second system.KeepaliveRpcTimeout = time.Second - shutdownc := make(chan struct{}) b = bigmachine.Start(system) ctx, ctxcancel := context.WithCancel(context.Background()) m = newMachineManager(b, nil, nil, maxp, maxLoad, &worker{MachineCombiners: false}) var wg sync.WaitGroup wg.Add(1) go func() { - m.Do(ctx, shutdownc) + m.Do(ctx) wg.Done() }() cancel = func() { - close(shutdownc) ctxcancel() wg.Wait() } From 9483e48dde2e2b19cc54536848c3ece5e84daf93 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Mon, 20 Jul 2020 11:48:57 -0700 Subject: [PATCH 13/17] lint --- exec/bigmachine.go | 5 ++--- exec/slicemachine.go | 8 ++++---- go.sum | 3 +++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 5da9fdd..405686d 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -123,7 +123,7 @@ type bigmachineExecutor struct { // manager index. managers []*machineManager - shutdownc chan struct{} + shutdownc chan struct{} managersWG sync.WaitGroup } @@ -155,8 +155,7 @@ func (b *bigmachineExecutor) Start(sess *Session) (shutdown func()) { } b.shutdownc = make(chan struct{}) - - return func(){ + return func() { close(b.shutdownc) b.managersWG.Wait() b.b.Shutdown() diff --git a/exec/slicemachine.go b/exec/slicemachine.go index 126899f..25da6ae 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -551,12 +551,12 @@ func (m *machineManager) Do(ctx context.Context) { have/m.machprocs, have, pending/m.machprocs, pending) go func() { machines := startMachines(ctx, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) - for _, machine := range machines { + for _, mach := range machines { m.machinesWG.Add(1) - go func() { - defer m.machinesWG.Done() + go func(machine *sliceMachine) { machine.Go(ctx) - }() + m.machinesWG.Done() + }(mach) } startc <- startResult{ machines: machines, diff --git a/go.sum b/go.sum index 097cd41..8d53bd7 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,9 @@ github.com/grailbio/bigmachine v0.5.7 h1:RaYi4wa4el62yqrw1qTB+KVmmlcS8VhDy59/l+8 github.com/grailbio/bigmachine v0.5.7/go.mod h1:wvOUthoZPxKKJ829ClWaO/uRTxaW3DLm7LyUQHO8ed0= github.com/grailbio/bigmachine v0.5.8 h1:LNOiBTPjk6P8JODqqItcysRzftEPhE59B1wSlKnpGy4= github.com/grailbio/bigmachine v0.5.8/go.mod h1:O9UMGp6FPD6dRJhpIImjnTs8uFG8smEDYSqdIoadS+Y= +github.com/grailbio/testutil v0.0.1/go.mod h1:j7teGaXqRY1n6m7oM8oy954lxL37Myt7nEJZlif3nMA= +github.com/grailbio/testutil v0.0.3 h1:Um0OOTtYVvyxwQbO48K3t6lNmLPY4sL3Vn6Sw0srNy8= +github.com/grailbio/testutil v0.0.3/go.mod h1:f9+y7xMXeXwyNcdV5cmo6GzRiitSOubMmqcqEON7NQQ= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a h1:kAl1x1ErQgs55bcm/WdoKCPny/kIF7COmC+UGQ9GKcM= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a/go.mod h1:2g5HI42KHw+BDBdjLP3zs+WvTHlDK3RoE8crjCl26y4= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= From 3749a4d205d53c1a0cb86753625fd2790b1393d8 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Mon, 20 Jul 2020 12:49:55 -0700 Subject: [PATCH 14/17] remove defer --- exec/bigmachine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exec/bigmachine.go b/exec/bigmachine.go index 405686d..5234618 100644 --- a/exec/bigmachine.go +++ b/exec/bigmachine.go @@ -184,9 +184,9 @@ func (b *bigmachineExecutor) manager(i int) *machineManager { b.managersWG.Add(1) go func() { - defer b.managersWG.Done() b.managers[i].Do(ctx) b.managers[i].machinesWG.Wait() + b.managersWG.Done() }() } return b.managers[i] From 993f92c98c74376b23d13d972b67f4b921c31345 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Mon, 20 Jul 2020 13:05:55 -0700 Subject: [PATCH 15/17] remove go.sum changes --- go.sum | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.sum b/go.sum index 8d53bd7..097cd41 100644 --- a/go.sum +++ b/go.sum @@ -101,9 +101,6 @@ github.com/grailbio/bigmachine v0.5.7 h1:RaYi4wa4el62yqrw1qTB+KVmmlcS8VhDy59/l+8 github.com/grailbio/bigmachine v0.5.7/go.mod h1:wvOUthoZPxKKJ829ClWaO/uRTxaW3DLm7LyUQHO8ed0= github.com/grailbio/bigmachine v0.5.8 h1:LNOiBTPjk6P8JODqqItcysRzftEPhE59B1wSlKnpGy4= github.com/grailbio/bigmachine v0.5.8/go.mod h1:O9UMGp6FPD6dRJhpIImjnTs8uFG8smEDYSqdIoadS+Y= -github.com/grailbio/testutil v0.0.1/go.mod h1:j7teGaXqRY1n6m7oM8oy954lxL37Myt7nEJZlif3nMA= -github.com/grailbio/testutil v0.0.3 h1:Um0OOTtYVvyxwQbO48K3t6lNmLPY4sL3Vn6Sw0srNy8= -github.com/grailbio/testutil v0.0.3/go.mod h1:f9+y7xMXeXwyNcdV5cmo6GzRiitSOubMmqcqEON7NQQ= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a h1:kAl1x1ErQgs55bcm/WdoKCPny/kIF7COmC+UGQ9GKcM= github.com/grailbio/v23/factories/grail v0.0.0-20190904050408-8a555d238e9a/go.mod h1:2g5HI42KHw+BDBdjLP3zs+WvTHlDK3RoE8crjCl26y4= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= From 07f224174fe501d403082d8a478d90c3b8eca2f9 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Mon, 20 Jul 2020 13:33:18 -0700 Subject: [PATCH 16/17] Use more idiomatic go func in loop Co-authored-by: Jaran Charumilind --- exec/slicemachine.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/exec/slicemachine.go b/exec/slicemachine.go index 25da6ae..0551061 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -551,12 +551,13 @@ func (m *machineManager) Do(ctx context.Context) { have/m.machprocs, have, pending/m.machprocs, pending) go func() { machines := startMachines(ctx, m.b, m.group, m.machprocs, needMachines, m.worker, m.params...) - for _, mach := range machines { + for _, machine := range machines { + machine := machine m.machinesWG.Add(1) - go func(machine *sliceMachine) { + go func() { machine.Go(ctx) m.machinesWG.Done() - }(mach) + }() } startc <- startResult{ machines: machines, From 6dd809c21d62a9a9dc63e04e31741bae82d68c37 Mon Sep 17 00:00:00 2001 From: Alex Wissmann Date: Mon, 20 Jul 2020 14:13:13 -0700 Subject: [PATCH 17/17] Dont return on ctx.Done() Co-authored-by: Jaran Charumilind --- exec/slicemachine.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exec/slicemachine.go b/exec/slicemachine.go index 0551061..7ca1c1c 100644 --- a/exec/slicemachine.go +++ b/exec/slicemachine.go @@ -188,7 +188,6 @@ loop: select { case <-time.After(statsPollInterval): case <-ctx.Done(): - return case <-stopped: break loop }