From bb9bf94fb33ec0ea16d7e9801a7f707fb5f06fe3 Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 15 Mar 2024 16:41:41 +0000 Subject: [PATCH 1/6] Adding an option to suppress native publishing --- msc_wis2node/publisher.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 383b866..8e6e344 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -40,6 +40,10 @@ class WIS2FlowCB(FlowCB): + def __init__(self, options): + super().__init__(options,logger) + self.o.add_option('selfPublish', 'flag', True) + def after_accept(self, worklist) -> None: """ sarracenia dispatcher From ebd69300dde31e34aa0b76cdacf7a63665626230 Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 15 Mar 2024 22:27:23 +0000 Subject: [PATCH 2/6] Implemented non-selfPublish (almost works) --- msc_wis2node/publisher.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 8e6e344..3e1144e 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -41,7 +41,7 @@ class WIS2FlowCB(FlowCB): def __init__(self, options): - super().__init__(options,logger) + super().__init__(options,LOGGER) self.o.add_option('selfPublish', 'flag', True) def after_accept(self, worklist) -> None: @@ -60,15 +60,32 @@ def after_accept(self, worklist) -> None: LOGGER.debug('Processing message') wis2_publisher = WIS2Publisher() - - if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): - new_incoming.append(msg) + + if self.o.selfPublish: + if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): + new_incoming.append(msg) + else: + worklist.rejected.append(msg) else: - return + dataset = wis2_publisher.identify(msg['relPath']) + LOGGER.critical(f"Dataset: {dataset}") + if dataset: + + + # 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True} + msg["topic"] = "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] + + msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1] + + new_incoming.append(msg) + + LOGGER.critical(f"topic: {msg['topic']}") + else: + worklist.rejected.append(msg) + except Exception as err: LOGGER.error(f'Error publishing message: {err}', exc_info=True) worklist.failed.append(msg) - continue worklist.incoming = new_incoming @@ -78,7 +95,7 @@ class WIS2Publisher: def __init__(self): """initialize""" - + self.datasets = [] self.tls = None @@ -111,7 +128,7 @@ def publish(self, base_url: str, relative_path: str) -> bool: relative_path2 = '/' + relative_path.lstrip('/') url = f'{base_url}{relative_path2}' - + LOGGER.debug(f'Publishing dataset notification: {url}') self.publish_to_wis2(dataset, url) From 9f1990faa08f84860e3331bd6323d58e1dc76056 Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 22 Mar 2024 22:14:10 +0000 Subject: [PATCH 3/6] Using sr3 publisher seems to work. More verification needed. --- msc_wis2node/publisher.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 3e1144e..cfa3a57 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -43,7 +43,8 @@ class WIS2FlowCB(FlowCB): def __init__(self, options): super().__init__(options,LOGGER) self.o.add_option('selfPublish', 'flag', True) - + self.wis2_nonPublisher = WIS2Publisher() + def after_accept(self, worklist) -> None: """ sarracenia dispatcher @@ -59,27 +60,26 @@ def after_accept(self, worklist) -> None: try: LOGGER.debug('Processing message') - wis2_publisher = WIS2Publisher() - if self.o.selfPublish: + wis2_publisher = WIS2Publisher() if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): new_incoming.append(msg) else: worklist.rejected.append(msg) else: - dataset = wis2_publisher.identify(msg['relPath']) - LOGGER.critical(f"Dataset: {dataset}") + dataset = self.wis2_nonPublisher.identify(msg['relPath']) if dataset: # 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True} - msg["topic"] = "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] + msg["topic"] = self.o.post_exchange[0] + "/" + "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1] + # wis2/ca-eccc-msc - missing from start + msg["contentType"] = dataset["media-type"] new_incoming.append(msg) - LOGGER.critical(f"topic: {msg['topic']}") else: worklist.rejected.append(msg) @@ -145,7 +145,7 @@ def identify(self, path: str) -> Union[dict, None]: """ for dataset in self.datasets: - LOGGER.debug(f'Dataset: {dataset}') + LOGGER.debug(f'{dataset = }') match = False subtopic_dirpath = self._subtopic2dirpath(dataset['subtopic']) From fb40910f93d54605aa5895552e50c810acc7814a Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 15 Mar 2024 16:41:41 +0000 Subject: [PATCH 4/6] Adding an option to suppress native publishing --- msc_wis2node/publisher.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 1f6c0e5..0804eb0 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -41,6 +41,10 @@ class WIS2FlowCB(FlowCB): + def __init__(self, options): + super().__init__(options,logger) + self.o.add_option('selfPublish', 'flag', True) + def after_accept(self, worklist) -> None: """ sarracenia dispatcher From 734cd493998e2e162057305f9c35696296d2cbc2 Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 15 Mar 2024 22:27:23 +0000 Subject: [PATCH 5/6] Implemented non-selfPublish (almost works) --- msc_wis2node/publisher.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 0804eb0..fd4f049 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -42,7 +42,7 @@ class WIS2FlowCB(FlowCB): def __init__(self, options): - super().__init__(options,logger) + super().__init__(options,LOGGER) self.o.add_option('selfPublish', 'flag', True) def after_accept(self, worklist) -> None: @@ -61,15 +61,32 @@ def after_accept(self, worklist) -> None: LOGGER.debug('Processing message') wis2_publisher = WIS2Publisher() - - if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): - new_incoming.append(msg) + + if self.o.selfPublish: + if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): + new_incoming.append(msg) + else: + worklist.rejected.append(msg) else: - return + dataset = wis2_publisher.identify(msg['relPath']) + LOGGER.critical(f"Dataset: {dataset}") + if dataset: + + + # 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True} + msg["topic"] = "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] + + msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1] + + new_incoming.append(msg) + + LOGGER.critical(f"topic: {msg['topic']}") + else: + worklist.rejected.append(msg) + except Exception as err: LOGGER.error(f'Error publishing message: {err}', exc_info=True) worklist.failed.append(msg) - continue worklist.incoming = new_incoming @@ -79,7 +96,7 @@ class WIS2Publisher: def __init__(self): """initialize""" - + self.datasets = [] self.tls = None @@ -112,7 +129,7 @@ def publish(self, base_url: str, relative_path: str) -> bool: relative_path2 = '/' + relative_path.lstrip('/') url = f'{base_url}{relative_path2}' - + LOGGER.debug(f'Publishing dataset notification: {url}') self.publish_to_wis2(dataset, url) From b55ff202a1863b5332078dd06502f55622c706b4 Mon Sep 17 00:00:00 2001 From: Mathew Shaker Date: Fri, 22 Mar 2024 22:14:10 +0000 Subject: [PATCH 6/6] Using sr3 publisher seems to work. More verification needed. --- msc_wis2node/publisher.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index fd4f049..df9a5b4 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -44,7 +44,8 @@ class WIS2FlowCB(FlowCB): def __init__(self, options): super().__init__(options,LOGGER) self.o.add_option('selfPublish', 'flag', True) - + self.wis2_nonPublisher = WIS2Publisher() + def after_accept(self, worklist) -> None: """ sarracenia dispatcher @@ -60,27 +61,26 @@ def after_accept(self, worklist) -> None: try: LOGGER.debug('Processing message') - wis2_publisher = WIS2Publisher() - if self.o.selfPublish: + wis2_publisher = WIS2Publisher() if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): new_incoming.append(msg) else: worklist.rejected.append(msg) else: - dataset = wis2_publisher.identify(msg['relPath']) - LOGGER.critical(f"Dataset: {dataset}") + dataset = self.wis2_nonPublisher.identify(msg['relPath']) if dataset: # 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True} - msg["topic"] = "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] + msg["topic"] = self.o.post_exchange[0] + "/" + "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1] + # wis2/ca-eccc-msc - missing from start + msg["contentType"] = dataset["media-type"] new_incoming.append(msg) - LOGGER.critical(f"topic: {msg['topic']}") else: worklist.rejected.append(msg) @@ -146,7 +146,7 @@ def identify(self, path: str) -> Union[dict, None]: """ for dataset in self.datasets: - LOGGER.debug(f'Dataset: {dataset}') + LOGGER.debug(f'{dataset = }') match = False subtopic_dirpath = self._subtopic2dirpath(dataset['subtopic'])