A lightweight, async background job runner framework for aiohttp applications. Execute long-running tasks in the background without blocking your HTTP responses.
- 🚀 Async-first: Built on asyncio for efficient concurrent task execution
- 🔄 Background Job Processing: Run time-consuming tasks without blocking HTTP responses
- 🎯 Queue Management: Built-in job queue with worker pool support
- ⏱️ Delayed Jobs: Schedule jobs to be re-executed after a delay
- đź”— Job Chaining: Chain jobs across different services
- 🛡️ Error Handling: Comprehensive error handling with custom error handlers
- 🎛️ Configurable Workers: Support for multiple concurrent workers
- đź”’ Throttling: Built-in throttling to control job execution rate
- 🔄 Job Lifecycle Hooks: Monitor jobs with
on_job_started,on_job_finished, andon_job_failedcallbacks - ⚙️ Flexible Configuration: Timeout controls, concurrent execution, and more
pip install some-aiohttp-serviceOr with Poetry:
poetry add some-aiohttp-serviceCreate a service by inheriting from BaseService and implementing the required methods:
import asyncio
from some_aiohttp_service import BaseService
async def some_long_calculation(a, b):
await asyncio.sleep(5)
return f"Done with {a}/{b}"
class TestService(BaseService):
name = "test"
@staticmethod
async def work(job):
return await some_long_calculation(**job.data)
async def error_handler(self, job, error):
print(f"Error processing job {job.id}: {error}")
async def result_handler(self, job, result):
print(f"Job {job.id} completed: {result}")Integrate your service with an aiohttp application:
from aiohttp.web import Application, get, run_app
from aiohttp.web_exceptions import HTTPOk, HTTPAccepted
async def health(_):
raise HTTPOk
async def hello(request):
a = request.match_info["a"]
b = request.match_info["b"]
# Submit work to background service
await request.app["test"].commit_work({"a": a, "b": b})
raise HTTPAccepted
app = Application()
app.add_routes([get("/work/{a}/{b}", hello), get("/health", health)])
# Register the service
app.cleanup_ctx.append(TestService(app).init)
run_app(app)When initializing a service, you can configure several parameters:
TestService(
app,
worker_count=3, # Number of concurrent workers (default: 1)
overall_timeout=3000, # Job timeout in seconds (default: 3000)
throttle=0.5 # Delay between jobs in seconds (default: None)
)Configure your service behavior with class attributes:
class MyService(BaseService):
name = "my_service"
# Enable concurrent job execution
CONCURRENT_WORK = True
# Maximum number of times a delayed job can be rescheduled
MAX_JOB_REPETITIONS = 10
# Delay before rescheduling periodic jobs (in seconds)
DELAY_PERIODIC_JOB_RESCHEDULE = 3Return a dictionary with a "delayed" key to reschedule a job:
class RetryService(BaseService):
name = "retry"
@staticmethod
async def work(job):
try:
result = await some_operation()
return result
except TemporaryError:
# Reschedule this job
return {"delayed": True}For infinite retries:
return {"delayed": True, "repeat_forever": True}Chain jobs across different services by returning a dictionary with "queue" and "data":
class ProcessorService(BaseService):
name = "processor"
@staticmethod
async def work(job):
processed_data = process(job.data)
# Send result to another service
return {
"queue": "notifier",
"data": {"message": processed_data}
}Override lifecycle methods to monitor job execution:
class MonitoredService(BaseService):
name = "monitored"
async def on_job_started(self, job):
"""Called when a job starts execution"""
print(f"Starting job {job.id}")
async def on_job_finished(self, job, result):
"""Called when a job completes successfully"""
print(f"Job {job.id} finished with result: {result}")
async def on_job_failed(self, job, reason):
"""Called when a job fails"""
print(f"Job {job.id} failed: {reason}")
async def prepare_job(self, job):
"""Called before job execution (for setup)"""
job.config["start_time"] = time.time()Implement startup and cleanup hooks for resource management:
class DatabaseService(BaseService):
name = "database"
async def startup(self, app):
"""Called during service initialization"""
self.db_pool = await create_db_pool()
print("Database pool initialized")
async def cleanup(self, app):
"""Called during service shutdown"""
await self.db_pool.close()
print("Database pool closed")
@staticmethod
async def work(job):
# Use self.db_pool here
passThe commit_work method returns a future that resolves when the job completes:
async def my_handler(request):
# Wait for job completion
result = await request.app["test"].commit_work({"data": "value"})
# Or submit without waiting
future = await request.app["test"].commit_work({"data": "value"})
# Do other work...
result = await futureThe main class to inherit from when creating a service.
async def work(job): The main work function that processes each jobasync def error_handler(job, error): Handle errors that occur during job executionasync def result_handler(job, result): Process successful job results
name(str): Unique identifier for the serviceCONCURRENT_WORK(bool): Enable/disable concurrent job execution (default: False)MAX_JOB_REPETITIONS(int): Maximum retries for delayed jobs (default: 10)DELAY_PERIODIC_JOB_RESCHEDULE(int): Delay for job rescheduling in seconds (default: 3)
async def commit_work(data: dict): Submit a job to the service queueasync def on_job_started(job): Hook called when job starts (optional)async def on_job_finished(job, result): Hook called on successful completion (optional)async def on_job_failed(job, reason): Hook called on job failure (optional)async def prepare_job(job): Hook called before job execution (optional)async def startup(app): Hook called during service initialization (optional)async def cleanup(app): Hook called during service shutdown (optional)
Represents a unit of work to be processed.
id(int): Unique job identifiertype(JobType): Job type (NORMAL or TERMINATE)data(dict): Job payload dataconfig(dict): Job configurationrepetitions(int): Number of times job has been rescheduled
# Clone the repository
git clone https://github.com/tommmlij/some-aiohttp-service.git
cd some-aiohttp-service
# Install dependencies
poetry installpoetry run pytest# Type checking
poetry run mypy src/
# Linting
poetry run flake8 src/
# Formatting
poetry run black src/
poetry run isort src/MIT License - see LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.