diff --git a/README.MD b/README.MD index 150c3ab..feb6751 100644 --- a/README.MD +++ b/README.MD @@ -291,6 +291,26 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) ``` Returns the total count of published, processed, and errored events. +### `Schedule` + +```go +func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer +``` +Schedules an Event to be subscribed and published at a specific time `t`. + +- [x] If `t` is in the future, the function returns a `*time.Timer` that can be used to cancel the event before it fires by using `timer.Stop()`. +- [x] If `t` is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. + +### `ScheduleAfter` + +```go +func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer +``` +A convenience wrapper around `Schedule` that fires an event after a specified duration `d`. + +- [x] If the duration `d` is greater than zero, it returns a cancellable `*time.Timer`. +- [x] If `d` is zero or negative, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. + ## Back-pressure and Overrun Policies GoEventBus provides three strategies for handling a saturated ring buffer: diff --git a/eventstore.go b/eventstore.go index 97547c8..ef35567 100644 --- a/eventstore.go +++ b/eventstore.go @@ -276,3 +276,35 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) { atomic.LoadUint64(&es.processedCount), atomic.LoadUint64(&es.errorCount) } + +func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer { + delay := time.Until(t) + if delay <= 0 { + // immediate, synchronous execution + e.Ctx = ctx + // look up and run the handler directly + disp := *es.dispatcher + if handler, ok := disp[e.Projection]; ok { + es.execute(handler, e) + } + return nil + } + + // schedule for the future via time.AfterFunc + return time.AfterFunc(delay, func() { + _ = es.Subscribe(ctx, e) + es.Publish() + }) +} + +// ScheduleAfter fires e after the given duration d. +// If d<=0 it falls back to Schedule(now). +func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer { + if d <= 0 { + return es.Schedule(ctx, time.Now(), e) + } + return time.AfterFunc(d, func() { + _ = es.Subscribe(ctx, e) + es.Publish() + }) +} diff --git a/eventstore_test.go b/eventstore_test.go index 0f7cabc..036d253 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -772,3 +772,100 @@ func BenchmarkDrainAsync(b *testing.B) { es.Drain(context.Background()) } } + +func TestScheduleAfter(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "foo": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + // schedule 50ms in future + es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"}) + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire once") + } +} + +// TestScheduleAfter_FiresOnce verifies ScheduleAfter enqueues and publishes the event exactly once. +func TestScheduleAfter_FiresOnce(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "foo": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"}) + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire once") + } +} + +// TestSchedule_FiresImmediatelyIfPast checks that Schedule executes immediately when given a past time. +func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "bar": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + past := time.Now().Add(-time.Second) + es.Schedule(context.Background(), past, Event{Projection: "bar"}) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire immediately for past time") + } +} + +// TestSchedule_FiresAtFutureTime ensures the handler is not called before the scheduled time and fires shortly after. +func TestSchedule_FiresAtFutureTime(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "baz": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + start := time.Now().Add(50 * time.Millisecond) + es.Schedule(context.Background(), start, Event{Projection: "baz"}) + // Should not fire immediately + if atomic.LoadUint32(&called) != 0 { + t.Fatal("handler fired too early") + } + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 1 { + t.Fatal("expected handler to fire at scheduled time") + } +} + +// TestSchedule_Cancel verifies that stopping the timer prevents the event from firing. +func TestSchedule_Cancel(t *testing.T) { + var called uint32 + dispatcher := Dispatcher{ + "qux": func(ctx context.Context, args map[string]any) (Result, error) { + atomic.StoreUint32(&called, 1) + return Result{}, nil + }, + } + es := NewEventStore(&dispatcher, 8, DropOldest) + es.Async = true + timer := es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "qux"}) + if stopped := timer.Stop(); !stopped { + t.Fatal("expected timer to stop successfully") + } + time.Sleep(100 * time.Millisecond) + if atomic.LoadUint32(&called) != 0 { + t.Fatal("handler fired despite cancellation") + } +}