Skip to content

Replace Submitter + ErrorHandler with Dispatcher#20

Merged
mikluko merged 20 commits intomainfrom
mikluko/refactor-package-structure
Feb 9, 2026
Merged

Replace Submitter + ErrorHandler with Dispatcher#20
mikluko merged 20 commits intomainfrom
mikluko/refactor-package-structure

Conversation

@mikluko
Copy link
Copy Markdown
Owner

@mikluko mikluko commented Feb 9, 2026

Summary

  • Replace Submitter + ErrorHandler interfaces with a single Dispatcher interface (Dispatch(func() error) + Wait(context.Context) error)
  • Default implementation: goroutine per task, sync.WaitGroup tracking, mutex-guarded error collection via errors.Join
  • Default error handling no longer panics — errors are collected and returned from Wait()
  • contrib/pond updated with pool-backed Dispatcher / DispatcherPool
  • All call sites updated: transport, subscriber, consumer, bucket, raft
  • Net deletion: -168 / +263 lines (97 net fewer lines of production code, rest is tests + docs)

Test plan

  • go build ./... passes
  • go test ./... passes (all existing tests green)
  • New dispatcher_test.go covers: no errors, multiple error collection, empty wait, context timeout
  • Verify CI passes

- Extract codec/ package: Codec interface, ContentType enum, all codec
  implementations, marshal/unmarshal helpers
- Extract transport/ package: Conn interface wrapping *nats.Conn, all
  subscribe option types, connection adapter implementation
- Fix queue subscription bug: inverted conditions in Subscribe() and
  SubscribeHandler() where queue="" called queue methods
- Remove Publisher, Requester, Subscriber, Connection, Drainer, Closer
  interfaces from root package
- Rename symbols: CodecContentType → codec.ForContentType,
  WrapConnection → transport.Wrap, ContentTypeJson → codec.JSON, etc.
- Update all tests, examples, mocks, and documentation
@mikluko
Copy link
Copy Markdown
Owner Author

mikluko commented Feb 9, 2026

Self-Review

Three independent review passes (code review, silent-failure hunting, type design analysis) converged on the same core issues.


CRITICAL: No call site ever calls Wait() — errors silently vanish

Every internal consumer of Dispatcher (transport, subscriber, consumer, bucket, raft) calls Dispatch() but none call Wait(). The old DefaultErrorHandler panicked immediately — loud and disruptive but impossible to miss. The new design collects errors in a []error that nobody reads.

The UPGRADING.md says "Call Wait() during shutdown," but:

  • No example demonstrates it
  • No API returns or exposes the dispatcher for the caller to drain
  • raft.Raft interface doesn't expose Wait() at all
  • consumer.Consume returns before dispatched tasks complete

This effectively makes the entire error path dead code.


CRITICAL: Wait() discards collected errors on context cancellation

case <-ctx.Done():
    return ctx.Err()  // accumulated d.errs silently retained

During graceful shutdown (the most common Wait() scenario), if the context expires before all tasks finish, only context.DeadlineExceeded is returned. Any already-collected errors are lost. Fix:

case <-ctx.Done():
    d.mu.Lock()
    err := errors.Join(append(d.errs, ctx.Err())...)
    d.errs = nil
    d.mu.Unlock()
    return err

Same bug exists in both dispatcher.go and contrib/pond/dispatcher.go.


CRITICAL: Pond dispatcher WaitGroup leak when pool is stopped

func (d *dispatcherImpl) Dispatch(f func() error) {
    d.wg.Add(1)
    d.pool.Submit(func() {   // Submit silently drops task if pool is stopped
        defer d.wg.Done()    // Never runs → wg never decrements
        ...
    })
}

When pond's pool is stopped, Submit returns a pre-resolved future with ErrPoolStopped and never executes the task function. The wg.Done() inside the closure never fires, so Wait() hangs forever. The old code used pool.Go() which returned an error. Fix: use pool.Go() and handle the error, calling wg.Done() on failure.


HIGH: Goroutine leak in Wait() on context cancellation

go func() {
    d.wg.Wait()   // blocks forever if tasks are stuck
    close(done)
}()

Each Wait() call spawns a goroutine. If the context cancels before tasks finish, the goroutine leaks until all dispatched tasks eventually complete. Repeated Wait() calls with timeouts accumulate goroutines.


HIGH: DefaultDispatcher is a shared singleton accumulating errors unboundedly

All subsystems default to the same DefaultDispatcher. Errors from transport, subscriber, consumer, bucket, and raft all accumulate in one []error. In a long-running service, this is a memory leak proportional to handler error rate. Calling Wait() on it drains errors from all subsystems indiscriminately.


MEDIUM: Dispatch() after Wait() semantics undefined

No state tracking prevents Dispatch() after Wait() returns. NATS may deliver messages after shutdown begins, launching goroutines whose errors are never collected. Consider a closed flag or documenting the contract.


MEDIUM: No nil-function guard in Dispatch

Dispatch(nil) panics inside a goroutine. Low-cost fix: check f == nil at the top of Dispatch.


MEDIUM: No tests for pond dispatcher

The pond dispatcher has materially different behavior (pool submission vs goroutine spawn) and the pool-stopped bug, but zero test coverage.


Summary

Severity Count Key Issues
CRITICAL 3 No call site drains errors; Wait discards errors on ctx cancel; Pond WaitGroup leak
HIGH 2 Goroutine leak in Wait; Shared singleton memory leak
MEDIUM 3 Post-Wait dispatch; nil guard; Missing pond tests

The core design (Dispatch + Wait replacing Submitter + ErrorHandler) is sound. The issues are all in lifecycle management and error surfacing — the framework dispatches errors into a collector that nothing ever reads.

@mikluko mikluko merged commit 1e4f20a into main Feb 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant