Skip to content

simplegear/rate-envelope-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

99 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

rate-envelope-queue

A lightweight, goroutine-safe wrapper around k8s.io/client-go/util/workqueue for managing tasks (envelopes) with a fixed worker pool, periodic scheduling, deadlines, hooks, and stamps (middleware). Adds a safe queue lifecycle (Start/Stop/Start), buffering before first start, and queue capacity limiting.

Under the hood it uses workqueue.TypedRateLimitingInterface. Deduplication happens by pointer to the element: repeated Add of the same pointer while it is in-flight is ignored.


Table of Contents


Features

Goroutine-safe in-memory queue for your service:

  • All public methods are safe to call from multiple goroutines. Send can be called concurrently. Multiple workers are supported via limit. Start/Stop are serialized internally.
  • This is not a distributed queue: there are no guarantees across processes/hosts. Ensure your hooks/invokers are thread-safe around shared state.

What you get:

  • A clear lifecycle FSM: init β†’ running β†’ stopping β†’ stopped
  • Both one-off and periodic tasks
  • Middleware chain via Stamp
  • Hooks: before/after/failure/success
  • Capacity accounting (quota)
  • Graceful or fast stop (Drain / Stop) and restartable queues (Start() after Stop())

Error/Hook semantics:

  • ErrStopEnvelope β€” intentional stop of a specific envelope:
    • the envelope is forgotten, not rescheduled;
    • if raised in beforeHook/invoke, the afterHook still runs (within a time-bounded context); successHook does not run.
  • context.Canceled / context.DeadlineExceeded β€” not a success:
    • envelope is forgotten; periodic ones are rescheduled, one-off ones are not.
  • Any other error:
    • periodic β†’ rescheduled (if queue is alive);
    • one-off β†’ defer to failureHook decision (RetryNow / RetryAfter / Drop).
  • Each hook runs with its own timeout: a fraction frac=0.5 of the envelope's deadline, but at least hardHookLimit (800ms). Hook timeouts are derived from the task context tctx, so hooks never outlive the envelope deadline.

Concurrency controls (brief):

  • stateMu guards the FSM state (RLock read / Lock write)
  • lifecycleMu serializes Start/Stop/queue swap
  • queueMu guards the inner workqueue pointer
  • pendingMu guards the pre-start buffer
  • run is an atomic fast flag for β€œqueue alive”
  • Capacity accounting is atomic via tryReserve/inc/dec/unreserve

Other highlights:

  • Worker pool via WithLimitOption(n)
  • Start/Stop/Start: tasks sent before first start are buffered and flushed on Start()
  • Periodic vs one-off: interval > 0 means periodic; interval == 0 means one-off
  • Deadlines: deadline > 0 bounds invoke time via context.WithTimeout in the worker
  • Stamps: both global (queue-level) and per-envelope (task-level), with predictable execution order
  • Panic safety: panics inside task are handled (Forget+Done) and logged with stack; worker keeps running
  • Prometheus metrics: use client-go workqueue metrics

Installation

go get github.com/PavelAgarkov/rate-envelope-queue

Recommended pins (compatible with this package):

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

Requires: Go 1.24+.


Quick Start

See full examples in examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Capacity scenarios (accounting correctness):

Drain + waiting=true β€” wait for all workers; all dec() happen; no remainder.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true β€” after wg.Wait() we subtract the β€œtail” (cur - pend), the counter converges.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

Unlimited capacity β€” WithAllowedCapacityOption(0) removes admission limits; the currentCapacity metric still reflects actual occupancy.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

Minimal API sketch:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // one-off
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // periodic
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // restart if needed

Concepts & Contracts

Envelope

e, err := NewEnvelope(
    WithId(123), // optional, for logs
    WithType("my_task"), // optional, for logs
    WithScheduleModeInterval(time.Second), // 0 = one-off
    WithDeadline(500*time.Millisecond),    // 0 = no deadline
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // required
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // Drop by default
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Validation:

  • invoke is required; interval >= 0; deadline >= 0
  • For periodic: deadline must not exceed interval β†’ ErrAdditionEnvelopeToQueueBadIntervals

Special error:

  • ErrStopEnvelope β€” gracefully stops this envelope only (no reschedule)

Queue

q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = unlimited
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded on overflow
q.Stop()

Pre-start buffer. In init, Send() pushes envelopes into an internal buffer; on Start() they are flushed into the workqueue.

Stamps (middleware)

type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

Order: global stamps (outer) wrap per-envelope stamps (inner), then Envelope.invoke.

A sample LoggingStamp() is provided for demonstration.


Worker Behavior

Result / condition Queue action
invoke returns nil Forget; if interval > 0 and alive β†’ AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; if periodic and alive β†’ AddAfter(interval)
ErrStopEnvelope Forget; no reschedule
Error on periodic Forget; if alive β†’ AddAfter(interval)
Error on one-off + failureHook Use decision: RetryNow / RetryAfter(d) / Drop
Panic in task Forget + Done + stack log; worker continues

β€œQueue is alive” = run == true, state is running, base context not done, and workqueue not shutting down.


Stop Modes

Waiting \ StopMode Drain (graceful) Stop (fast)
true Wait for workers; ShutDownWithDrain() Wait for workers; ShutDown()
false No wait; ShutDownWithDrain() Immediate stop; ShutDown()

After Stop() you can call Start() again: a fresh inner workqueue will be created.


Capacity Limiting

WithAllowedCapacityOption(cap) limits the total number of in-flight/queued/delayed items (including reschedules).
If the limit would be exceeded, Send() returns ErrAllowedQueueCapacityExceeded.
currentCapacity is updated on add, reschedule, and completion.

  • cap == 0 β†’ unlimited admission; the currentCapacity metric still tracks actual occupancy.
  • Stop + waiting=false + StopMode=Stop β€” documented tail leakage in accounting. Use Drain or waiting=true for accurate capacity convergence.

Benchmarks

Command examples:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Numbers provided by the author (your CPU/env will vary):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

Metrics (Prometheus)

Workqueue metrics are enabled via blank import:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Your queue name (QueueConfig.Name) is included in workqueue metric labels (workqueue_*: adds, depth, work_duration, retries, etc.).


Examples

See the examples/ folder for runnable snippets covering one-off jobs, periodic schedules, combined modes, and dynamic dispatch.


License

MIT β€” see LICENSE.



rate-envelope-queue (Русская вСрсия)

Лёгкая, потокобСзопасная ΠΎΠ±Ρ‘Ρ€Ρ‚ΠΊΠ° Π½Π°Π΄ k8s.io/client-go/util/workqueue для управлСния Π·Π°Π΄Π°Ρ‡Π°ΠΌΠΈ (envelopes) с фиксированным ΠΏΡƒΠ»ΠΎΠΌ Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ², пСриодичСским ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ΠΌ, Π΄Π΅Π΄Π»Π°ΠΉΠ½Π°ΠΌΠΈ, Ρ…ΡƒΠΊΠ°ΠΌΠΈ ΠΈ stamps (middleware). ДобавляСт бСзопасный ΠΆΠΈΠ·Π½Π΅Π½Π½Ρ‹ΠΉ Ρ†ΠΈΠΊΠ» ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ (Start/Stop/Start), Π±ΡƒΡ„Π΅Ρ€ΠΈΠ·Π°Ρ†ΠΈΡŽ Π·Π°Π΄Π°Ρ‡ Π΄ΠΎ старта ΠΈ ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΠ΅ ёмкости ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ.

Π’ основС β€” workqueue.TypedRateLimitingInterface. ДСдупликация происходит ΠΏΠΎ ΡƒΠΊΠ°Π·Π°Ρ‚Π΅Π»ΡŽ Π½Π° элСмСнт: ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹ΠΉ Add Ρ‚ΠΎΠ³ΠΎ ΠΆΠ΅ указатСля ΠΏΠΎΠΊΠ° ΠΎΠ½ Π² ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ΅ β€” игнорируСтся.


Π‘ΠΎΠ΄Π΅Ρ€ΠΆΠ°Π½ΠΈΠ΅


ΠšΠ»ΡŽΡ‡Π΅Π²Ρ‹Π΅ возмоТности

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ±Π΅Π·ΠΎΠΏΠ°ΡΠ½Π°Ρ локальная ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ Π² памяти прилоТСния:

  • ВсС ΠΏΡƒΠ±Π»ΠΈΡ‡Π½Ρ‹Π΅ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ бСзопасны ΠΏΡ€ΠΈ Π²Ρ‹Π·ΠΎΠ²Π°Ρ… ΠΈΠ· Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… Π³ΠΎΡ€ΡƒΡ‚ΠΈΠ½. Send ΠΌΠΎΠΆΠ½ΠΎ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ ΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΡŒΠ½ΠΎ. Π’ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ² ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ нСсколько (limit). Π’Ρ‹Π·ΠΎΠ²Ρ‹ Start/Stop ΡΠ΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΡƒΡŽΡ‚ΡΡ Π²Π½ΡƒΡ‚Ρ€ΠΈ.
  • Π­Ρ‚ΠΎ Π½Π΅ распрСдСлённая ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ: Π³Π°Ρ€Π°Π½Ρ‚ΠΈΠΉ ΠΌΠ΅ΠΆΠ΄Ρƒ Ρ€Π°Π·Π½Ρ‹ΠΌΠΈ процСссами/ΡƒΠ·Π»Π°ΠΌΠΈ Π½Π΅Ρ‚. Код Ρ…ΡƒΠΊΠΎΠ²/ΠΈΠ½Π²ΠΎΠΊΠ΅Ρ€ΠΎΠ² Π΄ΠΎΠ»ΠΆΠ΅Π½ сам ΠΎΠ±Π΅ΡΠΏΠ΅Ρ‡ΠΈΠ²Π°Ρ‚ΡŒ ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ±Π΅Π·ΠΎΠΏΠ°ΡΠ½ΠΎΡΡ‚ΡŒ ΠΏΡ€ΠΈ доступС ΠΊ ΠΎΠ±Ρ‰ΠΈΠΌ рСсурсам.

Π§Ρ‚ΠΎ Π²Π½ΡƒΡ‚Ρ€ΠΈ:

  • ΠŸΡ€ΠΎΠ·Ρ€Π°Ρ‡Π½Ρ‹ΠΉ Π°Π²Ρ‚ΠΎΠΌΠ°Ρ‚ состояний: init β†’ running β†’ stopping β†’ stopped
  • ΠžΠ΄Π½ΠΎΡ€Π°Π·ΠΎΠ²Ρ‹Π΅ ΠΈ пСриодичСскиС Π·Π°Π΄Π°Ρ‡ΠΈ
  • Π¦Π΅ΠΏΠΎΡ‡ΠΊΠ° middleware Ρ‡Π΅Ρ€Π΅Π· Stamp
  • Π₯ΡƒΠΊΠΈ: before/after/failure/success
  • Π£Ρ‡Ρ‘Ρ‚ ёмкости (quota)
  • Мягкий ΠΈΠ»ΠΈ быстрый останов (Drain / Stop) ΠΈ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹ΠΉ старт (Start() послС Stop())

Π‘Π΅ΠΌΠ°Π½Ρ‚ΠΈΠΊΠ° ошибок ΠΈ Ρ…ΡƒΠΊΠΎΠ²:

  • ErrStopEnvelope β€” намСрСнная остановка ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½ΠΎΠ³ΠΎ ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π°:
    • ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚ забываСтся, Π½Π΅ пСрСпланируСтся;
    • Ссли ошибка Π²ΠΎΠ·Π½ΠΈΠΊΠ»Π° Π² beforeHook/invoke, afterHook всё Ρ€Π°Π²Π½ΠΎ вызовСтся (с ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½Π½Ρ‹ΠΌ Π²Ρ€Π΅ΠΌΠ΅Π½Π΅ΠΌ); successHook Π½Π΅ вызываСтся.
  • context.Canceled / context.DeadlineExceeded β€” это Π½Π΅ успСх:
    • ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚ забываСтся; пСриодичСский β€” пСрСпланируСтся, ΠΎΠ΄Π½ΠΎΡ€Π°Π·ΠΎΠ²Ρ‹ΠΉ β€” Π½Π΅Ρ‚.
  • Π›ΡŽΠ±Π°Ρ другая ошибка:
    • пСриодичСская β†’ пСрСпланируСтся (Ссли ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ Β«ΠΆΠΈΠ²Π°Β»);
    • одноразовая β†’ Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅ Ρ‡Π΅Ρ€Π΅Π· failureHook (RetryNow / RetryAfter / Drop).
  • ΠšΠ°ΠΆΠ΄Ρ‹ΠΉ Ρ…ΡƒΠΊ выполняСтся с собствСнным Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚ΠΎΠΌ: доля frac=0.5 ΠΎΡ‚ deadline ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π°, Π½ΠΎ Π½Π΅ мСньшС hardHookLimit (800мс). Π’Π°ΠΉΠΌΠ°ΡƒΡ‚Ρ‹ «висят» Π½Π° tctx, Ρ‚.Π΅. Ρ…ΡƒΠΊΠΈ Π½ΠΈΠΊΠΎΠ³Π΄Π° Π½Π΅ ΠΏΠ΅Ρ€Π΅ΠΆΠΈΠ²ΡƒΡ‚ Π΄Π΅Π΄Π»Π°ΠΉΠ½ ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π°.

ΠŸΠΎΡ‚ΠΎΠΊΠΎΠ±Π΅Π·ΠΎΠΏΠ°ΡΠ½ΠΎΡΡ‚ΡŒ (ΠΊΠΎΡ€ΠΎΡ‚ΠΊΠΎ):

  • stateMu β€” Ρ‡Ρ‚Π΅Π½ΠΈΠ΅/запись состояния
  • lifecycleMu β€” сСриализация Start/Stop/смСны ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ
  • queueMu β€” доступ ΠΊ Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½Π΅ΠΉ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ
  • pendingMu β€” Π±ΡƒΡ„Π΅Ρ€ Π·Π°Π΄Π°Ρ‡ Π΄ΠΎ старта
  • run β€” Π°Ρ‚ΠΎΠΌΠ°Ρ€Π½Ρ‹ΠΉ Ρ„Π»Π°Π³ Β«ΠΆΠΈΠ²Π° Π»ΠΈ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒΒ»
  • Π£Ρ‡Ρ‘Ρ‚ ёмкости β€” Π°Ρ‚ΠΎΠΌΠ°Ρ€Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ tryReserve/inc/dec/unreserve

ΠŸΡ€ΠΎΡ‡Π΅Π΅:

  • ΠŸΡƒΠ» Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ²: WithLimitOption(n)
  • Start/Stop/Start: Π·Π°Π΄Π°Ρ‡ΠΈ, Π΄ΠΎΠ±Π°Π²Π»Π΅Π½Π½Ρ‹Π΅ Π΄ΠΎ ΠΏΠ΅Ρ€Π²ΠΎΠ³ΠΎ Start(), Π±ΡƒΡ„Π΅Ρ€ΠΈΠ·ΡƒΡŽΡ‚ΡΡ ΠΈ ΠΏΠ΅Ρ€Π΅Π»ΠΈΠ²Π°ΡŽΡ‚ΡΡ Π² ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ ΠΏΡ€ΠΈ стартС
  • ΠŸΠ΅Ρ€ΠΈΠΎΠ΄ΠΈΡ‡Π΅ΡΠΊΠΈΠ΅ / ΠΎΠ΄Π½ΠΎΡ€Π°Π·ΠΎΠ²Ρ‹Π΅: interval > 0 β€” пСриодичСская; interval == 0 β€” одноразовая
  • Π”Π΅Π΄Π»Π°ΠΉΠ½Ρ‹: deadline > 0 ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡ΠΈΠ²Π°Π΅Ρ‚ врСмя invoke Ρ‡Π΅Ρ€Π΅Π· context.WithTimeout
  • Stamps: Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΈ Π½Π° ΡƒΡ€ΠΎΠ²Π½Π΅ ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π°, порядок выполнСния прСдсказуСм
  • Π—Π°Ρ‰ΠΈΡ‚Π° ΠΎΡ‚ ΠΏΠ°Π½ΠΈΠΊ: ΠΏΠ°Π½ΠΈΠΊΠ° Π² Π·Π°Π΄Π°Ρ‡Π΅ β†’ Forget+Done ΠΈ Π»ΠΎΠ³ стСка; Π²ΠΎΡ€ΠΊΠ΅Ρ€ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°Π΅Ρ‚ Ρ€Π°Π±ΠΎΡ‚Ρƒ
  • ΠœΠ΅Ρ‚Ρ€ΠΈΠΊΠΈ Prometheus: ΠΈΠ· client-go workqueue

Установка

go get github.com/PavelAgarkov/rate-envelope-queue

Π Π΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡƒΠ΅ΠΌΡ‹Π΅ вСрсии:

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

ВрСбования: Go 1.24+.


Быстрый старт

Π‘ΠΌΠΎΡ‚Ρ€ΠΈΡ‚Π΅ ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³ examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Π‘Ρ†Π΅Π½Π°Ρ€ΠΈΠΈ ёмкости (ΠΊΠΎΡ€Ρ€Π΅ΠΊΡ‚Π½ΠΎΡΡ‚ΡŒ ΡƒΡ‡Ρ‘Ρ‚Π°):

Drain + waiting=true β€” доТидаСмся всСх Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ²; всС dec() ΠΏΡ€ΠΎΡˆΠ»ΠΈ; остатка Π½Π΅Ρ‚.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true β€” послС wg.Wait() снимаСтся «хвост» (cur - pend), счётчик сходится.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

БСзлимитная Ρ‘ΠΌΠΊΠΎΡΡ‚ΡŒ β€” WithAllowedCapacityOption(0) ΡƒΠ±ΠΈΡ€Π°Π΅Ρ‚ ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΠ΅ ΠΏΡ€ΠΈΡ‘ΠΌΠ°; ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊΠ° currentCapacity ΠΎΡ‚Ρ€Π°ΠΆΠ°Π΅Ρ‚ Ρ„Π°ΠΊΡ‚ΠΈΡ‡Π΅ΡΠΊΡƒΡŽ Π·Π°Π½ΡΡ‚ΠΎΡΡ‚ΡŒ.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

ΠœΠΈΠ½ΠΈβ€‘ΠΏΡ€ΠΈΠΌΠ΅Ρ€:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // одноразовая
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // пСриодичСская
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // ΠΏΡ€ΠΈ нСобходимости ΠΌΠΎΠΆΠ½ΠΎ снова ΡΡ‚Π°Ρ€Ρ‚ΠΎΠ²Π°Ρ‚ΡŒ

ΠšΠΎΠ½Ρ†Π΅ΠΏΡ†ΠΈΠΈ ΠΈ ΠΊΠΎΠ½Ρ‚Ρ€Π°ΠΊΡ‚Ρ‹

Envelope

e, err := NewEnvelope(
    WithId(123),                   // ΠΎΠΏΡ†ΠΈΠΎΠ½Π°Π»ΡŒΠ½ΠΎ, для Π»ΠΎΠ³ΠΎΠ²
    WithType("my_task"),           // ΠΎΠΏΡ†ΠΈΠΎΠ½Π°Π»ΡŒΠ½ΠΎ, для Π»ΠΎΠ³ΠΎΠ²
    WithScheduleModeInterval(time.Second), // 0 = одноразовая
    WithDeadline(500*time.Millisecond),    // 0 = Π±Π΅Π· Π΄Π΅Π΄Π»Π°ΠΉΠ½Π°
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // ΠΎΠ±ΡΠ·Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎ
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Drop
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Валидация:

  • invoke обязатСлСн; interval >= 0; deadline >= 0
  • Для пСриодичСских: deadline Π½Π΅ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΏΡ€Π΅Π²Ρ‹ΡˆΠ°Ρ‚ΡŒ interval β†’ ErrAdditionEnvelopeToQueueBadIntervals

Π‘ΠΏΠ΅Ρ†β€‘ΠΎΡˆΠΈΠ±ΠΊΠ°:

  • ErrStopEnvelope β€” ΠΊΠΎΡ€Ρ€Π΅ΠΊΡ‚Π½ΠΎ ΠΏΡ€Π΅ΠΊΡ€Π°Ρ‰Π°Π΅Ρ‚ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ этот ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚ (Π±Π΅Π· пСрСпланирования)

ΠžΡ‡Π΅Ρ€Π΅Π΄ΡŒ

q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = Π±Π΅Π· Π»ΠΈΠΌΠΈΡ‚Π°
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded ΠΏΡ€ΠΈ ΠΏΠ΅Ρ€Π΅ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
q.Stop()

Π‘ΡƒΡ„Π΅Ρ€ Π΄ΠΎ старта. Π’ состоянии init Send() складываСт Π·Π°Π΄Π°Ρ‡ΠΈ Π²ΠΎ Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½ΠΈΠΉ Π±ΡƒΡ„Π΅Ρ€; ΠΏΡ€ΠΈ Start() β€” ΠΎΠ½ΠΈ ΠΏΠ΅Ρ€Π΅Π»ΠΈΠ²Π°ΡŽΡ‚ΡΡ Π² workqueue.

Stamps (middleware)

type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

ΠŸΠΎΡ€ΡΠ΄ΠΎΠΊ: сначала Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½Ρ‹Π΅ stamps (внСшниС), Π·Π°Ρ‚Π΅ΠΌ per‑envelope (Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½ΠΈΠ΅), послС β€” Envelope.invoke.

LoggingStamp() β€” ΠΏΡ€ΠΈΠΌΠ΅Ρ€ для ΠΈΠ»Π»ΡŽΡΡ‚Ρ€Π°Ρ†ΠΈΠΈ (Π½Π΅ «сСрСбряная пуля» для ΠΏΡ€ΠΎΠ΄Π°ΠΊΡˆΠ΅Π½Π°).


ПовСдСниС Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°

Π‘ΠΎΠ±Ρ‹Ρ‚ΠΈΠ΅ / Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ ДСйствиС ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ
invoke Π²Π΅Ρ€Π½ΡƒΠ» nil Forget; Ссли interval > 0 ΠΈ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ Β«ΠΆΠΈΠ²Π°Β» β†’ AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; Ссли пСриодичСская ΠΈ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ Β«ΠΆΠΈΠ²Π°Β» β†’ AddAfter(interval)
ErrStopEnvelope Forget; Π½Π΅ ΠΏΠ΅Ρ€Π΅ΠΏΠ»Π°Π½ΠΈΡ€ΡƒΠ΅ΠΌ
Ошибка Ρƒ пСриодичСской Forget; Ссли ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ Β«ΠΆΠΈΠ²Π°Β» β†’ AddAfter(interval)
Ошибка Ρƒ ΠΎΠ΄Π½ΠΎΡ€Π°Π·ΠΎΠ²ΠΎΠΉ + failureHook РСшСниС ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Ρ: RetryNow / RetryAfter(d) / Drop
Паника Π² Π·Π°Π΄Π°Ρ‡Π΅ Forget + Done + Π»ΠΎΠ³ стСка; Π²ΠΎΡ€ΠΊΠ΅Ρ€ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°Π΅Ρ‚ Ρ€Π°Π±ΠΎΡ‚Ρƒ

Β«ΠžΡ‡Π΅Ρ€Π΅Π΄ΡŒ ΠΆΠΈΠ²Π°Β» = run == true, state == running, Π±Π°Π·ΠΎΠ²Ρ‹ΠΉ контСкст Π½Π΅ Π·Π°Π²Π΅Ρ€ΡˆΡ‘Π½ ΠΈ workqueue Π½Π΅ Π² shutdown.


Π Π΅ΠΆΠΈΠΌΡ‹ остановки

Waiting \ StopMode Drain (мягкая) Stop (Тёсткая)
true Π–Π΄Π°Ρ‚ΡŒ Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ²; ShutDownWithDrain() Π–Π΄Π°Ρ‚ΡŒ Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ²; ShutDown()
false Π‘Π΅Π· оТидания Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ²; ShutDownWithDrain() ΠœΠ³Π½ΠΎΠ²Π΅Π½Π½Ρ‹ΠΉ останов; ShutDown()

ПослС Stop() ΠΌΠΎΠΆΠ½ΠΎ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ Start() ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎ: создаётся Π½ΠΎΠ²Ρ‹ΠΉ Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½ΠΈΠΉ workqueue.


ΠžΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΠ΅ ёмкости

WithAllowedCapacityOption(cap) ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡ΠΈΠ²Π°Π΅Ρ‚ суммарноС число элСмСнтов Π² систСмС (Π²ΠΊΠ»ΡŽΡ‡Π°Ρ ΠΏΠ΅Ρ€Π΅ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Π΅).
ΠŸΡ€ΠΈ ΠΏΠΎΠΏΡ‹Ρ‚ΠΊΠ΅ ΠΏΡ€Π΅Π²Ρ‹ΡˆΠ΅Π½ΠΈΡ Π»ΠΈΠΌΠΈΡ‚Π° Send() Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ErrAllowedQueueCapacityExceeded.
currentCapacity обновляСтся ΠΏΡ€ΠΈ Π΄ΠΎΠ±Π°Π²Π»Π΅Π½ΠΈΠΈ, ΠΏΠ΅Ρ€Π΅ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠΈ ΠΈ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ.

  • cap == 0 β†’ Π±Π΅Π·Π»ΠΈΠΌΠΈΡ‚ ΠΏΠΎ ΠΏΡ€ΠΈΡ‘ΠΌΡƒ; ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊΠ° currentCapacity ΠΎΡ‚Ρ€Π°ΠΆΠ°Π΅Ρ‚ Ρ„Π°ΠΊΡ‚ΠΈΡ‡Π΅ΡΠΊΡƒΡŽ Π·Π°Π½ΡΡ‚ΠΎΡΡ‚ΡŒ.
  • Stop + waiting=false + StopMode=Stop β€” докумСнтированная ΡƒΡ‚Π΅Ρ‡ΠΊΠ° «хвоста» Π² ΡƒΡ‡Ρ‘Ρ‚Π΅. Для Ρ‚ΠΎΡ‡Π½ΠΎΠΉ сходимости ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠΉΡ‚Π΅ Drain ΠΈΠ»ΠΈ waiting=true.

Π‘Π΅Π½Ρ‡ΠΌΠ°Ρ€ΠΊΠΈ

Как Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Π¦ΠΈΡ„Ρ€Ρ‹ Π°Π²Ρ‚ΠΎΡ€Π° (зависят ΠΎΡ‚ CPU/окруТСния):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

ΠœΠ΅Ρ‚Ρ€ΠΈΠΊΠΈ (Prometheus)

ΠœΠ΅Ρ‚Ρ€ΠΈΠΊΠΈ workqueue Π°ΠΊΡ‚ΠΈΠ²ΠΈΡ€ΡƒΡŽΡ‚ΡΡ бланк‑импортом:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Имя ΠΎΡ‡Π΅Ρ€Π΅Π΄ΠΈ (QueueConfig.Name) ΠΏΠΎΠΏΠ°Π΄Π°Π΅Ρ‚ Π² Π»Π΅ΠΉΠ±Π»Ρ‹ ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊ (workqueue_*: adds, depth, work_duration, retries ΠΈ Ρ‚.Π΄.).


ΠŸΡ€ΠΈΠΌΠ΅Ρ€Ρ‹

Π‘ΠΌΠΎΡ‚Ρ€ΠΈΡ‚Π΅ ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³ examples/ β€” Ρ‚Π°ΠΌ Π΅ΡΡ‚ΡŒ Π³ΠΎΡ‚ΠΎΠ²Ρ‹Π΅ Π²Π°Ρ€ΠΈΠ°Π½Ρ‚Ρ‹ для ΠΎΠ΄Π½ΠΎΡ€Π°Π·ΠΎΠ²Ρ‹Ρ… Π·Π°Π΄Π°Ρ‡, пСриодичСских расписаний, ΠΊΠΎΠΌΠ±ΠΈΠ½ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Ρ… сцСнариСв ΠΈ динамичСского диспатча.


ЛицСнзия

MIT β€” см. LICENSE.