From ecb86b136b8756cdfd33d438f2125b8ce8db5352 Mon Sep 17 00:00:00 2001 From: Mykhailo Alipa Date: Fri, 10 May 2024 16:18:43 +0200 Subject: [PATCH 1/3] WIP --- composer_local_dev/cli.py | 30 ++++++++++- composer_local_dev/docker_files/entrypoint.sh | 12 +++++ composer_local_dev/environment.py | 53 ++++++++++++++++--- composer_local_dev/errors.py | 4 +- tests/unit/test_environment.py | 17 +++++- 5 files changed, 104 insertions(+), 12 deletions(-) diff --git a/composer_local_dev/cli.py b/composer_local_dev/cli.py index cf6ec4f5..f565e00e 100644 --- a/composer_local_dev/cli.py +++ b/composer_local_dev/cli.py @@ -47,7 +47,12 @@ }, { "name": "Environment options", - "options": ["--web-server-port", "--dags-path"], + "options": [ + "--web-server-port", + "--dags-path", + "--enable-ssh", + "--ssh-port", + ], }, ], "composer-dev start": [COMMON_OPTIONS], @@ -173,6 +178,21 @@ def cli(): metavar="PORT", ) +option_enable_ssh = click.option( + "--enable-ssh", + is_flag=True, + default=False, + help="Enable SSH daemon in the environment.", + metavar="ENABLE_SSH", +) + +option_ssh_port = click.option( + "--ssh-port", + type=click.IntRange(min=0, max=65535), + help="Port used by SSH daemon", + show_default="read from the configuration file", + metavar="SSHD_PORT", +) required_environment = click.argument( "environment", @@ -218,6 +238,8 @@ def cli(): ) @option_location @option_port +@option_enable_ssh +@option_ssh_port @click.option( "--dags-path", help="Path to DAGs folder. If it does not exist, it will be created.", @@ -235,6 +257,8 @@ def create( project: Optional[str], location: str, web_server_port: Optional[int], + enable_ssh: Optional[bool], + ssh_port: Optional[int], environment: str, verbose: bool, debug: bool, @@ -291,6 +315,8 @@ def create( location=location, env_dir_path=env_dir, web_server_port=web_server_port, + enable_ssh=enable_ssh, + ssh_port=ssh_port, dags_path=dags_path, ) else: @@ -300,6 +326,8 @@ def create( location=location, env_dir_path=env_dir, port=web_server_port, + enable_ssh=enable_ssh, + ssh_port=ssh_port, dags_path=dags_path, ) env.create() diff --git a/composer_local_dev/docker_files/entrypoint.sh b/composer_local_dev/docker_files/entrypoint.sh index 71163587..a4f9d185 100755 --- a/composer_local_dev/docker_files/entrypoint.sh +++ b/composer_local_dev/docker_files/entrypoint.sh @@ -74,9 +74,21 @@ run_airflow_as_airflow_user() { exec airflow webserver } +install_and_run_sshd() { + echo "Installing and running sshd" + sudo apt-get -qq update && sudo DEBIAN_FRONTEND=noninteractive apt-get -qqy install openssh-server > /dev/null 2>&1 + sudo mkdir /run/sshd + echo "airflow:${COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD}" | sudo chpasswd + sudo /usr/sbin/sshd +} + main() { init_airflow + if [ "${COMPOSER_CONTAINER_ENABLE_SSHD}" = "True" ]; then + install_and_run_sshd + fi + if [ "${COMPOSER_CONTAINER_RUN_AS_HOST_USER}" = "True" ]; then run_airflow_as_host_user else diff --git a/composer_local_dev/environment.py b/composer_local_dev/environment.py index 6e65231b..58633013 100644 --- a/composer_local_dev/environment.py +++ b/composer_local_dev/environment.py @@ -76,7 +76,7 @@ def get_image_mounts( def get_default_environment_variables( - dag_dir_list_interval: int, project_id: str + dag_dir_list_interval: int, project_id: str, enable_ssh: bool = False ) -> Dict: """Return environment variables that will be set inside container.""" return { @@ -92,6 +92,8 @@ def get_default_environment_variables( # By default, the container runs as the user `airflow` with UID 999. Set # this env variable to "True" to make it run as the current host user. "COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False", + "COMPOSER_CONTAINER_ENABLE_SSHD": str(enable_ssh), + "COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow", "COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}", "COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}", "AIRFLOW_HOME": "/home/airflow/airflow", @@ -348,7 +350,12 @@ def get_environments_status( class EnvironmentConfig: - def __init__(self, env_dir_path: pathlib.Path, port: Optional[int]): + def __init__( + self, + env_dir_path: pathlib.Path, + port: Optional[int], + ssh_port: Optional[int] = None, + ): self.env_dir_path = env_dir_path self.config = self.load_configuration_from_file() self.project_id = self.get_str_param("composer_project_id") @@ -363,6 +370,12 @@ def __init__(self, env_dir_path: pathlib.Path, port: Optional[int]): if port is not None else self.parse_int_param("port", allowed_range=(0, 65536)) ) + self.enable_ssh = self.get_str_param("enable_ssh") + self.ssh_port = ( + ssh_port + if ssh_port is not None + else self.parse_int_param("ssh_port", allowed_range=(0, 65536)) + ) def load_configuration_from_file(self) -> Dict: """ @@ -439,6 +452,8 @@ def __init__( dags_path: Optional[str], dag_dir_list_interval: int = 10, port: Optional[int] = None, + enable_ssh: Optional[bool] = False, + ssh_port: Optional[int] = None, pypi_packages: Optional[Dict] = None, environment_vars: Optional[Dict] = None, ): @@ -455,6 +470,8 @@ def __init__( self.dags_path = files.resolve_dags_path(dags_path, env_dir_path) self.dag_dir_list_interval = dag_dir_list_interval self.port: int = port if port is not None else 8080 + self.enable_ssh: bool = enable_ssh + self.ssh_port: int = ssh_port if ssh_port is not None else 10022 self.pypi_packages = ( pypi_packages if pypi_packages is not None else dict() ) @@ -493,9 +510,14 @@ def get_container( raise errors.EnvironmentNotFoundError() from None @classmethod - def load_from_config(cls, env_dir_path: pathlib.Path, port: Optional[int]): + def load_from_config( + cls, + env_dir_path: pathlib.Path, + port: Optional[int], + ssh_port: Optional[int] = None, + ): """Create local environment using 'config.json' configuration file.""" - config = EnvironmentConfig(env_dir_path, port) + config = EnvironmentConfig(env_dir_path, port, ssh_port) environment_vars = load_environment_variables(env_dir_path) return cls( @@ -506,6 +528,8 @@ def load_from_config(cls, env_dir_path: pathlib.Path, port: Optional[int]): dags_path=config.dags_path, dag_dir_list_interval=config.dag_dir_list_interval, port=config.port, + enable_ssh=config.enable_ssh, + ssh_port=config.ssh_port, environment_vars=environment_vars, ) @@ -518,6 +542,8 @@ def from_source_environment( env_dir_path: pathlib.Path, web_server_port: Optional[int], dags_path: Optional[str], + enable_ssh: Optional[bool] = False, + ssh_port: Optional[int] = None, ): """ Create Environment using configuration retrieved from Composer @@ -541,6 +567,8 @@ def from_source_environment( dags_path=dags_path, dag_dir_list_interval=10, port=web_server_port, + enable_ssh=enable_ssh, + ssh_port=ssh_port, pypi_packages=pypi_packages, environment_vars=env_variables, ) @@ -580,6 +608,8 @@ def write_environment_config_to_config_file(self): "dags_path": self.dags_path, "dag_dir_list_interval": int(self.dag_dir_list_interval), "port": int(self.port), + "enable_ssh": bool(self.enable_ssh), + "ssh_port": int(self.ssh_port), } with open(self.env_dir_path / "config.json", "w") as fp: json.dump(config, fp, indent=4) @@ -597,16 +627,25 @@ def create_docker_container(self): self.requirements_file, ) default_vars = get_default_environment_variables( - self.dag_dir_list_interval, self.project_id + self.dag_dir_list_interval, self.project_id, self.enable_ssh ) env_vars = {**default_vars, **self.environment_vars} - if platform.system() == "Windows" and env_vars["COMPOSER_CONTAINER_RUN_AS_HOST_USER"] == "True": - raise Exception("COMPOSER_CONTAINER_RUN_AS_HOST_USER must be set to `False` on Windows") + if ( + platform.system() == "Windows" + and env_vars["COMPOSER_CONTAINER_RUN_AS_HOST_USER"] == "True" + ): + raise Exception( + "COMPOSER_CONTAINER_RUN_AS_HOST_USER must be set to `False` on Windows" + ) ports = { f"8080/tcp": self.port, } + + if env_vars["COMPOSER_CONTAINER_ENABLE_SSHD"] == "True": + ports[f"22/tcp"] = self.ssh_port + entrypoint = f"sh {constants.ENTRYPOINT_PATH}" memory_limit = constants.DOCKER_CONTAINER_MEMORY_LIMIT diff --git a/composer_local_dev/errors.py b/composer_local_dev/errors.py index ef737f03..ca023a9c 100644 --- a/composer_local_dev/errors.py +++ b/composer_local_dev/errors.py @@ -117,9 +117,7 @@ def __init__( self, param_name: str, value: int, - int_range: Tuple[ - int, - ], + int_range: Tuple[int,], ): if len(int_range) == 1: allowed_range = f"x>={int_range[0]}" diff --git a/tests/unit/test_environment.py b/tests/unit/test_environment.py index 94d26fc0..71a7e720 100644 --- a/tests/unit/test_environment.py +++ b/tests/unit/test_environment.py @@ -383,10 +383,19 @@ def test_from_image( ], ) @pytest.mark.parametrize("port", [None, 8090]) + @pytest.mark.parametrize("ssh_port", [None, 2222]) + @pytest.mark.parametrize("enable_ssh", [False, True]) @mock.patch("composer_local_dev.environment.docker.from_env") @mock.patch("composer_local_dev.environment.assert_image_exists") def test_create_and_load_from_config( - self, mocked_docker, mocked_assert, pypi_packages, port, tmp_path + self, + mocked_docker, + mocked_assert, + pypi_packages, + port, + enable_ssh, + ssh_port, + tmp_path, ): env_dir_path = tmp_path / ".compose" / "my_env" image_version = "composer-2.0.8-airflow-2.2.3" @@ -398,6 +407,8 @@ def test_create_and_load_from_config( dags_path=str(pathlib.Path(tmp_path)), dag_dir_list_interval=10, port=port, + enable_ssh=enable_ssh, + ssh_port=ssh_port, pypi_packages=pypi_packages, ) expected_env.create() @@ -565,6 +576,8 @@ def test_create_docker_container(self, mocked_mounts, default_env): "AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE": "True", "COMPOSER_PYTHON_VERSION": "3", "COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False", + "COMPOSER_CONTAINER_ENABLE_SSHD": "False", + "COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow", "COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}", "COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}", "AIRFLOW_HOME": "/home/airflow/airflow", @@ -928,6 +941,8 @@ def test_get_environment_variables(): "COMPOSER_PYTHON_VERSION": "3", "AIRFLOW_HOME": "/home/airflow/airflow", "COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False", + "COMPOSER_CONTAINER_ENABLE_SSHD": "False", + "COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow", "COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}", "COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}", "AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT": "google-cloud-platform://?" From 40bed9f193e00ece65c46a4c1c7fbcc9c5fd8d6b Mon Sep 17 00:00:00 2001 From: Mykhailo Alipa Date: Fri, 10 May 2024 21:00:30 +0200 Subject: [PATCH 2/3] added Remote debugging section to README --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 7fd9d93a..f54ebc48 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,13 @@ composer-dev create example-local-environment \ --dags-path example_directory/dags ``` +## Remote debugging +To facilitate remote debugging in the local Airflow environment, use the `--enable-ssh` flag. +This flag allows SSH access into the container, enabling you to debug your DAGs directly. +For SSH connection, use the `airflow` user. The default password for this user is `airflow`. +If you need to change this default password, set the `COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD` environment variable +in the variables.env file. + ## Enable the container user to access mounted files and directories from the host By default, the Composer container runs as the user `airflow` with UID 999. The user needs to have access the files and From 0356c55279328276176057af2987e92a8ea883ec Mon Sep 17 00:00:00 2001 From: Mykhailo Alipa Date: Fri, 17 May 2024 10:38:19 +0200 Subject: [PATCH 3/3] fix: check if sshd is already installed in container --- composer_local_dev/docker_files/entrypoint.sh | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/composer_local_dev/docker_files/entrypoint.sh b/composer_local_dev/docker_files/entrypoint.sh index a4f9d185..9eeac1a0 100755 --- a/composer_local_dev/docker_files/entrypoint.sh +++ b/composer_local_dev/docker_files/entrypoint.sh @@ -75,10 +75,13 @@ run_airflow_as_airflow_user() { } install_and_run_sshd() { - echo "Installing and running sshd" - sudo apt-get -qq update && sudo DEBIAN_FRONTEND=noninteractive apt-get -qqy install openssh-server > /dev/null 2>&1 - sudo mkdir /run/sshd - echo "airflow:${COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD}" | sudo chpasswd + echo "Installing sshd" + if ! command -v /usr/sbin/sshd &> /dev/null + then + sudo apt-get -qq update && sudo DEBIAN_FRONTEND=noninteractive apt-get -qqy install openssh-server > /dev/null 2>&1 + sudo mkdir /run/sshd + echo "airflow:${COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD}" | sudo chpasswd + fi sudo /usr/sbin/sshd }