Skip to content

Commit d7359b0

Browse files
authored
fix: ensure node and services are watcing for the same shutdown signal from the context cancel (#1406)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> The nodes were creating a new context with cancel after passing the original context through to the services. This meant that when the node shutdown and cancelled its context, its services did not receive a shutdown signal until the original caller cancelled its context. This was leading to panics in testing about writing to a logger test file that was already closed. This change was verified by reverting the changes in #1402 and testing in a loop 100 times with no panics. **EDIT 1** Since no good deed goes unpunished 🙃 some timeouts surfaced. This was due to `context.Background()` being used for subscribe and unsubscribe events. Under the hood those events are watching for `ctx.Done()` events, so when `context.Background()` is passed in, they can hang indefinitely if the other signals aren't triggered. The test that was most prone to this timeout was run in a loop 1000 times to verify the issue was fixed. Additionally, added some more checks for `ctx.Done()` for faster shutdowns. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced node shutdown process with context cancellation for improved service termination. - **Refactor** - Updated error handling and context management in node creation functions. - Improved context usage in block synchronization and event subscription methods. - **Tests** - Adjusted full node integration tests to reflect new context management. - **Chores** - Removed redundant node cancellation in test cleanup function. - **Documentation** - No visible changes to end-users. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 2f6ed32 commit d7359b0

8 files changed

Lines changed: 94 additions & 25 deletions

File tree

block/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,11 @@ func (m *Manager) sendNonBlockingSignalToRetrieveCh() {
386386
// If commit for block h+1 is available, we proceed with sync process, and remove synced block from sync cache.
387387
func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
388388
for {
389+
select {
390+
case <-ctx.Done():
391+
return ctx.Err()
392+
default:
393+
}
389394
currentHeight := m.store.Height()
390395
b, ok := m.blockCache.getBlock(currentHeight + 1)
391396
if !ok {
@@ -516,13 +521,24 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
516521
}
517522

518523
func (m *Manager) processNextDABlock(ctx context.Context) error {
524+
select {
525+
case <-ctx.Done():
526+
return ctx.Err()
527+
default:
528+
}
529+
519530
// TODO(tzdybal): extract configuration option
520531
maxRetries := 10
521532
daHeight := atomic.LoadUint64(&m.daHeight)
522533

523534
var err error
524535
m.logger.Debug("trying to retrieve block from DA", "daHeight", daHeight)
525536
for r := 0; r < maxRetries; r++ {
537+
select {
538+
case <-ctx.Done():
539+
return ctx.Err()
540+
default:
541+
}
526542
blockResp, fetchErr := m.fetchBlock(ctx, daHeight)
527543
if fetchErr == nil {
528544
if blockResp.Code == da.StatusNotFound {
@@ -616,6 +632,12 @@ func (m *Manager) IsProposer() (bool, error) {
616632
}
617633

618634
func (m *Manager) publishBlock(ctx context.Context) error {
635+
select {
636+
case <-ctx.Done():
637+
return ctx.Err()
638+
default:
639+
}
640+
619641
var lastCommit *types.Commit
620642
var lastHeaderHash types.Hash
621643
var err error

node/full.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,17 @@ func newFullNode(
9595
clientCreator proxy.ClientCreator,
9696
genesis *cmtypes.GenesisDoc,
9797
logger log.Logger,
98-
) (*FullNode, error) {
98+
) (fn *FullNode, err error) {
99+
// Create context with cancel so that all services using the context can
100+
// catch the cancel signal when the node shutdowns
101+
ctx, cancel := context.WithCancel(ctx)
102+
defer func() {
103+
// If there is an error, cancel the context
104+
if err != nil {
105+
cancel()
106+
}
107+
}()
108+
99109
proxyApp, err := initProxyApp(clientCreator, logger)
100110
if err != nil {
101111
return nil, err
@@ -147,8 +157,6 @@ func newFullNode(
147157
return nil, err
148158
}
149159

150-
ctx, cancel := context.WithCancel(ctx)
151-
152160
node := &FullNode{
153161
proxyApp: proxyApp,
154162
eventBus: eventBus,

node/full_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (c *FullClient) BroadcastTxCommit(ctx context.Context, tx cmtypes.Tx) (*cty
112112
return nil, err
113113
}
114114
defer func() {
115-
if err := c.EventBus.Unsubscribe(context.Background(), subscriber, q); err != nil {
115+
if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil {
116116
c.Logger.Error("Error unsubscribing from eventBus", "err", err)
117117
}
118118
}()
@@ -885,7 +885,7 @@ func (c *FullClient) resubscribe(subscriber string, q cmpubsub.Query) cmtypes.Su
885885
return nil
886886
}
887887

888-
sub, err := c.EventBus.Subscribe(context.Background(), subscriber, q)
888+
sub, err := c.EventBus.Subscribe(c.node.ctx, subscriber, q)
889889
if err == nil {
890890
return sub
891891
}

node/full_node_integration_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal
4343
func TestCentralizedSequencer(t *testing.T) {
4444
require := require.New(t)
4545
assert := assert.New(t)
46-
ctx, cancel := context.WithCancel(context.Background())
47-
defer cancel()
4846
genDoc, privkey := types.GetGenesisWithPrivkey()
4947
genDoc.AppHash = make([]byte, 32)
5048
nodeKey := &p2p.NodeKey{
@@ -70,7 +68,9 @@ func TestCentralizedSequencer(t *testing.T) {
7068
DAStartHeight: 1,
7169
DABlockTime: 1 * time.Second,
7270
}
73-
node, err := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr, Aggregator: false, BlockManagerConfig: blockManagerConfig}, signingKey, signingKey, proxy.NewLocalClientCreator(app), genDoc, log.TestingLogger())
71+
ctx, cancel := context.WithCancel(context.Background())
72+
defer cancel()
73+
node, err := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr, Aggregator: false, BlockManagerConfig: blockManagerConfig}, signingKey, signingKey, proxy.NewLocalClientCreator(app), genDoc, test.NewFileLogger(t))
7474
require.NoError(err)
7575
node.dalc = dalc
7676
node.blockManager.SetDALC(dalc)
@@ -100,8 +100,7 @@ func TestCentralizedSequencer(t *testing.T) {
100100
submitResp := dalc.SubmitBlocks(ctx, []*types.Block{validBlock, junkProposerBlock, sigInvalidBlock})
101101
fmt.Println(submitResp)
102102
require.Equal(submitResp.Code, da.StatusSuccess)
103-
104-
require.NoError(testutils.Retry(3000, 100*time.Millisecond, func() error {
103+
require.NoError(testutils.Retry(300, 100*time.Millisecond, func() error {
105104
block, err := node.Store.GetBlock(1)
106105
if err != nil {
107106
return err
@@ -482,16 +481,20 @@ func TestSubmitBlocksToDA(t *testing.T) {
482481
assert.NoError(seq.Stop())
483482
}()
484483

485-
timer := time.NewTimer(5 * seq.nodeConfig.DABlockTime)
486-
<-timer.C
487-
488484
numberOfBlocksToSyncTill := seq.Store.Height()
489485

490486
//Make sure all produced blocks made it to DA
491487
for i := uint64(1); i <= numberOfBlocksToSyncTill; i++ {
492-
block, err := seq.Store.GetBlock(i)
493-
require.NoError(err)
494-
require.True(seq.blockManager.IsDAIncluded(block.Hash()), block.Height())
488+
require.NoError(testutils.Retry(300, 100*time.Millisecond, func() error {
489+
block, err := seq.Store.GetBlock(i)
490+
if err != nil {
491+
return err
492+
}
493+
if !seq.blockManager.IsDAIncluded(block.Hash()) {
494+
return fmt.Errorf("block %d not DA included", block.Height())
495+
}
496+
return nil
497+
}))
495498
}
496499
}
497500

node/light.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,17 @@ func newLightNode(
4949
clientCreator proxy.ClientCreator,
5050
genesis *cmtypes.GenesisDoc,
5151
logger log.Logger,
52-
) (*LightNode, error) {
52+
) (ln *LightNode, err error) {
53+
// Create context with cancel so that all services using the context can
54+
// catch the cancel signal when the node shutdowns
55+
ctx, cancel := context.WithCancel(ctx)
56+
defer func() {
57+
// If there is an error, cancel the context
58+
if err != nil {
59+
cancel()
60+
}
61+
}()
62+
5363
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
5464
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
5565
proxyApp.SetLogger(logger.With("module", "proxy"))
@@ -71,8 +81,6 @@ func newLightNode(
7181
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
7282
}
7383

74-
ctx, cancel := context.WithCancel(ctx)
75-
7684
node := &LightNode{
7785
P2P: client,
7886
proxyApp: proxyApp,

node/node_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ var MockServerAddr = ":7980"
1919

2020
// cleanUpNode stops the node and checks if it is running
2121
func cleanUpNode(node Node, t *testing.T) {
22-
defer func() {
23-
node.Cancel()
24-
}()
2522
assert.NoError(t, node.Stop())
2623
assert.False(t, node.IsRunning())
2724
}

state/txindex/indexer_service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ func (is *IndexerService) OnStart() error {
4747
// canceled due to not pulling messages fast enough. Cause this might
4848
// sometimes happen when there are no other subscribers.
4949
blockSub, err := is.eventBus.SubscribeUnbuffered(
50-
context.Background(),
50+
is.ctx,
5151
subscriber,
5252
types.EventQueryNewBlockEvents)
5353
if err != nil {
5454
return err
5555
}
5656

57-
txsSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryTx)
57+
txsSub, err := is.eventBus.SubscribeUnbuffered(is.ctx, subscriber, types.EventQueryTx)
5858
if err != nil {
5959
return err
6060
}
@@ -130,6 +130,6 @@ func (is *IndexerService) OnStart() error {
130130
// OnStop implements service.Service by unsubscribing from all transactions.
131131
func (is *IndexerService) OnStop() {
132132
if is.eventBus.IsRunning() {
133-
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
133+
_ = is.eventBus.UnsubscribeAll(is.ctx, subscriber)
134134
}
135135
}

test/context/context.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package context
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"runtime"
7+
"strings"
8+
)
9+
10+
// WithDebugCancelFunc returns a custom cancel function with debugging information.
11+
func WithDebugCancelFunc(ctx context.Context) (context.Context, context.CancelFunc) {
12+
childCtx, cancelFunc := context.WithCancel(ctx)
13+
debugCancelFunc := func() {
14+
var pcs [5]uintptr // Capture the last 5 frames
15+
n := runtime.Callers(1, pcs[:])
16+
frames := runtime.CallersFrames(pcs[:n])
17+
18+
var trace []string
19+
for {
20+
frame, more := frames.Next()
21+
trace = append(trace, fmt.Sprintf("%s:%d %s", frame.File, frame.Line, frame.Function))
22+
if !more {
23+
break
24+
}
25+
}
26+
27+
fmt.Printf("Context canceled by the following last 5 levels of the stack trace:\n%s\n", strings.Join(trace, "\n"))
28+
cancelFunc() // Call the standard context cancel function
29+
}
30+
return childCtx, debugCancelFunc
31+
}

0 commit comments

Comments
 (0)