-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcoordinator_client.py
More file actions
56 lines (50 loc) · 2 KB
/
coordinator_client.py
File metadata and controls
56 lines (50 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
import json
import requests
import boto3
import os
from uuid import uuid4
from botocore.exceptions import ClientError
from loguru import logger
class LocalCoordinatorClient:
def __init__(self,
working_directory: str,
coordinator_url: str
) -> None:
self.working_directory = working_directory
self.coordinator_url = coordinator_url
def load_input_job_from_dfs(self, job_id):
doc_path = os.path.join(self.working_directory,
'input_' + job_id + '.json')
if os.path.exists(doc_path):
with open(doc_path, 'r') as infile:
doc = json.load(infile)
return doc
else:
return None
def save_output_job_to_dfs(self, result_doc):
output_filename = 'output_' + result_doc['_id'] + '.json'
output_path = os.path.join(self.working_directory, output_filename)
with open(output_path, 'w') as outfile:
json.dump(result_doc, outfile)
input_filename = 'input_' + result_doc['_id'] + '.json'
input_path = os.path.join(self.working_directory, input_filename)
assert os.path.exists(input_path)
os.remove(input_path)
def update_status(self, job_id, new_status, returned_payload=None):
return requests.post(self.coordinator_url+f"/update_status/{job_id}", json={
"status": new_status,
"returned_payload": returned_payload
})
def upload_file(self, filename, object_name=None):
if object_name is None:
object_name = str(uuid4())+".png"
s3_client = boto3.client('s3')
try:
response = s3_client.upload_file(filename, 'toma-all', object_name)
except ClientError as e:
logger.error(e)
return False, None
return True, object_name
def fetch_instructions(self, model_name, rank):
return requests.get(self.coordinator_url+f"/instructions/{model_name}/{rank}").json()