Skip to content

Commit 42b5634

Browse files
authored
Merge pull request #319 from Baltic-RCC/dev
Task generator and local retrieving fixes
2 parents 376674a + d0cda62 commit 42b5634

File tree

8 files changed

+65
-81
lines changed

8 files changed

+65
-81
lines changed
Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,25 @@
11
[MAIN]
2-
32
RMQ_EXCHANGE = emf-tasks
4-
RMQ_SERVER =
5-
63
TASK_WINDOW_DURATION = PT1M
74
TASK_WINDOW_REFERENCE = currentMinuteStart
8-
TIMETRAVEL =
9-
#TIMETRAVEL = 2024-05-24T23:05+0200
5+
TIMESTAMP =
6+
#TIMESTAMP = 2025-05-07T14:30:00+0200
107
TASK_HEADER_KEYS = @id,job_id,run_id,process_id,@type,task_properties.merge_type,task_properties.time_horizon,
11-
128
RUN_TYPE =
13-
INCLUDED_TSO =
9+
INCLUDED_TSO =
1410
EXCLUDED_TSO =
1511
LOCAL_IMPORT =
1612
PROCESS_TIME_SHIFT =
17-
TASK_PERIOD_DURATION =
1813
TASK_REFERENCE_TIME =
1914
TASK_VERSION =
2015
TASK_MERGING_ENTITY =
21-
RUN_REPLACEMENT =
22-
RUN_REPLACEMENT_LOCAL =
23-
RUN_SCALING =
24-
UPLOAD_TO_OPDM =
25-
UPLOAD_TO_MINIO =
26-
SEND_MERGE_REPORT =
27-
PRE_TEMP_FIXES =
28-
POST_TEMP_FIXES =
29-
FORCE_OUTAGE_FIX =
16+
RUN_REPLACEMENT =
17+
RUN_REPLACEMENT_LOCAL =
18+
RUN_SCALING =
19+
UPLOAD_TO_OPDM =
20+
UPLOAD_TO_MINIO =
21+
SEND_MERGE_REPORT =
22+
PRE_TEMP_FIXES =
23+
POST_TEMP_FIXES =
24+
FORCE_OUTAGE_FIX =
3025
FIX_NET_INTERCHANGE2 =
Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
[MAIN]
2-
32
RMQ_EXCHANGE = emf-tasks
4-
RMQ_SERVER =
5-
63
TASK_ELK_INDEX =
7-
84
TASK_WINDOW_DURATION = PT1M
95
TASK_WINDOW_REFERENCE = currentMinuteStart
10-
TIMETRAVEL =
11-
#TIMETRAVEL = 2024-05-24T23:05+0200
6+
TIMESTAMP =
7+
#TIMESTAMP = 2025-06-30T23:05+0200
128
TASK_HEADER_KEYS = @id,job_id,run_id,process_id,@type,task_properties.merge_type,task_properties.time_horizon,
13-
149
TASK_SCHEDULE_SHIFT = P0D
15-
TASK_SCHEDULE_TIMEHORIZON = AUTO
16-
10+
TASK_SCHEDULE_TIME_HORIZON = AUTO

config/task_generator/timeframe_conf.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@
106106
"description": "Process that runs in current month for next month",
107107
"reference_time_start": "currentMonthStart",
108108
"reference_time_end": "currentMonthStart",
109-
"period_start": "P1M",
110-
"period_end": "P2M"
109+
"period_start": "P1M1D",
110+
"period_end": "P2M1D"
111111
},
112112
{
113113
"@context": "https://example.com/timeHorizon_context.jsonld",
@@ -116,7 +116,7 @@
116116
"description": "Process that runs in current year for next year",
117117
"reference_time_start": "currentYearStart",
118118
"reference_time_end": "currentYearStart",
119-
"period_start": "P1Y",
120-
"period_end": "P2Y"
119+
"period_start": "P1Y1D",
120+
"period_end": "P2Y1D"
121121
}
122122
]

emf/common/integrations/object_storage/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ def get_latest_models_and_download(time_horizon: str,
155155

156156
meta = {'pmd:validFrom': f"{parse_datetime(scenario_date):%Y%m%dT%H%MZ}",
157157
'pmd:timeHorizon': time_horizon,
158-
'opde:Object-Type': object_type}
158+
'opde:Object-Type': object_type,
159+
"data-source": data_source}
159160

160161
if tso:
161162
meta['pmd:TSO'] = tso

emf/model_merger/model_merger.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def handle(self, task_object: dict, properties: dict, **kwargs):
124124
scenario_datetime = task_properties["timestamp_utc"]
125125
schedule_start = task_properties.get("reference_schedule_start_utc")
126126
schedule_end = task_properties.get("reference_schedule_end_utc")
127-
schedule_timehorizon = task_properties.get("reference_schedule_timehorizon")
127+
schedule_time_horizon = task_properties.get("reference_schedule_time_horizon")
128128
merging_area = task_properties["merge_type"]
129129
merging_entity = task_properties["merging_entity"]
130130
mas = task_properties["mas"]
@@ -149,19 +149,15 @@ def handle(self, task_object: dict, properties: dict, **kwargs):
149149
remove_non_generators_from_slack_participation = True
150150

151151
# Collect valid models from ObjectStorage
152-
downloaded_models = get_latest_models_and_download(time_horizon, scenario_datetime, valid=False)
152+
downloaded_models = get_latest_models_and_download(time_horizon, scenario_datetime, valid=False, data_source='OPDM')
153153
latest_boundary = get_latest_boundary()
154154

155155
# Filter out models that are not to be used in merge
156156
filtered_models = merge_functions.filter_models(downloaded_models, included_models, excluded_models, filter_on='pmd:TSO')
157157

158158
# Get additional models directly from Minio
159159
if local_import_models:
160-
additional_models_data = self.minio_service.get_latest_models_and_download(time_horizon=time_horizon,
161-
scenario_datetime=scenario_datetime,
162-
model_entity=local_import_models,
163-
bucket_name=INPUT_MINIO_BUCKET,
164-
prefix=INPUT_MINIO_FOLDER)
160+
additional_models_data = get_latest_models_and_download(time_horizon, scenario_datetime, valid=True, data_source='PDN')
165161

166162
missing_local_import = [tso for tso in local_import_models if tso not in [model['pmd:TSO'] for model in additional_models_data]]
167163
merged_model.excluded.extend([{'tso': tso, 'reason': 'Missing in PDN'} for tso in missing_local_import])
@@ -287,16 +283,16 @@ def handle(self, task_object: dict, properties: dict, **kwargs):
287283
# Perform scaling
288284
if model_scaling:
289285

290-
# Set default timehorizon and scnario timestamp if not provided
291-
if not schedule_timehorizon or schedule_timehorizon == "AUTO":
292-
schedule_timehorizon = time_horizon
286+
# Set default time horizon and scenario timestamp if not provided
287+
if not schedule_time_horizon or schedule_time_horizon == "AUTO":
288+
schedule_time_horizon = time_horizon
293289

294290
if not schedule_start:
295291
schedule_start = scenario_datetime
296292

297293
# Get aligned schedules
298-
ac_schedules = query_acnp_schedules(time_horizon=schedule_timehorizon, scenario_timestamp=schedule_start)
299-
dc_schedules = query_hvdc_schedules(time_horizon=schedule_timehorizon, scenario_timestamp=schedule_start)
294+
ac_schedules = query_acnp_schedules(time_horizon=schedule_time_horizon, scenario_timestamp=schedule_start)
295+
dc_schedules = query_hvdc_schedules(time_horizon=schedule_time_horizon, scenario_timestamp=schedule_start)
300296

301297
# Scale balance if all schedules were received
302298
if all([ac_schedules, dc_schedules]):
@@ -309,7 +305,7 @@ def handle(self, task_object: dict, properties: dict, **kwargs):
309305
logger.error(e)
310306
merged_model.scaled = False
311307
else:
312-
logger.warning(f"Schedule reference data not available: {schedule_timehorizon} for {schedule_start}, skipping model scaling")
308+
logger.warning(f"Schedule reference data not available: {schedule_time_horizon} for {schedule_start}, skipping model scaling")
313309
merged_model.scaled = False
314310

315311
# Record main merging process end

emf/task_generator/manual_worker.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,21 @@
5656
process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = POST_TEMP_FIXES
5757
process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX
5858

59-
# Apply process time shift and time travel if defined
60-
if PROCESS_TIME_SHIFT:
61-
timeframe_config_json[0]['period_start'] = f'{PROCESS_TIME_SHIFT}'
62-
if TIMETRAVEL:
63-
timeframe_config_json[0]['period_duration'] = TASK_PERIOD_DURATION
64-
timeframe_config_json[0]['reference_time'] = TASK_REFERENCE_TIME
65-
66-
# Exporting configuration
67-
with open(process_conf, 'w') as file:
68-
json.dump(process_config_json, file, indent=1)
69-
with open(timeframe_conf, 'w') as file:
70-
json.dump(timeframe_config_json, file, indent=4)
59+
# If single timestamp run is defined
60+
if TIMESTAMP:
61+
timeframe_config_json[0]['reference_time_start'] = TASK_REFERENCE_TIME
62+
timeframe_config_json[0]['reference_time_end'] = TASK_REFERENCE_TIME
63+
timeframe_config_json[0]['period_start'] = 'PT0M'
64+
timeframe_config_json[0]['period_end'] = 'PT1H'
7165

7266
# Generate tasks
73-
tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_conf, timeframe_conf, TIMETRAVEL))
67+
tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_config_json, timeframe_config_json,
68+
TIMESTAMP, PROCESS_TIME_SHIFT))
7469

7570
# Publish tasks
7671
if tasks:
7772
logger.info(f"Creating connection to RMQ")
78-
rabbit_service = rabbit.BlockingClient(host=RMQ_SERVER)
73+
rabbit_service = rabbit.BlockingClient()
7974
logger.info(f"Sending tasks to Rabbit exchange: {RMQ_EXCHANGE}")
8075
for task in tasks:
8176
rabbit_service.publish(payload=json.dumps(task),

emf/task_generator/task_generator.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717
parse_app_properties(globals(), config.paths.task_generator.task_generator)
1818

1919

20-
def generate_tasks(task_window_duration:str, task_window_reference:str, process_conf:str, timeframe_conf:str, timetravel_now:str=None):
20+
def generate_tasks(task_window_duration: str,
21+
task_window_reference: str,
22+
process_conf: dict,
23+
timeframe_conf: dict,
24+
timetravel_now: str | None = None,
25+
process_time_shift: str | None = None,
26+
):
2127
"""
2228
Generates a sequence of tasks based on the given process configuration and time frame definitions.
2329
@@ -26,9 +32,9 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
2632
within which the tasks are generated.
2733
task_window_reference (str): a string identifying the reference point in time for the task window,
2834
as defined in the `reference_times` dict imported from `time_helper.py`.
29-
process_conf (str): path to a JSON file specifying the process configuration, as described in the
35+
process_conf (dict): JSON file specifying the process configuration, as described in the
3036
documentation.
31-
timeframe_conf (str): path to a JSON file specifying the time frame definitions, as described in the
37+
timeframe_conf (dict): JSON file specifying the time frame definitions, as described in the
3238
documentation.
3339
3440
Yields:
@@ -41,16 +47,12 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
4147

4248
# TODO - add validation against schema
4349

44-
# Load the time frame configuration from the specified file.
45-
time_frames = json.loads(Path(timeframe_conf).read_text())
46-
4750
# Convert the list of time frames to a dictionary for easier access.
48-
time_frames = {time_frame["@id"].split("/")[-1]: time_frame for time_frame in time_frames}
51+
time_frames = {time_frame["@id"].split("/")[-1]: time_frame for time_frame in timeframe_conf}
4952

5053
# Load the process configuration from the specified file.
51-
processes = json.loads(Path(process_conf).read_text())
5254

53-
for process in processes:
55+
for process in process_conf:
5456

5557
# Loop through each run in the process configuration.
5658
for run in process["runs"]:
@@ -82,21 +84,24 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
8284
else:
8385
_ = runs.get_next(datetime)
8486

85-
86-
8787
logger.info(f"Next run of {run['@id']} at {run_timestamp}")
8888

8989
if not (run_timestamp >= run_window_start and run_timestamp <= run_window_end):
90-
logger.info(f"Run at {run_timestamp} not in window [{run_window_start}/{run_window_end}] -> {run['@id']} ")
90+
logger.info(f"Run at {run_timestamp} not in window [{run_window_start}/{run_window_end}] -> {run['@id']}")
9191

9292
# Loop through each timestamp in the current run.
9393
while run_timestamp <= run_window_end:
94-
logger.info(f"Run at {run_timestamp} in window [{run_window_start}/{run_window_end}] -> {run['@id']} ")
94+
logger.info(f"Run at {run_timestamp} in window [{run_window_start}/{run_window_end}] -> {run['@id']}")
9595

9696
# Get the reference time for the current timestamp in the time frame.
9797
reference_time_start = reference_times[time_frame["reference_time_start"]](run_timestamp)
9898
reference_time_end = reference_times[time_frame["reference_time_end"]](run_timestamp)
9999

100+
# Change reference time according to time shift config
101+
if process_time_shift:
102+
reference_time_start = reference_time_start + parse_duration(process_time_shift)
103+
reference_time_end = reference_time_end + parse_duration(process_time_shift)
104+
100105
# Calculate the start and end of the period for the current task.
101106
job_period_start = reference_time_start + parse_duration(time_frame["period_start"])
102107
job_period_end = reference_time_end + parse_duration(time_frame["period_end"])
@@ -128,8 +133,7 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
128133
task_id = str(uuid4())
129134
task_timestamp = utcnow().isoformat()
130135

131-
logger.info(f"Task {timestamp_utc} in window [{job_period_start_utc}/{job_period_end_utc}] -> Job: {job_id} ")
132-
136+
logger.info(f"Task {timestamp_utc} in window [{job_period_start_utc}/{job_period_end_utc}] -> Job: {job_id}")
133137

134138
task = {
135139
"@context": "https://example.com/task_context.jsonld",
@@ -157,7 +161,7 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
157161
"timestamp_utc": f"{timestamp_utc:%Y-%m-%dT%H:%M}",
158162
"reference_schedule_start_utc": f"{schedule_start_utc:%Y-%m-%dT%H:%M}",
159163
"reference_schedule_end_utc": f"{schedule_end_utc:%Y-%m-%dT%H:%M}",
160-
"reference_schedule_timehorizon": TASK_SCHEDULE_TIMEHORIZON
164+
"reference_schedule_time_horizon": TASK_SCHEDULE_TIME_HORIZON
161165
}
162166
}
163167

@@ -185,6 +189,7 @@ def generate_tasks(task_window_duration:str, task_window_reference:str, process_
185189

186190
# Next Task
187191
timestamp_utc = timestamps_utc.get_next(datetime)
192+
188193
# Next Run
189194
run_timestamp = runs.get_next(datetime)
190195

@@ -255,7 +260,7 @@ def update_task_status(task, status_text, publish=True):
255260
try:
256261
publish_tasks([task])
257262
except:
258-
logger.warning("Task Publication to ELK failed")
263+
logger.warning("Task publication to Elastic failed")
259264

260265

261266
def set_task_version(task, elk_index='emfos-tasks*'):
@@ -272,11 +277,8 @@ def set_task_version(task, elk_index='emfos-tasks*'):
272277
else:
273278
if task['task_properties']['version'] == 'AUTO':
274279
task['task_properties']['version'] = '001'
275-
276280
except:
277-
logger.warning("ELK query for Task versioning unsuccessful, version not updated")
278-
279-
281+
logger.warning("Elastic query for task versioning unsuccessful, version not updated")
280282

281283

282284
if __name__ == "__main__":

emf/task_generator/worker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
timeframe_conf = config.paths.task_generator.timeframe_conf
1515
process_conf = config.paths.task_generator.process_conf
1616

17+
# Load to json
1718
process_config_json = json.load(process_conf)
19+
timeframe_config_json = json.load(timeframe_conf)
1820

19-
20-
tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_conf, timeframe_conf, TIMETRAVEL))
21+
tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_config_json, timeframe_config_json))
2122

2223
if tasks:
2324
logger.info(f"Creating connection to RMQ")

0 commit comments

Comments
 (0)