diff --git a/auto_backup/README.rst b/auto_backup/README.rst index 60c77d98a0a..6ec83d05dae 100644 --- a/auto_backup/README.rst +++ b/auto_backup/README.rst @@ -1,7 +1,3 @@ -.. image:: https://odoo-community.org/readme-banner-image - :target: https://odoo-community.org/get-involved?utm_source=readme - :alt: Odoo Community Association - ==================== Database Auto-Backup ==================== @@ -17,7 +13,7 @@ Database Auto-Backup .. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png :target: https://odoo-community.org/page/development-status :alt: Beta -.. |badge2| image:: https://img.shields.io/badge/license-AGPL--3-blue.png +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html :alt: License: AGPL-3 .. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--tools-lightgray.png?logo=github @@ -73,6 +69,18 @@ a path and everything will be backed up automatically. This is done through an SSH (encrypted) tunnel, thanks to pysftp, so your data is safe! +Secure SFTP connection with host key verification +------------------------------------------------- + +A new optional field **Host Public Key** has been added. + +- Paste the exact public key of your SFTP server (you can obtain it with + `ssh-keyscan -t rsa,ecdsa,ed25519 your.sftp.server`). +- When filled, Odoo will **verify the server identity** and refuse the connection if the key does not match → protects against man-in-the-middle attacks. +- Leave empty → old behaviour (no host key checking, backward compatible). + +The "Test SFTP Connection" button now also validates the host key. + Test connection ~~~~~~~~~~~~~~~ diff --git a/auto_backup/models/db_backup.py b/auto_backup/models/db_backup.py index b8517bae55a..6b688654378 100644 --- a/auto_backup/models/db_backup.py +++ b/auto_backup/models/db_backup.py @@ -7,11 +7,13 @@ import os import shutil import traceback +from base64 import decodebytes from contextlib import contextmanager from datetime import datetime, timedelta from glob import iglob import pysftp +from paramiko import DSSKey, ECDSAKey, Ed25519Key, HostKeys, RSAKey from odoo import _, api, exceptions, fields, models, tools from odoo.exceptions import UserError @@ -94,6 +96,17 @@ class DbBackup(models.Model): help="Choose the format for this backup.", ) + host_public_key = fields.Text( + help=( + "Public key of the remote SFTP server " + "(SSH RSA/ED25519 key in OpenSSH format). " + "If set, verifies the server identity to prevent " + "man-in-the-middle attacks. " + "Leave empty for no verification (less secure). " + "Can be obtained with e.g. 'ssh-keyscan 202.54.1.5'" + ), + ) + @api.model def _default_folder(self): """Default to ``backups`` folder inside current server datadir.""" @@ -283,21 +296,52 @@ def filename(when, ext="zip"): ) def sftp_connection(self): - """Return a new SFTP connection with found parameters.""" - self.ensure_one() - params = { - "host": self.sftp_host, - "username": self.sftp_user, - "port": self.sftp_port, - } - _logger.debug( - "Trying to connect to sftp://%(username)s@%(host)s:%(port)d", extra=params - ) - if self.sftp_private_key: - params["private_key"] = self.sftp_private_key - if self.sftp_password: - params["private_key_pass"] = self.sftp_password + cnopts = pysftp.CnOpts() + if self.host_public_key: + # Strict mode: Load only the provided key (no default known_hosts) + try: + parts = self.host_public_key.strip().split(None, 2) + if len(parts) < 2: + raise UserError( + _( + "Invalid host public key format. " + "Expected: 'ssh-rsa AAAAB3Nza...'" + ) + ) + key_type = parts[0] + key_b64 = parts[1] + data = decodebytes(key_b64.encode("ascii")) + if key_type == "ssh-rsa": + key = RSAKey(data=data) + elif key_type == "ssh-dss": + key = DSSKey(data=data) + elif key_type.startswith("ecdsa-sha2-nistp"): + key = ECDSAKey(data=data) + elif key_type == "ssh-ed25519": + key = Ed25519Key(data=data) + else: + raise ValueError(_("Unsupported key type: %s") % key_type) + # Use Paramiko's HostKeys directly (pysftp-compatible) + cnopts.hostkeys = HostKeys() + cnopts.hostkeys.add(self.sftp_host, key_type, key) + except Exception as e: + raise UserError(_("Error loading host public key: %s") % str(e)) from e else: - params["password"] = self.sftp_password - - return pysftp.Connection(**params) + # Disable checking explicitly to avoid known_hosts warnings + cnopts.hostkeys = None + if self.sftp_private_key: + return pysftp.Connection( + host=self.sftp_host, + username=self.sftp_user, + port=self.sftp_port or 22, + private_key=self.sftp_private_key, + private_key_pass=self.sftp_password or None, + cnopts=cnopts, + ) + return pysftp.Connection( + host=self.sftp_host, + username=self.sftp_user, + port=self.sftp_port or 22, + password=self.sftp_password, + cnopts=cnopts, + ) diff --git a/auto_backup/readme/USAGE.rst b/auto_backup/readme/USAGE.rst index bc3607e22a4..56dda90b52c 100644 --- a/auto_backup/readme/USAGE.rst +++ b/auto_backup/readme/USAGE.rst @@ -15,6 +15,18 @@ a path and everything will be backed up automatically. This is done through an SSH (encrypted) tunnel, thanks to pysftp, so your data is safe! +Secure SFTP connection with host key verification +------------------------------------------------- + +A new optional field **Host Public Key** has been added. + +- Paste the exact public key of your SFTP server (you can obtain it with + `ssh-keyscan -t rsa,ecdsa,ed25519 your.sftp.server`). +- When filled, Odoo will **verify the server identity** and refuse the connection if the key does not match → protects against man-in-the-middle attacks. +- Leave empty → old behaviour (no host key checking, backward compatible). + +The "Test SFTP Connection" button now also validates the host key. + Test connection ~~~~~~~~~~~~~~~ diff --git a/auto_backup/static/description/index.html b/auto_backup/static/description/index.html index ff75e85f681..0801fec30c3 100644 --- a/auto_backup/static/description/index.html +++ b/auto_backup/static/description/index.html @@ -3,7 +3,7 @@ -README.rst +Database Auto-Backup -
+
+

Database Auto-Backup

- - -Odoo Community Association - -
-

Database Auto-Backup

-

Beta License: AGPL-3 OCA/server-tools Translate me on Weblate Try me on Runboat

+

Beta License: AGPL-3 OCA/server-tools Translate me on Weblate Try me on Runboat

A tool for all your back-ups, internal and external!

Table of contents

-

Installation

+

Installation

Before installing this module, you need to execute:

 pip3 install pysftp==0.2.9
 
-

Configuration

+

Configuration

Go to Settings -> Database Structure -> Automated Backup to create your configurations for each database that you needed to backups.

-

Usage

+

Usage

Keep your Odoo data safe with this module. Take automated back-ups, remove them automatically and even write them to an external server through an encrypted tunnel. You can even specify how long local backups and external backups should be kept, automatically!

-

Connect with an FTP Server

+

Connect with an FTP Server

-

Keep your data safe, through an SSH tunnel!

+

Keep your data safe, through an SSH tunnel!

Want to go even further and write your backups to an external server? You can with this module! Specify the credentials to the server, specify a path and everything will be backed up automatically. This is done through an SSH (encrypted) tunnel, thanks to pysftp, so your data is safe!

+
+

Secure SFTP connection with host key verification

+

A new optional field Host Public Key has been added.

+
    +
  • Paste the exact public key of your SFTP server (you can obtain it with +ssh-keyscan -t rsa,ecdsa,ed25519 your.sftp.server).
  • +
  • When filled, Odoo will verify the server identity and refuse the connection if the key does not match → protects against man-in-the-middle attacks.
  • +
  • Leave empty → old behaviour (no host key checking, backward compatible).
  • +
+

The “Test SFTP Connection” button now also validates the host key.

+
-

Test connection

+

Test connection

-

Checks your credentials in one click

+

Checks your credentials in one click

Want to make sure if the connection details are correct and if Odoo can automatically write them to the remote server? Simply click on the ‘Test SFTP Connection’ button and you will get message telling you if @@ -448,15 +455,15 @@

Checks your credentials in one cl

-

E-mail on backup failure

+

E-mail on backup failure

-

Stay informed of problems, automatically!

+

Stay informed of problems, automatically!

Do you want to know if the database backup succeeded or failed? Subscribe to the corresponding backup setting notification type.

-

Run backups when you want

+

Run backups when you want

From the backups configuration list, press More > Execute backup(s) to manually execute the selected processes.

@@ -465,7 +472,7 @@

Run backups when you want

-

Known issues / Roadmap

+

Known issues / Roadmap

  • On larger databases, it is possible that backups will die due to Odoo server settings. In order to circumvent this without frivolously changing settings, @@ -475,7 +482,7 @@

    Known issues / Roadmap

-

Bug Tracker

+

Bug Tracker

Bugs are tracked on GitHub Issues. In case of trouble, please check there if your issue has already been reported. If you spotted it first, help us to smash it by providing a detailed and welcomed @@ -483,9 +490,9 @@

Bug Tracker

Do not contact contributors directly about support or help with technical issues.

-

Credits

+

Credits

-

Authors

+

Authors

  • Yenthe Van Ginneken
  • Agile Business Group
  • @@ -495,7 +502,7 @@

    Authors

-

Contributors

+

Contributors

-

Maintainers

+

Maintainers

This module is maintained by the OCA.

Odoo Community Association @@ -522,6 +529,5 @@

Maintainers

-
diff --git a/auto_backup/tests/test_db_backup.py b/auto_backup/tests/test_db_backup.py index b7dc6582bc2..60abbd4dfd2 100644 --- a/auto_backup/tests/test_db_backup.py +++ b/auto_backup/tests/test_db_backup.py @@ -11,6 +11,7 @@ from unittest.mock import PropertyMock, patch import pysftp +from paramiko import HostKeys from odoo import tools from odoo.exceptions import UserError @@ -56,13 +57,23 @@ def patch_filtered_sftp(self, record): with patch("%s.sftp_connection" % class_name): yield filtered - def new_record(self, method="sftp"): - vals = {"name": "Têst backup", "method": method, "days_to_keep": 1} + def new_record(self, method="sftp", name_suffix=None): + name = "Test backup" + if name_suffix is not None: + name += f" {name_suffix}" + elif self.env.registry.in_test_mode(): # always true in tests + # make name unique per call + name += f" {id(self)}-{datetime.now().microsecond}" + vals = { + "name": name, + "method": method, + "days_to_keep": 1, + } if method == "sftp": vals.update( { "sftp_host": "test_host", - "sftp_port": "222", + "sftp_port": 222, "sftp_user": "tuser", "sftp_password": "password", "folder": "/folder/", @@ -73,7 +84,7 @@ def new_record(self, method="sftp"): def test_compute_name_sftp(self): """It should create proper SFTP URI""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="compute_name_sftp") self.assertEqual( "sftp://%(user)s@%(host)s:%(port)s%(folder)s" % { @@ -87,7 +98,7 @@ def test_compute_name_sftp(self): def test_check_folder(self): """It should not allow recursive backups""" - rec_id = self.new_record("local") + rec_id = self.new_record("local", name_suffix="check_folder") with self.assertRaises(UserError): rec_id.write( { @@ -100,7 +111,7 @@ def test_check_folder(self): def test_action_sftp_test_connection_success(self, _): """It should raise connection succeeded warning""" with patch("%s.sftp_connection" % class_name, new_callable=PropertyMock): - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_sftp_test_connection_success") with self.assertRaises(UserError): rec_id.action_sftp_test_connection() _.assert_called_once_with("Connection Test Succeeded!") @@ -111,7 +122,7 @@ def _test_action_sftp_test_connection_fail(self, _): with patch( "%s.sftp_connection" % class_name, new_callable=PropertyMock ) as conn: - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_sftp_test_connection_fail") conn().side_effect = TestConnectionException with self.assertRaises(UserError): rec_id.action_sftp_test_connection() @@ -119,7 +130,7 @@ def _test_action_sftp_test_connection_fail(self, _): def test_action_backup_local(self): """It should backup local database""" - rec_id = self.new_record("local") + rec_id = self.new_record("local", name_suffix="backup_local") filename = rec_id.filename(datetime.now()) rec_id.action_backup() generated_backup = [f for f in os.listdir(rec_id.folder) if f >= filename] @@ -127,7 +138,7 @@ def test_action_backup_local(self): def test_action_backup_local_cleanup(self): """Backup local database and cleanup old databases""" - rec_id = self.new_record("local") + rec_id = self.new_record("local", name_suffix="backup_local_cleanup") old_date = datetime.now() - timedelta(days=3) filename = rec_id.filename(old_date) with patch("%s.datetime" % model) as mock_date: @@ -143,21 +154,21 @@ def test_action_backup_local_cleanup(self): def _test_action_backup_sftp_mkdirs(self): """It should create remote dirs""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_backup_sftp_mkdirs") with self.mock_assets(): with self.patch_filtered_sftp(rec_id): with patch("%s.cleanup" % class_name, new_callable=PropertyMock): - conn = rec_id.sftp_connection().__enter__() + conn = rec_id.sftp_connection() rec_id.action_backup() conn.makedirs.assert_called_once_with(rec_id.folder) def _test_action_backup_sftp_mkdirs_conn_exception(self): """It should guard from ConnectionException on remote.mkdirs""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_backup_sftp_mkdirs_conn_exception") with self.mock_assets(): with self.patch_filtered_sftp(rec_id): with patch("%s.cleanup" % class_name, new_callable=PropertyMock): - conn = rec_id.sftp_connection().__enter__() + conn = rec_id.sftp_connection() conn.makedirs.side_effect = TestConnectionException rec_id.action_backup() # No error was raised, test pass @@ -165,58 +176,64 @@ def _test_action_backup_sftp_mkdirs_conn_exception(self): def test_action_backup_sftp_remote_open(self): """It should open remote file w/ proper args""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="backup_sftp_remote_open") with self.mock_assets() as assets: - with self.patch_filtered_sftp(rec_id): - with patch("%s.cleanup" % class_name, new_callable=PropertyMock): - conn = rec_id.sftp_connection().__enter__() + with patch("%s.cleanup" % class_name, new_callable=PropertyMock): + with patch("%s.sftp_connection" % class_name) as mock_sftp_conn: + # Create a proper mock for the SFTP connection context manager + mock_remote = mock_sftp_conn.return_value.__enter__.return_value rec_id.action_backup() - conn.open.assert_called_once_with(assets["os"].path.join(), "wb") + mock_remote.open.assert_called_once_with( + assets["os"].path.join(), "wb" + ) def test_action_backup_all_search(self): """It should search all records""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_backup_all_search") with patch("%s.search" % class_name, new_callable=PropertyMock): rec_id.action_backup_all() rec_id.search.assert_called_once_with([]) def test_action_backup_all_return(self): """It should return result of backup operation""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="action_backup_all_return") with patch("%s.search" % class_name, new_callable=PropertyMock): res = rec_id.action_backup_all() self.assertEqual(rec_id.search().action_backup(), res) @patch("%s.pysftp" % model) - def test_sftp_connection_init_passwd(self, pysftp): - """It should initiate SFTP connection w/ proper args and pass""" - rec_id = self.new_record() - rec_id.sftp_connection() - pysftp.Connection.assert_called_once_with( - host=rec_id.sftp_host, - username=rec_id.sftp_user, - port=rec_id.sftp_port, - password=rec_id.sftp_password, + def test_sftp_connection(self, pysftp): + """It should create the SFTP connection with correct arguments and cnopts""" + + # 1. Password authentication + rec_pwd = self.new_record(name_suffix="pwd") # <-- unique name + rec_pwd.sftp_connection() + pysftp.Connection.assert_called_with( + host=rec_pwd.sftp_host, + username=rec_pwd.sftp_user, + port=rec_pwd.sftp_port, + password=rec_pwd.sftp_password, + cnopts=pysftp.CnOpts.return_value, ) - @patch("%s.pysftp" % model) - def test_sftp_connection_init_key(self, pysftp): - """It should initiate SFTP connection w/ proper args and key""" - rec_id = self.new_record() - rec_id.write({"sftp_private_key": "pkey", "sftp_password": "pkeypass"}) - rec_id.sftp_connection() - pysftp.Connection.assert_called_once_with( - host=rec_id.sftp_host, - username=rec_id.sftp_user, - port=rec_id.sftp_port, - private_key=rec_id.sftp_private_key, - private_key_pass=rec_id.sftp_password, + # 2. Private key authentication + pysftp.reset_mock() + rec_key = self.new_record(name_suffix="key") # <-- unique name + rec_key.write({"sftp_private_key": "/fake/key", "sftp_password": "keypass"}) + rec_key.sftp_connection() + pysftp.Connection.assert_called_with( + host=rec_key.sftp_host, + username=rec_key.sftp_user, + port=rec_key.sftp_port, + private_key=rec_key.sftp_private_key, + private_key_pass=rec_key.sftp_password, + cnopts=pysftp.CnOpts.return_value, ) @patch("%s.pysftp" % model) def test_sftp_connection_return(self, pysftp): """It should return new sftp connection""" - rec_id = self.new_record() + rec_id = self.new_record(name_suffix="sftp_connection_return") res = rec_id.sftp_connection() self.assertEqual( pysftp.Connection(), @@ -240,3 +257,47 @@ def test_filename_dump(self): now = datetime.now() res = self.Model.filename(now, ext="dump") self.assertTrue(res.endswith(".dump")) + + def test_sftp_host_key_verification(self): + """Test all host public key scenarios: valid, invalid, and disabled""" + backup = self.new_record(name_suffix="hostkey") + + # ------------------------------------------------------------------ + # 1. Valid key → HostKeys populated correctly, connection created + # ------------------------------------------------------------------ + with patch("pysftp.Connection") as mock_conn: + valid_ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCqGKukO1De7zhZj6+H0qtjTkVxwTCpvKe4eCZ0FPqri0cb2JZfXJ/DgYSF6vUpwmJG8wVQZKjeGcjDOL5UlsuusFncCzWBQ7RKNUSesmQRMSGkVb1/3j+skZ6UtW+5u09lHNsj6tQ51s1SPrCBkedbNf0Tp0GbMJDyR4e9T04ZZwIDAQAB" # noqa: B950 + backup.host_public_key = valid_ssh_key + backup.sftp_connection() # Parses real key, no exception + + # Verify cnopts.hostkeys has the entry for our host + cnopts = mock_conn.call_args.kwargs["cnopts"] + self.assertIsInstance(cnopts.hostkeys, HostKeys) + self.assertIn(backup.sftp_host, cnopts.hostkeys) + keys = cnopts.hostkeys.lookup(backup.sftp_host) + self.assertIsNotNone(keys) + self.assertEqual(len(keys), 1) + + # ------------------------------------------------------------------ + # 2. Invalid format (too few parts) → "Invalid host public key" + # ------------------------------------------------------------------ + backup.host_public_key = "invalid" # len(parts) < 2 → ValueError + with self.assertRaises(UserError) as cm: + backup.sftp_connection() + self.assertIn("Invalid host public key", str(cm.exception)) + + # ------------------------------------------------------------------ + # 3. Valid format but bad base64 (padding error) → + # "Error loading host public key" + # ------------------------------------------------------------------ + backup.host_public_key = "ssh-rsa ABC" # decodebytes() fails → binascii.Error + with self.assertRaises(UserError) as cm: + backup.sftp_connection() + self.assertIn("Error loading host public key", str(cm.exception)) + + # ------------------------------------------------------------------ + # 4. Empty key → cnopts.hostkeys = None (backward compatible, no warning) + # ------------------------------------------------------------------ + with patch("pysftp.Connection") as mock_conn: + backup.host_public_key = None + backup.sftp_connection() diff --git a/auto_backup/view/db_backup_view.xml b/auto_backup/view/db_backup_view.xml index d4dc9a95168..a370d1d9253 100644 --- a/auto_backup/view/db_backup_view.xml +++ b/auto_backup/view/db_backup_view.xml @@ -37,6 +37,10 @@ name="sftp_private_key" placeholder="/home/odoo/.ssh/id_rsa" /> +