From 308c0e9fd32eed795dada1d8786df095165df193 Mon Sep 17 00:00:00 2001 From: Raezil Date: Sun, 13 Jul 2025 20:33:42 +0200 Subject: [PATCH 1/5] feat: event scheduler --- eventstore.go | 26 ++++++++++++++++++++++++++ eventstore_test.go | 17 +++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/eventstore.go b/eventstore.go index 97547c8..586609c 100644 --- a/eventstore.go +++ b/eventstore.go @@ -276,3 +276,29 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) { atomic.LoadUint64(&es.processedCount), atomic.LoadUint64(&es.errorCount) } + +// Schedule fires e at the absolute time t. +// If t is in the past or now, it enqueues & publishes immediately. +func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer { + delay := time.Until(t) + if delay <= 0 { + _ = es.Subscribe(ctx, e) + es.Publish() + return nil + } + return time.AfterFunc(delay, func() { + _ = es.Subscribe(ctx, e) + es.Publish() + }) +} + +// ScheduleAfter fires e after the given duration d. +func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer { + if d <= 0 { + return es.Schedule(ctx, time.Now(), e) + } + return time.AfterFunc(d, func() { + _ = es.Subscribe(ctx, e) + es.Publish() + }) +} diff --git a/eventstore_test.go b/eventstore_test.go index 0f7cabc..182a0f3 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -772,3 +772,20 @@ func BenchmarkDrainAsync(b *testing.B) { es.Drain(context.Background()) } } + +func TestScheduleAfter(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "foo": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + // schedule 50ms in future + es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"}) + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire once") + } +} From d950a1bab5d9169c5a0138067b47fbf667b74781 Mon Sep 17 00:00:00 2001 From: Raezil Date: Sun, 13 Jul 2025 20:37:03 +0200 Subject: [PATCH 2/5] fix scheduler --- eventstore.go | 14 +++++--- eventstore_test.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/eventstore.go b/eventstore.go index 586609c..ef35567 100644 --- a/eventstore.go +++ b/eventstore.go @@ -277,15 +277,20 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) { atomic.LoadUint64(&es.errorCount) } -// Schedule fires e at the absolute time t. -// If t is in the past or now, it enqueues & publishes immediately. func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer { delay := time.Until(t) if delay <= 0 { - _ = es.Subscribe(ctx, e) - es.Publish() + // immediate, synchronous execution + e.Ctx = ctx + // look up and run the handler directly + disp := *es.dispatcher + if handler, ok := disp[e.Projection]; ok { + es.execute(handler, e) + } return nil } + + // schedule for the future via time.AfterFunc return time.AfterFunc(delay, func() { _ = es.Subscribe(ctx, e) es.Publish() @@ -293,6 +298,7 @@ func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time. } // ScheduleAfter fires e after the given duration d. +// If d<=0 it falls back to Schedule(now). func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer { if d <= 0 { return es.Schedule(ctx, time.Now(), e) diff --git a/eventstore_test.go b/eventstore_test.go index 182a0f3..036d253 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -789,3 +789,83 @@ func TestScheduleAfter(t *testing.T) { t.Fatal("expected handler to fire once") } } + +// TestScheduleAfter_FiresOnce verifies ScheduleAfter enqueues and publishes the event exactly once. +func TestScheduleAfter_FiresOnce(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "foo": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"}) + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire once") + } +} + +// TestSchedule_FiresImmediatelyIfPast checks that Schedule executes immediately when given a past time. +func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "bar": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + past := time.Now().Add(-time.Second) + es.Schedule(context.Background(), past, Event{Projection: "bar"}) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire immediately for past time") + } +} + +// TestSchedule_FiresAtFutureTime ensures the handler is not called before the scheduled time and fires shortly after. +func TestSchedule_FiresAtFutureTime(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "baz": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + start := time.Now().Add(50 * time.Millisecond) + es.Schedule(context.Background(), start, Event{Projection: "baz"}) + // Should not fire immediately + if atomic.LoadUint32(&called) != 0 { + t.Fatal("handler fired too early") + } + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire at scheduled time") + } +} + +// TestSchedule_Cancel verifies that stopping the timer prevents the event from firing. +func TestSchedule_Cancel(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "qux": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + timer := es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "qux"}) + if stopped := timer.Stop(); !stopped { + t.Fatal("expected timer to stop successfully") + } + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 0 { + t.Fatal("handler fired despite cancellation") + } +} From 8ca0d357d2341ff3e2a12047aa1c8f667eea394a Mon Sep 17 00:00:00 2001 From: Raezil Date: Sun, 13 Jul 2025 20:44:14 +0200 Subject: [PATCH 3/5] feat: update README --- README.MD | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.MD b/README.MD index 150c3ab..4162773 100644 --- a/README.MD +++ b/README.MD @@ -291,6 +291,26 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) ``` Returns the total count of published, processed, and errored events. +### `Schedule` + +```go +func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer +``` +Schedules an Event to be subscribed and published at a specific time t. + + - [x] If t is in the future, the function returns a *time.Timer that can be used to cancel the event before it fires by using timer.Stop(). + + - [x] If t is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns nil. + +### `ScheduleAfter` +```go +func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer +``` +A convenience wrapper around Schedule that fires an event after a specified duration d. + + - [x] If the duration d is greater than zero, it returns a cancellable *time.Timer. + + - [x] If d is zero or negative, the event is executed immediately and synchronously. ## Back-pressure and Overrun Policies GoEventBus provides three strategies for handling a saturated ring buffer: From d7b0c955a50010c91a86e49f7f09f197c1a08028 Mon Sep 17 00:00:00 2001 From: Kamil Mosciszko Date: Sun, 13 Jul 2025 20:45:23 +0200 Subject: [PATCH 4/5] Update README.MD Signed-off-by: Kamil Mosciszko --- README.MD | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.MD b/README.MD index 4162773..0d4ee84 100644 --- a/README.MD +++ b/README.MD @@ -296,6 +296,7 @@ Returns the total count of published, processed, and errored events. ```go func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer ``` + Schedules an Event to be subscribed and published at a specific time t. - [x] If t is in the future, the function returns a *time.Timer that can be used to cancel the event before it fires by using timer.Stop(). @@ -306,6 +307,7 @@ Schedules an Event to be subscribed and published at a specific time t. ```go func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer ``` + A convenience wrapper around Schedule that fires an event after a specified duration d. - [x] If the duration d is greater than zero, it returns a cancellable *time.Timer. From c5c7779ac5140141644cf650e2a3dc1e05fd4f2a Mon Sep 17 00:00:00 2001 From: Raezil Date: Sun, 13 Jul 2025 20:50:08 +0200 Subject: [PATCH 5/5] feat: update README --- README.MD | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/README.MD b/README.MD index 0d4ee84..feb6751 100644 --- a/README.MD +++ b/README.MD @@ -296,23 +296,21 @@ Returns the total count of published, processed, and errored events. ```go func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer ``` +Schedules an Event to be subscribed and published at a specific time `t`. -Schedules an Event to be subscribed and published at a specific time t. - - - [x] If t is in the future, the function returns a *time.Timer that can be used to cancel the event before it fires by using timer.Stop(). - - - [x] If t is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns nil. +- [x] If `t` is in the future, the function returns a `*time.Timer` that can be used to cancel the event before it fires by using `timer.Stop()`. +- [x] If `t` is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. ### `ScheduleAfter` + ```go func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer ``` +A convenience wrapper around `Schedule` that fires an event after a specified duration `d`. -A convenience wrapper around Schedule that fires an event after a specified duration d. - - - [x] If the duration d is greater than zero, it returns a cancellable *time.Timer. +- [x] If the duration `d` is greater than zero, it returns a cancellable `*time.Timer`. +- [x] If `d` is zero or negative, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. - - [x] If d is zero or negative, the event is executed immediately and synchronously. ## Back-pressure and Overrun Policies GoEventBus provides three strategies for handling a saturated ring buffer: