Quick reference for example files demonstrating SDK features.
# Install
pip install conductor-python httpx
# Configure
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
# Run end-to-end example
python examples/workers_e2e.py| File | Description | Run |
|---|---|---|
| workers_e2e.py | β Start here - sync + async workers | python examples/workers_e2e.py |
| worker_example.py | Comprehensive patterns (None returns, TaskInProgress) | python examples/worker_example.py |
| fastapi_worker_service.py | FastAPI exposing a workflow as an API (+ workers) | uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1 |
| worker_configuration_example.py | Hierarchical configuration (env vars) | python examples/worker_configuration_example.py |
| task_context_example.py | Task context (logs, poll_count, task_id) | python examples/task_context_example.py |
| task_workers.py | Task worker patterns with dataclasses | python examples/task_workers.py |
| pythonic_usage.py | Pythonic API patterns and decorators | python examples/pythonic_usage.py |
Key Concepts:
defβ TaskRunner (ThreadPoolExecutor)async defβ AsyncTaskRunner (pure async/await, single event loop)- One process per worker (automatic selection)
from conductor.client.context.task_context import TaskInProgress
from typing import Union
@worker_task(task_definition_name='batch_job')
def process_batch(batch_id: str) -> Union[dict, TaskInProgress]:
ctx = get_task_context()
if ctx.get_poll_count() < 5:
# More work - extend lease
return TaskInProgress(callback_after_seconds=30)
return {'status': 'completed'}See: task_context_example.py, worker_example.py
| File | Description | Run |
|---|---|---|
| dynamic_workflow.py | Create workflows programmatically | python examples/dynamic_workflow.py |
| workflow_ops.py | Start, pause, resume, terminate workflows | python examples/workflow_ops.py |
| workflow_status_listner.py | Workflow event listeners | python examples/workflow_status_listner.py |
| test_workflows.py | Unit testing workflows | python -m unittest examples.test_workflows |
See agentic_workflows/ for the full set of AI agent examples.
| File | Description | Run |
|---|---|---|
| agentic_workflows/llm_chat.py | Automated multi-turn LLM chat | python examples/agentic_workflows/llm_chat.py |
| agentic_workflows/llm_chat_human_in_loop.py | Interactive chat with WAIT task pauses | python examples/agentic_workflows/llm_chat_human_in_loop.py |
| agentic_workflows/multiagent_chat.py | Multi-agent debate with moderator routing | python examples/agentic_workflows/multiagent_chat.py |
| agentic_workflows/function_calling_example.py | LLM picks Python functions to call | python examples/agentic_workflows/function_calling_example.py |
| agentic_workflows/mcp_weather_agent.py | AI agent with MCP tool calling | python examples/agentic_workflows/mcp_weather_agent.py "What's the weather?" |
| rag_workflow.py | RAG pipeline: markitdown, pgvector, search, answer | python examples/rag_workflow.py file.pdf "question" |
| File | Description | Run |
|---|---|---|
| metrics_example.py | Prometheus metrics (HTTP server on :8000) | python examples/metrics_example.py |
| event_listener_examples.py | Custom event listeners (SLA, logging) | python examples/event_listener_examples.py |
| task_listener_example.py | Task lifecycle listeners | python examples/task_listener_example.py |
Access metrics: curl http://localhost:8000/metrics
| File | Description | Notes |
|---|---|---|
| task_configure.py | Task definitions (retry, timeout, rate limits) | Programmatic task config |
| kitchensink.py | All task types (HTTP, JS, JQ, Switch) | Comprehensive |
| shell_worker.py | Execute shell commands | |
| untrusted_host.py | Self-signed SSL certificates |
Complete working examples demonstrating 100% API coverage for major SDK features.
| File | Description | APIs |
|---|---|---|
| authorization_journey.py | Complete RBAC implementation | 49 APIs |
Scenario: E-commerce platform with departments, teams, and role-based access control.
Features:
- User, group, and application management
- Custom roles with fine-grained permissions
- Resource access control and audit trails
- Automatic cleanup (use
--no-cleanupto keep resources)
python examples/authorization_journey.py| File | Description | APIs |
|---|---|---|
| schedule_journey.py | Complete scheduling system | 15 APIs |
Scenario: E-commerce order processing with scheduled batch workflows.
Features:
- Schedule CRUD operations
- Cron expressions with timezone support
- Pause/resume schedules
- Execution history and monitoring
python examples/schedule_journey.py| File | Description | APIs |
|---|---|---|
| metadata_journey.py | Workflow & task definitions | 21 APIs |
Scenario: Online education platform with complex workflow orchestration.
Features:
- Task and workflow definition management
- Version control and tagging
- Rate limiting and monitoring
- Complex workflow patterns (SWITCH, FORK_JOIN, DECISION)
python examples/metadata_journey.py| File | Description | APIs |
|---|---|---|
| prompt_journey.py | AI/LLM prompt templates | 8 APIs |
Scenario: AI-powered customer service with managed prompt templates.
Features:
- Prompt template CRUD operations
- Multi-language support
- Testing with AI models
- Version management and tagging
python examples/prompt_journey.pyComplete RAG (Retrieval Augmented Generation) pipeline example:
# 1. Install dependencies
pip install conductor-python "markitdown[pdf]"
# 2. Configure (requires Orkes Conductor with AI/LLM support)
# - Vector DB integration named "postgres-prod" (pgvector)
# - LLM provider named "openai" with a valid API key
export CONDUCTOR_SERVER_URL="http://localhost:7001/api"
# 3. Run RAG workflow
python examples/rag_workflow.py examples/goog-20251231.pdf "What were Google's total revenues?"Pipeline: convert_to_markdown β LLM_INDEX_TEXT β WAIT β LLM_SEARCH_INDEX β LLM_CHAT_COMPLETE
Features:
- Document conversion (PDF, Word, Excel β Markdown via markitdown)
- Vector database ingestion into pgvector with OpenAI
text-embedding-3-smallembeddings - Semantic search with configurable result count
- Context-aware answer generation with
gpt-4o-mini
MCP (Model Context Protocol) agent example:
# 1. Install MCP weather server
pip install mcp-weather-server
# 2. Start MCP server
python3 -m mcp_weather_server \
--mode streamable-http \
--host localhost \
--port 3001 \
--stateless
# 3. Run AI agent
export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"
python examples/agentic_workflows/mcp_weather_agent.py "What's the weather in Tokyo?"
# Or simple mode (direct tool call):
python examples/agentic_workflows/mcp_weather_agent.py "Temperature in New York" --simpleFeatures:
- MCP tool discovery
- LLM-based planning (agent decides which tool to use)
- Tool execution via HTTP/Streamable transport
- Natural language response generation
# 1. Basic workers (5 min)
python examples/workers_e2e.py
# 2. Long-running tasks (5 min)
python examples/task_context_example.py
# 3. Configuration (5 min)
python examples/worker_configuration_example.py
# 4. Workflows (10 min)
python examples/dynamic_workflow.py
# 5. AI/LLM Workflows (15 min)
python examples/agentic_workflows/llm_chat.py
python examples/rag_workflow.py examples/goog-20251231.pdf "What were Google's total revenues?"
# 6. Monitoring (5 min)
python examples/metrics_example.py
curl http://localhost:8000/metricsexamples/
βββ Core Workers
β βββ workers_e2e.py # β Start here
β βββ worker_example.py # Comprehensive patterns
β βββ worker_configuration_example.py # Env var configuration
β βββ task_context_example.py # Long-running tasks
β βββ task_workers.py # Dataclass patterns
β βββ pythonic_usage.py # Pythonic decorators
β
βββ Workflows
β βββ dynamic_workflow.py # Workflow creation
β βββ workflow_ops.py # Workflow management
β βββ workflow_status_listner.py # Workflow events
β βββ test_workflows.py # Unit tests
β
βββ AI/LLM Workflows
β βββ rag_workflow.py # RAG pipeline (markitdown + pgvector)
β βββ agentic_workflows/ # Agentic AI examples
β βββ llm_chat.py # Multi-turn LLM chat
β βββ llm_chat_human_in_loop.py # Interactive chat with WAIT
β βββ multiagent_chat.py # Multi-agent debate
β βββ function_calling_example.py # LLM function calling
β βββ mcp_weather_agent.py # MCP tool calling agent
β
βββ Monitoring
β βββ metrics_example.py # Prometheus metrics
β βββ event_listener_examples.py # Custom listeners
β βββ task_listener_example.py # Task events
β
βββ Advanced
β βββ task_configure.py # Task definitions
β βββ kitchensink.py # All features
β βββ shell_worker.py # Shell commands
β βββ untrusted_host.py # SSL handling
β
βββ API Journeys
β βββ authorization_journey.py # β All 49 authorization APIs
β βββ schedule_journey.py # β All 15 schedule APIs
β βββ metadata_journey.py # β All 21 metadata APIs
β βββ prompt_journey.py # β All 8 prompt APIs
β
βββ helloworld/ # Simple examples
β βββ greetings_worker.py
β βββ greetings_workflow.py
β βββ helloworld.py
β
βββ user_example/ # HTTP + dataclass
β βββ models.py
β βββ user_workers.py
β
βββ worker_discovery/ # Auto-discovery
β βββ my_workers/
β βββ other_workers/
β
βββ orkes/ # Orkes-specific features
βββ vector_db_helloworld.py # Vector DB operations
βββ agentic_workflow.py # AI agent (AIOrchestrator)
βββ http_poll.py
βββ sync_updates.py
βββ wait_for_webhook.py
Multiprocess - one process per worker with automatic runner selection:
# Sync worker β TaskRunner (ThreadPoolExecutor)
@worker_task(task_definition_name='cpu_task', thread_count=4)
def cpu_task(data: dict):
return expensive_computation(data)
# Async worker β AsyncTaskRunner (event loop, 67% less memory)
@worker_task(task_definition_name='api_task', thread_count=50)
async def api_task(url: str):
async with httpx.AsyncClient() as client:
return await client.get(url)# Required
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
# Optional - Orkes Cloud
export CONDUCTOR_AUTH_KEY="your-key"
export CONDUCTOR_AUTH_SECRET="your-secret"
# Optional - Worker config
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=250
export conductor.worker.all.thread_count=20Workers not polling?
- Check task names match between workflow and
@worker_task - Verify
CONDUCTOR_SERVER_URLis correct - Check auth credentials
Async workers using threads?
- Use
async def(notdef) - Check logs for "Created AsyncTaskRunner"
High memory?
- Use
async deffor I/O tasks (lower memory) - Reduce worker count or thread_count
- Authorization API - Complete RBAC system (49 APIs)
- Metadata API - Task & workflow definitions (21 APIs)
- Prompt API - AI/LLM prompt templates (8 APIs)
- Schedule API - Workflow scheduling (15 APIs)
- Task Management API - Task operations (11 APIs)
- Workflow API - Workflow operations
- Integration API - AI/LLM provider integrations
- Worker Design - Complete architecture guide
- Worker Configuration - Hierarchical config system
- Python SDK README - SDK overview and installation
Repository: https://github.com/conductor-oss/conductor-python License: Apache 2.0