Skip to content

feat: Watermill integration, eventbus GoChannel migration, TLS fix#62

Open
markusnissl wants to merge 1 commit intomainfrom
feat/watermill-integration
Open

feat: Watermill integration, eventbus GoChannel migration, TLS fix#62
markusnissl wants to merge 1 commit intomainfrom
feat/watermill-integration

Conversation

@markusnissl
Copy link
Copy Markdown
Contributor

Replaces #58 (was on wrong branch).

Summary

  • New infra/messaging/wmconvert package — Watermill adapter layer enabling any Watermill provider (Kafka, NATS, Google Pub/Sub, SQL) behind rho-kit's messaging interfaces
  • Migrated runtime/eventbus to GoChannel — async handlers via Watermill, sync handlers direct dispatch. All features preserved: sync/async split, bounded pool, metrics, context propagation
  • Upgraded AMQP publisher to watermill-amqp — channel pooling, confirm mode. Custom Marshaler handles x-death headers
  • Fixed TLS shared pointer mutation.Clone() before passing to amqp.DialTLS and http.Transport
  • Fixed release scriptgo mod tidy after dependency version bumps

Test plan

  • 56 NX projects pass build, test, lint
  • EventBus: sync errors, async dispatch, context propagation, bounded pool, panic recovery
  • wmconvert: round-trip conversion, adapters, backend wiring
  • AMQP: marshaler handles non-string headers, PublishRaw works
  • TLS clone verified

Introduce `infra/messaging/wmconvert` — a Watermill adapter package that
enables any Watermill-supported broker (Kafka, NATS, Google Pub/Sub, SQL)
to be used behind rho-kit's messaging interfaces. Includes bidirectional
message conversion, PublisherAdapter, ConsumerAdapter, ConnectorAdapter,
and a convenience Backend type.

Migrate `runtime/eventbus` from custom handler map + worker pool to
Watermill GoChannel backend while preserving all original semantics:
sync/async handler distinction, sync error collection via errors.Join,
bounded worker pool with drop semantics, Prometheus metrics, panic
recovery, and context propagation (via GoChannel PreserveContext).

Upgrade AMQP publisher to use watermill-amqp with channel pooling and
confirm mode. Add custom Marshaler that handles non-string AMQP headers
(x-death tables) by serializing to JSON metadata. Keep PublishRaw for
dead-letter byte forwarding via direct amqp091-go.

Fix TLS *tls.Config shared pointer mutation — clone before passing to
amqp.DialTLS and http.Transport to prevent Go's TLS handshake from
writing ServerName into a shared config.

Fix release script: run `go mod tidy` after dependency version bumps in
updateProjectDependencies to recompute go.sum checksums.

Breaking changes:
- eventbus async handlers now use GoChannel (JSON serialization overhead)
- AMQP publisher uses watermill-amqp internally (separate connection)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant