-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
88 lines (66 loc) · 2.93 KB
/
utils.py
File metadata and controls
88 lines (66 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
from typing import Dict, List
import grpc.aio
from zepben.eas.client.eas_client import EasClient
from zepben.ewb import connect_with_token, Feeder, NetworkConsumerClient
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger()
def get_config_dir(argv):
return argv[1] if len(argv) > 1 else "."
def get_config(config_dir):
config = read_json_config(f"{config_dir}/config.json")
config["feeders"] = list({f for f in config["feeders"]})
config["forecast_years"] = list({f for f in config["forecast_years"]})
config["scenarios"] = list({f for f in config["scenarios"]})
return config
def get_client(config_dir, async_=True) -> EasClient:
auth_config = read_json_config(f"{config_dir}/auth_config.json")
return EasClient(
host=auth_config["eas_server"]["host"],
port=auth_config["eas_server"]["port"],
protocol=auth_config["eas_server"]["protocol"],
access_token=auth_config["eas_server"]["access_token"],
verify_certificate=auth_config["eas_server"].get("verify_certificate", True),
ca_filename=auth_config["eas_server"].get("ca_filename"),
asynchronous=async_,
)
def read_json_config(config_file_path: str) -> Dict:
file = open(config_file_path)
config_dict = json.load(file)
file.close()
return config_dict
def print_cancel(result):
if "data" in result:
print(f'work_package_id=\n{result["data"]["cancelWorkPackage"]}')
else:
if "404" in result["errors"][0]["message"]:
print("No work package running with provided ID")
else:
print("Errors:\n", "\n".join(err["message"] for err in result['errors']))
def print_run(result):
if "data" in result:
print(f'work_package_id=\n{result["data"]["runWorkPackage"]}')
else:
print("Errors:\n", "\n".join(err["message"] for err in result['errors']))
def print_progress(result):
logger.info("------------------------------")
if "data" in result:
logger.info(f'Progress: \n{str(json.dumps(result["data"]["getWorkPackageProgress"], indent=4))}')
else:
logger.error("Errors:\n".join(err["message"] for err in result['errors']))
logger.info("------------------------------")
def get_ewb_channel(config_dir) -> grpc.aio.Channel:
auth_config = read_json_config(f"{config_dir}/auth_config.json")
channel = connect_with_token(
host=auth_config["ewb_server"]["host"],
rpc_port=auth_config["ewb_server"]["rpc_port"],
access_token=auth_config["ewb_server"]["access_token"],
ca_filename=auth_config["ewb_server"].get("ca_path", None)
)
return channel
async def fetch_feeders(config_dir) -> List[Feeder]:
channel = get_ewb_channel(config_dir)
client = NetworkConsumerClient(channel)
(await client.get_network_hierarchy()).throw_on_error()
return list(client.service.objects(Feeder))