diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index dc6469419db37..7f22cc9c2cd96 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -96,6 +96,7 @@ airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/ @Lee-W @jason810496 @guan /providers/fab/ @vincbeck /providers/google/ @shahar1 /providers/hashicorp/ @hussein-awala +/providers/ibm/mq/ @dabla /providers/informatica/ @RNHTTR # + @cetingokhan @sertaykabuk @umutozel /providers/keycloak/ @vincbeck @bugraoz93 /providers/microsoft/azure/ @dabla diff --git a/.github/ISSUE_TEMPLATE/3-airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/3-airflow_providers_bug_report.yml index f7c6642519d3a..8249e5bcb84c5 100644 --- a/.github/ISSUE_TEMPLATE/3-airflow_providers_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/3-airflow_providers_bug_report.yml @@ -77,6 +77,7 @@ body: - grpc - hashicorp - http + - ibm-mq - imap - influxdb - informatica diff --git a/dev/breeze/doc/images/output_workflow-run_publish-docs.txt b/dev/breeze/doc/images/output_workflow-run_publish-docs.txt index 4d64c05aefdbd..8640ca78ad92d 100644 --- a/dev/breeze/doc/images/output_workflow-run_publish-docs.txt +++ b/dev/breeze/doc/images/output_workflow-run_publish-docs.txt @@ -1 +1 @@ -46e32fc6adb71c667231a207e063e291 +7cc9afce1e4ae2311f163402d807ad45 diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index aa0bd36de11ad..cdda668048414 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -761,6 +761,8 @@ hyperparameter hyperparameters IaC iam +IBM +ibm ibmcloudant ideation idempotence @@ -1017,6 +1019,8 @@ mongo mongodb monospace moto +MQ +mq msfabric msg msgraph diff --git a/providers/ibm/mq/LICENSE b/providers/ibm/mq/LICENSE new file mode 100644 index 0000000000000..11069edd79019 --- /dev/null +++ b/providers/ibm/mq/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/providers/ibm/mq/NOTICE b/providers/ibm/mq/NOTICE new file mode 100644 index 0000000000000..e02aab0589f0d --- /dev/null +++ b/providers/ibm/mq/NOTICE @@ -0,0 +1,5 @@ +Apache Airflow +Copyright 2016-2025 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). diff --git a/providers/ibm/mq/README.rst b/providers/ibm/mq/README.rst new file mode 100644 index 0000000000000..188044fe7fc6f --- /dev/null +++ b/providers/ibm/mq/README.rst @@ -0,0 +1,92 @@ + +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + +.. IF YOU WANT TO MODIFY TEMPLATE FOR THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + ``PROVIDER_README_TEMPLATE.rst.jinja2`` IN the ``dev/breeze/src/airflow_breeze/templates`` DIRECTORY + +Package ``apache-airflow-providers-ibm-mq`` + +Release: ``0.1.0`` + + +`IBM MQ `__ + + +Provider package +---------------- + +This is a provider package for ``ibm.mq`` provider. All classes for this provider package +are in ``airflow.providers.ibm.mq`` python package. + +You can find package information and changelog for the provider +in the `documentation `_. + +Installation +------------ + +You can install this package on top of an existing Airflow 2 installation +(see ``Requirements`` below for the minimum Airflow version supported) via + +``pip install apache-airflow-providers-ibm-mq`` + +This installs only the provider package. To use the IBM MQ operators at +runtime you also need the ``ibmmq`` dependency, which can be installed via +the provider extra: + +``pip install apache-airflow-providers-ibm-mq[ibmmq]`` + +The ``ibmmq`` extra installs the Python wrapper for the IBM MQ client that is required by the provider hooks and operators. + +Note that the `ibmmq `_ Python package requires the native `IBM MQ Redistributable Client `_ libraries to be installed on the system. + +Refer to the IBM MQ documentation for installation instructions for your platform. + +The package supports the following python versions: 3.9,3.10,3.11,3.12 + +Requirements +------------ + +============================================= ===================================== +PIP package Version required +============================================= ===================================== +``apache-airflow`` ``>=2.11.0`` +``apache-airflow-providers-common-messaging`` ``>=2.0.0`` +``importlib-resources`` ``>=1.3`` +============================================= ===================================== + + +======================================================================================================================== ==================== +Dependent package Extra +======================================================================================================================== ==================== +`apache-airflow-providers-common-compat `_ ``common.compat`` +`apache-airflow-providers-common-messaging `_ ``common.messaging`` +======================================================================================================================== ==================== + +Optional dependencies +---------------------- + +========== ================ +Extra Dependencies +========== ================ +``ibmmq`` ``ibmmq>=2.0.4`` +========== ================ + +The changelog for the provider package can be found in the +`changelog `_. diff --git a/providers/ibm/mq/docs/.latest-doc-only-change.txt b/providers/ibm/mq/docs/.latest-doc-only-change.txt new file mode 100644 index 0000000000000..f41e3226a6f43 --- /dev/null +++ b/providers/ibm/mq/docs/.latest-doc-only-change.txt @@ -0,0 +1 @@ +7b2ec33c7ad4998d9c9735b79593fcdcd3b9dd1f diff --git a/providers/ibm/mq/docs/changelog.rst b/providers/ibm/mq/docs/changelog.rst new file mode 100644 index 0000000000000..099b5b4ddded4 --- /dev/null +++ b/providers/ibm/mq/docs/changelog.rst @@ -0,0 +1,49 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. NOTE TO CONTRIBUTORS: + Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes + and you want to add an explanation to the users on how they are supposed to deal with them. + The changelog is updated and maintained semi-automatically by release manager. + +``apache-airflow-providers-ibm-mq`` + +Changelog +--------- + +0.1.0 +..... + +.. note:: + This release of provider is only available for Airflow 2.10+ as explained in the + Apache Airflow providers support policy _. + +Misc +~~~~ + +* ``Bump min Airflow version in providers to 2.10 (#49843)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Update description of provider.yaml dependencies (#50231)`` + * ``Avoid committing history for providers (#49907)`` + +0.1.0 +..... + +Initial version of the provider. diff --git a/providers/ibm/mq/docs/commits.rst b/providers/ibm/mq/docs/commits.rst new file mode 100644 index 0000000000000..1b08b59cb402c --- /dev/null +++ b/providers/ibm/mq/docs/commits.rst @@ -0,0 +1,35 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + .. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + + .. IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + `PROVIDER_COMMITS_TEMPLATE.rst.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + + .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN! + +Package apache-airflow-providers-ibm-mq +------------------------------------------------------ + +`IBM MQ `__ + + +This is detailed commit list of changes for versions provider package: ``mq``. +For high-level changelog, see :doc:`package information including changelog `. + +.. airflow-providers-commits:: diff --git a/providers/ibm/mq/docs/conf.py b/providers/ibm/mq/docs/conf.py new file mode 100644 index 0000000000000..425d9e512683d --- /dev/null +++ b/providers/ibm/mq/docs/conf.py @@ -0,0 +1,27 @@ +# Disable Flake8 because of all the sphinx imports +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Configuration of Providers docs building.""" + +from __future__ import annotations + +import os + +os.environ["AIRFLOW_PACKAGE_NAME"] = "apache-airflow-providers-ibm-mq" + +from docs.provider_conf import * # noqa: F403 diff --git a/providers/ibm/mq/docs/connections/mq.rst b/providers/ibm/mq/docs/connections/mq.rst new file mode 100644 index 0000000000000..fbd8f9cad0ad0 --- /dev/null +++ b/providers/ibm/mq/docs/connections/mq.rst @@ -0,0 +1,27 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:mq: + +MQ connection +============= + +The MQ connection type enables connection to an IBM MQ. + +.. |Kafka Connection| image:: mq_connection.png + :width: 400 + :alt: IBM MQ Connection Screenshot diff --git a/providers/ibm/mq/docs/connections/mq_connection.png b/providers/ibm/mq/docs/connections/mq_connection.png new file mode 100644 index 0000000000000..37a2a6a55443c Binary files /dev/null and b/providers/ibm/mq/docs/connections/mq_connection.png differ diff --git a/providers/ibm/mq/docs/index.rst b/providers/ibm/mq/docs/index.rst new file mode 100644 index 0000000000000..c20caabf78321 --- /dev/null +++ b/providers/ibm/mq/docs/index.rst @@ -0,0 +1,133 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-ibm-mq`` +=================================== + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + Security + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/ibm/mq/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/ibm/mq/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + PyPI Repository + Example Dags + Installing from sources + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits + + +apache-airflow-providers-ibm-mq package +------------------------------------------------------ + +`IBM MQ `__ + + +Release: 0.1.0 + +Provider package +---------------- + +This package is for the ``ibm.mq`` provider. +All classes for this package are included in the ``airflow.providers.ibm.mq`` python package. + +Installation +------------ + +You can install this package on top of an existing Airflow installation via +``pip install apache-airflow-providers-ibm-mq``. +For the minimum Airflow version supported, see ``Requirements`` below. + +Requirements +------------ + +The minimum Apache Airflow version supported by this provider distribution is ``2.11.0``. + +============================================= ================== +PIP package Version required +============================================= ================== +``apache-airflow`` ``>=2.11.0`` +``apache-airflow-providers-common-messaging`` ``>=2.0.0`` +``importlib-resources`` ``>=1.3`` +============================================= ================== + +Cross provider package dependencies +----------------------------------- + +Those are dependencies that might be needed in order to use all the features of the package. +You need to install the specified provider distributions in order to use them. + +You can install such cross-provider dependencies when installing from PyPI. For example: + +.. code-block:: bash + + pip install apache-airflow-providers-ibm-mq[common.compat] + + +======================================================================================================================== ==================== +Dependent package Extra +======================================================================================================================== ==================== +`apache-airflow-providers-common-compat `_ ``common.compat`` +`apache-airflow-providers-common-messaging `_ ``common.messaging`` +======================================================================================================================== ==================== + +Downloading official packages +----------------------------- + +You can download officially released packages and verify their checksums and signatures from the +`Official Apache Download site `_ + +* `The apache-airflow-providers-ibm-mq 0.1.0 sdist package `_ (`asc `__, `sha512 `__) +* `The apache-airflow-providers-ibm-mq 0.1.0 wheel package `_ (`asc `__, `sha512 `__) diff --git a/providers/ibm/mq/docs/installing-providers-from-sources.rst b/providers/ibm/mq/docs/installing-providers-from-sources.rst new file mode 100644 index 0000000000000..fdbb17d017579 --- /dev/null +++ b/providers/ibm/mq/docs/installing-providers-from-sources.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: /../../../../devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst diff --git a/providers/ibm/mq/docs/integration-logos/ibm-mq.png b/providers/ibm/mq/docs/integration-logos/ibm-mq.png new file mode 100644 index 0000000000000..e5c1a0820df2d Binary files /dev/null and b/providers/ibm/mq/docs/integration-logos/ibm-mq.png differ diff --git a/providers/ibm/mq/docs/message-queues.rst b/providers/ibm/mq/docs/message-queues.rst new file mode 100644 index 0000000000000..314413f5d9966 --- /dev/null +++ b/providers/ibm/mq/docs/message-queues.rst @@ -0,0 +1,90 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. NOTE TO CONTRIBUTORS: + Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes + and you want to add an explanation to the users on how they are supposed to deal with them. + The changelog is updated and maintained semi-automatically by release manager. + + +IBM MQ Message Queue +==================== + +.. contents:: + :local: + :depth: 2 + + +IBM MQ Queue Provider +--------------------- + +Implemented by :class:`~airflow.providers.ibm.mq.queues.mq.IBMMQMessageQueueProvider` + +The IBM MQ Queue Provider is a +:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider` +that uses IBM MQ as the underlying message queue system. + +It allows you to send and receive messages using IBM MQ queues in your Airflow workflows +via the common message queue interface +:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`. + + +.. include:: /../src/airflow/providers/ibm/mq/queues/mq.py + :start-after: [START ibmmq_message_queue_provider_description] + :end-before: [END ibmmq_message_queue_provider_description] + + +.. _howto/triggers:IBMMQMessageQueueTrigger: + + +IBM MQ Message Queue Trigger +---------------------------- + +Implemented by :class:`~airflow.providers.ibm.mq.triggers.mq.AwaitMessageTrigger` + +Inherited from +:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger` + +Wait for a message in a queue +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Below is an example of how you can configure an Airflow DAG to be triggered +by a message arriving in an IBM MQ queue. + +.. exampleinclude:: /../tests/system/ibm/mq/example_dag_message_queue_trigger.py + :language: python + :start-after: [START howto_trigger_message_queue] + :end-before: [END howto_trigger_message_queue] + + +How it works +------------ + +1. **IBM MQ Message Queue Trigger** + The ``AwaitMessageTrigger`` listens for messages from an IBM MQ queue. + +2. **Asset and Watcher** + The ``Asset`` abstracts the external entity, the IBM MQ queue in this example. + The ``AssetWatcher`` associates a trigger with a name. This name helps you + identify which trigger is associated with which asset. + +3. **Event-Driven DAG** + Instead of running on a fixed schedule, the DAG executes when the asset receives + an update (for example, when a new message arrives in the queue). + +For how to use the trigger, refer to the documentation of the +:ref:`Messaging Trigger `. diff --git a/providers/ibm/mq/docs/redirects.txt b/providers/ibm/mq/docs/redirects.txt new file mode 100644 index 0000000000000..e37560e5ca516 --- /dev/null +++ b/providers/ibm/mq/docs/redirects.txt @@ -0,0 +1 @@ +connections/index.rst connections/mq.rst diff --git a/providers/ibm/mq/docs/security.rst b/providers/ibm/mq/docs/security.rst new file mode 100644 index 0000000000000..351ff007ebf2f --- /dev/null +++ b/providers/ibm/mq/docs/security.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: /../../../../devel-common/src/sphinx_exts/includes/security.rst diff --git a/providers/ibm/mq/provider.yaml b/providers/ibm/mq/provider.yaml new file mode 100644 index 0000000000000..b2e2d1b9cdc32 --- /dev/null +++ b/providers/ibm/mq/provider.yaml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-ibm-mq +name: IBM MQ + +state: ready +lifecycle: incubation +source-date-epoch: 1758787200 +description: | + `IBM MQ `__ +# Note that those versions are maintained by release manager - do not update them manually +# with the exception of case where other provider in sources has >= new provider version. +# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider have +# to be done in the same PR +versions: + - 0.1.0 + +integrations: + - integration-name: IBM MQ + external-doc-url: https://www.ibm.com/products/mq + logo: /docs/integration-logos/ibm-mq.png + tags: [apache] + +hooks: + - integration-name: IBM MQ + python-modules: + - airflow.providers.ibm.mq.hooks.mq + +connection-types: + - hook-class-name: airflow.providers.ibm.mq.hooks.mq.IBMMQHook + connection-type: mq + +triggers: + - integration-name: IBM MQ + python-modules: + - airflow.providers.ibm.mq.triggers.mq + +queues: + - airflow.providers.ibm.mq.queues.mq.IBMMQMessageQueueProvider diff --git a/providers/ibm/mq/pyproject.toml b/providers/ibm/mq/pyproject.toml new file mode 100644 index 0000000000000..c761e1fb15263 --- /dev/null +++ b/providers/ibm/mq/pyproject.toml @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + +# IF YOU WANT TO MODIFY THIS FILE EXCEPT DEPENDENCIES, YOU SHOULD MODIFY THE TEMPLATE +# `pyproject_TEMPLATE.toml.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY +[build-system] +requires = ["flit_core==3.12.0"] +build-backend = "flit_core.buildapi" + +[project] +name = "apache-airflow-providers-ibm-mq" +version = "0.1.0" +description = "Provider package apache-airflow-providers-ibm-mq for Apache Airflow" +readme = "README.rst" +license = "Apache-2.0" +license-files = ['LICENSE', 'NOTICE'] +authors = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +maintainers = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +keywords = [ "airflow-provider", "ibm.mq", "airflow", "integration" ] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Framework :: Apache Airflow", + "Framework :: Apache Airflow :: Provider", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: System :: Monitoring", +] +requires-python = ">=3.10" + +# The dependencies should be modified in place in the generated file. +# Any change in the dependencies is preserved when the file is regenerated +# Make sure to run ``prek update-providers-dependencies --all-files`` +# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` +dependencies = [ + "apache-airflow>=2.11.0", + "apache-airflow-providers-common-messaging>=2.0.0", + "importlib-resources>=1.3", +] + +# The optional dependencies should be modified in place in the generated file +# Any change in the dependencies is preserved when the file is regenerated +[project.optional-dependencies] +"ibmmq" = [ + # Required at Runtime + "ibmmq>=2.0.4", +] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] + +[dependency-groups] +dev = [ + "apache-airflow", + "apache-airflow-task-sdk", + "apache-airflow-devel-common", + "apache-airflow-providers-common-compat", + "apache-airflow-providers-common-messaging", + # Additional devel dependencies (do not remove this line and add extra development dependencies) +] + +# To build docs: +# +# uv run --group docs build-docs +# +# To enable auto-refreshing build with server: +# +# uv run --group docs build-docs --autobuild +# +# To see more options: +# +# uv run --group docs build-docs --help +# +docs = [ + "apache-airflow-devel-common[docs]" +] + +[tool.uv.sources] +# These names must match the names as defined in the pyproject.toml of the workspace items, +# *not* the workspace folder paths +apache-airflow = {workspace = true} +apache-airflow-devel-common = {workspace = true} +apache-airflow-task-sdk = {workspace = true} +apache-airflow-providers-common-sql = {workspace = true} +apache-airflow-providers-standard = {workspace = true} + +[project.urls] +"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-ibm-mq/0.1.0" +"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-ibm-mq/0.1.0/changelog.html" +"Bug Tracker" = "https://github.com/apache/airflow/issues" +"Source Code" = "https://github.com/apache/airflow" +"Slack Chat" = "https://s.apache.org/airflow-slack" +"Mastodon" = "https://fosstodon.org/@airflow" +"YouTube" = "https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/" + +[project.entry-points."apache_airflow_provider"] +provider_info = "airflow.providers.ibm.mq.get_provider_info:get_provider_info" + +[tool.flit.module] +name = "airflow.providers.ibm.mq" diff --git a/providers/ibm/mq/src/airflow/__init__.py b/providers/ibm/mq/src/airflow/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/src/airflow/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/src/airflow/providers/__init__.py b/providers/ibm/mq/src/airflow/providers/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/src/airflow/providers/ibm/__init__.py b/providers/ibm/mq/src/airflow/providers/ibm/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/LICENSE b/providers/ibm/mq/src/airflow/providers/ibm/mq/LICENSE new file mode 100644 index 0000000000000..11069edd79019 --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/__init__.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/__init__.py new file mode 100644 index 0000000000000..f809790763f0c --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/__init__.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE +# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES. +# +# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE +# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY +# +from __future__ import annotations + +import packaging.version + +from airflow import __version__ as airflow_version + +__all__ = ["__version__"] + +__version__ = "0.1.0" + +if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( + "2.11.0" +): + raise RuntimeError( + f"The package `apache-airflow-providers-ibm-mq:{__version__}` needs Apache Airflow 2.11.0+" + ) diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/get_provider_info.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/get_provider_info.py new file mode 100644 index 0000000000000..b5cf84bc43329 --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/get_provider_info.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! +# +# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE +# `get_provider_info_TEMPLATE.py.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + + +def get_provider_info(): + return { + "package-name": "apache-airflow-providers-ibm-mq", + "name": "IBM MQ", + "description": "`IBM MQ `__\n", + "integrations": [ + { + "integration-name": "IBM MQ", + "external-doc-url": "https://www.ibm.com/products/mq", + "logo": "/docs/integration-logos/ibm-mq.png", + "tags": ["apache"], + } + ], + "hooks": [{"integration-name": "IBM MQ", "python-modules": ["airflow.providers.ibm.mq.hooks.mq"]}], + "connection-types": [ + {"hook-class-name": "airflow.providers.ibm.mq.hooks.mq.IBMMQHook", "connection-type": "mq"} + ], + "triggers": [ + {"integration-name": "IBM MQ", "python-modules": ["airflow.providers.ibm.mq.triggers.mq"]} + ], + "queues": ["airflow.providers.ibm.mq.queues.mq.IBMMQMessageQueueProvider"], + } diff --git a/airflow-core/src/airflow/_shared/__init__.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/__init__.py similarity index 100% rename from airflow-core/src/airflow/_shared/__init__.py rename to providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/__init__.py diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py new file mode 100644 index 0000000000000..e709ca5861d0e --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +import json +from contextlib import asynccontextmanager, suppress +from typing import TYPE_CHECKING, Any + +from asgiref.sync import sync_to_async + +from airflow.providers.common.compat.connection import get_async_connection +from airflow.sdk.bases.hook import BaseHook + +if TYPE_CHECKING: + from airflow.providers.common.compat.sdk import Connection + + +class IBMMQHook(BaseHook): + conn_name_attr = "conn_id" + default_conn_name = "mq_default" + conn_type = "mq" + hook_name = "IBM MQ" + + def __init__(self, conn_id: str = default_conn_name): + super().__init__() + self.conn_id = conn_id + self._conn = None + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom UI field behaviour for IBM MQ Connection.""" + return { + "hidden_fields": ["schema"], + "placeholders": { + "host": "mq.example.com", + "port": "1414", + "login": "app_user", + "extra": json.dumps( + { + "queue_manager": "QM1", + "channel": "DEV.APP.SVRCONN", + }, + indent=2, + ), + }, + } + + @classmethod + def _connect(cls, conn: Connection): + import ibmmq + + csp = ibmmq.CSP() + csp.CSPUserId = conn.login + csp.CSPPassword = conn.password + + config = conn.extra_dejson + + return ibmmq.connect( + config["queue_manager"], + config["channel"], + f"{conn.host}({conn.port})", + csp=csp, + ) + + @asynccontextmanager + async def get_conn(self): + connection = await get_async_connection(conn_id=self.conn_id) + conn = None + try: + conn = self._connect(connection) + yield conn + finally: + if conn: + with suppress(Exception): + conn.disconnect() + + @classmethod + def _process_message(cls, message): + import ibmmq + + try: + rfh2 = ibmmq.RFH2() + rfh2.unpack(message) + payload_offset = rfh2.get_length() + payload = message[payload_offset:] + return payload.decode("utf-8", errors="ignore") + except Exception: + return message + + async def consume(self, queue_name: str, poll_interval: float = 5) -> str | None: + """ + Wait for a single message from the specified IBM MQ queue and return its decoded payload. + + If the MQ connection is lost or another recoverable error occurs, the method logs the + issue and exits so that the next trigger instance can attempt to reconnect. + + :param queue_name: Name of the IBM MQ queue to consume messages from. + :param poll_interval: Interval in seconds used to wait for messages and to control + how long the underlying MQ get operation blocks before checking again. + :return: The decoded message payload if a message is received, otherwise ``None``. + """ + import ibmmq + + od = ibmmq.OD() + od.ObjectName = queue_name + + md = ibmmq.MD() + md.Format = ibmmq.CMQC.MQFMT_STRING + md.CodedCharSetId = 1208 + md.Encoding = ibmmq.CMQC.MQENC_NATIVE + + gmo = ibmmq.GMO() + gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | ibmmq.CMQC.MQGMO_CONVERT + gmo.WaitInterval = int(poll_interval * 1000) + + try: + async with self.get_conn() as qmgr: + q = ibmmq.Queue( + qmgr, + od, + ibmmq.CMQC.MQOO_INPUT_AS_Q_DEF, + ) + + async_get = sync_to_async(q.get) + + try: + while True: + try: + message = await asyncio.wait_for( + async_get(None, md, gmo), + timeout=poll_interval + 1, + ) + + if message: + return self._process_message(message) + + except ibmmq.MQMIError as e: + if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE: + await asyncio.sleep(poll_interval) + continue + if e.reason == ibmmq.CMQC.MQRC_CONNECTION_BROKEN: + self.log.warning( + "MQ connection broken, will exit consume; next trigger instance will reconnect" + ) + return None + raise + + finally: + with suppress(Exception): + q.close() + + except asyncio.CancelledError: + raise + except Exception: + self.log.exception("MQ consume failed, exiting; next trigger instance will retry") + return None + + async def produce(self, queue_name: str, payload: str) -> None: + """ + Put a message onto the specified IBM MQ queue. + + This method connects to the configured MQ queue manager and sends the + provided payload as a UTF-8 encoded message to the given queue. + + :param queue_name: Name of the IBM MQ queue to which the message should be sent. + :param payload: Message payload to send. The payload will be encoded as UTF-8 + before being placed on the queue. + :return: None + """ + import ibmmq + + od = ibmmq.OD() + od.ObjectName = queue_name + + md = ibmmq.MD() + md.Format = ibmmq.CMQC.MQFMT_STRING + md.CodedCharSetId = 1208 + md.Encoding = ibmmq.CMQC.MQENC_NATIVE + + async with self.get_conn() as qmgr: + q = ibmmq.Queue( + qmgr, + od, + ibmmq.CMQC.MQOO_OUTPUT, + ) + + async_put = sync_to_async(q.put) + + try: + await async_put(payload.encode("utf-8"), md) + finally: + with suppress(Exception): + q.close() diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/__init__.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/mq.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/mq.py new file mode 100644 index 0000000000000..117f2ea52fd2d --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/queues/mq.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import re +from typing import Any +from urllib.parse import urlparse + +from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider +from airflow.providers.ibm.mq.triggers.mq import AwaitMessageTrigger +from airflow.providers.ibm.mq.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.triggers.base import BaseEventTrigger +else: + from airflow.triggers.base import BaseTrigger as BaseEventTrigger # type: ignore + + +# [START queue_regexp] +QUEUE_REGEXP = r"^mq://" +# [END queue_regexp] + + +class IBMMQMessageQueueProvider(BaseMessageQueueProvider): + """ + Configuration for IBM MQ integration with common-messaging. + + [START ibmmq_message_queue_provider_description] + + * It uses ``mq`` as scheme for identifying IBM MQ queues. + * For parameter definitions take a look at + :class:`~airflow.providers.ibm.mq.triggers.mq.AwaitMessageTrigger`. + + .. code-block:: python + + from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger + from airflow.sdk import Asset, AssetWatcher + + trigger = MessageQueueTrigger( + queue="mq://mq_default/MY.QUEUE.NAME", + ) + + asset = Asset("mq_topic_asset", watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)]) + + [END ibmmq_message_queue_provider_description] + """ + + scheme = "mq" + + def queue_matches(self, queue: str) -> bool: + return bool(re.match(QUEUE_REGEXP, queue)) + + def trigger_class(self) -> type[BaseEventTrigger]: + return AwaitMessageTrigger + + def trigger_kwargs(self, queue: str, **kwargs) -> dict[str, Any]: + """ + Parse URI of format: + mq:/// + """ + parsed = urlparse(queue) + + if not parsed.netloc: + raise ValueError( + "MQ URI must contain connection id. Expected format: mq:///" + ) + + conn_id = parsed.netloc + + queue_name = parsed.path.lstrip("/") + if not queue_name: + raise ValueError("MQ URI must contain queue name. Expected format: mq:///") + + return { + "mq_conn_id": conn_id, + "queue_name": queue_name, + "poll_interval": kwargs.get("poll_interval", 5), + } diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/__init__.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/mq.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/mq.py new file mode 100644 index 0000000000000..bc6ecd37efe31 --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/mq.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +from typing import Any + +from airflow.providers.ibm.mq.hooks.mq import IBMMQHook +from airflow.providers.ibm.mq.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.triggers.base import TriggerEvent + +if AIRFLOW_V_3_0_PLUS: + from airflow.triggers.base import BaseEventTrigger +else: + from airflow.triggers.base import BaseTrigger as BaseEventTrigger # type: ignore + + +class AwaitMessageTrigger(BaseEventTrigger): + def __init__( + self, + mq_conn_id: str, + queue_name: str, + poll_interval: float = 5, + ) -> None: + super().__init__() + self.mq_conn_id = mq_conn_id + self.queue_name = queue_name + self.poll_interval = poll_interval + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + f"{self.__class__.__module__}.{self.__class__.__name__}", + { + "mq_conn_id": self.mq_conn_id, + "queue_name": self.queue_name, + "poll_interval": self.poll_interval, + }, + ) + + async def run(self): + try: + event = await IBMMQHook(self.mq_conn_id).consume( + queue_name=self.queue_name, + poll_interval=self.poll_interval, + ) + yield TriggerEvent(event) + except asyncio.CancelledError: + return diff --git a/providers/ibm/mq/src/airflow/providers/ibm/mq/version_compat.py b/providers/ibm/mq/src/airflow/providers/ibm/mq/version_compat.py new file mode 100644 index 0000000000000..0956edd21112f --- /dev/null +++ b/providers/ibm/mq/src/airflow/providers/ibm/mq/version_compat.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", +] diff --git a/providers/ibm/mq/tests/conftest.py b/providers/ibm/mq/tests/conftest.py new file mode 100644 index 0000000000000..f56ccce0a3f69 --- /dev/null +++ b/providers/ibm/mq/tests/conftest.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +pytest_plugins = "tests_common.pytest_plugin" diff --git a/providers/ibm/mq/tests/system/__init__.py b/providers/ibm/mq/tests/system/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/tests/system/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/tests/system/ibm/__init__.py b/providers/ibm/mq/tests/system/ibm/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/tests/system/ibm/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/tests/system/ibm/mq/__init__.py b/providers/ibm/mq/tests/system/ibm/mq/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/ibm/mq/tests/system/ibm/mq/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/ibm/mq/tests/system/ibm/mq/example_dag_message_queue_trigger.py b/providers/ibm/mq/tests/system/ibm/mq/example_dag_message_queue_trigger.py new file mode 100644 index 0000000000000..c24fcba2052ce --- /dev/null +++ b/providers/ibm/mq/tests/system/ibm/mq/example_dag_message_queue_trigger.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +# [START howto_trigger_message_queue] +from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger +from airflow.sdk import DAG, Asset, AssetWatcher, task + +# Define a trigger that listens to an external message queue (IBM MQ in this case) +trigger = MessageQueueTrigger( + queue="mq://mq_default/MY.QUEUE.NAME", +) + +mq_topic_asset = Asset( + "mq_topic_asset", + watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)], +) + +with DAG(dag_id="example_ibm_mq_watcher", schedule=[mq_topic_asset]) as dag: + + @task + def process_message(**context): + for event in context["triggering_asset_events"][mq_topic_asset]: + # Get the message from the TriggerEvent payload + print("Processing event: ", event) + payload = event["payload"] + print("Actual payload: ", payload) +# [END howto_trigger_message_queue] + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/ibm/mq/tests/unit/__init__.py b/providers/ibm/mq/tests/unit/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/tests/unit/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/tests/unit/ibm/__init__.py b/providers/ibm/mq/tests/unit/ibm/__init__.py new file mode 100644 index 0000000000000..e8fd22856438c --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/providers/ibm/mq/tests/unit/ibm/mq/__init__.py b/providers/ibm/mq/tests/unit/ibm/mq/__init__.py new file mode 100644 index 0000000000000..289afa96fdc1c --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/__init__.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +try: + import ibmmq # real one might exist +except ModuleNotFoundError: + import sys + from unittest.mock import MagicMock + + class MQMIError(Exception): + def __init__(self, msg="", reason=None): + super().__init__(msg) + self.reason = reason + + fake_ibmmq = MagicMock() + fake_ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE = 2033 + fake_ibmmq.CMQC.MQRC_CONNECTION_BROKEN = 2009 + fake_ibmmq.CMQC.MQGMO_WAIT = 1 + fake_ibmmq.CMQC.MQGMO_NO_SYNCPOINT = 2 + fake_ibmmq.CMQC.MQGMO_CONVERT = 4 + fake_ibmmq.MQMIError = MQMIError + fake_ibmmq.OD = MagicMock() + fake_ibmmq.MD = MagicMock() + fake_ibmmq.GMO = MagicMock() + fake_ibmmq.CSP = MagicMock() + fake_ibmmq.Queue = MagicMock() + fake_ibmmq.connect = MagicMock() + fake_ibmmq.RFH2 = MagicMock() + + sys.modules["ibmmq"] = fake_ibmmq diff --git a/providers/ibm/mq/tests/unit/ibm/mq/hooks/__init__.py b/providers/ibm/mq/tests/unit/ibm/mq/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/ibm/mq/tests/unit/ibm/mq/hooks/test_mq.py b/providers/ibm/mq/tests/unit/ibm/mq/hooks/test_mq.py new file mode 100644 index 0000000000000..77e013d319980 --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/hooks/test_mq.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from airflow.models import Connection +from airflow.providers.ibm.mq.hooks.mq import IBMMQHook + +MQ_PAYLOAD = """RFH x"MQSTR jms_map topic://localhost/topic17721219474762414D5143514D4941303054202020202069774D7092F81057Llocal26.01.00 4topic {}""" + + +async def fake_get(*args, **kwargs): + import ibmmq + + raise ibmmq.MQMIError("connection broken", reason=ibmmq.CMQC.MQRC_CONNECTION_BROKEN) + + +@pytest.mark.asyncio +class TestIBMMQHook: + """Tests for the IBM MQ hook.""" + + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + # Add a valid MQ connection + create_connection_without_db( + Connection( + conn_id="mq_conn", + conn_type="mq", + host="mq.example.com", + login="user", + password="pass", + port=1414, + extra='{"queue_manager": "QM1", "channel": "DEV.APP.SVRCONN"}', + ) + ) + self.hook = IBMMQHook("mq_conn") + + @patch("airflow.providers.ibm.mq.hooks.mq.get_async_connection", new_callable=AsyncMock) + @patch("ibmmq.connect") + @patch("ibmmq.Queue") + @patch("airflow.providers.ibm.mq.hooks.mq.sync_to_async") + async def test_consume_message( + self, mock_sync_to_async, mock_queue_class, mock_connect, mock_get_async_conn + ): + """Test consuming a single message.""" + + # Mock connection and queue + mock_qmgr = MagicMock() + mock_get_async_conn.return_value = MagicMock() # Connection object for _connect + mock_connect.return_value = mock_qmgr + + # Mock queue instance + mock_queue = MagicMock() + mock_queue_class.return_value = mock_queue + + # Simulate async get returning a message once, then None + async def fake_get(*args, **kwargs): + return MQ_PAYLOAD.format("test message").encode() + + mock_sync_to_async.return_value = fake_get + + result = await self.hook.consume(queue_name="QUEUE1", poll_interval=0.1) + assert isinstance(result, str) + assert "test message" in result + + mock_connect.assert_called_once() # connection established + mock_queue_class.assert_called_once_with( + mock_qmgr, + mock.ANY, + mock.ANY, + ) + + @patch("airflow.providers.ibm.mq.hooks.mq.get_async_connection", new_callable=AsyncMock) + @patch("ibmmq.connect") + @patch("ibmmq.Queue") + @patch("airflow.providers.ibm.mq.hooks.mq.sync_to_async") + async def test_produce_message( + self, mock_sync_to_async, mock_queue_class, mock_connect, mock_get_async_conn + ): + """Test producing a message to the queue.""" + + mock_qmgr = MagicMock() + mock_get_async_conn.return_value = MagicMock() + mock_connect.return_value = mock_qmgr + + mock_queue = MagicMock() + mock_queue_class.return_value = mock_queue + + async def fake_put(msg, md): + assert isinstance(msg, bytes) + assert b"payload" in msg + + mock_sync_to_async.return_value = fake_put + + await self.hook.produce(queue_name="QUEUE1", payload="payload") + + mock_connect.assert_called_once() + mock_queue_class.assert_called_once_with( + mock_qmgr, + mock.ANY, + mock.ANY, + ) + mock_sync_to_async.assert_called_once() # ensure async put is wrapped + + @patch("airflow.providers.ibm.mq.hooks.mq.get_async_connection", new_callable=AsyncMock) + @patch("ibmmq.connect") + @patch("ibmmq.Queue") + @patch("airflow.providers.ibm.mq.hooks.mq.sync_to_async") + async def test_consume_connection_broken( + self, mock_sync_to_async, mock_queue_class, mock_connect, mock_get_async_conn, caplog + ): + """Test that consume exits gracefully on connection broken.""" + + mock_get_async_conn.return_value = MagicMock() + mock_qmgr = MagicMock() + mock_connect.return_value = mock_qmgr + mock_queue = MagicMock() + mock_queue_class.return_value = mock_queue + mock_sync_to_async.return_value = fake_get + + result = await self.hook.consume(queue_name="QUEUE1", poll_interval=0.1) + assert result is None + assert "MQ connection broken, will exit consume; next trigger instance will reconnect" in caplog.text diff --git a/providers/ibm/mq/tests/unit/ibm/mq/queues/__init__.py b/providers/ibm/mq/tests/unit/ibm/mq/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/ibm/mq/tests/unit/ibm/mq/queues/test_mq.py b/providers/ibm/mq/tests/unit/ibm/mq/queues/test_mq.py new file mode 100644 index 0000000000000..4f43a10d99b37 --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/queues/test_mq.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.providers.ibm.mq.triggers.mq import AwaitMessageTrigger + +pytest.importorskip("airflow.providers.common.messaging.providers.base_provider") + + +class TestIBMMQMessageQueueProvider: + """Tests for IBMMQMessageQueueProvider.""" + + def setup_method(self): + """Set up the test environment.""" + from airflow.providers.ibm.mq.queues.mq import IBMMQMessageQueueProvider + + self.provider = IBMMQMessageQueueProvider() + + def test_queue_create(self): + """Test the creation of the provider.""" + from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider + + assert isinstance(self.provider, BaseMessageQueueProvider) + + @pytest.mark.parametrize( + ("queue_uri", "expected_result"), + [ + pytest.param("mq://mq_conn/QUEUE1", True, id="valid_mq_uri"), + pytest.param("http://example.com", False, id="http_url"), + pytest.param("not-a-url", False, id="invalid_url"), + ], + ) + def test_queue_matches(self, queue_uri, expected_result): + """Test the queue_matches method with various URLs.""" + assert self.provider.queue_matches(queue_uri) == expected_result + + @pytest.mark.parametrize( + ("scheme", "expected_result"), + [ + pytest.param("kafka", False, id="kafka_scheme"), + pytest.param("mq", True, id="mq_scheme"), + pytest.param("redis+pubsub", False, id="redis_scheme"), + pytest.param("sqs", False, id="sqs_scheme"), + pytest.param("unknown", False, id="unknown_scheme"), + ], + ) + def test_scheme_matches(self, scheme, expected_result): + """Test the scheme_matches method with various schemes.""" + assert self.provider.scheme_matches(scheme) == expected_result + + def test_trigger_class(self): + """Test the trigger_class method.""" + assert self.provider.trigger_class() == AwaitMessageTrigger + + @pytest.mark.parametrize( + ("queue_uri", "extra_kwargs", "expected_result"), + [ + pytest.param( + "mq://my_conn/QUEUE1", + {}, + { + "mq_conn_id": "my_conn", + "queue_name": "QUEUE1", + "poll_interval": 5, + }, + id="default_poll_interval", + ), + pytest.param( + "mq://my_conn/QUEUE1", + {"poll_interval": 60}, + { + "mq_conn_id": "my_conn", + "queue_name": "QUEUE1", + "poll_interval": 60, + }, + id="override_poll_interval", + ), + ], + ) + def test_trigger_kwargs_valid_cases(self, queue_uri, extra_kwargs, expected_result): + """Test the trigger_kwargs method with valid parameters.""" + kwargs = self.provider.trigger_kwargs(queue_uri, **extra_kwargs) + assert kwargs == expected_result + + @pytest.mark.parametrize( + ("queue_uri", "expected_error", "error_match"), + [ + pytest.param( + "mq:///QUEUE1", + ValueError, + "MQ URI must contain connection id", + id="missing_conn_id", + ), + pytest.param( + "mq://my_conn/", + ValueError, + "MQ URI must contain queue name", + id="missing_queue_name", + ), + ], + ) + def test_trigger_kwargs_error_cases(self, queue_uri, expected_error, error_match): + """Test that trigger_kwargs raises appropriate errors with invalid parameters.""" + with pytest.raises(expected_error, match=error_match): + self.provider.trigger_kwargs(queue_uri) diff --git a/providers/ibm/mq/tests/unit/ibm/mq/triggers/__init__.py b/providers/ibm/mq/tests/unit/ibm/mq/triggers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/triggers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/ibm/mq/tests/unit/ibm/mq/triggers/test_mq.py b/providers/ibm/mq/tests/unit/ibm/mq/triggers/test_mq.py new file mode 100644 index 0000000000000..f6ce0e02c30fd --- /dev/null +++ b/providers/ibm/mq/tests/unit/ibm/mq/triggers/test_mq.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +import pytest + +from airflow.providers.ibm.mq.hooks.mq import IBMMQHook +from airflow.providers.ibm.mq.triggers.mq import AwaitMessageTrigger +from airflow.triggers.base import TriggerEvent + + +async def fake_get(*args, **kwargs): + import ibmmq + + raise ibmmq.MQMIError("connection broken", reason=ibmmq.CMQC.MQRC_CONNECTION_BROKEN) + + +class TestMQTrigger: + @pytest.mark.asyncio + async def test_trigger_serialization(self): + trigger = AwaitMessageTrigger( + mq_conn_id="mq_default", + queue_name="QUEUE1", + poll_interval=2, + ) + assert isinstance(trigger, AwaitMessageTrigger) + + classpath, kwargs = trigger.serialize() + assert classpath == "airflow.providers.ibm.mq.triggers.mq.AwaitMessageTrigger" + assert kwargs == { + "mq_conn_id": "mq_default", + "queue_name": "QUEUE1", + "poll_interval": 2, + } + + @pytest.mark.asyncio + @patch.object(IBMMQHook, "consume", return_value="test message") + async def test_trigger_run_message_yielded(self, mock_consume): + trigger = AwaitMessageTrigger( + mq_conn_id="mq_default", + queue_name="QUEUE1", + poll_interval=0.1, + ) + + event = await anext(trigger.run()) + assert isinstance(event, TriggerEvent) + assert event.payload == "test message" + mock_consume.assert_called_once_with(queue_name="QUEUE1", poll_interval=0.1) + + @pytest.mark.asyncio + @patch("airflow.providers.ibm.mq.hooks.mq.sync_to_async", return_value=fake_get) + @patch("airflow.providers.ibm.mq.hooks.mq.get_async_connection", new_callable=AsyncMock) + @patch("ibmmq.Queue") + async def test_trigger_run_none_on_connection_error( + self, mock_queue, mock_get_async_conn, mock_sync_to_async, caplog + ): + """Test that the trigger yields None when consume encounters a connection problem.""" + + trigger = AwaitMessageTrigger( + mq_conn_id="mq_default", + queue_name="QUEUE1", + poll_interval=0.1, + ) + + with caplog.at_level("WARNING"): + event = await anext(trigger.run()) + + # The trigger yields a TriggerEvent with payload None because consume handles the exception + assert isinstance(event, TriggerEvent) + assert event.payload is None + + # The consume warning log should be present + assert "MQ connection broken" in caplog.text diff --git a/pyproject.toml b/pyproject.toml index b83c2fcd07d23..9daade9b1a582 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -240,6 +240,9 @@ packages = [] "http" = [ "apache-airflow-providers-http>=4.13.2" ] +"ibm.mq" = [ + "apache-airflow-providers-ibm-mq>=0.1.0" # Set from local provider pyproject.toml +] "imap" = [ "apache-airflow-providers-imap>=3.8.0" ] @@ -439,6 +442,7 @@ packages = [] "apache-airflow-providers-grpc>=3.7.0", "apache-airflow-providers-hashicorp>=4.0.0", "apache-airflow-providers-http>=4.13.2", + "apache-airflow-providers-ibm-mq>=0.1.0", # Set from local provider pyproject.toml "apache-airflow-providers-imap>=3.8.0", "apache-airflow-providers-influxdb>=2.8.0", "apache-airflow-providers-informatica>=0.1.1", @@ -1142,6 +1146,8 @@ mypy_path = [ "$MYPY_CONFIG_FILE_DIR/providers/hashicorp/tests", "$MYPY_CONFIG_FILE_DIR/providers/http/src", "$MYPY_CONFIG_FILE_DIR/providers/http/tests", + "$MYPY_CONFIG_FILE_DIR/providers/ibm/mq/src", + "$MYPY_CONFIG_FILE_DIR/providers/ibm/mq/tests", "$MYPY_CONFIG_FILE_DIR/providers/imap/src", "$MYPY_CONFIG_FILE_DIR/providers/imap/tests", "$MYPY_CONFIG_FILE_DIR/providers/influxdb/src", @@ -1416,6 +1422,7 @@ apache-airflow-providers-google = { workspace = true } apache-airflow-providers-grpc = { workspace = true } apache-airflow-providers-hashicorp = { workspace = true } apache-airflow-providers-http = { workspace = true } +apache-airflow-providers-ibm-mq = { workspace = true } apache-airflow-providers-imap = { workspace = true } apache-airflow-providers-influxdb = { workspace = true } apache-airflow-providers-informatica = { workspace = true } @@ -1545,6 +1552,7 @@ members = [ "providers/grpc", "providers/hashicorp", "providers/http", + "providers/ibm/mq", "providers/imap", "providers/influxdb", "providers/informatica", diff --git a/scripts/ci/docker-compose/tests-sources.yml b/scripts/ci/docker-compose/tests-sources.yml index eb9da3cacebf5..caab1b2a7a19c 100644 --- a/scripts/ci/docker-compose/tests-sources.yml +++ b/scripts/ci/docker-compose/tests-sources.yml @@ -89,6 +89,7 @@ services: - ../../../providers/grpc/tests:/opt/airflow/providers/grpc/tests - ../../../providers/hashicorp/tests:/opt/airflow/providers/hashicorp/tests - ../../../providers/http/tests:/opt/airflow/providers/http/tests + - ../../../providers/ibm/mq/tests:/opt/airflow/providers/ibm/mq/tests - ../../../providers/imap/tests:/opt/airflow/providers/imap/tests - ../../../providers/influxdb/tests:/opt/airflow/providers/influxdb/tests - ../../../providers/informatica/tests:/opt/airflow/providers/informatica/tests