-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrobot.py
More file actions
69 lines (50 loc) · 2.2 KB
/
robot.py
File metadata and controls
69 lines (50 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from typing import Optional, List
from .client import Client
class Robot:
def __init__(self, client: Client, robot_data: dict):
self.client = client
self.robot_data = robot_data
@property
def id(self) -> str:
return self.robot_data["recording_meta"]["id"]
@property
def name(self) -> str:
return self.robot_data["recording_meta"]["name"]
def get_data(self) -> dict:
return self.robot_data
async def run(self, options: Optional[dict] = None):
return await self.client.execute_robot(self.id, options)
async def get_runs(self) -> list:
return await self.client.get_runs(self.id)
async def get_run(self, run_id: str) -> dict:
return await self.client.get_run(self.id, run_id)
async def get_latest_run(self) -> Optional[dict]:
runs = await self.get_runs()
if not runs:
return None
return sorted(runs, key=lambda r: r.get("startedAt", ""), reverse=True)[0]
async def abort(self, run_id: str) -> None:
await self.client.abort_run(self.id, run_id)
async def schedule(self, config: dict) -> None:
updated = await self.client.schedule_robot(self.id, config)
self.robot_data = updated
async def unschedule(self) -> None:
updated = await self.client.unschedule_robot(self.id)
self.robot_data = updated
async def add_webhook(self, webhook: dict) -> None:
updated = await self.client.add_webhook(self.id, webhook)
self.robot_data = updated
def get_webhooks(self) -> Optional[list]:
return self.robot_data.get("webhooks") or None
async def remove_webhooks(self) -> None:
updated = await self.client.update_robot(self.id, {"webhooks": None})
self.robot_data = updated
def get_schedule(self) -> Optional[dict]:
return self.robot_data.get("schedule") or None
async def update(self, updates: dict) -> None:
updated = await self.client.update_robot(self.id, updates)
self.robot_data = updated
async def delete(self) -> None:
await self.client.delete_robot(self.id)
async def refresh(self) -> None:
self.robot_data = await self.client.get_robot(self.id)