Skip to content

theboringhumane/cleo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

65 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Cleo ๐Ÿš€

Cleo Logo

Why did the task queue go to therapy? It had too many unresolved promises! ๐Ÿ˜…

A distributed task queue system that's seriously powerful (but doesn't take itself too seriously ๐ŸŽญ).

Cleo Logo

Docs

Features โœจ

  • Enhanced Task Grouping ๐ŸŽฏ - Smart task coordination with multiple processing strategies
  • Intelligent Task Decorators ๐ŸŽ€ - Auto-filtering events and lifecycle management
  • Distributed Locking ๐Ÿ” - No queue jumping allowed!
  • Advanced Retry with Backoff ๐Ÿ”„ - Smart retries with configurable strategies
  • Redis-Backed ๐Ÿ“ฆ - Because memory is fleeting, but Redis is forever
  • TypeScript Support ๐Ÿ’ช - For when any just won't cut it
  • Real-time Event System ๐Ÿ“ก - Keep track of your tasks with detailed events
  • Task History & Analytics ๐Ÿ“Š - Complete visibility into task lifecycles

Core Superpowers ๐Ÿ’ซ

Task Processing ๐ŸŽฏ

  • ๐Ÿš€ Distributed processing with auto load balancing
  • ๐ŸŽญ Smart group task management with multiple strategies
  • ๐Ÿ“Š Enhanced real-time monitoring with filtered events
  • โญ Dynamic priority-based processing
  • โšก Event-driven architecture with detailed task history
  • ๐Ÿ›ก๏ธ Robust error handling and retry mechanisms
  • ๐Ÿ“ˆ Comprehensive performance metrics and analytics

Group Processing Strategies ๐ŸŽฒ

  • ๐Ÿ”„ Round Robin: Fair distribution with last-processed time tracking
  • ๐Ÿ“ FIFO: Strict order processing with complete task history
  • โญ Priority: Dynamic priority adjustment with group statistics
  • ๐ŸŽฏ Smart Processing: Adapts to task patterns and system load

Advanced Features ๐Ÿ”ฌ

  • ๐ŸŽฏ Smart Batching

    • Groups tasks like a pro party planner
    • Optimizes performance like a caffeine-powered compiler
    • Handles bursts better than your morning coffee machine
  • ๐Ÿ“Š Real-time Analytics

    • Success/failure tracking (keeping score)
    • Processing time stats (for the speed demons)
    • Resource usage metrics (watching the diet)
    • Performance insights (big brain time)

Security & Protection ๐Ÿ›ก๏ธ

  • ๐Ÿ” Redis ACL support (because sharing isn't always caring)
  • ๐ŸŽฏ Task-level permissions (not everyone gets a backstage pass)
  • ๐Ÿ“ Audit logging (tracking who did what)
  • ๐Ÿ”‘ Role-based access (VIP list management)

System Architecture ๐Ÿ—๏ธ

(Where all the magic happens โœจ)

graph TB
    Client[๐Ÿ–ฅ๏ธ Client] --> QM[๐Ÿ“Š Queue Manager]
    QM --> Redis[(๐Ÿ’พ Redis)]
    QM --> Worker[๐Ÿ‘ท Worker Pool]
    QM --> Groups[๐Ÿ‘ฅ Task Groups]
    Worker --> Redis
    Groups --> Redis
    
    subgraph "๐ŸŽญ Task Party"
        Groups --> Strategy{๐ŸŽฏ Strategy}
        Strategy --> RR[๐Ÿ”„ Round Robin]
        Strategy --> FIFO[๐Ÿ“ FIFO]
        Strategy --> Priority[โญ Priority]
    end

    subgraph "๐Ÿ’ช Worker Squad"
        Worker --> W1[๐Ÿƒ Worker 1]
        Worker --> W2[๐Ÿƒโ€โ™€๏ธ Worker 2]
        Worker --> W3[๐Ÿƒโ€โ™‚๏ธ Worker 3]
    end
Loading

Task Flow ๐ŸŒŠ

(AKA: The Epic Journey of a Task)

sequenceDiagram
    participant C as ๐Ÿ–ฅ๏ธ Client
    participant QM as ๐Ÿ“Š Queue
    participant G as ๐Ÿ‘ฅ Group
    participant W as ๐Ÿ‘ท Worker
    participant R as ๐Ÿ’พ Redis
    participant E as ๐Ÿ“ก Events

    C->>QM: Submit Task ๐Ÿ“ฌ
    QM->>G: Group Check ๐Ÿ”
    G->>R: Store State ๐Ÿ’พ
    G->>R: Update Processing Order ๐Ÿ”„
    QM->>R: Queue Task โžก๏ธ
    W->>R: Poll Tasks ๐ŸŽฃ
    W->>G: Check Strategy ๐Ÿ“‹
    G-->>E: Emit Status Change ๐Ÿ“ป
    W->>QM: Process โš™๏ธ
    QM-->>E: Emit Task Events ๐Ÿ“ก
    QM->>C: Done! ๐ŸŽ‰

    Note over G,R: Group maintains processing order
    Note over W,QM: Worker respects group strategy
    Note over E: Event system provides real-time updates
Loading

Task Group Processing ๐ŸŽญ

(How tasks play nice together)

graph TB
    Task[๐Ÿ“ฆ Task] --> Group[๐Ÿ‘ฅ Group]
    Group --> Strategy{๐ŸŽฏ Strategy}
    
    Strategy --> RR[๐Ÿ”„ Round Robin]
    Strategy --> FIFO[๐Ÿ“ FIFO]
    Strategy --> Priority[โญ Priority]
    
    RR --> Redis[(๐Ÿ’พ Redis)]
    FIFO --> Redis
    Priority --> Redis
    
    Redis --> History[๐Ÿ“Š Task History]
    Redis --> Stats[๐Ÿ“ˆ Group Stats]
    
    subgraph "๐Ÿ“ก Event System"
        History --> Events[๐Ÿ”” Events]
        Stats --> Events
    end
    
    style Task fill:#f96,stroke:#333
    style Group fill:#9cf,stroke:#333
    style Strategy fill:#f9f,stroke:#333
    style Redis fill:#9f9,stroke:#333
    style Events fill:#ff9,stroke:#333
Loading

Real-World Examples ๐ŸŒ

(Because who doesn't love practical examples?)

Video Processing ๐ŸŽฅ

graph TB
    Upload[๐Ÿ“ค Upload] --> Process[โš™๏ธ Process]
    Process --> Encode[๐ŸŽฌ Encode]
    Encode --> Deliver[๐Ÿš€ Deliver]
    
    style Upload fill:#f9f,stroke:#333
    style Process fill:#bbf,stroke:#333
    style Encode fill:#bfb,stroke:#333
    style Deliver fill:#fbf,stroke:#333
Loading

Installation ๐Ÿ› ๏ธ

npm install @cleo/core
# or if you're yarn-core'd
yarn add @cleo/core

Quick Start ๐Ÿƒโ€โ™‚๏ธ

Examples ๐ŸŽฎ

(Because the best way to learn is by doing!)

Quick Start ๐Ÿš€

import { Cleo } from '@cleo/core';

// Get your Cleo instance (it's like a task-managing pet)
const cleo = Cleo.getInstance();

// Configure it (give it treats and training)
cleo.configure({
  redis: {
    host: "localhost",
    port: 6379,
    password: "cleosecret",
  },
  worker: {
    concurrency: 4,
    queues: [
      {
        name: "send-email",
        priority: TaskPriority.HIGH,
      },
    ],
  },
});

// Monitor your tasks (helicopter parenting, but for code)
const queueManager = cleo.getQueueManager();
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
  console.log(`Task ${taskId} status changed to ${status}`, data);
});

Task Decorators ๐ŸŽ€

import { task } from "@cleo/core";

class EmailService {
  @task({
    id: "send-email",
    priority: TaskPriority.HIGH,
    queue: 'send-email',
    group: 'notifications',
    timeout: 30000,
    maxRetries: 3,
    retryDelay: 3000,
  })
  async sendEmail(input: { email: string }): Promise<string> {
    // Your email sending logic here
    return `Sent to ${input.email}`;
  }
}

// Task decorator automatically:
// - Filters task events by taskId
// - Manages task lifecycle within groups
// - Handles cancellation through AbortSignal
// - Provides automatic cleanup of event listeners

Advanced Group Processing ๐ŸŽญ

import { QueueClass, GroupProcessingStrategy } from "@cleo/core";

// Define a service with group settings
@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    retryDelay: 1000,
    backoff: {
      type: "fixed",
      delay: 2000,
    },
    group: "notifications",
    timeout: 300000,
  },
  queue: "notifications",
})
class NotificationService {
  async sendPushNotification(data: { message: string }) {
    console.log(`๐Ÿ“ฑ Sending push: ${data.message}`);
    return `Notification sent: ${data.message}`;
  }

  async sendSMS(data: { message: string }) {
    console.log(`๐Ÿ“ฒ Sending SMS: ${data.message}`);
    return `SMS sent: ${data.message}`;
  }
}

// Enhanced Group Processing Features
const queueManager = cleo.getQueueManager();

// Round Robin - Fair distribution with last-processed time tracking
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);

// FIFO - Strict order processing with task history
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);

// Priority - Dynamic priority adjustment with group stats
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
await queueManager.setGroupPriority("notifications", 10);

// New: Group Task Event Handling
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
  // Enhanced group event data including:
  // - Task history
  // - Group processing stats
  // - Task completion/failure details
  console.log(`๐Ÿ‘ฅ Group operation for ${taskId}:`, {
    operation: data.operation,
    group: data.group,
    history: data.history
  });
});

Error Handling & Retries ๐Ÿ›Ÿ

// Built-in retry configuration
@QueueClass({
  defaultOptions: {
    maxRetries: 3,
    backoff: {
      type: "fixed",
      delay: 2000,
    },
    retryDelay: 1000,
  }
})
class ReliableService {
  async mightFail() {
    // Will retry 3 times with backoff
    throw new Error("Oops!");
  }
}

// Manual retry with backoff
import { RetryWithBackoff } from "@cleo/core";

const result = await retryWithBackoff(
  async () => {
    return await unreliableOperation();
  },
  3,    // max retries
  1000  // base delay in ms
);

Event Monitoring ๐Ÿ“Š

const queueManager = cleo.getQueueManager();

// Monitor all the things!
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
  console.log(`๐Ÿ’ฌ Task ${taskId} status: ${status}`);
});

queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
  console.log(`๐Ÿ‘ฅ Group operation: ${data.operation}`);
});

queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED, (taskId, status, result) => {
  console.log(`โœ… Task ${taskId} completed:`, result);
});

queueManager.onTaskEvent(ObserverEvent.TASK_FAILED, (taskId, status, error) => {
  console.log(`โŒ Task ${taskId} failed:`, error);
});

Complete Examples ๐Ÿ“š

Check out our example files for full implementations:

Each example comes with:

  • ๐ŸŽฏ Complete setup and configuration
  • ๐Ÿ“Š Event monitoring setup
  • ๐ŸŽญ Different processing strategies
  • ๐Ÿ› ๏ธ Error handling patterns
  • ๐Ÿ“ˆ Performance monitoring

Contributing ๐Ÿค

We welcome contributions! Whether you're fixing bugs ๐Ÿ›, adding features โœจ, or improving docs ๐Ÿ“š, we'd love your help!

Q: How many developers does it take to review a PR? A: None, they're all stuck in an infinite loop of bikeshedding! ๐Ÿ˜„

Check out our Contributing Guidelines for:

  • Code style and standards ๐Ÿ“
  • Development workflow ๐Ÿ”„
  • Project structure ๐Ÿ—๏ธ
  • Pull request process ๐Ÿ”
  • Bug reporting guidelines ๐Ÿž

Key Components ๐Ÿ”ง

Our project is like a well-oiled machine (that occasionally needs coffee):

  • QueueManager ๐Ÿ“Š - The traffic controller of your tasks
  • TaskGroup ๐Ÿ‘ฅ - Because tasks work better in teams
  • Worker ๐Ÿƒ - The real MVP doing all the heavy lifting
  • Utilities ๐Ÿ› ๏ธ - Our Swiss Army knife of helper functions

Performance Features โšก

(Because speed matters!)

graph LR
    A[๐Ÿ“Š Smart Batching] --> B[โšก Fast Processing]
    B --> C[๐ŸŽฏ Optimal Results]
    C --> D[๐ŸŽ‰ Happy Users]
    
    style A fill:#f96,stroke:#333
    style B fill:#9cf,stroke:#333
    style C fill:#9f9,stroke:#333
    style D fill:#f9f,stroke:#333
Loading

License ๐Ÿ“œ

MIT License - see LICENSE file for details

Remember: In a world of callbacks, promises, and async/await, we're all just trying our best to avoid race conditions! ๐Ÿ


Made with โค๏ธ and probably too much caffeine โ˜•

About

A distributed task queue system that's seriously powerful (but doesn't take itself too seriously ๐ŸŽญ).

Resources

Contributing

Stars

Watchers

Forks

Packages

No packages published