Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,26 @@ func (es *EventStore) Metrics() (published, processed, errors uint64)
```
Returns the total count of published, processed, and errored events.

### `Schedule`

```go
func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer
```
Schedules an Event to be subscribed and published at a specific time `t`.

- [x] If `t` is in the future, the function returns a `*time.Timer` that can be used to cancel the event before it fires by using `timer.Stop()`.
- [x] If `t` is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`.

### `ScheduleAfter`

```go
func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer
```
A convenience wrapper around `Schedule` that fires an event after a specified duration `d`.

- [x] If the duration `d` is greater than zero, it returns a cancellable `*time.Timer`.
- [x] If `d` is zero or negative, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`.

## Back-pressure and Overrun Policies

GoEventBus provides three strategies for handling a saturated ring buffer:
Expand Down
32 changes: 32 additions & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,35 @@ func (es *EventStore) Metrics() (published, processed, errors uint64) {
atomic.LoadUint64(&es.processedCount),
atomic.LoadUint64(&es.errorCount)
}

func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer {
delay := time.Until(t)
if delay <= 0 {
// immediate, synchronous execution
e.Ctx = ctx
// look up and run the handler directly
disp := *es.dispatcher
if handler, ok := disp[e.Projection]; ok {
es.execute(handler, e)
}
return nil
}

// schedule for the future via time.AfterFunc
return time.AfterFunc(delay, func() {
_ = es.Subscribe(ctx, e)
es.Publish()
})
}

// ScheduleAfter fires e after the given duration d.
// If d<=0 it falls back to Schedule(now).
func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer {
if d <= 0 {
return es.Schedule(ctx, time.Now(), e)
}
return time.AfterFunc(d, func() {
_ = es.Subscribe(ctx, e)
es.Publish()
})
}
97 changes: 97 additions & 0 deletions eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,100 @@ func BenchmarkDrainAsync(b *testing.B) {
es.Drain(context.Background())
}
}

func TestScheduleAfter(t *testing.T) {
var called uint32
dispatcher := Dispatcher{
"foo": func(ctx context.Context, args map[string]any) (Result, error) {
atomic.StoreUint32(&called, 1)
return Result{}, nil
},
}
es := NewEventStore(&dispatcher, 8, DropOldest)
// schedule 50ms in future
es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"})
time.Sleep(100 * time.Millisecond)
if atomic.LoadUint32(&called) != 1 {
t.Fatal("expected handler to fire once")
}
}

// TestScheduleAfter_FiresOnce verifies ScheduleAfter enqueues and publishes the event exactly once.
func TestScheduleAfter_FiresOnce(t *testing.T) {
var called uint32
dispatcher := Dispatcher{
"foo": func(ctx context.Context, args map[string]any) (Result, error) {
atomic.StoreUint32(&called, 1)
return Result{}, nil
},
}
es := NewEventStore(&dispatcher, 8, DropOldest)
es.Async = true
es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "foo"})
time.Sleep(100 * time.Millisecond)
if atomic.LoadUint32(&called) != 1 {
t.Fatal("expected handler to fire once")
}
}

// TestSchedule_FiresImmediatelyIfPast checks that Schedule executes immediately when given a past time.
func TestSchedule_FiresImmediatelyIfPast(t *testing.T) {
var called uint32
dispatcher := Dispatcher{
"bar": func(ctx context.Context, args map[string]any) (Result, error) {
atomic.StoreUint32(&called, 1)
return Result{}, nil
},
}
es := NewEventStore(&dispatcher, 8, DropOldest)
es.Async = true
past := time.Now().Add(-time.Second)
es.Schedule(context.Background(), past, Event{Projection: "bar"})
if atomic.LoadUint32(&called) != 1 {
t.Fatal("expected handler to fire immediately for past time")
}
}

// TestSchedule_FiresAtFutureTime ensures the handler is not called before the scheduled time and fires shortly after.
func TestSchedule_FiresAtFutureTime(t *testing.T) {
var called uint32
dispatcher := Dispatcher{
"baz": func(ctx context.Context, args map[string]any) (Result, error) {
atomic.StoreUint32(&called, 1)
return Result{}, nil
},
}
es := NewEventStore(&dispatcher, 8, DropOldest)
es.Async = true
start := time.Now().Add(50 * time.Millisecond)
es.Schedule(context.Background(), start, Event{Projection: "baz"})
// Should not fire immediately
if atomic.LoadUint32(&called) != 0 {
t.Fatal("handler fired too early")
}
time.Sleep(100 * time.Millisecond)
if atomic.LoadUint32(&called) != 1 {
t.Fatal("expected handler to fire at scheduled time")
}
}

// TestSchedule_Cancel verifies that stopping the timer prevents the event from firing.
func TestSchedule_Cancel(t *testing.T) {
var called uint32
dispatcher := Dispatcher{
"qux": func(ctx context.Context, args map[string]any) (Result, error) {
atomic.StoreUint32(&called, 1)
return Result{}, nil
},
}
es := NewEventStore(&dispatcher, 8, DropOldest)
es.Async = true
timer := es.ScheduleAfter(context.Background(), 50*time.Millisecond, Event{Projection: "qux"})
if stopped := timer.Stop(); !stopped {
t.Fatal("expected timer to stop successfully")
}
time.Sleep(100 * time.Millisecond)
if atomic.LoadUint32(&called) != 0 {
t.Fatal("handler fired despite cancellation")
}
}
Loading