-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
50 lines (39 loc) · 1.54 KB
/
server.py
File metadata and controls
50 lines (39 loc) · 1.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from client.rq_client import queue
from contextlib import asynccontextmanager
from queues.graph import EmployeeRecruiterAgent, run_agent_workflow
class JobSpec(BaseModel):
title: str
description: str
required_skills: list[str]
class CandidateInfo(BaseModel):
name: str
email: str
phone: str
resume_url: str
agent: EmployeeRecruiterAgent | None
@asynccontextmanager
async def lifespan(app: FastAPI):
global agent
agent = EmployeeRecruiterAgent()
yield
if agent is not None and hasattr(agent, "close"):
agent.close()
app = FastAPI(title="Automated Recruitment Pipeline", lifespan=lifespan)
@app.post("/execute_workflow")
async def execute_workflow(candidate_info: CandidateInfo, thread_id: str, job_spec: JobSpec):
return agent.invoke_agent(thread_id=thread_id, resume_url=candidate_info.resume_url, job_desc=job_spec.description)
@app.post("/rq/workflow")
async def rq_workflow(candidate_info: CandidateInfo, thread_id: str, job_spec: JobSpec):
job = queue.enqueue(run_agent_workflow, thread_id=thread_id, resume_url=candidate_info.resume_url, job_desc=job_spec.description)
return {"status": "ok", "job_id": job.id}
@app.get("/rq")
async def rq_get_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
return {"status": job.get_status(), "result" : job.return_value()}
@app.get("/health")
def health_check():
return {"status": "ok"}