Skip to content

Commit fb1a6ff

Browse files
authored
Merge pull request #1 from Adori/initial_setup
Initial setup
2 parents 64e64d3 + e624417 commit fb1a6ff

File tree

16 files changed

+691
-2
lines changed

16 files changed

+691
-2
lines changed

README.md

Lines changed: 249 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,249 @@
1-
# gelery
2-
Celery-like task manager built on top of GCP pub/sub and Cloud Run
1+
# FastAPI Cloud Tasks
2+
3+
GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.
4+
5+
FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
6+
7+
## Concept
8+
9+
[`Cloud Tasks`](https://cloud.google.com/tasks) allows us to schedule a HTTP request in the future.
10+
11+
[FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint.
12+
13+
FastAPI Cloud Tasks works by putting the two together:
14+
15+
- It adds a `.delay` method to existing routes on FastAPI.
16+
- When this method is called, it schedules a request with Cloud Tasks.
17+
- The task worker is a regular FastAPI server which gets called by Cloud Tasks.
18+
19+
If we host the task worker on Cloud Run, we get free autoscaling.
20+
21+
## Pseudocode
22+
23+
In practice, this is what it looks like:
24+
25+
```python
26+
router = APIRouter(route_class=TaskRouteBuilder(...))
27+
28+
class Recipe(BaseModel):
29+
ingredients: List[str]
30+
31+
@router.post("/{restaurant}/make_dinner")
32+
async def make_dinner(restaurant: str, recipe: Recipe,):
33+
# Do a ton of work here.
34+
35+
app.include_router(router)
36+
```
37+
38+
Now we can trigger the task with
39+
40+
```python
41+
make_dinner.delay(restaurant="Taj", recipe=Recipe(ingredients=["Pav","Bhaji"]))
42+
```
43+
44+
If we want to trigger the task 30 minutes later
45+
46+
```python
47+
make_dinner.options(countdown=1800).delay(...)
48+
```
49+
50+
## Running
51+
52+
### Local
53+
54+
Pre-requisites:
55+
56+
- Create a task queue and copy the project id, location and queue name.
57+
- Install and ensure that ngrok works.
58+
59+
We will need a an API endpoint to give to cloud tasks, so let us fire up ngrok on local
60+
61+
```sh
62+
ngrok http 8000
63+
```
64+
65+
You'll see something like this
66+
67+
```
68+
Forwarding http://feda-49-207-221-153.ngrok.io -> http://localhost:8000
69+
```
70+
71+
```python
72+
# complete file: examples/simple/main.py
73+
74+
# First we construct our TaskRoute class with all relevant settings
75+
# This can be done once across the entire project
76+
TaskRoute = TaskRouteBuilder(
77+
base_url="http://feda-49-207-221-153.ngrok.io",
78+
queue_path=queue_path(
79+
project="gcp-project-id",
80+
location="asia-south1",
81+
queue="test-queue",
82+
),
83+
)
84+
85+
# Wherever we use
86+
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
87+
88+
class Payload(BaseModel):
89+
message: str
90+
91+
@task_router.post("/hello")
92+
async def hello(p: Payload = Payload(message="Default")):
93+
logger.warning(f"Hello task ran with payload: {p.message}")
94+
95+
96+
# Define our app and add trigger to it.
97+
app = FastAPI()
98+
99+
@app.get("/trigger")
100+
async def trigger():
101+
# Trigger the task
102+
hello.delay(p=Payload(message="Triggered task"))
103+
return {"message": "Hello task triggered"}
104+
105+
app.include_router(task_router)
106+
107+
```
108+
109+
Start running the task runner on port 8000 so that it is accessible from cloud tasks.
110+
111+
```sh
112+
uvicorn main:app --reload --port 8000
113+
```
114+
115+
In another terminal, trigger the task with curl
116+
117+
```
118+
curl http://localhost:8000/trigger
119+
```
120+
121+
Check the logs on the server, you should see
122+
123+
```
124+
WARNING: Hello task ran with payload: Triggered task
125+
```
126+
127+
Note: You can read complete working source code of the above example in [`examples/simple/main.py`](examples/simple/main.py)
128+
129+
In the real world you'd have a separate process for task runner and actual task.
130+
131+
### Cloud Run
132+
133+
Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a `hook`.
134+
135+
Pre-requisites:
136+
137+
- Create a task queue. Copy the project id, location and queue name.
138+
- Deploy the worker as a service on Cloud Run and copy it's URL.
139+
- Create a service account in cloud IAM and add `Cloud Run Invoker` role to it.
140+
141+
We'll only edit the parts from above that we need changed from above example.
142+
143+
```python
144+
# URL of the Cloud Run service
145+
base_url = "https://hello-randomchars-el.a.run.app"
146+
147+
TaskRoute = TaskRouteBuilder(
148+
base_url=base_url,
149+
# Task queue, same as above.
150+
queue_path=queue_path(...),
151+
pre_create_hook=oidc_hook(
152+
token=tasks_v2.OidcToken(
153+
# Service account that you created
154+
service_account_email="fastapi-cloud-tasks@gcp-project-id.iam.gserviceaccount.com",
155+
audience=base_url,
156+
),
157+
),
158+
)
159+
```
160+
161+
Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.py)
162+
163+
## Configuration
164+
165+
### TaskRouteBuilder
166+
167+
Usage:
168+
169+
```python
170+
TaskRoute = TaskRouteBuilder(...)
171+
task_router = APIRouter(route_class=TaskRoute)
172+
173+
@task_router.get("/simple_task")
174+
def mySimpleTask():
175+
return {}
176+
```
177+
178+
- `base_url` - The URL of your worker FastAPI service.
179+
180+
- `queue_path` - Full path of the Cloud Tasks queue. (Hint: use the util function `queue_path`)
181+
182+
- `task_create_timeout` - How long should we wait before giving up on creating cloud task.
183+
184+
- `pre_create_hook` - If you need to edit the `CreateTaskRequest` before sending it to Cloud Tasks (eg: Auth for Cloud Run), you can do that with this hook. See hooks section below for more.
185+
186+
- `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc)
187+
188+
### Task level default options
189+
190+
Usage:
191+
192+
```python
193+
@task_router.get("/simple_task")
194+
@task_default_options(...)
195+
def mySimpleTask():
196+
return {}
197+
```
198+
199+
All options from above can be passed as `kwargs` to the decorator.
200+
201+
Additional options:
202+
203+
- `countdown` - Seconds in the future to schedule the task.
204+
- `task_id` - named task id for deduplication. (One task id will only be queued once.)
205+
206+
Eg:
207+
208+
```python
209+
# Trigger after 5 minutes
210+
@task_router.get("/simple_task")
211+
@task_default_options(countdown=300)
212+
def mySimpleTask():
213+
return {}
214+
```
215+
216+
### Delayer Options
217+
218+
Usage:
219+
220+
```python
221+
mySimpleTask.options(...).delay()
222+
```
223+
224+
All options from above can be overriden per call (including TaskRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
225+
226+
Example:
227+
228+
```python
229+
# Trigger after 2 minutes
230+
mySimpleTask.options(countdown=120).delay()
231+
```
232+
233+
## Hooks
234+
235+
We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that.
236+
237+
Some hooks are included in the library.
238+
239+
- `oidc_hook` - Used to work with Cloud Run.
240+
- `deadline_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
241+
- `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)`
242+
243+
## Future work
244+
245+
- Ensure queue exists.
246+
- Integrate with [Cloud Scheduler](https://cloud.google.com/scheduler/) to replace celery beat.
247+
- Make helper features for worker's side. Eg:
248+
- Easier access to current retry count.
249+
- API Exceptions to make GCP back-off.

examples/full/main.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from uuid import uuid4
2+
3+
from examples.full.serializer import Payload
4+
from examples.full.tasks import hello
5+
from fastapi import FastAPI, Response, status
6+
from google.api_core.exceptions import AlreadyExists
7+
8+
app = FastAPI()
9+
10+
task_id = str(uuid4())
11+
12+
13+
@app.get("/basic")
14+
async def basic():
15+
hello.delay(p=Payload(message="Basic task"))
16+
return {"message": "Basic hello task scheduled"}
17+
18+
19+
@app.get("/delayed")
20+
async def delayed():
21+
hello.options(countdown=5).delay(p=Payload(message="Delayed task"))
22+
return {"message": "Delayed hello task scheduled"}
23+
24+
25+
@app.get("/deduped")
26+
async def deduped(response: Response):
27+
28+
try:
29+
hello.options(task_id=task_id).delay(p=Payload(message="Deduped task"))
30+
return {"message": "Deduped hello task scheduled"}
31+
except AlreadyExists as e:
32+
response.status_code = status.HTTP_409_CONFLICT
33+
return {"error": "Could not schedule task.", "reason": str(e)}

examples/full/serializer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from pydantic import BaseModel
2+
3+
4+
class Payload(BaseModel):
5+
message: str

examples/full/settings.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
3+
from fastapi_cloud_tasks.utils import queue_path
4+
from google.cloud import tasks_v2
5+
6+
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="http://example.com")
7+
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
8+
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
9+
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")
10+
11+
TASK_SERVICE_ACCOUNT = os.getenv("TASK_SERVICE_ACCOUNT", default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com")
12+
13+
TASK_QUEUE_PATH = queue_path(
14+
project=TASK_PROJECT_ID,
15+
location=TASK_LOCATION,
16+
queue=TASK_QUEUE,
17+
)
18+
19+
TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)

examples/full/tasks.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import logging
2+
3+
from examples.full.serializer import Payload
4+
from examples.full.settings import TASK_LISTENER_BASE_URL, TASK_OIDC_TOKEN, TASK_QUEUE_PATH
5+
from fastapi import FastAPI
6+
from fastapi.routing import APIRouter
7+
from fastapi_cloud_tasks.hooks import chained_hook, deadline_hook, oidc_hook
8+
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
9+
from google.protobuf import duration_pb2
10+
11+
app = FastAPI()
12+
13+
14+
logger = logging.getLogger("uvicorn")
15+
16+
TaskRoute = TaskRouteBuilder(
17+
base_url=TASK_LISTENER_BASE_URL,
18+
queue_path=TASK_QUEUE_PATH,
19+
# Chain multiple hooks together
20+
pre_create_hook=chained_hook(
21+
# Add service account for cloud run
22+
oidc_hook(
23+
token=TASK_OIDC_TOKEN,
24+
),
25+
# Wait for half an hour
26+
deadline_hook(duration=duration_pb2.Duration(seconds=1800)),
27+
),
28+
)
29+
30+
router = APIRouter(route_class=TaskRoute, prefix="/tasks")
31+
32+
33+
@router.post("/hello")
34+
async def hello(p: Payload = Payload(message="Default")):
35+
message = f"Hello task ran with payload: {p.message}"
36+
logger.warning(message)
37+
return {"message": message}
38+
39+
40+
app.include_router(router)

examples/simple/main.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
import os
3+
4+
from fastapi import FastAPI
5+
from fastapi.routing import APIRouter
6+
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
7+
from fastapi_cloud_tasks.utils import queue_path
8+
from pydantic import BaseModel
9+
10+
TaskRoute = TaskRouteBuilder(
11+
# Base URL where the task server will get hosted
12+
base_url=os.getenv("TASK_LISTENER_BASE_URL", default="https://6045-49-207-221-153.ngrok.io"),
13+
# Full queue path to which we'll send tasks.
14+
# Edit values below to match your project
15+
queue_path=queue_path(
16+
project=os.getenv("TASK_PROJECT_ID", default="gcp-project-id"),
17+
location=os.getenv("TASK_LOCATION", default="asia-south1"),
18+
queue=os.getenv("TASK_QUEUE", default="test-queue"),
19+
),
20+
)
21+
22+
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
23+
24+
logger = logging.getLogger("uvicorn")
25+
26+
27+
class Payload(BaseModel):
28+
message: str
29+
30+
31+
@task_router.post("/hello")
32+
async def hello(p: Payload = Payload(message="Default")):
33+
logger.warning(f"Hello task ran with payload: {p.message}")
34+
35+
36+
app = FastAPI()
37+
38+
39+
@app.get("/trigger")
40+
async def trigger():
41+
hello.delay(p=Payload(message="Triggered task"))
42+
return {"message": "Basic hello task triggered"}
43+
44+
45+
app.include_router(task_router)

fastapi_cloud_tasks/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)