11# FastAPI Cloud Tasks
22
3- Strongly typed background tasks with FastAPI and CloudTasks!
4-
5- GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.
6-
7- GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
8-
9- FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
3+ Strongly typed background tasks with FastAPI and Google CloudTasks.
104
115## Installation
126
137```
148pip install fastapi-cloud-tasks
159```
1610
17- ## Concept
11+ ## Key features
12+
13+ - Strongly typed tasks.
14+ - Fail at invocation site to make it easier to develop and debug.
15+ - Breaking schema changes between versions will fail at task runner with Pydantic.
16+ - Familiar and simple public API
17+ - ` .delay ` method that takes same arguments as the task.
18+ - ` .scheduler ` method to create recurring job.
19+ - Tasks are regular FastAPI endpoints on plain old HTTP.
20+ - ` Depends ` just works!
21+ - All middlewares, telemetry, auth, debugging etc solutions for FastAPI work as is.
22+ - Host task runners it independent of GCP. If CloudTasks can reach the URL, it can invoke the task.
23+ - Save money.
24+ - Task invocation with GCP is [ free for first million, then costs $0.4/million] ( https://cloud.google.com/tasks/pricing ) .
25+ That's almost always cheaper than running a RabbitMQ/Redis/SQL backend for celery.
26+ - Jobs cost [ $0.1 per job per month irrespective of invocations. 3 jobs are free.] ( https://cloud.google.com/scheduler#pricing )
27+ Either free or almost always cheaper than always running beat worker.
28+ - If somehow, this cost ever becomes a concern, the ` client ` can be overriden to call any gRPC server with a compatible API.
29+ [ Here's a trivial emulator implementation that we will use locally] ( https://github.com/aertje/cloud-tasks-emulator )
30+ - Autoscale.
31+ - With a FaaS setup, your task workers can autoscale based on load.
32+ - Most FaaS services have free tiers making it much cheaper than running a celery worker.
33+
34+ ## How it works
35+
36+ ### Delayed job
1837
19- [ ` Cloud Tasks ` ] ( https://cloud.google.com/tasks ) allows us to schedule a HTTP request in the future.
38+ ``` python
39+ from fastapi_cloud_tasks import DelayedRouteBuilder
2040
21- [ FastAPI ] ( https://fastapi.tiangolo.com/tutorial/body/ ) makes us define complete schema and params for an HTTP endpoint.
41+ delayed_router = APIRouter( route_class = DelayedRouteBuilder( ... ))
2242
43+ class Recipe (BaseModel ):
44+ ingredients: List[str ]
2345
24- [ ` Cloud Scheduler ` ] ( https://cloud.google.com/scheduler ) allows us to schedule recurring HTTP requests in the future.
46+ @delayed_router.post (" /{restaurant} /make_dinner" )
47+ async def make_dinner (restaurant : str , recipe : Recipe):
48+ # Do a ton of work here.
2549
26- FastAPI Cloud Tasks works by putting the three together:
2750
28- - It adds a ` .delay ` method to existing routes on FastAPI.
29- - When this method is called, it schedules a request with Cloud Tasks.
30- - The task worker is a regular FastAPI server which gets called by Cloud Tasks.
31- - It adds a ` .scheduler ` method to existing routes on FastAPI.
32- - When this method is called, it schedules a recurring job with Cloud Scheduler.
51+ app.include_router(delayed_router)
52+ ```
3353
34- If we host the task worker on Cloud Run, we get autoscaling workers.
54+ Now we can trigger the task with
3555
36- ## Pseudocode
56+ ``` python
57+ make_dinner.delay(restaurant = " Taj" , recipe = Recipe(ingredients = [" Pav" ," Bhaji" ]))
58+ ```
3759
38- In practice, this is what it looks like:
60+ If we want to trigger the task 30 minutes later
3961
4062``` python
41- delayed_router = APIRouter(route_class = DelayedRouteBuilder(... ))
63+ make_dinner.options(countdown = 1800 ).delay(... )
64+ ```
65+
66+ ### Scheduled Task
67+ ``` python
68+ from fastapi_cloud_tasks import ScheduledRouteBuilder
69+
4270scheduled_router = APIRouter(route_class = ScheduledRouteBuilder(... ))
4371
4472class Recipe (BaseModel ):
4573 ingredients: List[str ]
4674
47- @delayed_router.post (" /{restaurant} /make_dinner" )
48- async def make_dinner (restaurant : str , recipe : Recipe):
49- # Do a ton of work here.
50-
5175@scheduled_router.post (" /home_cook" )
5276async def home_cook (recipe : Recipe):
5377 # Make my own food
5478
55- app.include_router(delayed_router)
5679app.include_router(scheduled_router)
5780
58- # If you wan to make your own breakfast every morning at 7AM IST.
81+ # If you want to make your own breakfast every morning at 7AM IST.
5982home_cook.scheduler(name = " test-home-cook-at-7AM-IST" , schedule = " 0 7 * * *" , time_zone = " Asia/Kolkata" ).schedule(recipe = Recipe(ingredients = [" Milk" ," Cereal" ]))
6083```
6184
62- Now we can trigger the task with
85+ ## Concept
6386
64- ``` python
65- make_dinner.delay(restaurant = " Taj" , recipe = Recipe(ingredients = [" Pav" ," Bhaji" ]))
66- ```
87+ [ ` Cloud Tasks ` ] ( https://cloud.google.com/tasks ) allows us to schedule a HTTP request in the future.
88+
89+ [ FastAPI] ( https://fastapi.tiangolo.com/tutorial/body/ ) makes us define complete schema and params for an HTTP endpoint.
90+
91+ [ ` Cloud Scheduler ` ] ( https://cloud.google.com/scheduler ) allows us to schedule recurring HTTP requests in the future.
92+
93+ FastAPI Cloud Tasks works by putting the three together:
94+
95+ - GCP's Cloud Tasks + FastAPI = Partial replacement for celery's async delayed tasks.
96+ - GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
97+ - FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
6798
68- If we want to trigger the task 30 minutes later
6999
70- ``` python
71- make_dinner.options(countdown = 1800 ).delay(... )
72- ```
73100
74101## Running
75102
76103### Local
77104
78105Pre-requisites:
106+ - ` pip install local-requirements.txt `
107+ - Install [ cloud-tasks-emulator] ( https://github.com/aertje/cloud-tasks-emulator )
108+ - Alternatively install ngrok and forward the server's port
79109
80- - Create a task queue and copy the project id, location and queue name.
81- - Install and ensure that ngrok works.
110+ Start running the emulator in a terminal
111+ ``` sh
112+ cloud-tasks-emulator
113+ ```
82114
83- We will need a an API endpoint to give to cloud tasks, so let us fire up ngrok on local
115+ Start running the task runner on port 8000 so that it is accessible from cloud tasks.
84116
85117``` sh
86- ngrok http 8000
118+ uvicorn examples.simple.main:app --reload --port 8000
119+ ```
120+
121+ In another terminal, trigger the task with curl
122+
123+ ```
124+ curl http://localhost:8000/trigger
87125```
88126
89- You'll see something like this
127+ Check the logs on the server, you should see
90128
91129```
92- Forwarding http://feda-49-207-221-153.ngrok.io -> http://localhost:8000
130+ WARNING: Hello task ran with payload: Triggered task
93131```
94132
133+ Important bits of code:
134+
95135``` python
96136# complete file: examples/simple/main.py
97137
98- # First we construct our DelayedRoute class with all relevant settings
138+ # For local, we connect to the emulator client
139+ client = None
140+ if IS_LOCAL :
141+ client = emulator_client()
142+
143+ # Construct our DelayedRoute class with all relevant settings
99144# This can be done once across the entire project
100145DelayedRoute = DelayedRouteBuilder(
101- base_url = " http://feda-49-207-221-153.ngrok.io" ,
146+ client = client,
147+ base_url = " http://localhost:8000"
102148 queue_path = queue_path(
103149 project = " gcp-project-id" ,
104150 location = " asia-south1" ,
105151 queue = " test-queue" ,
106152 ),
107153)
108154
109- delayed_router = APIRouter(route_class = DelayedRoute, prefix = " /tasks" )
155+ # Override the route_class so that we can add .delay method to the endpoints and know their complete URL
156+ delayed_router = APIRouter(route_class = DelayedRoute, prefix = " /delayed" )
110157
111158class Payload (BaseModel ):
112159 message: str
@@ -129,29 +176,11 @@ app.include_router(delayed_router)
129176
130177```
131178
132- Start running the task runner on port 8000 so that it is accessible from cloud tasks.
133-
134- ``` sh
135- uvicorn main:app --reload --port 8000
136- ```
137-
138- In another terminal, trigger the task with curl
139-
140- ```
141- curl http://localhost:8000/trigger
142- ```
143-
144- Check the logs on the server, you should see
145-
146- ```
147- WARNING: Hello task ran with payload: Triggered task
148- ```
149-
150179Note: You can read complete working source code of the above example in [ ` examples/simple/main.py ` ] ( examples/simple/main.py )
151180
152181In the real world you'd have a separate process for task runner and actual task.
153182
154- ### Cloud Run
183+ ### Deployed environment / Cloud Run
155184
156185Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a ` hook ` .
157186
@@ -161,7 +190,6 @@ Pre-requisites:
161190- Deploy the worker as a service on Cloud Run and copy it's URL.
162191- Create a service account in cloud IAM and add ` Cloud Run Invoker ` role to it.
163192
164- We'll only edit the parts from above that we need changed from above example.
165193
166194``` python
167195# URL of the Cloud Run service
@@ -183,6 +211,10 @@ DelayedRoute = DelayedRouteBuilder(
183211
184212Check the fleshed out example at [ ` examples/full/tasks.py ` ] ( examples/full/tasks.py )
185213
214+ If you're not running on CloudRun and want to an OAuth Token instead, you can use the ` oauth_task_hook ` instead.
215+
216+ Check [ fastapi_cloud_tasks/hooks.py] ( fastapi_cloud_tasks/hooks.py ) to get the hang od hooks and how you can use them.
217+
186218## Configuration
187219
188220### DelayedRouteBuilder
@@ -194,7 +226,7 @@ DelayedRoute = DelayedRouteBuilder(...)
194226delayed_router = APIRouter(route_class = DelayedRoute)
195227
196228@delayed_router.get (" /simple_task" )
197- def mySimpleTask ():
229+ def simple_task ():
198230 return {}
199231```
200232
@@ -213,9 +245,9 @@ def mySimpleTask():
213245Usage:
214246
215247``` python
216- @task_router .get (" /simple_task" )
248+ @delayed_router .get (" /simple_task" )
217249@task_default_options (... )
218- def mySimpleTask ():
250+ def simple_task ():
219251 return {}
220252```
221253
@@ -226,13 +258,13 @@ Additional options:
226258- ` countdown ` - Seconds in the future to schedule the task.
227259- ` task_id ` - named task id for deduplication. (One task id will only be queued once.)
228260
229- Eg :
261+ Example :
230262
231263``` python
232264# Trigger after 5 minutes
233- @task_router .get (" /simple_task" )
265+ @delayed_router .get (" /simple_task" )
234266@task_default_options (countdown = 300 )
235- def mySimpleTask ():
267+ def simple_task ():
236268 return {}
237269```
238270
@@ -241,7 +273,7 @@ def mySimpleTask():
241273Usage:
242274
243275``` python
244- mySimpleTask .options(... ).delay()
276+ simple_task .options(... ).delay()
245277```
246278
247279All options from above can be overriden per call (including DelayedRouteBuilder options like ` base_url ` ) with kwargs to the ` options ` function before calling delay.
@@ -250,7 +282,7 @@ Example:
250282
251283``` python
252284# Trigger after 2 minutes
253- mySimpleTask .options(countdown = 120 ).delay()
285+ simple_task .options(countdown = 120 ).delay()
254286```
255287
256288### ScheduledRouteBuilder
@@ -262,11 +294,11 @@ ScheduledRoute = ScheduledRouteBuilder(...)
262294scheduled_router = APIRouter(route_class = ScheduledRoute)
263295
264296@scheduled_router.get (" /simple_scheduled_task" )
265- def mySimpleScheduledTask ():
297+ def simple_scheduled_task ():
266298 return {}
267299
268300
269- mySimpleScheduledTask .scheduler(name = " simple_scheduled_task" , schedule = " * * * * *" ).schedule()
301+ simple_scheduled_task .scheduler(name = " simple_scheduled_task" , schedule = " * * * * *" ).schedule()
270302```
271303
272304
@@ -280,9 +312,24 @@ Some hooks are included in the library.
280312- ` deadline_delayed_hook ` / ` deadline_scheduled_hook ` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
281313- ` chained_hook ` - If you need to chain multiple hooks together, you can do that with ` chained_hook(hook1, hook2) `
282314
283- ## Future work
315+ ## Helper dependencies
316+
317+ ### max_retries
318+
319+ ``` python
320+ @delayed_router.post (" /fail_twice" , dependencies = [Depends(max_retries(2 ))])
321+ async def fail_twice ():
322+ raise Exception (" nooo" )
323+ ```
324+
325+ ### CloudTasksHeaders
326+
327+ ``` python
328+ @delayed_router.get (" /my_task" )
329+ async def my_task (ct_headers : CloudTasksHeaders = Depends()):
330+ print (ct_headers.queue_name)
331+ ```
332+
333+ Check the file [ fastapi_cloud_tasks/dependencies.py] ( fastapi_cloud_tasks/dependencies.py ) for details.
284334
285- - Ensure queue exists.
286- - Make helper features for worker's side. Eg:
287- - Easier access to current retry count.
288- - API Exceptions to make GCP back-off.
335+ Note: This project is neither affiliated with, nor sponsored by Google.
0 commit comments