diff --git a/k8s/fairqueue/dedupe_queue.go b/k8s/fairqueue/dedupe_queue.go new file mode 100644 index 00000000..c5cf76bd --- /dev/null +++ b/k8s/fairqueue/dedupe_queue.go @@ -0,0 +1,94 @@ +package fairqueue + +// This is a non thread-safe version of of the workqueue.Type. It has been liberally copied +// with some functions eliminated. The thread-safe version should be built on top of this implementation. +type dedupingQueue struct { + // queue defines the order in which we will work on items. Every + // element of queue should be in the dirty set and not in the + // processing set. + queue []t + + // dirty defines all of the items that need to be processed. + dirty set + + // Things that are currently being processed are in the processing set. + // These things may be simultaneously in the dirty set. When we finish + // processing something and remove it from this set, we'll check if + // it's in the dirty set, and if so, add it to the queue. + processing set +} + +type empty struct{} +type t interface{} +type set map[t]empty + +func (s set) has(item t) bool { + _, exists := s[item] + return exists +} + +func (s set) insert(item t) { + s[item] = empty{} +} + +func (s set) delete(item t) { + delete(s, item) +} + +// Add marks item as needing processing. +func (q *dedupingQueue) Add(item interface{}) { + if q.dirty.has(item) { + return + } + + q.dirty.insert(item) + if q.processing.has(item) { + return + } + + q.queue = append(q.queue, item) +} + +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular +// value, that can't be synchronized properly. +func (q *dedupingQueue) Len() int { + return len(q.queue) +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *dedupingQueue) Get() (item interface{}) { + if len(q.queue) == 0 { + return nil + } + + item, q.queue = q.queue[0], q.queue[1:] + + q.processing.insert(item) + q.dirty.delete(item) + + return item +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. Returns if the item was re-added for processing. +func (q *dedupingQueue) Done(item interface{}) bool { + if q.processing.has(item) { + q.processing.delete(item) + if q.dirty.has(item) { + q.queue = append(q.queue, item) + return true + } + } + return false +} + +func newDedupingQueue() *dedupingQueue { + return &dedupingQueue{ + dirty: set{}, + processing: set{}, + } +} diff --git a/k8s/fairqueue/dedupe_queue_test.go b/k8s/fairqueue/dedupe_queue_test.go new file mode 100644 index 00000000..41e7ee70 --- /dev/null +++ b/k8s/fairqueue/dedupe_queue_test.go @@ -0,0 +1,98 @@ +package fairqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDedupingQueue(t *testing.T) { + t.Run("get-empty", func(t *testing.T) { + + q := newDedupingQueue() + assert.Equal(t, nil, q.Get()) + }) + + t.Run("add-normal", func(t *testing.T) { + q := newDedupingQueue() + assert.False(t, q.dirty.has("x")) + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + }) + + t.Run("add-when-processing", func(t *testing.T) { + q := newDedupingQueue() + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + assert.Equal(t, "x", q.Get()) + + assert.False(t, q.dirty.has("x")) + assert.True(t, q.processing.has("x")) + assert.Equal(t, 0, len(q.queue)) + assert.Equal(t, 0, q.Len()) + + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.True(t, q.processing.has("x")) + assert.Equal(t, 0, len(q.queue)) + assert.Equal(t, 0, q.Len()) + + q.Done("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + }) + + t.Run("add-when-dirty", func(t *testing.T) { + q := newDedupingQueue() + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + q.Done("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + }) + + t.Run("done-processing-one", func(t *testing.T) { + q := newDedupingQueue() + q.Add("x") + assert.True(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 1, len(q.queue)) + assert.Equal(t, 1, q.Len()) + + assert.Equal(t, "x", q.Get()) + + assert.False(t, q.dirty.has("x")) + assert.True(t, q.processing.has("x")) + assert.Equal(t, 0, len(q.queue)) + assert.Equal(t, 0, q.Len()) + + q.Done("x") + assert.False(t, q.dirty.has("x")) + assert.False(t, q.processing.has("x")) + assert.Equal(t, 0, len(q.queue)) + assert.Equal(t, 0, q.Len()) + + }) +} diff --git a/k8s/fairqueue/doc.go b/k8s/fairqueue/doc.go new file mode 100644 index 00000000..dec8603e --- /dev/null +++ b/k8s/fairqueue/doc.go @@ -0,0 +1,4 @@ +package fairqueue + +// A Queue implementation that provides fairness of work execution across namespaces. Beneficial in guaranteeing that a +// resource in a namespace will not be affected by other noisy namespaces. diff --git a/k8s/fairqueue/indexed_circular_buffer.go b/k8s/fairqueue/indexed_circular_buffer.go new file mode 100644 index 00000000..40a730f7 --- /dev/null +++ b/k8s/fairqueue/indexed_circular_buffer.go @@ -0,0 +1,103 @@ +package fairqueue + +import "container/ring" + +// This is an implementation of a circular buffer using the container/ring package in golang +// This buffer is specialized to allow accessing a specific entry by its key. +// Iterating through the buffer using the Next method is stateful and always returns the sequentially next item in the buffer +type IndexedCircularBuffer struct { + head *ring.Ring + current *ring.Ring + index map[string]*ring.Ring +} + +// Returns sequentially next item in the Circular buffer. This method is stateful +func (c *IndexedCircularBuffer) Next() interface{} { + if c.current.Next() == c.head { + c.current = c.head + } + v := c.current.Next() + c.current = v + return v.Value +} + +// Returns the value in the circular buffer that matches the given Key. If the key is not found, returns a false +func (c *IndexedCircularBuffer) Get(key string) (interface{}, bool) { + v, ok := c.index[key] + if !ok { + return nil, false + } + return v.Value, true +} + +// Returns the value in the circular buffer that matches the given Key and true indicating that it existed. +// If the key is not found, uses the provided function to get a default item, adds it to the buffer and returns it with a false +// indicating that the value was created +func (c *IndexedCircularBuffer) GetOrDefault(key string, defaultItemGetter func() interface{}) (interface{}, bool) { + v, ok := c.index[key] + if !ok { + i := defaultItemGetter() + c.Add(key, i) + return i, false + } + return v.Value, true +} + +// Adds a new Key,item at the last position in the buffer in an order preserving way +func (c *IndexedCircularBuffer) Add(key string, item interface{}) bool { + if _, ok := c.index[key]; ok { + return false + } + r := ring.New(1) + r.Value = item + c.index[key] = r + last := c.head.Prev() + last.Link(r) + r.Link(c.head) + return true +} + +// Returns a length of the circular buffer +func (c *IndexedCircularBuffer) Len() int { + return len(c.index) +} + +// Iterates over all the elements in the circular buffer in the order of insertion +func (c *IndexedCircularBuffer) Range(do func(v interface{}) bool) { + for ptr := c.head.Next(); ptr != c.head; ptr = ptr.Next() { + if do(ptr.Value) == false { + return + } + } +} + +// Checks if the current is pointing to the first element in the buffer +func (c *IndexedCircularBuffer) IsCurrentAtHead() bool { + return c.current == c.head +} + +// Iterates through the buffer order-preserving. Stops iterating when the provided function returns false or +func (c *IndexedCircularBuffer) RangeNext(do func(v interface{}) bool) { + ptr := c.current.Next() + for ; ptr != c.current; ptr = ptr.Next() { + if ptr != c.head { + if do(ptr.Value) == false { + // Move the current to the iterated location + c.current = ptr + return + } + } + } + if ptr != c.head { + _ = do(ptr.Value) + } +} + +func NewIndexedCircularBuffer() *IndexedCircularBuffer { + head := ring.New(1) + return &IndexedCircularBuffer{ + head: head, + current: head, + index: map[string]*ring.Ring{}, + } +} diff --git a/k8s/fairqueue/indexed_circular_buffer_test.go b/k8s/fairqueue/indexed_circular_buffer_test.go new file mode 100644 index 00000000..0e427e89 --- /dev/null +++ b/k8s/fairqueue/indexed_circular_buffer_test.go @@ -0,0 +1,210 @@ +package fairqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexedCircularBuffer_RegularOperations(t *testing.T) { + b := NewIndexedCircularBuffer() + assert.Nil(t, b.Next()) + v, ok := b.Get("x") + assert.False(t, ok) + assert.Nil(t, v) + assert.Equal(t, 0, b.Len()) + + v, ok = b.GetOrDefault("x", func() interface{} { + return "y" + }) + assert.Equal(t, "y", v) + assert.False(t, ok) + + v, ok = b.Get("x") + assert.True(t, ok) + assert.Equal(t, "y", v) + + assert.Equal(t, 1, b.Len()) + + v, ok = b.GetOrDefault("m", func() interface{} { + return "n" + }) + assert.Equal(t, "n", v) + assert.False(t, ok) + + v, ok = b.Get("m") + assert.True(t, ok) + assert.Equal(t, "n", v) + + assert.Equal(t, 2, b.Len()) + + v, ok = b.GetOrDefault("m", func() interface{} { + assert.FailNow(t, "should not be called") + return nil + }) + assert.Equal(t, "n", v) + assert.True(t, ok) + + assert.Equal(t, 2, b.Len()) +} + +func TestIndexedCircularBuffer_TestCircularBuffer(t *testing.T) { + b := NewIndexedCircularBuffer() + assert.Nil(t, b.Next()) + + _, ok := b.GetOrDefault("x", func() interface{} { + return "x" + }) + assert.False(t, ok) + + _, ok = b.GetOrDefault("y", func() interface{} { + return "y" + }) + assert.False(t, ok) + + _, ok = b.GetOrDefault("z", func() interface{} { + return "z" + }) + assert.False(t, ok) + + assert.Equal(t, "x", b.Next()) + assert.Equal(t, "y", b.Next()) + assert.Equal(t, "z", b.Next()) + _, ok = b.GetOrDefault("m", func() interface{} { + return "m" + }) + assert.False(t, ok) + assert.Equal(t, "m", b.Next()) + _, ok = b.GetOrDefault("n", func() interface{} { + return "n" + }) + assert.False(t, ok) + assert.Equal(t, "n", b.Next()) + assert.Equal(t, "x", b.Next()) + assert.Equal(t, "y", b.Next()) + assert.Equal(t, "z", b.Next()) +} + +func TestIndexedCircularBuffer_Range(t *testing.T) { + b := NewIndexedCircularBuffer() + assert.Nil(t, b.Next()) + b.Range(func(v interface{}) bool { + assert.FailNow(t, "should not be invoked for an empty buffer") + return false + }) + + assert.True(t, b.Add("x", "x")) + assert.False(t, b.Add("x", "m")) + assert.True(t, b.Add("y", "y")) + assert.True(t, b.Add("z", "z")) + + t.Run("complete-iteration", func(t *testing.T) { + arr := make([]string, 0, 3) + b.Range(func(v interface{}) bool { + s, _ := v.(string) + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"x", "y", "z"}, arr) + }) + + t.Run("partial-iteration", func(t *testing.T) { + arr := make([]string, 0, 1) + b.Range(func(v interface{}) bool { + s, _ := v.(string) + if s == "y" { + return false + } + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"x"}, arr) + }) +} + +func TestIndexedCircularBuffer_RangeNext(t *testing.T) { + b := NewIndexedCircularBuffer() + assert.Nil(t, b.Next()) + b.Range(func(v interface{}) bool { + assert.FailNow(t, "should not be invoked for an empty buffer") + return false + }) + + assert.True(t, b.Add("x", "x")) + assert.False(t, b.Add("x", "m")) + assert.True(t, b.Add("y", "y")) + assert.True(t, b.Add("z", "z")) + + t.Run("complete-iteration", func(t *testing.T) { + arr := make([]string, 0, 3) + b.RangeNext(func(v interface{}) bool { + s, _ := v.(string) + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"x", "y", "z"}, arr) + }) + + t.Run("partial-iteration", func(t *testing.T) { + arr := make([]string, 0, 1) + b.RangeNext(func(v interface{}) bool { + s, _ := v.(string) + if s == "y" { + return false + } + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"x"}, arr) + }) +} + +func TestIndexedCircularBuffer_IsCurrentAtHead(t *testing.T) { + b := NewIndexedCircularBuffer() + assert.Nil(t, b.Next()) + assert.True(t, b.IsCurrentAtHead()) + + assert.True(t, b.Add("x", "x")) + assert.Equal(t, "x", b.Next()) + assert.False(t, b.IsCurrentAtHead()) + + assert.True(t, b.Add("y", "y")) + assert.Equal(t, "y", b.Next()) + assert.False(t, b.IsCurrentAtHead()) + + assert.True(t, b.Add("z", "z")) + assert.Equal(t, "z", b.Next()) + assert.False(t, b.IsCurrentAtHead()) + + { + arr := make([]string, 0, 3) + b.RangeNext(func(v interface{}) bool { + s, _ := v.(string) + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"x", "y", "z"}, arr) + } + + { + b.Next() + arr := make([]string, 0, 3) + b.RangeNext(func(v interface{}) bool { + s, _ := v.(string) + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"y", "z", "x"}, arr) + } + + { + b.Next() + arr := make([]string, 0, 3) + b.RangeNext(func(v interface{}) bool { + s, _ := v.(string) + arr = append(arr, s) + return true + }) + assert.Equal(t, []string{"z", "x", "y"}, arr) + } +} diff --git a/k8s/fairqueue/queue.go b/k8s/fairqueue/queue.go new file mode 100644 index 00000000..c229ef10 --- /dev/null +++ b/k8s/fairqueue/queue.go @@ -0,0 +1,160 @@ +package fairqueue + +import ( + "context" + "sync" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/lyft/flytestdlib/logger" +) + +// Implementation of a FairQueue per Namespace. +// The idea behind this queue is that in the case of asymmetrical load across namespaces it penalizes the +// namespace that has larger load, but tries to be fair across namespaces. +// In case a system is modeled where every tenant has a separate namespace, each tenant is promised an execution +// guarantee which is proportional to the number of workers available and does not rely on the length of the queue +// contributed by other tenants. +// The queue also implements workqueue.Interface +type PerNamespaceFairQueue struct { + perNamespaceQueue *IndexedCircularBuffer + cond *sync.Cond + shuttingDown bool +} + +// Adds an item to the queue for the namespace where the item belongs. The method uses ache.SplitMetaNamespaceKey +// to identify the namespace of the item. Default or no qualified namespace gets added to a common queue signified by +// an empty namespace +func (m *PerNamespaceFairQueue) Add(item interface{}) { + + key, ok := item.(string) + if !ok { + logger.Error(context.TODO(), "failed to add item to the workQueue, item type is not string.") + return + } + // Convert the namespace/name string into a distinct namespace and name + namespace, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logger.Error(context.TODO(), "failed to add item to the workQueue, item incorrectly formatted. Error: %s", err.Error()) + return + } + tenantID := namespace + + m.cond.L.Lock() + defer m.cond.L.Unlock() + qObj, _ := m.perNamespaceQueue.GetOrDefault(tenantID, func() interface{} { + return newDedupingQueue() + }) + + if q, ok := qObj.(*dedupingQueue); !ok { + logger.Fatal(context.TODO(), "incorrect queue type for tenant ID [%s]", tenantID) + } else { + q.Add(item) + } + m.cond.Signal() +} + +// Provides a length of all the available items across all queues +func (m *PerNamespaceFairQueue) Len() int { + m.cond.L.Lock() + defer m.cond.L.Unlock() + length := 0 + m.perNamespaceQueue.Range(func(v interface{}) bool { + q, ok := v.(*dedupingQueue) + if !ok { + return false + } + length += q.Len() + return true + }) + return length +} + +// Retrieves an item from the next logical namespace in a circular buffer if available. It iterates through all the +// namespaces in this order, until it reaches the current position. Once back at the current position, this call `Get` +// blocks for a new item to be inserted in any one of the namespaces. +func (m *PerNamespaceFairQueue) Get() (item interface{}, shutdown bool) { + m.cond.L.Lock() + defer m.cond.L.Unlock() + for { + if m.shuttingDown { + return nil, m.shuttingDown + } + var item interface{} + m.perNamespaceQueue.RangeNext(func(v interface{}) bool { + q, ok := v.(*dedupingQueue) + if !ok { + logger.Errorf(context.TODO(), "found a non conforming type in queue, skipping") + } + if i := q.Get(); i != nil { + item = i + return false + } + return true + }) + if item != nil { + return item, m.shuttingDown + } + m.cond.Wait() + } + +} + +// Removes the item from the queue permanently +func (m *PerNamespaceFairQueue) Done(item interface{}) { + + //m.metrics.done(item) + + key, ok := item.(string) + if !ok { + logger.Error(context.TODO(), "failed to add item to the workQueue, item type is not string.") + return + } + // Convert the namespace/name string into a distinct namespace and name + namespace, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logger.Error(context.TODO(), "failed to add item to the workQueue, item incorrectly formatted. Error: %s", err.Error()) + return + } + tenantID := namespace + + m.cond.L.Lock() + defer m.cond.L.Unlock() + v, ok := m.perNamespaceQueue.Get(tenantID) + if !ok { + return + } + q, ok := v.(*dedupingQueue) + if !ok { + return + } + if q.Done(item) == true { + m.cond.Signal() + } +} + +// ShutDown will cause q to ignore all new items added to it. As soon as the +// worker goroutines have drained the existing items in the queue, they will be +// instructed to exit. +func (m *PerNamespaceFairQueue) ShutDown() { + m.cond.L.Lock() + defer m.cond.L.Unlock() + m.shuttingDown = true + m.cond.Broadcast() +} + +// Returns a boolean that indicates if a shutdown has been initiated for the queue. +func (m *PerNamespaceFairQueue) ShuttingDown() bool { + m.cond.L.Lock() + defer m.cond.L.Unlock() + + return m.shuttingDown +} + +func New() workqueue.Interface { + return &PerNamespaceFairQueue{ + perNamespaceQueue: NewIndexedCircularBuffer(), + cond: sync.NewCond(&sync.Mutex{}), + } +} diff --git a/k8s/fairqueue/queue_test.go b/k8s/fairqueue/queue_test.go new file mode 100644 index 00000000..62654d54 --- /dev/null +++ b/k8s/fairqueue/queue_test.go @@ -0,0 +1,183 @@ +package fairqueue + +import ( + "fmt" + "strconv" + "sync" + "testing" + "time" + + "k8s.io/client-go/util/workqueue" +) + +func TestBasic(t *testing.T) { + // If something is seriously wrong this test will never complete. + q := New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + q.Add(fmt.Sprintf("%s/%s", strconv.Itoa(i), strconv.Itoa(i))) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := q.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + q.Add("added after shutdown!") + consumerWG.Wait() +} + +func TestAddWhileProcessing(t *testing.T) { + q := New() + + // Start producers + const producers = 10 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + q.Add(fmt.Sprintf("%s/%s", strconv.Itoa(i), strconv.Itoa(i))) + }(i) + } + + // Start consumers + const consumers = 2 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := q.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + q.Add(item) + } + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + consumerWG.Wait() +} + +func TestLen(t *testing.T) { + q := New() + q.Add("foo") + if e, a := 1, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("bar") + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("foo") // should not increase the queue length. + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestReinsert(t *testing.T) { + q := New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add it back while processing + q.Add(i) + + // Finish it up + q.Done(i) + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Finish that one up + q.Done(i) + + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} + +func benchmarkQueueAddRepeat(b *testing.B, q workqueue.Interface) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + q.Add("foo") + } +} + +func BenchmarkPerNamespaceFairQueue_Add_Repeat(b *testing.B) { benchmarkQueueAddRepeat(b, New()) } +func BenchmarkWorkQueue_Add_Repeat(b *testing.B) { benchmarkQueueAddRepeat(b, workqueue.New()) } + +func benchmarkQueueAddDistinct(b *testing.B, q workqueue.Interface) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + q.Add(strconv.Itoa(i)) + } +} + +func BenchmarkPerNamespaceFairQueue_Add_Distinct(b *testing.B) { benchmarkQueueAddDistinct(b, New()) } +func BenchmarkWorkQueue_Add_Distinct(b *testing.B) { benchmarkQueueAddDistinct(b, workqueue.New()) } + +func benchmarkQueueGet(b *testing.B, q workqueue.Interface) { + b.StopTimer() + for i := 0; i < b.N; i++ { + q.Add(strconv.Itoa(i)) + } + + b.ReportAllocs() + b.StartTimer() + for i := 0; i < b.N; i++ { + _, _ = q.Get() + } +} + +func BenchmarkPerNamespaceFairQueue_Get(b *testing.B) { benchmarkQueueGet(b, New()) } +func BenchmarkWorkQueue_Get(b *testing.B) { benchmarkQueueGet(b, workqueue.New()) } diff --git a/k8s/loadtest/main.go b/k8s/loadtest/main.go new file mode 100644 index 00000000..f0cf959e --- /dev/null +++ b/k8s/loadtest/main.go @@ -0,0 +1,125 @@ +package loadtest + +import ( + "fmt" + "sync" + "time" + + "github.com/lyft/flytepropeller/pkg/controller/fairqueue" + "k8s.io/client-go/util/workqueue" +) + +type workCounterMap struct { + *sync.Map + cycles int + + l sync.Mutex +} + +func (w *workCounterMap) allDone() bool { + w.l.Lock() + defer w.l.Unlock() + allDone := true + w.Map.Range(func(key, value interface{}) bool { + v := value.(int) + if v < w.cycles { + allDone = false + return false + } + return true + }) + return allDone +} + +func newWorkCounterMap(v map[string]time.Duration, cycles int) *workCounterMap { + w := &workCounterMap{ + cycles: cycles, + Map: &sync.Map{}, + } + for k := range v { + w.Store(k, 0) + } + return w +} + +func loadTest(tenantLoads map[string]time.Duration, q workqueue.Interface, cycles int) { + + w := newWorkCounterMap(tenantLoads, cycles) + + wg := sync.WaitGroup{} + + doWork := func() { + defer wg.Done() + for { + i, ok := q.Get() + if ok { + return + } + d, _ := tenantLoads[i.(string)] + time.Sleep(d) + q.Done(i) + v, _ := w.Load(i) + iv := v.(int) + w.Store(i, iv+1) + if w.allDone() { + q.ShutDown() + return + } + if iv < cycles { + q.Add(i) + } + } + } + + wg.Add(3) + for i := 0; i < 3; i++ { + go doWork() + } + + for k := range tenantLoads { + q.Add(k) + } + + wg.Wait() +} + +func main() { + + rounds := 10 + cycles := 10 + tenantLoads := map[string]time.Duration{ + "ns1/item1": time.Millisecond * 200, + "ns1/item2": time.Millisecond * 100, + "ns1/item3": time.Millisecond * 50, + "ns2/item1": time.Millisecond * 5, + "ns3/item1": time.Millisecond * 100, + "ns3/item2": time.Millisecond * 50, + "ns4/item1": time.Millisecond * 5, + "ns5/item1": time.Millisecond * 5, + } + + avgTime := time.Duration(0) + for i := 0; i < rounds; i++ { + s := time.Now() + loadTest(tenantLoads, fairqueue.New(), cycles) + d := time.Now().Sub(s) + fmt.Printf("FairQ, Asymmetric Load Round %d for %d cycles took: %f seconds\n", i, cycles, d.Seconds()) + avgTime += d + } + fairQTime := avgTime.Seconds() / float64(rounds) + fmt.Printf("======> FairQ, Asymmetric Load for %d cycles took: %f seconds\n", cycles, fairQTime) + + avgTime = time.Duration(0) + for i := 0; i < rounds; i++ { + s := time.Now() + loadTest(tenantLoads, workqueue.New(), cycles) + d := time.Now().Sub(s) + fmt.Printf("WorkQ, Asymmetric Load Round %d for %d cycles took: %f seconds\n", i, cycles, d.Seconds()) + avgTime += d + } + workQTime := avgTime.Seconds() / float64(rounds) + fmt.Printf("======> WorkQ, Asymmetric Load for %d cycles took: %f seconds\n", cycles, workQTime) + + fmt.Printf("Comparison: FairQ: %f WorkQ: %f. Speedup [%f]", fairQTime, workQTime, (workQTime-fairQTime)/workQTime*100.0) + +}