11from pika import BasicProperties
22import uuid
3+ import json
34from datetime import datetime
45from pathlib import Path
56from typing import Any , Dict , List , Optional , Set , Tuple , Union
@@ -326,13 +327,16 @@ def handle(self, message: bytes, properties: BasicProperties, **kwargs):
326327 # JSON normalize and transform to DataFrame
327328 df = self .normalize_cim_payload (data )
328329
330+ # TODO need to get CO and RA from object storage and merge
331+
329332 # Convert to dictionary
330333 data_to_send = df .to_dict (orient = 'records' )
331334
332- response = self .elastic_service .send_to_elastic (
335+ response = self .elastic_service .send_to_elastic_bulk (
333336 index = ELASTIC_METADATA_INDEX ,
334- json_message = data_to_send ,
335- id = metadata_object .get ('identifier' , None )
337+ json_message_list = data_to_send ,
338+ id_from_metadata = True ,
339+ id_metadata_list = ["@id" ],
336340 )
337341
338342 logger .info (f"Message sending to Elastic successful: { response } " )
@@ -341,23 +345,50 @@ def handle(self, message: bytes, properties: BasicProperties, **kwargs):
341345
342346
343347if __name__ == "__main__" :
344- rdf_xml = r"C:\Users\martynas.karobcikas\Downloads\ras-example.xml"
345- # rdf_xml = r"C:\Users\martynas.karobcikas\Documents\Python projects\RAO\test-data\TC1_assessed_elements.xml"
346- # rdf_xml = r"C:\Users\martynas.karobcikas\Documents\Python projects\RAO\test-data\TC1_contingencies.xml"
347- # rdf_xml = r"C:\Users\martynas.karobcikas\Documents\Python projects\RAO\test-data\TC1_remedial_actions.xml"
348- g = Graph ()
349- g .parse (rdf_xml , format = "xml" ) # your RDF/XML file
350-
351- result = convert_cim_rdf_to_json (rdf_xml , root_class = ["RemedialActionSchedule" ], key_mode = 'local' )
352- # result = convert_cim_rdf_to_json(rdf_xml, root_class=["RemedialActionSchedule"], key_mode='qualified')
353- # result = convert_cim_rdf_to_json(rdf_xml, root_class=["GridStateAlterationRemedialAction"], key_mode='local')
354- # result = convert_cim_rdf_to_json(rdf_xml, root_class=["OrdinaryContingency", "ExceptionalContingency"], key_mode='local')
355-
356- import json
357- print (json .dumps (result , indent = 2 ))
358-
359- with open ("test.json" , "w" ) as f :
360- json .dump (result , f , ensure_ascii = False , indent = 4 )
348+ # rdf_xml = r"C:\Users\martynas.karobcickas\Downloads\ras-example.xml"
349+ # rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_assessed_elements.xml"
350+ # rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_contingencies.xml"
351+ # rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_remedial_actions.xml"
352+ # g = Graph()
353+ # g.parse(rdf_xml, format="xml") # Put your RDF/XML file
354+
355+ # result = convert_cim_rdf_to_json(rdf_xml, root_class=["RemedialActionSchedule"], key_mode="local")
356+ # result = convert_cim_rdf_to_json(rdf_xml, root_class=["RemedialActionSchedule"], key_mode="qualified")
357+ # result = convert_cim_rdf_to_json(rdf_xml, root_class=["GridStateAlterationRemedialAction"], key_mode="local")
358+ # result = convert_cim_rdf_to_json(rdf_xml, root_class=["OrdinaryContingency", "ExceptionalContingency"], key_mode="local")
359+
360+ # print json
361+ # print(json.dumps(result, indent=2))
362+
363+ # with open("test.json", "w") as f:
364+ # json.dump(result, f, ensure_ascii=False, indent=4)
365+ # df = RemedialActionScheduleToElasticHandler.normalize_cim_payload(result)
366+ # print(df.head())
367+
368+ # Define RMQ test message
369+ headers = {
370+ "baCorrelationID" : f"{ uuid .uuid4 ()} " ,
371+ "baseMessageID" : f"{ uuid .uuid4 ()} " ,
372+ "businessType" : "CSA-INPUT" ,
373+ "messageID" : f"{ uuid .uuid4 ()} " ,
374+ "sendTimestamp" : datetime .utcnow ().isoformat (),
375+ "sender" : "TSOX" ,
376+ "senderApplication" : "APPX" ,
377+ "service" : "INPUT-DATA" ,
378+ }
361379
362- df = RemedialActionScheduleToElasticHandler .normalize_cim_payload (result )
363- print (df .head ())
380+ properties = BasicProperties (
381+ content_type = "application/octet-stream" ,
382+ delivery_mode = 2 ,
383+ priority = 4 ,
384+ message_id = f"{ uuid .uuid4 ()} " ,
385+ timestamp = 147728025 ,
386+ headers = headers ,
387+ )
388+
389+ with open (r"C:\Users\martynas.karobcickas\Downloads\ras-example.xml" , "rb" ) as file :
390+ file_bytes = file .read ()
391+
392+ # Create instance
393+ service = RemedialActionScheduleToElasticHandler ()
394+ result = service .handle (message = file_bytes , properties = properties )
0 commit comments