From 98069a5d35b9a1e31f4e8995c90f20271ecd7123 Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Tue, 25 Jun 2019 09:50:20 -0700 Subject: [PATCH 1/7] Remove a bunch of old stuff --- .travis.yml | 2 +- README.md | 8 - message.go | 144 --------- message_test.go | 222 ------------- timequeue.go | 452 -------------------------- timequeue_acceptance_test.go | 57 ---- timequeue_example_test.go | 82 ----- timequeue_test.go | 598 ----------------------------------- 8 files changed, 1 insertion(+), 1564 deletions(-) delete mode 100644 message.go delete mode 100644 message_test.go delete mode 100644 timequeue.go delete mode 100644 timequeue_acceptance_test.go delete mode 100644 timequeue_example_test.go delete mode 100644 timequeue_test.go diff --git a/.travis.yml b/.travis.yml index 6f14ad5..f2cecbd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.5.1 + - 1.x notifications: email: diff --git a/README.md b/README.md index 22fe385..8dc2424 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,3 @@ # timequeue timequeue provides a TimeQueue type that releases arbitrary messages at given time.Times. - -#### Status -[![Build Status](https://travis-ci.org/gogolfing/timequeue.svg?branch=master)](https://travis-ci.org/gogolfing/timequeue) -[![Coverage Status](https://coveralls.io/repos/github/gogolfing/timequeue/badge.svg?branch=master)](https://coveralls.io/github/gogolfing/timequeue?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/gogolfing/timequeue)](https://goreportcard.com/report/github.com/gogolfing/timequeue) - -### Documentation and Usage -Full documentation and examples can be found at [![GoDoc](https://godoc.org/github.com/gogolfing/timequeue?status.svg)](https://godoc.org/github.com/gogolfing/timequeue) diff --git a/message.go b/message.go deleted file mode 100644 index 70f5eb5..0000000 --- a/message.go +++ /dev/null @@ -1,144 +0,0 @@ -package timequeue - -import ( - "container/heap" - "fmt" - "time" -) - -//sentinel value that says a Message is not in a messageHeap. -const notInIndex = -1 - -//Message is a simple holder struct for a time.Time (the time the Message -//will be released from the queue) and a Data payload of type interface{}. -// -//A Message is not safe for modification from multiple go-routines. -//The Time field is used to calculate when the Message should be released from -//a TimeQueue, and thus changing its value while the Message is still referenced -//by a TimeQueue could have unknown side-effects. -//The Data field is never modified by a TimeQueue. -// -//It is up to client code to ensure that Data is always of the same underlying -//type if that is desired. -type Message struct { - time.Time - Data interface{} - - //reference to the messageHeap that this Message is in. used for removal safety. - mh *messageHeap - //the index of this Message in mh. used to remove a Message from a messageHeap. - index int -} - -//String returns the standard string representation of a struct. -func (m *Message) String() string { - return fmt.Sprintf("&timequeue.Message{%v %v}", m.Time, m.Data) -} - -//messageHeap is a heap.Interface implementation for Messages. -//The peekMessage(), pushMessage(), popMessage(), and removeMessage() methods -//should be used over Push() and Pop() because they provide logic for emprty heaps, -//nil Messages, and removal. -//A messageHeap has no guarantees for correctness if they are not used. -//messageHeap is not safe for use by multiple go-routines. -type messageHeap struct { - messages []*Message -} - -//newMessageHeap creates a messageHeap with messages added to the heap. -//heap.Init() is called before the value is returned. -func newMessageHeap() *messageHeap { - mh := &messageHeap{ - messages: []*Message{}, - } - heap.Init(mh) - return mh -} - -//Len returns the number of Messages in the heap. -func (mh *messageHeap) Len() int { - return len(mh.messages) -} - -//Less determines whether or not the Message at index i is less than that at index -//j. -//This is determined by the (message at i.Time).Before(message at j.Time). -func (mh *messageHeap) Less(i, j int) bool { - return mh.messages[i].Time.Before(mh.messages[j].Time) -} - -//Swap swaps the messages at indices i and j. -func (mh *messageHeap) Swap(i, j int) { - mh.messages[i], mh.messages[j] = mh.messages[j], mh.messages[i] - mh.messages[i].index = i - mh.messages[j].index = j -} - -//Push is the heap.Interface Push method that adds value to the heap. -//Appends value to the internal slice. -func (mh *messageHeap) Push(value interface{}) { - mh.messages = append(mh.messages, value.(*Message)) -} - -//Pop is the heap.Interface Pop method that removes the "smallest" Message from the heap. -func (mh *messageHeap) Pop() interface{} { - n := len(mh.messages) - result := (mh.messages)[n-1] - mh.messages = (mh.messages)[0 : n-1] - return result -} - -//peekMessage returns the "smallest" Message in the heap (without removal) or -//nil if the heap is empty. -func (mh *messageHeap) peekMessage() *Message { - if mh.Len() > 0 { - return mh.messages[0] - } - return nil -} - -//pushMessageValues creates and adds a Message with values t and data in the -//appropriate index to mh. -//The created message is returned. -func (mh *messageHeap) pushMessageValues(t time.Time, data interface{}) *Message { - message := &Message{ - Time: t, - Data: data, - index: mh.Len(), - mh: mh, - } - heap.Push(mh, message) - return message -} - -//popMessage returns the "smallest" Message in the heap (after removal) or nil -//if the heap is empty. -func (mh *messageHeap) popMessage() *Message { - if mh.Len() == 0 { - return nil - } - result := heap.Pop(mh).(*Message) - beforeRemoval(result) - return result -} - -//removeMessage removes the message from mh. -//If mh is empty, message is nil, or message is not in mh, then this is a nop -//and returns false. -//Returns true or false indicating whether or not message was actually removed -//from mh. -func (mh *messageHeap) removeMessage(message *Message) bool { - if mh.Len() == 0 || message == nil || message.index == notInIndex || message.mh != mh { - return false - } - result := heap.Remove(mh, message.index).(*Message) - beforeRemoval(result) - return true -} - -//beforeRemoval sets the index and mh fields of message to indicate that it is -//no longer in a messageHeap. -func beforeRemoval(message *Message) { - message.index = notInIndex - message.mh = nil -} diff --git a/message_test.go b/message_test.go deleted file mode 100644 index 6b88017..0000000 --- a/message_test.go +++ /dev/null @@ -1,222 +0,0 @@ -package timequeue - -import ( - "fmt" - "testing" - "time" -) - -func TestMessage_String(t *testing.T) { - now := time.Now() - message := &Message{now, "test_data", nil, notInIndex} - want := "&timequeue.Message{" + now.String() + " test_data}" - if result := message.String(); result != want { - t.Errorf("message.String() = %v WANT %v", result, want) - } -} - -func TestNewMessageHeap(t *testing.T) { - mh := newMessageHeap() - if mh.messages == nil { - t.Errorf("mh.messages = nil WANT non-nil") - } - if size := len(mh.messages); size != 0 { - t.Errorf("len(mh.messages) = %v WANT %v", size, 0) - } -} - -func TestMessageHeap_Len(t *testing.T) { - tests := []struct { - messages []*Message - result int - }{ - {nil, 0}, - {[]*Message{}, 0}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}, 2}, - } - for _, test := range tests { - if result := (&messageHeap{test.messages}).Len(); result != test.result { - t.Errorf("messageHeap.Len() = %v WANT %v", result, test.result) - } - } -} - -func TestMessageHeap_Less(t *testing.T) { - now := time.Now() - tests := []struct { - a *Message - b *Message - result bool - }{ - {&Message{now.Add(-1), 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, true}, - {&Message{now, 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, false}, - {&Message{now.Add(1), 0, nil, notInIndex}, &Message{now, 0, nil, notInIndex}, false}, - } - for _, test := range tests { - //do this so the heap.Init() is not called and messes with the ordering we want. - mh := &messageHeap{ - messages: []*Message{test.a, test.b}, - } - if result := mh.Less(0, 1); result != test.result { - t.Errorf("mh.Less(%v, %v) = %v WANT %v", mh.messages[0], mh.messages[1], result, test.result) - } - } -} - -func TestMessageHeap_Swap(t *testing.T) { - mh := newMessageHeap() - a := mh.pushMessageValues(time.Now(), 0) - b := mh.pushMessageValues(time.Now(), 0) - mh.Swap(0, 1) - if mh.messages[0] != b || mh.messages[1] != a { - t.Errorf("mh.Swap(0, 1) should equal b, a") - } - if a.index != 1 { - t.Errorf("mh.Swap() a.index = %v WANT %v", a.index, 1) - } - if b.index != 0 { - t.Errorf("mh.Swap() b.index = %v WANT %v", b.index, 0) - } -} - -func TestMessageHeap_Push(t *testing.T) { - mh := newMessageHeap() - message := &Message{time.Now(), 0, nil, notInIndex} - mh.Push(message) - if mh.Len() != 1 || mh.messages[0] != message { - t.Errorf("mh.Len(), mh[0] = %v, %v WANT %v, %v", mh.Len(), 1, mh.messages[0], message) - } -} - -func TestMessageHeap_Pop(t *testing.T) { - mh := newMessageHeap() - message := mh.pushMessageValues(time.Now(), 0) - if result := mh.Pop(); result != message { - t.Errorf("mh.Pop() = %v WANT %v", result, message) - } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) - } -} - -func TestMessageHeap_peekMessage_empty(t *testing.T) { - mh := newMessageHeap() - if message := mh.peekMessage(); message != nil { - t.Errorf("mh.peekMessage() = non-nil WANT nil") - } -} - -func TestMessageHeap_peekMessage_nonEmpty(t *testing.T) { - mh := newMessageHeap() - want := mh.pushMessageValues(time.Now(), nil) - mh.pushMessageValues(time.Now(), nil) - if actual := mh.peekMessage(); actual != want { - t.Errorf("mh.peekMessage() = %v WANT %v", actual, want) - } - if size := mh.Len(); size != 2 { - t.Errorf("mh.Len() = %v WANT %v", size, 2) - } -} - -func TestMessageHeap_pushMessageValues(t *testing.T) { - mh := newMessageHeap() - for i := 0; i < 10; i++ { - data := fmt.Sprintf("data_%v", i) - now := time.Now() - message := mh.pushMessageValues(now, data) - if message.Time != now { - t.Errorf("message.Time = %v WANT %v", message.Time, now) - } - if message.Data != data { - t.Errorf("message.Data = %v WANT %v", message.Data, data) - } - if message.mh != mh { - t.Errorf("message.mh = %v WANT %v", message.mh, mh) - } - if message.index != i { - t.Errorf("message.index = %v WANT %v", message.index, i) - } - } -} - -func TestMessageHeap_popMessage_empty(t *testing.T) { - mh := newMessageHeap() - if message := mh.popMessage(); message != nil { - t.Errorf("mh.popMessage() = non-nil WANT nil") - } -} - -func TestMessageHeap_popMessage_nonEmpty(t *testing.T) { - mh := newMessageHeap() - want := mh.pushMessageValues(time.Now(), 0) - actual := mh.popMessage() - if actual != want { - t.Errorf("mh.popMessage() = %v WANT %v", actual, want) - } - if actual.mh != nil || actual.index != notInIndex { - t.Errorf("popped message mh, index = %v, %v WANT %v, %v", actual.mh, actual.index, nil, notInIndex) - } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) - } -} - -func TestMessageHeap_removeMessage_empty(t *testing.T) { - mh := newMessageHeap() - if result := mh.removeMessage(nil); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) - } -} - -func TestMessageHeap_removeMessage_messageNil(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(nil); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) - } -} - -func TestMessageHeap_removeMessage_notInIndex(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - mh.pushMessageValues(time.Now(), nil) - message := mh.popMessage() - if result := mh.removeMessage(message); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) - } -} - -func TestMessageHeap_removeMessage_notInMh(t *testing.T) { - mh := newMessageHeap() - mh.pushMessageValues(time.Now(), nil) - other := newMessageHeap().pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(other); result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, false) - } -} - -func TestMessageHeap_removeMessage_success(t *testing.T) { - mh := newMessageHeap() - message := mh.pushMessageValues(time.Now(), nil) - if result := mh.removeMessage(message); !result { - t.Errorf("mh.removeMessage() = %v WANT %v", result, true) - } - if size := mh.Len(); size != 0 { - t.Errorf("mh.Len() = %v WANT %v", size, 0) - } - if message.mh != nil || message.index != notInIndex { - t.Errorf("popped message mh, index = %v, %v WANT %v, %v", message.mh, message.index, nil, notInIndex) - } -} - -func TestBeforeRemoval(t *testing.T) { - mh := newMessageHeap() - message := &Message{time.Now(), nil, mh, 1} - beforeRemoval(message) - if message.mh != nil { - t.Errorf("message.mh = non-nil WANT nil") - } - if message.index != notInIndex { - t.Errorf("message.index = %v WANT %v", message.index, notInIndex) - } -} diff --git a/timequeue.go b/timequeue.go deleted file mode 100644 index 4405179..0000000 --- a/timequeue.go +++ /dev/null @@ -1,452 +0,0 @@ -//Package timequeue provides the TimeQueue type that is a queue of Messages. -//Each Message contains a time.Time that determines the time at which the Message -//should be released from the queue. -//Message types also have a Data field of type interface{} that should be used -//as the payload of the Message. -//TimeQueue is safe for use by multiple go-routines. -// -//Messages need only be pushed to the queue, and then when their time passes, -//they will be sent on the channel returned by Messages(). -//See below for examples. -// -//TimeQueue uses a single go-routine, spawned from Start() that returns from Stop(), -//that processes the Messages as their times pass. -//When a Message is pushed to the queue, the earliest Message in the queue is -//used to determine the next time the running go-routine should wake. -//The running go-routine knows when to wake because the earliest time is used -//to make a channel via time.After(). Receiving on that channel wakes the -//running go-routine if a call to Stop() has not happened prior. -//Upon waking, that Message is removed from the queue and released on the channel -//returned from Messages(). -//Then the newest remaining Message is used to determine when to wake, etc. -//If a Message with a time before any other in the queue is inserted, then that -//Message is pushed to the front of the queue and released appropriately. -// -//Messages that are "released", i.e. sent on the Messages() channel, are always -//released from a newly spawned go-routine so that other go-routines are not -//paused waiting for a receive from Messages(). -// -//Messages with the same Time value will be "flood-released" from the same -//separately spawned go-routine. -//Additionally, Messages that are pushed with times before time.Now() will -//immediately be released from the queue. -package timequeue - -import ( - "sync" - "time" -) - -const ( - //DefaultCapacity is the default capacity used for Messages() channels in New(). - DefaultCapacity = 1 -) - -//TimeQueue is a queue of Messages that releases its Messages when their -//Time fields pass. -// -//When Messages are pushed to a TimeQueue, the earliest Message is used to -//determine when the TimeQueue should wake. Upon waking, that earliest Message -//is "released" from the TimeQueue by being sent on the channel returned by -//Messages(). -// -//Messages may be pushed and popped from a TimeQueue whether or not the TimeQueue -//is running or not. Start() and Stop() may be called as many times as desired, -//but Messsages will be released only between calls to Start() and Stop(), i.e. -//while the TimeQueue is running and IsRunning() returns true. -// -//Calls to Pop(), PopAll(), and PopAllUntil() may be called to remove Messages -//from a TimeQueue, but this is required for normal use. -// -//One of the New*() functions should be used to create a TimeQueue. A zero-value -//TimeQueue is not in a valid or working state. -type TimeQueue struct { - //the goal is to have only one go-routine "inside" a TimeQueue at once. - //this is achieved by locking on lock in all exported methods and - //requiring the TimeQueue be locked in all unexported methods and - //before all use of unexported fields. - - //protects all other members of a TimeQueue. - lock *sync.Mutex - - //the heap of Messages in the TimeQueue. - messages *messageHeap - - //flag determining if the TimeQueue is running. - //should be true between calls to Start() and Stop() and false otherwise. - running bool - //signal that sends to stopChan or wakeChan to wake or stop the running go-routine. - wakeSignal *wakeSignal - - //the channel to send released Messages on. should be receive only in client code. - messageChan chan *Message - //send to this channel to wake the running go-routine and release Messages. - wakeChan chan time.Time - //send to this channel to stop the running go-routine. - stopChan chan struct{} -} - -//New creates a new *TimeQueue with a call to New(DefaultCapacity). -func New() *TimeQueue { - return NewCapacity(DefaultCapacity) -} - -//NewCapacity creates a new *TimeQueue where the channel returned from Messages() -//has the capacity given by capacity. -//The new TimeQueue is in the stopped state and has no Messages in it. -func NewCapacity(capacity int) *TimeQueue { - return &TimeQueue{ - lock: &sync.Mutex{}, - messages: newMessageHeap(), - running: false, - wakeSignal: nil, - messageChan: make(chan *Message, capacity), - wakeChan: make(chan time.Time), - stopChan: make(chan struct{}), - } -} - -//Push creates and adds a Message to q with t and data. The created Message is returned. -func (q *TimeQueue) Push(t time.Time, data interface{}) *Message { - q.lock.Lock() - defer q.lock.Unlock() - message := q.messages.pushMessageValues(t, data) - q.afterHeapUpdate() - return message -} - -//Peek returns (without removing) the Time and Data fields from the earliest -//Message in q. -//If q is empty, then the zero Time and nil are returned. -func (q *TimeQueue) Peek() (time.Time, interface{}) { - message := q.PeekMessage() - if message == nil { - return time.Time{}, nil - } - return message.Time, message.Data -} - -//PeekMessage returns (without removing) the earliest Message in q or nil if q -//is empty. -func (q *TimeQueue) PeekMessage() *Message { - q.lock.Lock() - defer q.lock.Unlock() - return q.peekMessage() -} - -//peekMessage is the unexported version of PeekMessage(). -//It should only be called when q is locked. -func (q *TimeQueue) peekMessage() *Message { - return q.messages.peekMessage() -} - -//Pop removes and returns the earliest Message in q or nil if q is empty. -//If release is true, then the Message (if not nil) will also be sent on the -//channel returned from Messages(). -func (q *TimeQueue) Pop(release bool) *Message { - q.lock.Lock() - defer q.lock.Unlock() - message := q.messages.popMessage() - if message == nil { - return nil - } - if release { - q.releaseMessage(message) - } - q.afterHeapUpdate() - return message -} - -//PopAll removes and returns a slice of all Messages in q. -//The returned slice will be non-nil but empty if q is itseld empty. -//If release is true, then all returned Messages will also be sent on the channel -//returned from Messages(). -func (q *TimeQueue) PopAll(release bool) []*Message { - q.lock.Lock() - defer q.lock.Unlock() - result := make([]*Message, 0, q.messages.Len()) - for message := q.messages.popMessage(); message != nil; message = q.messages.popMessage() { - result = append(result, message) - } - if release { - q.releaseCopyToChan(result) - } - q.afterHeapUpdate() - return result -} - -//PopAllUntil removes and returns a slice of Messages in q with Time fields before, -//but not equal to, until. -//If release is true, then all returned Messages will also be sent on the channel -//returned from Messages(). -func (q *TimeQueue) PopAllUntil(until time.Time, release bool) []*Message { - q.lock.Lock() - defer q.lock.Unlock() - return q.popAllUntil(until, release) -} - -//popAllUntil is the unexported verson of PopAllUntil. -//It should only be called when q is locked. -func (q *TimeQueue) popAllUntil(until time.Time, release bool) []*Message { - result := make([]*Message, 0, q.messages.Len()) - for message := q.messages.peekMessage(); message != nil && message.Before(until); message = q.messages.peekMessage() { - result = append(result, q.messages.popMessage()) - } - if release { - q.releaseCopyToChan(result) - } - q.afterHeapUpdate() - return result -} - -//Remove removes message from q. -//If q is empty, message is nil, or message is not in q, then Remove is a nop -//and returns false. -//Returns true or false indicating whether or not message was actually removed from q. -//If release is true and message was actually removed, then message will also be -//sent on the channel returned by Messages(). -func (q *TimeQueue) Remove(message *Message, release bool) bool { - q.lock.Lock() - defer q.lock.Unlock() - removed := q.messages.removeMessage(message) - if removed && release { - q.releaseMessage(message) - } - q.afterHeapUpdate() - return removed -} - -//afterHeapUpdate ensures the earliest time is in the next wake signal, if q is running. -//It should only be called when q is locked. -func (q *TimeQueue) afterHeapUpdate() { - if q.isRunning() { - q.updateAndSpawnWakeSignal() - } -} - -//Messages returns the receive only channel that all Messages are released on. -//The returned channel will be the same instance on every call, and this value -//will never be closed. -// -//In order to receive Messages when they are earliest available a go-routine should -//be spawned to drain the channel of all Messages. -// q := timequeue.New() -// q.Start() -// go func() { -// message := <-q.Messages() -// }() -// //push Messages to q. -func (q *TimeQueue) Messages() <-chan *Message { - return q.messageChan -} - -//Size returns the number of Messages in q. This is the number of Messages that -//have yet to be released (or waiting to be sent on Messages()) in q. -//Therefore, there could still be Messages that q has reference to that are waiting -//to be released or in the Messages() channel buffer. -// -//To obtain the number of total Messages that q still has references to add this value -//and the length of Messages(): -// q.Size() + len(q.Messages()) -func (q *TimeQueue) Size() int { - q.lock.Lock() - defer q.lock.Unlock() - return q.messages.Len() -} - -//Start spawns a new go-routine to listen for wake times of Messages and sets the -//state to running. -//If q is already running, then Start is a nop. -func (q *TimeQueue) Start() { - q.lock.Lock() - defer q.lock.Unlock() - if q.isRunning() { - return - } - q.setRunning(true) - go q.run() - q.updateAndSpawnWakeSignal() -} - -//IsRunning returns whether or not q is running. E.g. in between calls to Start() -//and Stop(). -//If IsRunning returns true, then it is possible that Messages are being released. -func (q *TimeQueue) IsRunning() bool { - q.lock.Lock() - defer q.lock.Unlock() - return q.isRunning() -} - -//isRunning is the unexported version of IsRunning. -//It should only be called when q is locked. -func (q *TimeQueue) isRunning() bool { - return q.running -} - -//run is the run loop of a TimeQueue. -//It is an infinite loop that selects between q.wakeChan and q.stopChan. -//If q.wakeChan is selected, then q.onWake() is called. -//If q.wakeStop is selected, then this method returns. -//Note that this method does not spawn a new go-routine. -//That should be done outside of run. -//run does not have the precondition that q must be locked. -//This is a function that should execute in its own go-routine and thus cannot -//lock any other parts of q. -func (q *TimeQueue) run() { - for { - select { - case wakeTime := <-q.wakeChan: - q.onWake(wakeTime) - case <-q.stopChan: - return - } - } -} - -//onWake should be called when q receives a value on q.wakeChan. -//Because onWake will be called from a go-routine that we spawned, we lock and -//defer unlock on q since this acts like an exported method of sorts in that -//it starts execution of unexported code from an outside go-routine. -func (q *TimeQueue) onWake(wakeTime time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - q.popAllUntil(wakeTime, true) - q.updateAndSpawnWakeSignal() -} - -//releaseMessage is a utility method that spawns a go-routine to send message on -//q.messageChan so that that calling go-routine does not have to wait. -func (q *TimeQueue) releaseMessage(message *Message) { - go func() { - q.messageChan <- message - }() -} - -//releaseCopyToChan is a utility method that copies messages to a new, buffered -//channel, and empties that new channel by sending every messsage on q.messageChan. -func (q *TimeQueue) releaseCopyToChan(messages []*Message) { - copyChan := make(chan *Message, len(messages)) - for _, message := range messages { - copyChan <- message - } - q.releaseChan(copyChan) - close(copyChan) -} - -//releaseChan is a utility method that spawns a go-routine to send every message -//in messages on q.messageChan. -//Note that releaseChan reads from messages until it is closed, thus messages must -//be closed by the calling function. -func (q *TimeQueue) releaseChan(messages <-chan *Message) { - go func() { - for message := range messages { - q.messageChan <- message - } - }() -} - -//updateAndSpawnSignal kills the current wake signal if it exists -//and creates and spawns the next wake signal if there are any messages left in q. -//Returns true if a new wakeSignal was spawned, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) updateAndSpawnWakeSignal() bool { - q.killWakeSignal() - message := q.peekMessage() - if message == nil { - return false - } - q.setWakeSignal(newWakeSignal(q.wakeChan, message.Time)) - return q.spawnWakeSignal() -} - -//setWakeSignal sets q.wakeSignal to wakeSignal. -//It should only be called when q is locked. -func (q *TimeQueue) setWakeSignal(wakeSignal *wakeSignal) { - q.wakeSignal = wakeSignal -} - -//spawnWakeSignal calls spawn() on q.wakeSignal if it is not nil. -//Returns true if spawn was called, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) spawnWakeSignal() bool { - if q.wakeSignal != nil { - q.wakeSignal.spawn() - return true - } - return false -} - -//killWakeSignal call kill() and sets q.wakeSignal to nil if it is not nil. -//Returns true if the old wakeSignal is not nil, false otherwise. -//It should only be called when q is locked. -func (q *TimeQueue) killWakeSignal() bool { - if q.wakeSignal != nil { - q.wakeSignal.kill() - q.wakeSignal = nil - return true - } - return false -} - -//Stop tells the running go-routine to stop running. -//This results in no more Messages being released (until a subsequent call to Start()) -//and the state to be set to not running. -//If q is already stopped, then Stop is a nop. -func (q *TimeQueue) Stop() { - q.lock.Lock() - defer q.lock.Unlock() - if !q.isRunning() { - return - } - q.killWakeSignal() - q.setRunning(false) - go func() { - q.stopChan <- struct{}{} - }() -} - -//setRunning is the unexported version of SetRunning. Sets q.running to running. -//It should only be called when q is locked. -func (q *TimeQueue) setRunning(running bool) { - q.running = running -} - -//wakeSignal represents a signal that sends a time.Time value after that time has passed. -//wakeSignals can be killed, which will prevent the signal from sending its value. -type wakeSignal struct { - dst chan time.Time - src <-chan time.Time - stop chan struct{} -} - -//newWakeSignal create a wakeSignal that sends wakeTime on dst when wakeTime passes. -//this function should be used to create wakeSignals. -//the zero value wakeSignal is not valid. -func newWakeSignal(dst chan time.Time, wakeTime time.Time) *wakeSignal { - return &wakeSignal{ - dst: dst, - src: time.After(wakeTime.Sub(time.Now())), - stop: make(chan struct{}), - } -} - -//spawn starts a new go-routine that selects on w.src and w.stop. -//If w.src is selected, the received value is sent on w.dst. -//If w.stop is selected, then the function stops selecting. -//In both cases, w.src is set to nil and the function returns. -func (w *wakeSignal) spawn() { - go func() { - select { - case wakeTime := <-w.src: - w.dst <- wakeTime - case <-w.stop: - } - w.src = nil - }() -} - -//kill closes the w.stop channel. -//This is NOT idempotent. I.e. kill should only be called once a single wakeSignal. -func (w *wakeSignal) kill() { - close(w.stop) -} diff --git a/timequeue_acceptance_test.go b/timequeue_acceptance_test.go deleted file mode 100644 index 8ebc170..0000000 --- a/timequeue_acceptance_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package timequeue_test - -import ( - "testing" - "time" - - "github.com/gogolfing/timequeue" -) - -func TestTimeQueue_acceptance_messageAddedBeforeStart(t *testing.T) { - tq := timequeue.New() - tq.Push(time.Now(), "now") - tq.Start() - defer tq.Stop() - if message := <-tq.Messages(); message.Data != "now" { - t.Errorf("message was not released") - } -} - -func TestTimeQueue_acceptance_startAndStopStress(t *testing.T) { - const count = 100000 - tq := timequeue.NewCapacity(100) - tq.Start() - defer tq.Stop() - for i := 0; i < count; i++ { - tq.Push(time.Now().Add(time.Duration(i)*time.Nanosecond), i) - } - go func() { - for i := 0; i < count; i++ { - tq.Stop() - tq.Start() - } - }() - for i := 0; i < count; i++ { - <-tq.Messages() - } - if size := tq.Size(); size != 0 { - t.Errorf("size = %v WANT %v", size, 0) - } -} - -func TestTimeQueue_acceptance_millionMessagesSameTime(t *testing.T) { - const count = 1000000 - tq := timequeue.NewCapacity(100) - tq.Start() - defer tq.Stop() - now := time.Now() - for i := 0; i < count; i++ { - tq.Push(now, i) - } - for i := 0; i < count; i++ { - <-tq.Messages() - } - if size := tq.Size(); size != 0 { - t.Errorf("size = %v WANT %v", size, 0) - } -} diff --git a/timequeue_example_test.go b/timequeue_example_test.go deleted file mode 100644 index 56bc7b6..0000000 --- a/timequeue_example_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package timequeue_test - -import ( - "fmt" - "time" - - "github.com/gogolfing/timequeue" -) - -func Example() { - tq := timequeue.New() - tq.Start() - //this would normally be a long-running process, - //and not stop at the return of a function call. - defer tq.Stop() - - startTime := time.Now() - - tq.Push(startTime, "this will be released immediately") - - //adding Messages in chronological order. - for i := 1; i <= 4; i++ { - tq.Push( - startTime.Add(time.Duration(i)*time.Second), - fmt.Sprintf("message at second %v", i), - ) - } - //adding Messages in reverse chronological order. - for i := 8; i >= 5; i-- { - tq.Push( - startTime.Add(time.Duration(i)*time.Second), - fmt.Sprintf("message at second %v", i), - ) - } - - //receive all 9 Messages that were pushed. - for i := 0; i < 9; i++ { - message := <-tq.Messages() - fmt.Println(message.Data) - } - - fmt.Printf("there are %v messages left in the queue\n", tq.Size()) - - endTime := time.Now() - if endTime.Sub(startTime) > time.Duration(8)*time.Second { - fmt.Println("releasing all messages took more than 8 seconds") - } else { - fmt.Println("releasing all messages took less than 8 seconds") - } - - //Output: - //this will be released immediately - //message at second 1 - //message at second 2 - //message at second 3 - //message at second 4 - //message at second 5 - //message at second 6 - //message at second 7 - //message at second 8 - //there are 0 messages left in the queue - //releasing all messages took more than 8 seconds -} - -func ExampleTimeQueue_PopAllUntil() { - tq := timequeue.New() - now := time.Now() - for i := 0; i < 4; i++ { - tq.Push(now.Add(time.Duration(i)*time.Second), i) - } - tq.PopAllUntil(now.Add(time.Duration(2)*time.Second), true) - for i := 0; i < 2; i++ { - message := <-tq.Messages() - fmt.Println(message.Data) - } - fmt.Println("messages left:", tq.Size()) - - //Output: - //0 - //1 - //messages left: 2 -} diff --git a/timequeue_test.go b/timequeue_test.go deleted file mode 100644 index 3aa4809..0000000 --- a/timequeue_test.go +++ /dev/null @@ -1,598 +0,0 @@ -package timequeue - -import ( - "reflect" - "sort" - "testing" - "time" -) - -func TestNew(t *testing.T) { - q := New() - if cap(q.messageChan) != DefaultCapacity { - t.Errorf("cap(messageChan) = %v WANT %v", cap(q.messageChan), DefaultCapacity) - } -} - -func TestNewCapacity(t *testing.T) { - q := NewCapacity(2) - if size := q.messages.Len(); size != 0 { - t.Errorf("NewSize() q.messges.Len() = %v WANT %v", size, 0) - } - if q.lock == nil { - t.Errorf("NewSize() lock should be non-nil") - } - if q.running == true { - t.Errorf("NewSize() running = %v WANT %v", q.running, false) - } - if q.wakeSignal != nil { - t.Errorf("NewSize() wakeSignal should be nil") - } - if cap(q.messageChan) != 2 { - t.Errorf("NewSize() cap(messageChan) = %v WANT %v", cap(q.messageChan), 2) - } - if q.wakeChan == nil || q.stopChan == nil { - t.Errorf("NewSize() wakeChan and stopChan should be non-nil") - } -} - -func TestTimeQueue_Push(t *testing.T) { - q := New() - message := q.Push(time.Time{}, "test_data") - size := q.messages.Len() - if size != 1 { - t.Errorf("q.messages.Len() = %v WANT %v", size, 1) - } - if message == nil { - t.Errorf("message = nil WANT non-nil") - } - if message != q.messages.peekMessage() { - t.Errorf("return message should equal peek message") - } - if !message.Time.Equal(time.Time{}) { - t.Errorf("message.Time = %v WANT %v", message.Time, time.Time{}) - } - if message.Data != "test_data" { - t.Errorf("message.Data = %v WANT %v", message.Data, "test_data") - } -} - -func TestTimeQueue_Peek_nil(t *testing.T) { - q := New() - peekTime, data := q.Peek() - if !peekTime.IsZero() || data != nil { - t.Errorf("q.Peek() = %v, %v WANT %v, %v", peekTime, data, time.Time{}, nil) - } -} - -func TestTimeQueue_Peek_nonNil(t *testing.T) { - q := New() - now := time.Now() - q.Push(now, "test_data") - peekTime, data := q.Peek() - if !peekTime.Equal(now) || data != "test_data" { - t.Errorf("q.Peek() = %v, %v WANT %v, %v", peekTime, data, now, "test_data") - } -} - -func TestTimeQueue_PeekMessage_nil(t *testing.T) { - q := New() - message := q.PeekMessage() - if message != nil { - t.Errorf("q.PeekMessage() = non-nil WANT nil") - } -} - -func TestTimeQueue_PeekMessage_nonNil(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.PeekMessage() - if actual == nil || actual != want { - t.Errorf("q.PeekMessage() = %v WANT %v", actual, want) - } -} - -func TestTimeQueue_Pop_empty(t *testing.T) { - q := New() - message := q.Pop(false) - if message != nil { - t.Errorf("q.Pop() is non-nil WANT nil") - } -} - -func TestTimeQueue_Pop_nonEmptyRelease(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.Pop(true) - if actual != want { - t.Errorf("q.Pop() return = %v WANT %v", actual, want) - } - actual = <-q.Messages() - if actual != want { - t.Errorf("q.Pop() Messages() = %v WANT %v", actual, want) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", len(q.Messages()), 0) - } -} - -func TestTimeQueue_Pop_nonEmptyNonRelease(t *testing.T) { - q := New() - want := q.Push(time.Now(), "test_data") - actual := q.Pop(true) - if actual != want { - t.Errorf("q.Pop() return = %v WANT %v", actual, want) - } -} - -func TestTimeQueue_PopAll(t *testing.T) { - now := time.Now() - tests := []struct { - messageValues []*testMessageValue - release bool - }{ - {[]*testMessageValue{}, false}, - {[]*testMessageValue{}, true}, - {[]*testMessageValue{{now, 0}}, false}, - {[]*testMessageValue{{now, 0}}, true}, - {[]*testMessageValue{{now, 0}, {now.Add(1), 1}, {now.Add(2), 2}}, true}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true}, - } - for _, test := range tests { - q := New() - want := []*Message{} - for _, mv := range test.messageValues { - message := q.Push(mv.Time, mv.Data) - want = append(want, message) - } - sort.Sort(&messageHeap{want}) - result := q.PopAll(test.release) - if !areMessagesEqual(result, want) { - t.Errorf("q.PopAll() messages sorted = %v WANT %v", result, want) - } - if test.release && !areChannelMessagesEqual(q.Messages(), want) { - t.Errorf("q.PopAll() Messages() sorted WANT %v", want) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages() = %v WANT %v", len(q.Messages()), 0) - } - } -} - -func TestTimeQueue_PopAllUntil(t *testing.T) { - now := time.Now() - tests := []struct { - messageValues []*testMessageValue - release bool - untilTime time.Time - untilCount int - }{ - {[]*testMessageValue{}, false, now.Add(10), 0}, - {[]*testMessageValue{}, true, now.Add(-10), 0}, - {[]*testMessageValue{{now, 0}}, true, now, 0}, - {[]*testMessageValue{{now, 0}, {now.Add(1), 1}, {now.Add(2), 2}}, true, now.Add(2), 2}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true, now.Add(3), 3}, - {[]*testMessageValue{{now.Add(4), 4}, {now.Add(-1), -1}, {now.Add(2), 2}, {now.Add(1), 1}, {now, 0}}, true, now.Add(3), 4}, - } - for _, test := range tests { - q := New() - want := []*Message{} - for _, mv := range test.messageValues { - message := q.Push(mv.Time, mv.Data) - want = append(want, message) - } - sort.Sort(&messageHeap{want}) - want = want[:test.untilCount] - result := q.PopAllUntil(test.untilTime, test.release) - if !areMessagesEqual(result, want) { - t.Errorf("q.PopAllUntil() messages sorted = %v WANT %v", result, want) - } - if test.release && !areChannelMessagesEqual(q.Messages(), want) { - t.Errorf("q.PopAllUntil() Messages() sorted WANT %v", want) - } - if q.messages.Len() != len(test.messageValues)-test.untilCount { - t.Errorf("len(q.messages) = %v WANT %v", q.messages.Len(), len(test.messageValues)-test.untilCount) - } - if len(q.Messages()) != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", len(q.Messages()), 0) - } - } -} - -func TestTimeQueue_Remove_empty(t *testing.T) { - q := New() - if result := q.Remove(nil, true); result { - t.Errorf("q.Remove() = %v WANT %v", result, false) - } - if size := len(q.Messages()); size != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", size, 0) - } -} - -func TestTimeQueue_Remove_nonEmpty(t *testing.T) { - tests := []struct { - release bool - }{ - {true}, - {false}, - } - for _, test := range tests { - q := New() - want := q.Push(time.Now(), nil) - if result := q.Remove(want, test.release); !result { - t.Errorf("q.Remove() = %v WANT %v", result, true) - } - if test.release { - if actual := <-q.Messages(); actual != want { - t.Errorf("<-q.Messages() = %v WANT %v", actual, want) - } - } - if size := q.Size(); size != 0 { - t.Errorf("t.Size() = %v WANT %v", size, 0) - } - if size := len(q.Messages()); size != 0 { - t.Errorf("len(q.Messages()) = %v WANT %v", size, 0) - } - } -} - -func TestTimeQueue_Remove_notIn(t *testing.T) { - q := New() - q.Push(time.Now(), nil) - other := New().Push(time.Now(), nil) - if result := q.Remove(other, true); result { - t.Errorf("q.Remove(other) = %v WANT %v", result, false) - } -} - -func TestTimeQueue_afterHeapUpdate_notRunning(t *testing.T) { - q := New() - q.afterHeapUpdate() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} - -func TestTimeQueue_afterHeapUpdate_running(t *testing.T) { - q := New() - q.setRunning(true) - q.afterHeapUpdate() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} - -func TestTimeQueue_Messages(t *testing.T) { - q := New() - if q.Messages() != q.messageChan { - t.Errorf("q.Messages() != q.messageChan") - } -} - -func TestTimeQueue_Size(t *testing.T) { - q := New() - q.Push(time.Now(), 0) - if q.Size() != 1 { - t.Errorf("q.Size() = %v WANT %v", q.Size(), 1) - } -} - -func TestTimeQueue_Start_notRunning(t *testing.T) { - q := New() - q.setRunning(true) - q.Start() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} - -func TestTimeQueue_Start_running(t *testing.T) { - q := New() - message := q.Push(time.Now().Add(time.Duration(200)*time.Millisecond), "test_data") - q.Start() - defer q.Stop() - if q.wakeSignal == nil { - t.Errorf("q.wakeSignal = nil WANT non-nil") - } - if running := q.IsRunning(); !running { - t.Errorf("running = %v WANT %v", running, true) - } - if result := <-q.Messages(); result != message { - t.Errorf("message = %v WANT %v", result, message) - } -} - -func TestTimeQueue_run(t *testing.T) { - q := New() - go func() { - q.wakeChan <- time.Now() - q.stopChan <- struct{}{} - }() - q.run() - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } - if count := len(q.messageChan); count != 0 { - t.Errorf("len(q.messageChan) = %v WANT %v", count, 0) - } -} - -func TestTimeQueue_onWake(t *testing.T) { - q := New() - now := time.Now() - for i := 0; i < 4; i++ { - q.Push(now.Add(time.Duration(i)), i) - } - q.onWake(now.Add(4)) - for i := 0; i < 4; i++ { - message := <-q.Messages() - if message.Data != i { - t.Errorf("message.Data = %v WANT %v", message.Data, i) - } - } - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} - -func TestTimeQueue_popAllUntil(t *testing.T) { - q := New() - now := time.Now() - for i := 4; i >= 0; i-- { - q.Push(now.Add(time.Duration(i)), i) - } - q.popAllUntil(now.Add(5), true) - for i := 0; i <= 4; i++ { - message := <-q.Messages() - if message.Data != i { - t.Errorf("message.Data = %v WANT %v", message.Data, i) - } - } - if size := q.Size(); size != 0 { - t.Errorf("q.Size() = %v WANT %v", size, 0) - } - if q.wakeSignal != nil { - t.Errorf("q.wakeSignal = non-nil WANT nil") - } -} - -func TestTimeQueue_releaseMessage(t *testing.T) { - q := New() - q.releaseMessage(&Message{time.Now(), 0, nil, notInIndex}) - if message := <-q.Messages(); message.Data != 0 { - t.Errorf("message.Data = %v WANT %v", message.Data, 0) - } -} - -func TestTimeQueue_releaseCopyToChan(t *testing.T) { - tests := []struct { - messages []*Message - }{ - {nil}, - {[]*Message{}}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}}, - } - for _, test := range tests { - q := New() - q.releaseCopyToChan(test.messages) - for _, wantMessage := range test.messages { - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) - } - } - } -} - -func TestTimeQueue_releaseChan(t *testing.T) { - tests := []struct { - messages []*Message - }{ - {nil}, - {[]*Message{}}, - {[]*Message{{time.Now(), 0, nil, notInIndex}, {time.Now(), 1, nil, notInIndex}}}, - } - for _, test := range tests { - q := New() - out := make(chan *Message) - go func() { - for _, message := range test.messages { - out <- message - } - close(out) - }() - q.releaseChan(out) - for _, wantMessage := range test.messages { - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) - } - } - } -} - -func TestTimeQueue_updateAndSpawnWakeSignal_empty(t *testing.T) { - q := New() - if result := q.updateAndSpawnWakeSignal(); result != false { - t.Errorf("q.updateAndSpawnWakeSignal() = %v WANT %v", result, false) - } -} - -func TestTimeQueue_updateAndSpawnWakeSignal_nonEmpty(t *testing.T) { - q := New() - wantMessage := q.Push(time.Now().Add(time.Duration(250)*time.Millisecond), 0) - if result := q.updateAndSpawnWakeSignal(); result != true { - t.Fatalf("q.updateAndSpawnWakeSignal() = %v WANT %v", result, true) - } - if q.wakeSignal == nil { - t.Errorf("q.wakeSignal = nil WANT non-nil") - } - go q.run() - if message := <-q.Messages(); message != wantMessage { - t.Errorf("q.Messages() = %v WANT %v", message, wantMessage) - } -} - -func TestTimeQueue_setWakeSignal(t *testing.T) { - q := New() - ws := newWakeSignal(q.wakeChan, time.Now()) - q.setWakeSignal(ws) - if q.wakeSignal != ws { - t.Errorf("q.wakeSignal = %v WANT %v", q.wakeSignal, ws) - } -} - -func TestTimeQueue_spawnWakeSignal_nil(t *testing.T) { - q := New() - if result := q.spawnWakeSignal(); result != false { - t.Errorf("q.spawnWakeSignal() = %v WANT %v", result, false) - } -} - -func TestTimeQueue_spawnWakeSignal_nonNil(t *testing.T) { - q := New() - ws := newWakeSignal(q.wakeChan, time.Now().Add(time.Duration(1)*time.Second)) - ws.kill() - q.setWakeSignal(ws) - if result := q.spawnWakeSignal(); result != true { - t.Errorf("q.spawnWakeSignal() = %v WANT %v", result, true) - } -} - -func TestTimeQueue_killWakeSignal_nil(t *testing.T) { - q := New() - if result := q.killWakeSignal(); result != false { - t.Errorf("q.killWakeSignal() = %v WANT %v", result, false) - } -} - -func TestTimeQueue_killWakeSignal_nonNil(t *testing.T) { - q := New() - q.setWakeSignal(newWakeSignal(q.wakeChan, time.Now().Add(time.Duration(1)*time.Second))) - if result := q.killWakeSignal(); result != true { - t.Errorf("q.killWakeSignal() = %v WANT %v", result, true) - } -} - -func TestTimeQueue_Stop_notRunning(t *testing.T) { - q := New() - q.Stop() -} - -func TestTimeQueue_Stop_running(t *testing.T) { - q := New() - q.setRunning(true) - q.Stop() - q.run() - if result := q.IsRunning(); result != false { - t.Errorf("q.IsRunning() = %v WANT %v", result, false) - } -} - -func TestTimeQueue_IsRunning(t *testing.T) { - tests := []struct { - value bool - }{ - {true}, - {false}, - } - for _, test := range tests { - q := New() - q.running = test.value - if result := q.IsRunning(); result != test.value { - t.Errorf("q.IsRunning() = %v WANT %v", result, test.value) - } - } -} - -func TestTimeQueue_setRunning(t *testing.T) { - tests := []struct { - value bool - }{ - {false}, - {true}, - } - for _, test := range tests { - q := New() - q.setRunning(test.value) - if result := q.running; result != test.value { - t.Errorf("q.running = %v WANT %v", result, test.value) - } - } -} - -func TestNewWakeSignal(t *testing.T) { - dst := make(chan time.Time) - wakeTime := time.Now() - ws := newWakeSignal(dst, wakeTime) - if ws.dst != dst { - t.Errorf("ws.dst = %v WANT %v", ws.dst, dst) - } - if ws.src == nil { - t.Errorf("ws.src = nil WANT non-nil") - } - if cap(ws.src) != 1 { - t.Errorf("cap(ws.src) = %v WANT %v", cap(ws.src), 1) - } - if ws.stop == nil { - t.Errorf("ws.stop = nil WANT non-nil") - } - if cap(ws.stop) != 0 { - t.Errorf("cap(ws.stop) = %v WANT %v", cap(ws.stop), 0) - } -} - -func TestWakeSignal_spawn_wake(t *testing.T) { - dst := make(chan time.Time) - now := time.Now() - ws := newWakeSignal(dst, now) - ws.spawn() - result := <-dst - time.Sleep(time.Duration(250) * time.Millisecond) - diff := result.Sub(now) - if diff < 0 { - diff = -diff - } - if diff > time.Duration(1)*time.Millisecond { - t.Errorf("<-ws.dst too far away from desired : %v WANT %v", result, now) - } - if ws.src != nil { - t.Errorf("ws.src = nil WANT non-nil") - } -} - -func TestWakeSignal_spawn_stop(t *testing.T) { - ws := newWakeSignal(nil, time.Now().Add(time.Duration(1)*time.Second)) - ws.spawn() - ws.stop <- struct{}{} - time.Sleep(time.Duration(250) * time.Millisecond) - if ws.src != nil { - t.Errorf("ws.src = nil WANT non-nil") - } -} - -func TestWakeSignal_kill(t *testing.T) { - ws := newWakeSignal(nil, time.Now()) - ws.kill() - defer func() { - if result := recover(); result == nil { - t.Errorf("kill() kill() recover() = nil WANT non-nil") - } - }() - ws.kill() -} - -type testMessageValue struct { - time.Time - Data interface{} -} - -func areChannelMessagesEqual(actualChan <-chan *Message, want []*Message) bool { - actual := []*Message{} - for i := 0; i < len(want); i++ { - actual = append(actual, <-actualChan) - } - return areMessagesEqual(actual, want) -} - -func areMessagesEqual(actual, want []*Message) bool { - return (len(actual) == 0 && len(want) == 0) || reflect.DeepEqual(actual, want) -} From cb230a256a3cdcfdc84fb61dd4055e9bfc2581ff Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Tue, 2 Jul 2019 17:41:37 -0700 Subject: [PATCH 2/7] Update bin and travis files --- .travis.yml | 11 ++++-- bin/bump_version.sh | 37 ------------------- bin/coveralls/{push.sh => push} | 0 bin/travis/{install.sh => install} | 3 +- .../{test_coverage.sh => test-coverage} | 0 5 files changed, 10 insertions(+), 41 deletions(-) delete mode 100755 bin/bump_version.sh rename bin/coveralls/{push.sh => push} (100%) rename bin/travis/{install.sh => install} (84%) rename bin/travis/{test_coverage.sh => test-coverage} (100%) diff --git a/.travis.yml b/.travis.yml index f2cecbd..9f1e0d9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,11 +10,16 @@ notifications: on_success: change on_failure: change +env: + global: + #COVERALLS_TOKEN + - secure: "Sem13/2A1jUUMrhgC5w8oPaVVKU0iPs7LZ1QqlHjSpCX03WcEvhwUloXoFtKqMjoe1hHRplUE5rO/gquQa+t34i9PSVp00qiLO5eec4M0rJJ/Uz1PG8BEfAlEaiuSZTQ+JM4pNvldc2gbmYQ+dz7Ans5lA0Hv5j5L7A9///O+IJ/3Ob6yhG9YGXx/ohrF0US6PkHwl6+8Vu9iMOLBpnxeV1pRPny5yjVbtvTmAEnQc6ZMLMX0LgwcIuTJIvS8JTJgqyO6vygh0ZxEzmCB1gqf4h4ZkxTn7wxlY9UaH2P7F0agInZLP+Pt6Opg/YvIVreh9gOGy1u6utN8CGkUstkz9W8zuPpxV0PGdhhR7nyWoj1072qPYTk/m/ED5flKsmlW8tfq0HbgcpumWoZov/UoECgWltT9nQb8XVHXMQApaNnhh/EThlC0/noNfehtJsT/zNEK6fSZK+ZHOqyxalViZsEznTbo7RzpP1qD59+dBWx544PCAghSemLg+XYoEJALn0SocSVpYiW+F1cHhxL4c0imMhHGzzk/id5ZAJ3RLFzOzH8Ker1Ssp8TkCqlHoksv/IkkEW0IQ3O7vfrNPjBCcI2foAPfdv3EivfuI3L5JPgiteQ8LyZfzCO2xTGnCqUoBBcIiM6471aaQtADdnZMcwW3tAzdgrSgXrTWGcobI=" + install: - - ./bin/travis/install.sh + - ./bin/travis/install script: - - ./bin/travis/test_coverage.sh + - ./bin/travis/test-coverage after_success: - - ./bin/coveralls/push.sh + - ./bin/coveralls/push diff --git a/bin/bump_version.sh b/bin/bump_version.sh deleted file mode 100755 index 835696e..0000000 --- a/bin/bump_version.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/sh - -#ensure we are on the master branch. -branch=$(git branch | grep "*" | cut -d " " -f 2) -if [ "$branch" != "master" ]; then - >&2 echo "you must be on master branch to run this command" - exit 1 -fi - -#grab the version argument. -version="$1" - -if [ -z "$version" ]; then - >&2 echo "usage: first argument supplied must be a non-empty version" - exit 2 -fi - -#set tag on new version commit. -git tag -a "$version" -did_tag=$? - -if [ $did_tag -ne 0 ]; then - exit 3 -fi - -#push master branch and new tag to origin. -git push origin master tag "$version" -did_push=$? - -echo "\n" - -if [ $did_push -eq 0 ]; then - echo "Version bump, commit, tag, and push to origin/master successful!!" -else - >&2 echo "failed to push new tag to origin/master" - exit 4 -fi diff --git a/bin/coveralls/push.sh b/bin/coveralls/push similarity index 100% rename from bin/coveralls/push.sh rename to bin/coveralls/push diff --git a/bin/travis/install.sh b/bin/travis/install similarity index 84% rename from bin/travis/install.sh rename to bin/travis/install index 6a2a7de..a6a4829 100755 --- a/bin/travis/install.sh +++ b/bin/travis/install @@ -2,4 +2,5 @@ go get github.com/axw/gocov/gocov go get github.com/ericelsken/goveralls -go get -t ./... + +go get ./... diff --git a/bin/travis/test_coverage.sh b/bin/travis/test-coverage similarity index 100% rename from bin/travis/test_coverage.sh rename to bin/travis/test-coverage From 70eb085e557a45f3a6728c699dd8e93b7ddbf9bc Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Thu, 4 Jul 2019 15:07:09 -0700 Subject: [PATCH 3/7] Flesh out implementation for TimeQueue --- .travis.yml | 1 + README.md | 2 +- message.go | 161 +++++++++++++++++++++++++++++++ message_test.go | 252 ++++++++++++++++++++++++++++++++++++++++++++++++ timequeue.go | 235 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 650 insertions(+), 1 deletion(-) create mode 100644 message.go create mode 100644 message_test.go create mode 100644 timequeue.go diff --git a/.travis.yml b/.travis.yml index 9f1e0d9..6b8d8b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ notifications: env: global: + - GO111MODULE=on #COVERALLS_TOKEN - secure: "Sem13/2A1jUUMrhgC5w8oPaVVKU0iPs7LZ1QqlHjSpCX03WcEvhwUloXoFtKqMjoe1hHRplUE5rO/gquQa+t34i9PSVp00qiLO5eec4M0rJJ/Uz1PG8BEfAlEaiuSZTQ+JM4pNvldc2gbmYQ+dz7Ans5lA0Hv5j5L7A9///O+IJ/3Ob6yhG9YGXx/ohrF0US6PkHwl6+8Vu9iMOLBpnxeV1pRPny5yjVbtvTmAEnQc6ZMLMX0LgwcIuTJIvS8JTJgqyO6vygh0ZxEzmCB1gqf4h4ZkxTn7wxlY9UaH2P7F0agInZLP+Pt6Opg/YvIVreh9gOGy1u6utN8CGkUstkz9W8zuPpxV0PGdhhR7nyWoj1072qPYTk/m/ED5flKsmlW8tfq0HbgcpumWoZov/UoECgWltT9nQb8XVHXMQApaNnhh/EThlC0/noNfehtJsT/zNEK6fSZK+ZHOqyxalViZsEznTbo7RzpP1qD59+dBWx544PCAghSemLg+XYoEJALn0SocSVpYiW+F1cHhxL4c0imMhHGzzk/id5ZAJ3RLFzOzH8Ker1Ssp8TkCqlHoksv/IkkEW0IQ3O7vfrNPjBCcI2foAPfdv3EivfuI3L5JPgiteQ8LyZfzCO2xTGnCqUoBBcIiM6471aaQtADdnZMcwW3tAzdgrSgXrTWGcobI=" diff --git a/README.md b/README.md index 8dc2424..f6da8e5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # timequeue -timequeue provides a TimeQueue type that releases arbitrary messages at given +timequeue provides a TimeQueue type that releases arbitrary Messages at given time.Times. diff --git a/message.go b/message.go new file mode 100644 index 0000000..aca38a2 --- /dev/null +++ b/message.go @@ -0,0 +1,161 @@ +package timequeue + +import ( + "container/heap" + "time" +) + +const ( + //indexNotInHeap is a sentinel for a Message.index that indicates the Message + //is not a in a messageHeap. + indexNotInHeap = -1 +) + +//Priority is the priority of a Message. +//Smaller values indicate a higher priority, with 0 being the highest priority. +type Priority uint32 + +//Message is a container type that associates a Time and Priority with some +//arbitrary data. +//A Message is "released" from a TimeQueue as close to Time At as possible. +// +//Message zero values are not in a valid state. You should use NewMessage to create +//Message instances. +type Message struct { + //At is the Time at which to release this Message. + At time.Time + + //Priority is the Priority of this Message. + //If a Message has an equal At value with another Message in the same TimeQueue, + //then Priority is consulted to "order" the Messages to determine which should + //be "released" first. + Priority + + //Data is any arbitrary data that you can put in a Message and retrieve when + //the Message is released. + Data interface{} + + //messageHeap is the messageHeap that this Message is in. + //A nil value means that Message is not in a messageHeap. + *messageHeap + + //index is the index at which this Message resides in messageHeap. + index int +} + +//NewMessage returns a Message with at, p, and data set on their corresponding fields. +// +//You should use this function to create Messages instead of using a struct initializer. +func NewMessage(at time.Time, p Priority, data interface{}) Message { + return Message{ + At: at, + Priority: p, + Data: data, + messageHeap: nil, + index: indexNotInHeap, + } +} + +//less returns whether or not m is "less than" other. +//This is used to determined the order in which Messages are released from a TimeQueue. +// +//It returns true if m.At is before other.At, regardless of Priorities. +//If m and other have an equal At field, then true is returned if m has a lower +//Priority than other. +func (m *Message) less(other *Message) bool { + diff := m.At.Sub(other.At) + if diff != 0 { + return diff < 0 + } + return m.Priority < other.Priority +} + +//isHead returns whether or not m is at the head of a messageHeap, i.e. the next +//one to be released. +func (m *Message) isHead() bool { + return m.messageHeap != nil && m.index == 0 +} + +func (m *Message) withoutHeap() Message { + m.messageHeap = nil + m.index = indexNotInHeap + return *m +} + +//messageHeap is a slice of Messages with methods that satisfy the heap.Interface. +// +//messageHeaps can be used with the heap package to push and pop Messages ordered +//by Message.less. +// +//messageHeaps are not safe for use by multiple goroutines. +// +//We let Go manage how increasing size and capacity works when appending to a +//messageHeap. +type messageHeap []*Message + +//Len is the heap.Interface implementation. +//It returns len(mh). +func (mh messageHeap) Len() int { + return len(mh) +} + +//Less is the heap.Interface implementation. +func (mh messageHeap) Less(i, j int) bool { + return mh[i].less(mh[j]) +} + +//Swap is the heap.Interface implementation. +func (mh messageHeap) Swap(i, j int) { + mh[i], mh[j] = mh[j], mh[i] + mh[i].index = i + mh[j].index = j +} + +//pushMessage is a helper that calls the heap.Push package function with mh and m. +func pushMessage(mh *messageHeap, m *Message) { + heap.Push(mh, m) +} + +//Push is the heap.Interface implementation. +func (mh *messageHeap) Push(x interface{}) { + n := len(*mh) + m := x.(*Message) + m.messageHeap, m.index = mh, n + *mh = append(*mh, m) +} + +//popMessage is a helper that calls the heap.Pop package function with mh. +func popMessage(mh *messageHeap) *Message { + return heap.Pop(mh).(*Message) +} + +//Pop is the heap.Interface implementation. +func (mh *messageHeap) Pop() interface{} { + old := *mh + n := len(old) + m := old[n-1] + m.messageHeap, m.index = nil, indexNotInHeap + *mh = old[0 : n-1] + return m +} + +//peek returns the next Message to be released, or nil if mh is empty. +func (mh *messageHeap) peek() *Message { + if mh.Len() > 0 { + return (*mh)[0] + } + return nil +} + +//remove attemps to remove m from mh. +// +//It returns true if m is actually stored in mh and was actually removed, false +//if m is not in mh. +func (mh *messageHeap) remove(m *Message) bool { + if m.messageHeap != mh { + return false + } + + heap.Remove(mh, m.index) + return true +} diff --git a/message_test.go b/message_test.go new file mode 100644 index 0000000..c05717e --- /dev/null +++ b/message_test.go @@ -0,0 +1,252 @@ +package timequeue + +import ( + "container/heap" + "math/rand" + "reflect" + "sort" + "time" + "testing" +) + +var ( + _ heap.Interface = new(messageHeap) +) + +func TestNewMessage(t *testing.T) { + now := time.Now() + p := Priority(1234) + var data interface{} = t.Name() + + m := NewMessage(now, p, data) + + if !m.At.Equal(now) { + t.Fatal("At") + } + if m.Priority != p { + t.Fatal("Priority") + } + if !reflect.DeepEqual(m.Data, data) { + t.Fatal("Data") + } + + if m.messageHeap != nil { + t.Fatal("messageHeap") + } + if m.index != indexNotInHeap { + t.Fatal("index") + } +} + +func TestMessage_less(t *testing.T) { + now := time.Now() + + cases := []struct{ + a Message + b Message + result bool + }{ + { + Message{At: now}, + Message{At: now.Add(-1)}, + false, + }, + { + Message{At: now.Add(-1)}, + Message{At: now}, + true, + }, + { + Message{At: now}, + Message{At: now}, + false, + }, + { + Message{At: now, Priority: 1}, + Message{At: now.Add(-1), Priority: 2}, + false, + }, + { + Message{At: now.Add(-1), Priority: 2}, + Message{At: now, Priority: 1}, + true, + }, + { + Message{At: now, Priority: 1}, + Message{At: now, Priority: 2}, + true, + }, + { + Message{At: now, Priority: 2}, + Message{At: now, Priority: 2}, + false, + }, + { + Message{At: now, Priority: 3}, + Message{At: now, Priority: 2}, + false, + }, + } + + for i, tc := range cases { + result := tc.a.less(&tc.b) + + if result != tc.result { + t.Errorf("%d: %v less %v = %v WANT %v", i, tc.a, tc.b, result, tc.result) + } + } +} + +func TestMessage_isHead_NewMessagesShouldNotBeHeads(t *testing.T) { + m := NewMessage(time.Now(), 0, nil) + if m.isHead() { + t.Fatal() + } +} + +func TestMessage_isHead_MessagesInLenOneHeapsAreHeads(t *testing.T) { + mh := messageHeap([]*Message{}) + m := NewMessage(time.Now(), 0, nil) + + pushMessage(&mh, &m) + + if !m.isHead() { + t.Fatal() + } +} + +func TestMessageHeap_Len(t *testing.T) { + mh := messageHeap([]*Message{}) + if mh.Len() != 0 { + t.Fatal() + } + + mh = messageHeap(make([]*Message, 1234)) + if mh.Len() != 1234 { + t.Fatal() + } +} + +func TestMessageHeap_Less_DefersToTheMessageLessMethod(t *testing.T) { + now := time.Now() + m1 := NewMessage(now, 0, nil) + m2 := NewMessage(now, 1, nil) + + mh := messageHeap([]*Message{&m1, &m2}) + + if !mh.Less(0, 1) { + t.Fatal() + } + if mh.Less(1, 0) { + t.Fatal() + } +} + +func TestMessageHeap_Swap_UpdatesReferencesAndIndices(t *testing.T) { + now := time.Now() + m1 := NewMessage(now, 0, nil) + m2 := NewMessage(now, 1, nil) + + mh := messageHeap([]*Message{&m1, &m2}) + + mh.Swap(0, 1) + + //Messages weren't pushed, so there isn't information on them. + //We can check to make sure the index is updated. + + if mh[0] != &m2 || m2.index != 0 { + t.Fatal() + } + if mh[1] != &m1 || m1.index != 1 { + t.Fatal() + } +} + +func TestMessageHeap_Push_SetsTheMessageHeapFieldOnMessage(t *testing.T) { + m := NewMessage(time.Now(), 0, nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, &m) + + if m.messageHeap != &mh { + t.Fatal() + } +} + +func TestMessageHeap_PushAndPopResultInTheCorrectOrdering(t *testing.T) { + now := time.Now() + + mh := messageHeap([]*Message{}) + + want := []*Message{} + for i := 0; i < 100; i++ { + m := NewMessage(now, Priority(rand.Int31()), nil) + want = append(want, &m) + + pushMessage(&mh, &m) + } + sort.Sort(messageHeap(want)) + + result := []*Message{} + for mh.Len() > 0 { + result = append(result, popMessage(&mh)) + } + + //Do a loop here to check equality of pointer values. + for i, m := range result { + if m != want[i] { + t.Fatal() + } + } +} + +func TestMessageHeap_peek_EmptyReturnsNil(t *testing.T) { + mh := messageHeap([]*Message{}) + + if r := mh.peek(); r != nil { + t.Fatal() + } +} + +func TestMessageHeap_peek_ReturnsMessageAtIndexZero(t *testing.T) { + m := NewMessage(time.Now(), 0, nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, &m) + + if peeked := mh.peek(); peeked != mh[0] { + t.Fatal() + } +} + +func TestMessageHeap_remove_ReturnsFalseWithoutAssociation(t *testing.T) { + m := NewMessage(time.Now(), 0, nil) + + mh := messageHeap([]*Message{}) + + if ok := mh.remove(&m); ok { + t.Fatal() + } +} + +func TestMessageHeap_remove_ReturnsTrueAndModifiesMessage(t *testing.T) { + m := NewMessage(time.Now(), 0, nil) + + mh := messageHeap([]*Message{}) + + pushMessage(&mh, &m) + + if m.index < 0 { + t.Fatal() + } + + if ok := mh.remove(&m); !ok { + t.Fatal() + } + + if m.index >= 0 { + t.Fatal() + } +} diff --git a/timequeue.go b/timequeue.go new file mode 100644 index 0000000..bedbe9e --- /dev/null +++ b/timequeue.go @@ -0,0 +1,235 @@ +package timequeue + +import ( + "sync" + "time" +) + +const ( + DefaultCapacity = 0 +) + +type TimeQueue struct { + timer *time.Timer + + out chan Message + + lock *sync.Mutex + messageHeap messageHeap + stopChan chan chan struct{} + pauseChan chan chan struct{} +} + +func New() *TimeQueue { + return NewCapacity(DefaultCapacity) +} + +func NewCapacity(c int) *TimeQueue { + tq := &TimeQueue{ + timer: newExpiredTimer(), + out: make(chan Message, c), + lock: &sync.Mutex{}, + messageHeap: messageHeap([]*Message{}), + stopChan: nil, + pauseChan: make(chan chan struct{}), //Must not have capacity to ensure only only goroutine is able to pause the run loop. + } + + tq.Start() + + return tq +} + +func newExpiredTimer() *time.Timer { + timer := time.NewTimer(0) + <-timer.C + + return timer +} + +func (tq *TimeQueue) Messages() <-chan Message { + return tq.out +} + +func (tq *TimeQueue) Start() bool { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.start() +} + +func (tq *TimeQueue) start() bool { + if !tq.isStopped() { + return false + } + + tq.stopChan = make(chan chan struct{}) + tq.run() + return true +} + +func (tq *TimeQueue) run() { + //TODO need to test that the timer chan keeps values across stops and starts. + // + + go func() { + for { + select { + case <-tq.timer.C: + tq.releaseNextMessage() + + case resultChan := <-tq.pauseChan: + resultChan <- struct{}{} + <-resultChan + + case resultChan := <-tq.stopChan: + resultChan <- struct{}{} + return + } + + select { + case resultChan := <-tq.stopChan: + resultChan <- struct{}{} + return + + default: + } + } + }() +} + +func (tq *TimeQueue) releaseNextMessage() { + //TODO document how we are the only goroutine with access to the messageHeap. + + m := popMessage(&tq.messageHeap) + tq.dispatch(m) + + peeked := tq.messageHeap.peek() + if peeked != nil { + tq.resetTimerTo(peeked.At) + } +} + +func (tq *TimeQueue) dispatch(m *Message) { + //We don't need to call m.withoutHeap becuase the prior pop operation already does that. + tq.out <- *m +} + +func (tq *TimeQueue) Stop() bool { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.stop() +} + +func (tq *TimeQueue) stop() bool { + if tq.isStopped() { + return false + } + + resultChan := make(chan struct{}) + tq.stopChan <- resultChan + <-resultChan + + tq.stopChan = nil + return true +} + +func (tq *TimeQueue) Drain() []Message { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.drain() +} + +func (tq *TimeQueue) drain() []Message { + unpause := tq.pause() + defer unpause() + + if tq.messageHeap.Len() > 0 { + defer tq.stopTimer() + } + + result := make([]Message, tq.messageHeap.Len()) + for i, m := range tq.messageHeap { + result[i] = m.withoutHeap() + } + + for len(tq.out) > 0 { + result = append(result, <-tq.out) + } + + return result +} + +func (tq *TimeQueue) isStopped() bool { + return tq.stopChan == nil +} + +func (tq *TimeQueue) Remove(m Message) bool { + tq.lock.Lock() + defer tq.lock.Unlock() + + return tq.remove(m) +} + +func (tq *TimeQueue) remove(m Message) bool { + unpause := tq.pause() + defer unpause() + + return tq.messageHeap.remove(&m) +} + +func (tq *TimeQueue) Push(at time.Time, p Priority, data interface{}) Message { + m := NewMessage(at, p, data) + tq.PushAll(m) + return m +} + +func (tq *TimeQueue) PushAll(messages ...Message) { + tq.lock.Lock() + defer tq.lock.Unlock() + + unpause := tq.pause() + defer unpause() + + var newHead *Message + + for _, m := range messages { + pushMessage(&tq.messageHeap, &m) + + if m.isHead() { + newHead = &m + } + } + + if newHead != nil { + if tq.messageHeap.Len() == 1 { + //We are the new head, but the only Message, so just set timer. + tq.resetTimerTo(newHead.At) + } else { + //We bumped out a prior head Message, so stop then reset. + tq.stopTimer() + tq.resetTimerTo(newHead.At) + } + } +} + +func (tq *TimeQueue) pause() func() { + resultChan := make(chan struct{}) + tq.pauseChan <- resultChan + <-tq.pauseChan + + return func() { + resultChan <- struct{}{} + } +} + +func (tq *TimeQueue) stopTimer() { + if !tq.timer.Stop() { + <-tq.timer.C + } +} + +func (tq *TimeQueue) resetTimerTo(t time.Time) { + tq.timer.Reset(time.Until(t)) +} From f0740498746196aa418339af81fd83a3c6c13570 Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Sat, 6 Jul 2019 19:18:27 -0700 Subject: [PATCH 4/7] Initialize Go module --- go.mod | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 go.mod diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1e598c8 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/gogolfing/timequeue + +go 1.12 From 845575445d03d0646e2a604ee8b75e5f8a2eaef4 Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Sat, 6 Jul 2019 19:18:40 -0700 Subject: [PATCH 5/7] Add implementations and tests --- message.go | 17 +++++- message_test.go | 62 +++++++++++++++------- timequeue.go | 74 +++++++++++++++++++++------ timequeue_accept_test.go | 64 +++++++++++++++++++++++ timequeue_test.go | 108 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 288 insertions(+), 37 deletions(-) create mode 100644 timequeue_accept_test.go create mode 100644 timequeue_test.go diff --git a/message.go b/message.go index aca38a2..6c96583 100644 --- a/message.go +++ b/message.go @@ -46,8 +46,8 @@ type Message struct { //NewMessage returns a Message with at, p, and data set on their corresponding fields. // //You should use this function to create Messages instead of using a struct initializer. -func NewMessage(at time.Time, p Priority, data interface{}) Message { - return Message{ +func NewMessage(at time.Time, p Priority, data interface{}) *Message { + return &Message{ At: at, Priority: p, Data: data, @@ -159,3 +159,16 @@ func (mh *messageHeap) remove(m *Message) bool { heap.Remove(mh, m.index) return true } + +func (mh *messageHeap) drain() []Message { + old := *mh + + result := make([]Message, len(old)) + for i, m := range old { + result[i] = m.withoutHeap() + } + + *mh = old[0:0] + + return result +} diff --git a/message_test.go b/message_test.go index c05717e..99088d3 100644 --- a/message_test.go +++ b/message_test.go @@ -5,8 +5,8 @@ import ( "math/rand" "reflect" "sort" - "time" "testing" + "time" ) var ( @@ -41,9 +41,9 @@ func TestNewMessage(t *testing.T) { func TestMessage_less(t *testing.T) { now := time.Now() - cases := []struct{ - a Message - b Message + cases := []struct { + a Message + b Message result bool }{ { @@ -108,7 +108,7 @@ func TestMessage_isHead_MessagesInLenOneHeapsAreHeads(t *testing.T) { mh := messageHeap([]*Message{}) m := NewMessage(time.Now(), 0, nil) - pushMessage(&mh, &m) + pushMessage(&mh, m) if !m.isHead() { t.Fatal() @@ -132,7 +132,7 @@ func TestMessageHeap_Less_DefersToTheMessageLessMethod(t *testing.T) { m1 := NewMessage(now, 0, nil) m2 := NewMessage(now, 1, nil) - mh := messageHeap([]*Message{&m1, &m2}) + mh := messageHeap([]*Message{m1, m2}) if !mh.Less(0, 1) { t.Fatal() @@ -147,17 +147,17 @@ func TestMessageHeap_Swap_UpdatesReferencesAndIndices(t *testing.T) { m1 := NewMessage(now, 0, nil) m2 := NewMessage(now, 1, nil) - mh := messageHeap([]*Message{&m1, &m2}) + mh := messageHeap([]*Message{m1, m2}) mh.Swap(0, 1) //Messages weren't pushed, so there isn't information on them. //We can check to make sure the index is updated. - if mh[0] != &m2 || m2.index != 0 { + if mh[0] != m2 || m2.index != 0 { t.Fatal() } - if mh[1] != &m1 || m1.index != 1 { + if mh[1] != m1 || m1.index != 1 { t.Fatal() } } @@ -167,7 +167,7 @@ func TestMessageHeap_Push_SetsTheMessageHeapFieldOnMessage(t *testing.T) { mh := messageHeap([]*Message{}) - pushMessage(&mh, &m) + pushMessage(&mh, m) if m.messageHeap != &mh { t.Fatal() @@ -182,9 +182,9 @@ func TestMessageHeap_PushAndPopResultInTheCorrectOrdering(t *testing.T) { want := []*Message{} for i := 0; i < 100; i++ { m := NewMessage(now, Priority(rand.Int31()), nil) - want = append(want, &m) + want = append(want, m) - pushMessage(&mh, &m) + pushMessage(&mh, m) } sort.Sort(messageHeap(want)) @@ -214,7 +214,7 @@ func TestMessageHeap_peek_ReturnsMessageAtIndexZero(t *testing.T) { mh := messageHeap([]*Message{}) - pushMessage(&mh, &m) + pushMessage(&mh, m) if peeked := mh.peek(); peeked != mh[0] { t.Fatal() @@ -226,7 +226,7 @@ func TestMessageHeap_remove_ReturnsFalseWithoutAssociation(t *testing.T) { mh := messageHeap([]*Message{}) - if ok := mh.remove(&m); ok { + if ok := mh.remove(m); ok { t.Fatal() } } @@ -236,17 +236,43 @@ func TestMessageHeap_remove_ReturnsTrueAndModifiesMessage(t *testing.T) { mh := messageHeap([]*Message{}) - pushMessage(&mh, &m) + pushMessage(&mh, m) - if m.index < 0 { + if m.messageHeap == nil || m.index < 0 { t.Fatal() } - if ok := mh.remove(&m); !ok { + if ok := mh.remove(m); !ok { t.Fatal() } - if m.index >= 0 { + assertDisassociated(t, *m) +} + +func TestMessageHeap_drain_ReturnsEqualLengthSliceOfMessagesNotInAHeapAndSetsLengthToZero(t *testing.T) { + mh := messageHeap([]*Message{}) + + for i := 0; i < 100; i++ { + m := NewMessage(time.Now(), 0, i) + pushMessage(&mh, m) + } + + drained := mh.drain() + + assertDisassociated(t, drained...) + + if len(drained) != 100 { + t.Fatal() + } + if mh.Len() != 0 { t.Fatal() } } + +func assertDisassociated(t *testing.T, messages ...Message) { + for _, m := range messages { + if m.messageHeap != nil || m.index >= 0 { + t.Error("Message is not disassociated", m) + } + } +} diff --git a/timequeue.go b/timequeue.go index bedbe9e..de4a428 100644 --- a/timequeue.go +++ b/timequeue.go @@ -1,6 +1,7 @@ package timequeue import ( + "log" "sync" "time" ) @@ -75,20 +76,27 @@ func (tq *TimeQueue) run() { for { select { case <-tq.timer.C: + log.Println("got timer") tq.releaseNextMessage() case resultChan := <-tq.pauseChan: + log.Println("got pause request") resultChan <- struct{}{} <-resultChan + log.Println("ended pause request") case resultChan := <-tq.stopChan: + log.Println("got stop request") resultChan <- struct{}{} + log.Println("ended stop request") return } select { case resultChan := <-tq.stopChan: + log.Println("got stop request") resultChan <- struct{}{} + log.Println("ended stop request") return default: @@ -103,10 +111,7 @@ func (tq *TimeQueue) releaseNextMessage() { m := popMessage(&tq.messageHeap) tq.dispatch(m) - peeked := tq.messageHeap.peek() - if peeked != nil { - tq.resetTimerTo(peeked.At) - } + tq.maybeResetTimerToHead() } func (tq *TimeQueue) dispatch(m *Message) { @@ -149,11 +154,11 @@ func (tq *TimeQueue) drain() []Message { defer tq.stopTimer() } - result := make([]Message, tq.messageHeap.Len()) - for i, m := range tq.messageHeap { - result[i] = m.withoutHeap() - } + //We start with the drained Messages from our heap. + result := tq.messageHeap.drain() + //If there are Messages on our output channel, then drain the channel. + //Messages on this channel are already disassociated with a heap. for len(tq.out) > 0 { result = append(result, <-tq.out) } @@ -165,44 +170,66 @@ func (tq *TimeQueue) isStopped() bool { return tq.stopChan == nil } -func (tq *TimeQueue) Remove(m Message) bool { +func (tq *TimeQueue) Remove(m *Message) bool { tq.lock.Lock() defer tq.lock.Unlock() return tq.remove(m) } -func (tq *TimeQueue) remove(m Message) bool { +func (tq *TimeQueue) remove(m *Message) bool { unpause := tq.pause() defer unpause() - return tq.messageHeap.remove(&m) + //TODO something with checking timer if removed message is head. + //TODO make sure the calling code gets understands that m is removed. + + isHead := m.isHead() + ok := tq.messageHeap.remove(m) + + if ok && isHead { + tq.stopTimer() + tq.maybeResetTimerToHead() + } + + return ok } func (tq *TimeQueue) Push(at time.Time, p Priority, data interface{}) Message { m := NewMessage(at, p, data) tq.PushAll(m) - return m + return *m } -func (tq *TimeQueue) PushAll(messages ...Message) { +func (tq *TimeQueue) PushAll(messages ...*Message) { tq.lock.Lock() defer tq.lock.Unlock() + log.Println("pushing messages", messages) + unpause := tq.pause() defer unpause() + log.Println("paused and defered unpause") + var newHead *Message for _, m := range messages { - pushMessage(&tq.messageHeap, &m) + pushMessage(&tq.messageHeap, m) + + log.Println("pushed message") if m.isHead() { - newHead = &m + log.Println("new message is head") + newHead = m + } else { + log.Println("new message is NOT head") } } if newHead != nil { + log.Println("doing something with timer because of new head") + if tq.messageHeap.Len() == 1 { //We are the new head, but the only Message, so just set timer. tq.resetTimerTo(newHead.At) @@ -212,13 +239,18 @@ func (tq *TimeQueue) PushAll(messages ...Message) { tq.resetTimerTo(newHead.At) } } + + log.Println("end of PushAll") } func (tq *TimeQueue) pause() func() { + if tq.isStopped() { + return func() {} + } + resultChan := make(chan struct{}) tq.pauseChan <- resultChan - <-tq.pauseChan - + <-resultChan return func() { resultChan <- struct{}{} } @@ -230,6 +262,14 @@ func (tq *TimeQueue) stopTimer() { } } +func (tq *TimeQueue) maybeResetTimerToHead() { + peeked := tq.messageHeap.peek() + + if peeked != nil { + tq.resetTimerTo(peeked.At) + } +} + func (tq *TimeQueue) resetTimerTo(t time.Time) { tq.timer.Reset(time.Until(t)) } diff --git a/timequeue_accept_test.go b/timequeue_accept_test.go new file mode 100644 index 0000000..4548729 --- /dev/null +++ b/timequeue_accept_test.go @@ -0,0 +1,64 @@ +package timequeue_test + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/gogolfing/timequeue" +) + +func ExampleTimeQueue() { + return + + tq := timequeue.New() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + log.Println("have context") + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + + <-ctx.Done() + + tq.Stop() + + log.Println("ctx is Done") + }() + + doneProducing := make(chan struct{}) + go func() { + defer close(doneProducing) + + for i := 0; i < 10; i++ { + m := tq.Push(time.Now().Add(time.Duration(i)*time.Second), 0, i) + log.Println("pushed", m) + } + + log.Println("done producing") + }() + + doneConsuming := make(chan struct{}) + go func() { + defer close(doneConsuming) + + for { + select { + case <-ctx.Done(): + return + + case m := <-tq.Messages(): + log.Println("received", m) + fmt.Println(m.Data.(int)) + } + } + }() + + <-doneProducing + <-doneConsuming + <-stopped +} diff --git a/timequeue_test.go b/timequeue_test.go new file mode 100644 index 0000000..a5ad0e2 --- /dev/null +++ b/timequeue_test.go @@ -0,0 +1,108 @@ +package timequeue + +import ( + "testing" + "time" +) + +func TestTimeQueue_New_CreatesAMessageChannelWithDefaultCapacity(t *testing.T) { + tq := New() + defer tq.Stop() + + if cap(tq.Messages()) != DefaultCapacity { + t.Fatal() + } +} + +func TestTimeQueue_NewCapacity_CreatesAMessagesChannelWithDesiredCapacity(t *testing.T) { + tq := NewCapacity(1234) + defer tq.Stop() + + if cap(tq.Messages()) != 1234 { + t.Fatal() + } +} + +func TestTimeQueue_Start_ReturnsFalseOnARunningTimeQueue(t *testing.T) { + tq := New() + defer tq.Stop() + + for i := 0; i < 10; i++ { + if tq.Start() { + t.Fatal() + } + } +} + +func TestTimeQueue_Stop_ReturnsTrueWhileRunningAndFalseWhileStopped(t *testing.T) { + tq := New() + + if !tq.Stop() { + t.Fatal("first Stop") + } + + for i := 0; i < 10; i++ { + if tq.Stop() { + t.Fatal() + } + } +} + +func TestTimeQueue_Start_ReturnsTrueWhileStopped(t *testing.T) { + tq := New() + tq.Stop() + + if !tq.Start() { + t.Fatal() + } + + defer tq.Stop() +} + +func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileRunning(t *testing.T) { + tq := NewCapacity(10) + defer tq.Stop() + + now := time.Now() + for i := 0; i < 10; i++ { + tq.Push(now, 0, i) + } + + drained := tq.Drain() + if len(drained) != 10 { + t.Fatal() + } + assertDisassociated(t, drained...) +} + +func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileStopped(t *testing.T) { + tq := NewCapacity(10) + + now := time.Now() + for i := 0; i < 10; i++ { + tq.Push(now, 0, i) + } + + tq.Stop() + + drained := tq.Drain() + if len(drained) != 10 { + t.Fatal() + } + assertDisassociated(t, drained...) +} + +func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t *testing.T) { + tq := New() + defer tq.Stop() + + m := tq.Push(time.Now().Add(time.Hour), 0, "whoami") + + if ok := tq.Remove(&m); !ok { + t.Fatal(ok) + } + + if len(tq.Drain()) != 0 { + t.Fatal() + } +} From 53c48b5d9a69fecbb7df157c146c27f417b02cce Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Sun, 7 Jul 2019 14:04:10 -0700 Subject: [PATCH 6/7] Update tests for the TimeQueue type --- message.go | 48 ++++--- message_test.go | 46 +++---- timequeue.go | 54 ++------ timequeue_accept_test.go | 263 ++++++++++++++++++++++++++++++++++---- timequeue_example_test.go | 72 +++++++++++ timequeue_test.go | 125 ++++++++++++++++-- 6 files changed, 492 insertions(+), 116 deletions(-) create mode 100644 timequeue_example_test.go diff --git a/message.go b/message.go index 6c96583..04de5f3 100644 --- a/message.go +++ b/message.go @@ -22,18 +22,18 @@ type Priority uint32 //Message zero values are not in a valid state. You should use NewMessage to create //Message instances. type Message struct { - //At is the Time at which to release this Message. - At time.Time + //at is the Time at which to release this Message. + at time.Time - //Priority is the Priority of this Message. + //priority is the Priority of this Message. //If a Message has an equal At value with another Message in the same TimeQueue, - //then Priority is consulted to "order" the Messages to determine which should + //then priority is consulted to "order" the Messages to determine which should //be "released" first. - Priority + priority Priority - //Data is any arbitrary data that you can put in a Message and retrieve when + //data is any arbitrary data that you can put in a Message and retrieve when //the Message is released. - Data interface{} + data interface{} //messageHeap is the messageHeap that this Message is in. //A nil value means that Message is not in a messageHeap. @@ -48,26 +48,44 @@ type Message struct { //You should use this function to create Messages instead of using a struct initializer. func NewMessage(at time.Time, p Priority, data interface{}) *Message { return &Message{ - At: at, - Priority: p, - Data: data, + at: at, + priority: p, + data: data, messageHeap: nil, index: indexNotInHeap, } } +//At returns the Time at which m is scheduled to be released. +func (m *Message) At() time.Time { + return m.at +} + +//Priority returns m's Priority. +func (m *Message) Priority() Priority { + return m.priority +} + +//Data returns the data associated with m. +// +//This will usually be used after receiving a Message from a TimeQueue in order +//to process the Message appropriately. +func (m *Message) Data() interface{} { + return m.data +} + //less returns whether or not m is "less than" other. //This is used to determined the order in which Messages are released from a TimeQueue. // -//It returns true if m.At is before other.At, regardless of Priorities. -//If m and other have an equal At field, then true is returned if m has a lower -//Priority than other. +//It returns true if m.at is before other.at, regardless of Priorities. +//If m and other have an equal at field, then true is returned if m has a lower +//priority than other. func (m *Message) less(other *Message) bool { - diff := m.At.Sub(other.At) + diff := m.at.Sub(other.at) if diff != 0 { return diff < 0 } - return m.Priority < other.Priority + return m.priority < other.priority } //isHead returns whether or not m is at the head of a messageHeap, i.e. the next diff --git a/message_test.go b/message_test.go index 99088d3..ffe1d87 100644 --- a/message_test.go +++ b/message_test.go @@ -20,14 +20,14 @@ func TestNewMessage(t *testing.T) { m := NewMessage(now, p, data) - if !m.At.Equal(now) { - t.Fatal("At") + if !m.at.Equal(now) { + t.Fatal("at") } - if m.Priority != p { - t.Fatal("Priority") + if m.priority != p { + t.Fatal("priority") } - if !reflect.DeepEqual(m.Data, data) { - t.Fatal("Data") + if !reflect.DeepEqual(m.data, data) { + t.Fatal("data") } if m.messageHeap != nil { @@ -47,43 +47,43 @@ func TestMessage_less(t *testing.T) { result bool }{ { - Message{At: now}, - Message{At: now.Add(-1)}, + Message{at: now}, + Message{at: now.Add(-1)}, false, }, { - Message{At: now.Add(-1)}, - Message{At: now}, + Message{at: now.Add(-1)}, + Message{at: now}, true, }, { - Message{At: now}, - Message{At: now}, + Message{at: now}, + Message{at: now}, false, }, { - Message{At: now, Priority: 1}, - Message{At: now.Add(-1), Priority: 2}, + Message{at: now, priority: 1}, + Message{at: now.Add(-1), priority: 2}, false, }, { - Message{At: now.Add(-1), Priority: 2}, - Message{At: now, Priority: 1}, + Message{at: now.Add(-1), priority: 2}, + Message{at: now, priority: 1}, true, }, { - Message{At: now, Priority: 1}, - Message{At: now, Priority: 2}, + Message{at: now, priority: 1}, + Message{at: now, priority: 2}, true, }, { - Message{At: now, Priority: 2}, - Message{At: now, Priority: 2}, + Message{at: now, priority: 2}, + Message{at: now, priority: 2}, false, }, { - Message{At: now, Priority: 3}, - Message{At: now, Priority: 2}, + Message{at: now, priority: 3}, + Message{at: now, priority: 2}, false, }, } @@ -270,6 +270,8 @@ func TestMessageHeap_drain_ReturnsEqualLengthSliceOfMessagesNotInAHeapAndSetsLen } func assertDisassociated(t *testing.T, messages ...Message) { + t.Helper() + for _, m := range messages { if m.messageHeap != nil || m.index >= 0 { t.Error("Message is not disassociated", m) diff --git a/timequeue.go b/timequeue.go index de4a428..6a19120 100644 --- a/timequeue.go +++ b/timequeue.go @@ -1,7 +1,6 @@ package timequeue import ( - "log" "sync" "time" ) @@ -21,11 +20,11 @@ type TimeQueue struct { pauseChan chan chan struct{} } -func New() *TimeQueue { - return NewCapacity(DefaultCapacity) +func NewTimeQueue() *TimeQueue { + return NewTimeQueueCapacity(DefaultCapacity) } -func NewCapacity(c int) *TimeQueue { +func NewTimeQueueCapacity(c int) *TimeQueue { tq := &TimeQueue{ timer: newExpiredTimer(), out: make(chan Message, c), @@ -69,34 +68,24 @@ func (tq *TimeQueue) start() bool { } func (tq *TimeQueue) run() { - //TODO need to test that the timer chan keeps values across stops and starts. - // - go func() { for { select { case <-tq.timer.C: - log.Println("got timer") tq.releaseNextMessage() case resultChan := <-tq.pauseChan: - log.Println("got pause request") resultChan <- struct{}{} <-resultChan - log.Println("ended pause request") case resultChan := <-tq.stopChan: - log.Println("got stop request") resultChan <- struct{}{} - log.Println("ended stop request") return } select { case resultChan := <-tq.stopChan: - log.Println("got stop request") resultChan <- struct{}{} - log.Println("ended stop request") return default: @@ -181,9 +170,6 @@ func (tq *TimeQueue) remove(m *Message) bool { unpause := tq.pause() defer unpause() - //TODO something with checking timer if removed message is head. - //TODO make sure the calling code gets understands that m is removed. - isHead := m.isHead() ok := tq.messageHeap.remove(m) @@ -195,52 +181,36 @@ func (tq *TimeQueue) remove(m *Message) bool { return ok } -func (tq *TimeQueue) Push(at time.Time, p Priority, data interface{}) Message { +func (tq *TimeQueue) Push(at time.Time, p Priority, data interface{}) *Message { m := NewMessage(at, p, data) tq.PushAll(m) - return *m + return m } func (tq *TimeQueue) PushAll(messages ...*Message) { tq.lock.Lock() defer tq.lock.Unlock() - log.Println("pushing messages", messages) - unpause := tq.pause() defer unpause() - log.Println("paused and defered unpause") - - var newHead *Message + hasNewHead := false for _, m := range messages { pushMessage(&tq.messageHeap, m) - log.Println("pushed message") - if m.isHead() { - log.Println("new message is head") - newHead = m - } else { - log.Println("new message is NOT head") + hasNewHead = true } } - if newHead != nil { - log.Println("doing something with timer because of new head") - - if tq.messageHeap.Len() == 1 { - //We are the new head, but the only Message, so just set timer. - tq.resetTimerTo(newHead.At) - } else { - //We bumped out a prior head Message, so stop then reset. + if hasNewHead { + if len(messages) < tq.messageHeap.Len() { + //TODO displaced docs tq.stopTimer() - tq.resetTimerTo(newHead.At) } + tq.maybeResetTimerToHead() } - - log.Println("end of PushAll") } func (tq *TimeQueue) pause() func() { @@ -266,7 +236,7 @@ func (tq *TimeQueue) maybeResetTimerToHead() { peeked := tq.messageHeap.peek() if peeked != nil { - tq.resetTimerTo(peeked.At) + tq.resetTimerTo(peeked.At()) } } diff --git a/timequeue_accept_test.go b/timequeue_accept_test.go index 4548729..76fae35 100644 --- a/timequeue_accept_test.go +++ b/timequeue_accept_test.go @@ -2,63 +2,272 @@ package timequeue_test import ( "context" - "fmt" - "log" + "math/rand" + "sort" + "testing" "time" "github.com/gogolfing/timequeue" ) -func ExampleTimeQueue() { - return +func messagesLessFunc(messages []timequeue.Message) func(i, j int) bool { + return func(i, j int) bool { + return messages[i].At().Before(messages[j].At()) + } +} - tq := timequeue.New() +func timeWithinDurationFunc(t time.Time, d time.Duration) func() time.Time { + return func() time.Time { + return t.Add(time.Duration(rand.Int63n(int64(d)))) + } +} - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() +func TestTimeQueue_SinglePublisherAndConsumerRetrievesMessagesInOrder(t *testing.T) { + tq := timequeue.NewTimeQueue() - log.Println("have context") + const count = 10000 - stopped := make(chan struct{}) go func() { - defer close(stopped) + now := time.Now() + atFunc := timeWithinDurationFunc(now, time.Second) + + toPush := make([]*timequeue.Message, count) + for i := 0; i < count; i++ { + toPush[i] = timequeue.NewMessage(atFunc(), 0, i) + } + + tq.PushAll(toPush...) + }() + + messages := make([]timequeue.Message, 0, count) + + for i := 0; i < count; i++ { + messages = append(messages, <-tq.Messages()) + } + + sorted := sort.SliceIsSorted(messages, messagesLessFunc(messages)) + if !sorted { + t.Fatal(sorted) + } +} + +func TestTimeQueue_FullOnUsage(t *testing.T) { + if testing.Short() { + t.Skip("skipping for time") + } + + //now is used a reference for the start time of the test. + now := time.Now() + goroutineCount := 10 + messagesPerGoroutine := 10000 + duration := time.Duration(10) * time.Second + atFunc := timeWithinDurationFunc(now, duration) + pauseCount := 100 + + tq := timequeue.NewTimeQueueCapacity(messagesPerGoroutine) + + consumingCtx, consumingCtxCancel := context.WithCancel(context.Background()) + consumedCountChan := consumeMessages(consumingCtx, tq, goroutineCount) + + producingCtx, producingCtxCancel := context.WithCancel(context.Background()) + producedCountChan, messagesToRemove := produceMessages( + producingCtx, + tq, + goroutineCount, + messagesPerGoroutine, + atFunc, + 0.15, + ) + + removingCtx := producingCtx + removedCountChan := removeMessages(removingCtx, tq, 1, messagesToRemove) + + //Wait to get through a portion of the allotted duration. + deadline := now.Add(duration * 3 / 4) + + stopCtx, stop := context.WithDeadline(context.Background(), deadline) + defer stop() + + //Send intermittent Stop then Start signals. + pauseThroughoutDeadline(stopCtx, tq, deadline, pauseCount) + + <-stopCtx.Done() //Actually wait until the deadline hits. + producingCtxCancel() //Stop producing. We may already be done producing depending on the number of messages sent. + + producedCount := <-producedCountChan + //Getting here means that we are done producing. All producers have returned. + + tq.Stop() //We call Stop while there are still consumers running. This is a requirement. + consumingCtxCancel() //Now we stop consuming. + + removedCount := <-removedCountChan //All removers have returned. + consumedCount := <-consumedCountChan //All consumers have returned. We know there are no more receive attempts from tq.Messages(). + + //From above, we know we can call Drain(). + + drainedCount := len(tq.Drain()) + + totalReceived := removedCount + consumedCount + drainedCount + + if producedCount != totalReceived { + t.Fatalf( + "produced %v ; removed + consumed + drained = %v + %v + %v = %v", + producedCount, + removedCount, + consumedCount, + drainedCount, + totalReceived, + ) + } +} + +func consumeMessages(ctx context.Context, tq *timequeue.TimeQueue, grc int) <-chan int { + aggChan := make(chan int) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + for { + select { + case <-ctx.Done(): + aggChan <- count + return + + case <-tq.Messages(): + count++ + } + } + }() + } - <-ctx.Done() + result := make(chan int, 1) - tq.Stop() + go func() { + defer close(aggChan) + defer close(result) - log.Println("ctx is Done") + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan + } + result <- sum }() - doneProducing := make(chan struct{}) + return result +} + +func produceMessages(ctx context.Context, tq *timequeue.TimeQueue, grc, mpg int, atFunc func() time.Time, removeRate float64) (<-chan int, <-chan *timequeue.Message) { + aggChan := make(chan int) + removeChan := make(chan *timequeue.Message, mpg) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + + loop: + for count < mpg { + select { + case <-ctx.Done(): + break loop + + default: + m := tq.Push(atFunc(), 0, i) + count++ + + if rand.Float64() < removeRate { + removeChan <- m + } + } + } + + aggChan <- count + }() + } + + result := make(chan int, 1) + go func() { - defer close(doneProducing) + defer close(aggChan) + defer close(removeChan) + defer close(result) - for i := 0; i < 10; i++ { - m := tq.Push(time.Now().Add(time.Duration(i)*time.Second), 0, i) - log.Println("pushed", m) + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan } + result <- sum + }() + + return result, removeChan +} - log.Println("done producing") +func removeMessages(ctx context.Context, tq *timequeue.TimeQueue, grc int, toRemove <-chan *timequeue.Message) <-chan int { + aggChan := make(chan int) + + for i := 0; i < grc; i++ { + go func() { + count := 0 + + loop: + for m := range toRemove { + select { + case <-ctx.Done(): + break loop + + default: + if ok := tq.Remove(m); ok { + count++ + } + } + } + + aggChan <- count + }() + } + + result := make(chan int, 1) + + go func() { + defer close(aggChan) + defer close(result) + + sum := 0 + for i := 0; i < grc; i++ { + sum += <-aggChan + } + result <- sum }() - doneConsuming := make(chan struct{}) + return result +} + +func pauseThroughoutDeadline(ctx context.Context, tq *timequeue.TimeQueue, deadline time.Time, count int) { + defer tq.Start() + + done := make(chan struct{}) + go func() { - defer close(doneConsuming) + ticker := time.NewTicker(time.Until(deadline) / time.Duration(count)) + defer ticker.Stop() + + start := false for { select { case <-ctx.Done(): + close(done) return - case m := <-tq.Messages(): - log.Println("received", m) - fmt.Println(m.Data.(int)) + case <-ticker.C: + if start { + tq.Start() + } else { + tq.Stop() + } + start = !start } } }() - <-doneProducing - <-doneConsuming - <-stopped + <-done } diff --git a/timequeue_example_test.go b/timequeue_example_test.go new file mode 100644 index 0000000..579fde6 --- /dev/null +++ b/timequeue_example_test.go @@ -0,0 +1,72 @@ +package timequeue_test + +import ( + "context" + "fmt" + "time" + + "github.com/gogolfing/timequeue" +) + +func ExampleTimeQueue() { + now := time.Now() + tq := timequeue.NewTimeQueue() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + + <-ctx.Done() + + tq.Stop() + }() + + doneProducing := make(chan struct{}) + go func() { + defer close(doneProducing) + + const count = 10 + + toPush := make([]*timequeue.Message, count) + for i := 0; i < count; i++ { + m := timequeue.NewMessage(now.Add(time.Duration(i)), 0, i+1) + toPush[i] = m + } + + tq.PushAll(toPush...) + }() + + doneConsuming := make(chan struct{}) + go func() { + defer close(doneConsuming) + + for { + select { + case <-stopped: + return + + case m := <-tq.Messages(): + fmt.Println(m.Data().(int)) + } + } + }() + + <-doneProducing + <-stopped + <-doneConsuming + + //Output: + //1 + //2 + //3 + //4 + //5 + //6 + //7 + //8 + //9 + //10 +} diff --git a/timequeue_test.go b/timequeue_test.go index a5ad0e2..1ef2c20 100644 --- a/timequeue_test.go +++ b/timequeue_test.go @@ -6,7 +6,7 @@ import ( ) func TestTimeQueue_New_CreatesAMessageChannelWithDefaultCapacity(t *testing.T) { - tq := New() + tq := NewTimeQueue() defer tq.Stop() if cap(tq.Messages()) != DefaultCapacity { @@ -15,7 +15,7 @@ func TestTimeQueue_New_CreatesAMessageChannelWithDefaultCapacity(t *testing.T) { } func TestTimeQueue_NewCapacity_CreatesAMessagesChannelWithDesiredCapacity(t *testing.T) { - tq := NewCapacity(1234) + tq := NewTimeQueueCapacity(1234) defer tq.Stop() if cap(tq.Messages()) != 1234 { @@ -24,7 +24,7 @@ func TestTimeQueue_NewCapacity_CreatesAMessagesChannelWithDesiredCapacity(t *tes } func TestTimeQueue_Start_ReturnsFalseOnARunningTimeQueue(t *testing.T) { - tq := New() + tq := NewTimeQueue() defer tq.Stop() for i := 0; i < 10; i++ { @@ -35,7 +35,7 @@ func TestTimeQueue_Start_ReturnsFalseOnARunningTimeQueue(t *testing.T) { } func TestTimeQueue_Stop_ReturnsTrueWhileRunningAndFalseWhileStopped(t *testing.T) { - tq := New() + tq := NewTimeQueue() if !tq.Stop() { t.Fatal("first Stop") @@ -49,7 +49,7 @@ func TestTimeQueue_Stop_ReturnsTrueWhileRunningAndFalseWhileStopped(t *testing.T } func TestTimeQueue_Start_ReturnsTrueWhileStopped(t *testing.T) { - tq := New() + tq := NewTimeQueue() tq.Stop() if !tq.Start() { @@ -60,7 +60,7 @@ func TestTimeQueue_Start_ReturnsTrueWhileStopped(t *testing.T) { } func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileRunning(t *testing.T) { - tq := NewCapacity(10) + tq := NewTimeQueueCapacity(10) defer tq.Stop() now := time.Now() @@ -76,7 +76,7 @@ func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileR } func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileStopped(t *testing.T) { - tq := NewCapacity(10) + tq := NewTimeQueueCapacity(10) now := time.Now() for i := 0; i < 10; i++ { @@ -93,12 +93,12 @@ func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileS } func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t *testing.T) { - tq := New() + tq := NewTimeQueue() defer tq.Stop() - m := tq.Push(time.Now().Add(time.Hour), 0, "whoami") + m := tq.Push(time.Now().Add(time.Hour), 0, nil) - if ok := tq.Remove(&m); !ok { + if ok := tq.Remove(m); !ok { t.Fatal(ok) } @@ -106,3 +106,108 @@ func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t * t.Fatal() } } + +func TestTimeQueue_Remove_CanRemoveAMessageWhileStoppedAndNothingGetsDrained(t *testing.T) { + tq := NewTimeQueue() + + m := tq.Push(time.Now().Add(time.Hour), 0, nil) + + tq.Stop() + + if ok := tq.Remove(m); !ok { + t.Fatal(ok) + } + + if len(tq.Drain()) != 0 { + t.Fatal() + } +} + +func TestTimeQueue_Remove_CallingRemoveWithAMessageFromAnotherQueueReturnsFalse(t *testing.T) { + tq1 := NewTimeQueueCapacity(1) //We need capacity so we don't block. + defer tq1.Stop() + + tq2 := NewTimeQueueCapacity(1) //We need capacity so we don't block. + defer tq2.Stop() + + tq1.Push(time.Now(), 0, nil) + m2 := tq2.Push(time.Now(), 0, nil) + + if tq1.Remove(m2) { + t.Fatal() + } + + if len(tq1.Drain()) != 1 { + t.Fatal() + } + if len(tq2.Drain()) != 1 { + t.Fatal() + } +} + +func TestTimeQueue_Push_WeCanPushAsManyMessagesAsWeWantAtNowWhileStoppedWithoutCapacity(t *testing.T) { + tq := NewTimeQueue() + tq.Stop() + + now := time.Now() + for i := 0; i < 100; i++ { + tq.Push(now, 0, i) + } +} + +func TestTimeQueue_PushAll_WeCanPushAllWithAsManyMessagesAsWeWantAtNowWhileRunningWithoutCapacity(t *testing.T) { + done := make(chan struct{}) + + tq := NewTimeQueue() + defer close(done) + defer tq.Stop() + + now := time.Now() + messages := []*Message{} + for i := 0; i < 100; i++ { + messages = append(messages, NewMessage(now, 0, i)) + } + + tq.PushAll(messages...) + + go func() { + for { + select { + case <-tq.Messages(): + + case <-done: + return + } + } + }() +} + +func TestTimeQueue_PushAll_CorrectlyStopsThenResetsTheTimerWhenWeAddANewHeadWithMessagesAlreadyInTheQueue(t *testing.T) { + tq := NewTimeQueueCapacity(2) //We need capacity so we don't block. + + defer tq.Stop() //Need to be running to check the timer stop. So defer. + + now := time.Now() + tq.PushAll( + NewMessage(now.Add(time.Hour), 0, nil), + ) + + tq.PushAll( + NewMessage(now, 0, nil), + ) +} + +func TestTimeQueue_CanSuccessfullyReceiveMessageFromTimerAfterResumingFromStopped(t *testing.T) { + tq := NewTimeQueueCapacity(1) + + tq.Stop() + + tq.Push(time.Now().Add(time.Second/2), 0, "my message") + + tq.Start() + + m := <-tq.Messages() + if m.Data().(string) != "my message" { + t.Fatal() + } +} From fcc1a57d51c2a5d844e20b9048a0685bba19fa7a Mon Sep 17 00:00:00 2001 From: Eric Elsken Date: Sun, 7 Jul 2019 14:26:29 -0700 Subject: [PATCH 7/7] Remove Priority from Message type --- message.go | 32 ++++------------------ message_test.go | 56 +++++++++------------------------------ timequeue.go | 4 +-- timequeue_accept_test.go | 4 +-- timequeue_example_test.go | 2 +- timequeue_test.go | 22 +++++++-------- 6 files changed, 34 insertions(+), 86 deletions(-) diff --git a/message.go b/message.go index 04de5f3..95a0fa9 100644 --- a/message.go +++ b/message.go @@ -11,11 +11,7 @@ const ( indexNotInHeap = -1 ) -//Priority is the priority of a Message. -//Smaller values indicate a higher priority, with 0 being the highest priority. -type Priority uint32 - -//Message is a container type that associates a Time and Priority with some +//Message is a container type that associates a Time with some //arbitrary data. //A Message is "released" from a TimeQueue as close to Time At as possible. // @@ -25,12 +21,6 @@ type Message struct { //at is the Time at which to release this Message. at time.Time - //priority is the Priority of this Message. - //If a Message has an equal At value with another Message in the same TimeQueue, - //then priority is consulted to "order" the Messages to determine which should - //be "released" first. - priority Priority - //data is any arbitrary data that you can put in a Message and retrieve when //the Message is released. data interface{} @@ -43,13 +33,12 @@ type Message struct { index int } -//NewMessage returns a Message with at, p, and data set on their corresponding fields. +//NewMessage returns a Message with at and data set on their corresponding fields. // //You should use this function to create Messages instead of using a struct initializer. -func NewMessage(at time.Time, p Priority, data interface{}) *Message { +func NewMessage(at time.Time, data interface{}) *Message { return &Message{ at: at, - priority: p, data: data, messageHeap: nil, index: indexNotInHeap, @@ -61,11 +50,6 @@ func (m *Message) At() time.Time { return m.at } -//Priority returns m's Priority. -func (m *Message) Priority() Priority { - return m.priority -} - //Data returns the data associated with m. // //This will usually be used after receiving a Message from a TimeQueue in order @@ -77,15 +61,9 @@ func (m *Message) Data() interface{} { //less returns whether or not m is "less than" other. //This is used to determined the order in which Messages are released from a TimeQueue. // -//It returns true if m.at is before other.at, regardless of Priorities. -//If m and other have an equal at field, then true is returned if m has a lower -//priority than other. +//It returns true if m.at is before other.at. func (m *Message) less(other *Message) bool { - diff := m.at.Sub(other.at) - if diff != 0 { - return diff < 0 - } - return m.priority < other.priority + return m.at.Before(other.at) } //isHead returns whether or not m is at the head of a messageHeap, i.e. the next diff --git a/message_test.go b/message_test.go index ffe1d87..89698b8 100644 --- a/message_test.go +++ b/message_test.go @@ -2,7 +2,6 @@ package timequeue import ( "container/heap" - "math/rand" "reflect" "sort" "testing" @@ -15,17 +14,13 @@ var ( func TestNewMessage(t *testing.T) { now := time.Now() - p := Priority(1234) var data interface{} = t.Name() - m := NewMessage(now, p, data) + m := NewMessage(now, data) if !m.at.Equal(now) { t.Fatal("at") } - if m.priority != p { - t.Fatal("priority") - } if !reflect.DeepEqual(m.data, data) { t.Fatal("data") } @@ -61,31 +56,6 @@ func TestMessage_less(t *testing.T) { Message{at: now}, false, }, - { - Message{at: now, priority: 1}, - Message{at: now.Add(-1), priority: 2}, - false, - }, - { - Message{at: now.Add(-1), priority: 2}, - Message{at: now, priority: 1}, - true, - }, - { - Message{at: now, priority: 1}, - Message{at: now, priority: 2}, - true, - }, - { - Message{at: now, priority: 2}, - Message{at: now, priority: 2}, - false, - }, - { - Message{at: now, priority: 3}, - Message{at: now, priority: 2}, - false, - }, } for i, tc := range cases { @@ -98,7 +68,7 @@ func TestMessage_less(t *testing.T) { } func TestMessage_isHead_NewMessagesShouldNotBeHeads(t *testing.T) { - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) if m.isHead() { t.Fatal() } @@ -106,7 +76,7 @@ func TestMessage_isHead_NewMessagesShouldNotBeHeads(t *testing.T) { func TestMessage_isHead_MessagesInLenOneHeapsAreHeads(t *testing.T) { mh := messageHeap([]*Message{}) - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) pushMessage(&mh, m) @@ -129,8 +99,8 @@ func TestMessageHeap_Len(t *testing.T) { func TestMessageHeap_Less_DefersToTheMessageLessMethod(t *testing.T) { now := time.Now() - m1 := NewMessage(now, 0, nil) - m2 := NewMessage(now, 1, nil) + m1 := NewMessage(now, nil) + m2 := NewMessage(now.Add(1), nil) mh := messageHeap([]*Message{m1, m2}) @@ -144,8 +114,8 @@ func TestMessageHeap_Less_DefersToTheMessageLessMethod(t *testing.T) { func TestMessageHeap_Swap_UpdatesReferencesAndIndices(t *testing.T) { now := time.Now() - m1 := NewMessage(now, 0, nil) - m2 := NewMessage(now, 1, nil) + m1 := NewMessage(now, nil) + m2 := NewMessage(now, nil) mh := messageHeap([]*Message{m1, m2}) @@ -163,7 +133,7 @@ func TestMessageHeap_Swap_UpdatesReferencesAndIndices(t *testing.T) { } func TestMessageHeap_Push_SetsTheMessageHeapFieldOnMessage(t *testing.T) { - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) mh := messageHeap([]*Message{}) @@ -181,7 +151,7 @@ func TestMessageHeap_PushAndPopResultInTheCorrectOrdering(t *testing.T) { want := []*Message{} for i := 0; i < 100; i++ { - m := NewMessage(now, Priority(rand.Int31()), nil) + m := NewMessage(now.Add(time.Duration(i)), nil) want = append(want, m) pushMessage(&mh, m) @@ -210,7 +180,7 @@ func TestMessageHeap_peek_EmptyReturnsNil(t *testing.T) { } func TestMessageHeap_peek_ReturnsMessageAtIndexZero(t *testing.T) { - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) mh := messageHeap([]*Message{}) @@ -222,7 +192,7 @@ func TestMessageHeap_peek_ReturnsMessageAtIndexZero(t *testing.T) { } func TestMessageHeap_remove_ReturnsFalseWithoutAssociation(t *testing.T) { - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) mh := messageHeap([]*Message{}) @@ -232,7 +202,7 @@ func TestMessageHeap_remove_ReturnsFalseWithoutAssociation(t *testing.T) { } func TestMessageHeap_remove_ReturnsTrueAndModifiesMessage(t *testing.T) { - m := NewMessage(time.Now(), 0, nil) + m := NewMessage(time.Now(), nil) mh := messageHeap([]*Message{}) @@ -253,7 +223,7 @@ func TestMessageHeap_drain_ReturnsEqualLengthSliceOfMessagesNotInAHeapAndSetsLen mh := messageHeap([]*Message{}) for i := 0; i < 100; i++ { - m := NewMessage(time.Now(), 0, i) + m := NewMessage(time.Now(), i) pushMessage(&mh, m) } diff --git a/timequeue.go b/timequeue.go index 6a19120..067c13b 100644 --- a/timequeue.go +++ b/timequeue.go @@ -181,8 +181,8 @@ func (tq *TimeQueue) remove(m *Message) bool { return ok } -func (tq *TimeQueue) Push(at time.Time, p Priority, data interface{}) *Message { - m := NewMessage(at, p, data) +func (tq *TimeQueue) Push(at time.Time, data interface{}) *Message { + m := NewMessage(at, data) tq.PushAll(m) return m } diff --git a/timequeue_accept_test.go b/timequeue_accept_test.go index 76fae35..d804a28 100644 --- a/timequeue_accept_test.go +++ b/timequeue_accept_test.go @@ -33,7 +33,7 @@ func TestTimeQueue_SinglePublisherAndConsumerRetrievesMessagesInOrder(t *testing toPush := make([]*timequeue.Message, count) for i := 0; i < count; i++ { - toPush[i] = timequeue.NewMessage(atFunc(), 0, i) + toPush[i] = timequeue.NewMessage(atFunc(), i) } tq.PushAll(toPush...) @@ -171,7 +171,7 @@ func produceMessages(ctx context.Context, tq *timequeue.TimeQueue, grc, mpg int, break loop default: - m := tq.Push(atFunc(), 0, i) + m := tq.Push(atFunc(), i) count++ if rand.Float64() < removeRate { diff --git a/timequeue_example_test.go b/timequeue_example_test.go index 579fde6..579e859 100644 --- a/timequeue_example_test.go +++ b/timequeue_example_test.go @@ -32,7 +32,7 @@ func ExampleTimeQueue() { toPush := make([]*timequeue.Message, count) for i := 0; i < count; i++ { - m := timequeue.NewMessage(now.Add(time.Duration(i)), 0, i+1) + m := timequeue.NewMessage(now.Add(time.Duration(i)), i+1) toPush[i] = m } diff --git a/timequeue_test.go b/timequeue_test.go index 1ef2c20..464ff96 100644 --- a/timequeue_test.go +++ b/timequeue_test.go @@ -65,7 +65,7 @@ func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileR now := time.Now() for i := 0; i < 10; i++ { - tq.Push(now, 0, i) + tq.Push(now, i) } drained := tq.Drain() @@ -80,7 +80,7 @@ func TestTimeQueue_ANewTimeQueueCanHoldCapacityWithoutBlocking_DrainCalledWhileS now := time.Now() for i := 0; i < 10; i++ { - tq.Push(now, 0, i) + tq.Push(now, i) } tq.Stop() @@ -96,7 +96,7 @@ func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t * tq := NewTimeQueue() defer tq.Stop() - m := tq.Push(time.Now().Add(time.Hour), 0, nil) + m := tq.Push(time.Now().Add(time.Hour), nil) if ok := tq.Remove(m); !ok { t.Fatal(ok) @@ -110,7 +110,7 @@ func TestTimeQueue_Remove_CanRemoveAMessageWhileRunningAndNothingGetsDrained(t * func TestTimeQueue_Remove_CanRemoveAMessageWhileStoppedAndNothingGetsDrained(t *testing.T) { tq := NewTimeQueue() - m := tq.Push(time.Now().Add(time.Hour), 0, nil) + m := tq.Push(time.Now().Add(time.Hour), nil) tq.Stop() @@ -130,8 +130,8 @@ func TestTimeQueue_Remove_CallingRemoveWithAMessageFromAnotherQueueReturnsFalse( tq2 := NewTimeQueueCapacity(1) //We need capacity so we don't block. defer tq2.Stop() - tq1.Push(time.Now(), 0, nil) - m2 := tq2.Push(time.Now(), 0, nil) + tq1.Push(time.Now(), nil) + m2 := tq2.Push(time.Now(), nil) if tq1.Remove(m2) { t.Fatal() @@ -151,7 +151,7 @@ func TestTimeQueue_Push_WeCanPushAsManyMessagesAsWeWantAtNowWhileStoppedWithoutC now := time.Now() for i := 0; i < 100; i++ { - tq.Push(now, 0, i) + tq.Push(now, i) } } @@ -165,7 +165,7 @@ func TestTimeQueue_PushAll_WeCanPushAllWithAsManyMessagesAsWeWantAtNowWhileRunni now := time.Now() messages := []*Message{} for i := 0; i < 100; i++ { - messages = append(messages, NewMessage(now, 0, i)) + messages = append(messages, NewMessage(now, i)) } tq.PushAll(messages...) @@ -189,11 +189,11 @@ func TestTimeQueue_PushAll_CorrectlyStopsThenResetsTheTimerWhenWeAddANewHeadWith now := time.Now() tq.PushAll( - NewMessage(now.Add(time.Hour), 0, nil), + NewMessage(now.Add(time.Hour), nil), ) tq.PushAll( - NewMessage(now, 0, nil), + NewMessage(now, nil), ) } @@ -202,7 +202,7 @@ func TestTimeQueue_CanSuccessfullyReceiveMessageFromTimerAfterResumingFromStoppe tq.Stop() - tq.Push(time.Now().Add(time.Second/2), 0, "my message") + tq.Push(time.Now().Add(time.Second/2), "my message") tq.Start()