Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ composer-dev create example-local-environment \
--dags-path example_directory/dags
```

## Remote debugging
To facilitate remote debugging in the local Airflow environment, use the `--enable-ssh` flag.
This flag allows SSH access into the container, enabling you to debug your DAGs directly.
For SSH connection, use the `airflow` user. The default password for this user is `airflow`.
If you need to change this default password, set the `COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD` environment variable
in the variables.env file.

## Enable the container user to access mounted files and directories from the host

By default, the Composer container runs as the user `airflow` with UID 999. The user needs to have access the files and
Expand Down
30 changes: 29 additions & 1 deletion composer_local_dev/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
},
{
"name": "Environment options",
"options": ["--web-server-port", "--dags-path"],
"options": [
"--web-server-port",
"--dags-path",
"--enable-ssh",
"--ssh-port",
],
},
],
"composer-dev start": [COMMON_OPTIONS],
Expand Down Expand Up @@ -173,6 +178,21 @@ def cli():
metavar="PORT",
)

option_enable_ssh = click.option(
"--enable-ssh",
is_flag=True,
default=False,
help="Enable SSH daemon in the environment.",
metavar="ENABLE_SSH",
)

option_ssh_port = click.option(
"--ssh-port",
type=click.IntRange(min=0, max=65535),
help="Port used by SSH daemon",
show_default="read from the configuration file",
metavar="SSHD_PORT",
)

required_environment = click.argument(
"environment",
Expand Down Expand Up @@ -218,6 +238,8 @@ def cli():
)
@option_location
@option_port
@option_enable_ssh
@option_ssh_port
@click.option(
"--dags-path",
help="Path to DAGs folder. If it does not exist, it will be created.",
Expand All @@ -235,6 +257,8 @@ def create(
project: Optional[str],
location: str,
web_server_port: Optional[int],
enable_ssh: Optional[bool],
ssh_port: Optional[int],
environment: str,
verbose: bool,
debug: bool,
Expand Down Expand Up @@ -291,6 +315,8 @@ def create(
location=location,
env_dir_path=env_dir,
web_server_port=web_server_port,
enable_ssh=enable_ssh,
ssh_port=ssh_port,
dags_path=dags_path,
)
else:
Expand All @@ -300,6 +326,8 @@ def create(
location=location,
env_dir_path=env_dir,
port=web_server_port,
enable_ssh=enable_ssh,
ssh_port=ssh_port,
dags_path=dags_path,
)
env.create()
Expand Down
15 changes: 15 additions & 0 deletions composer_local_dev/docker_files/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,24 @@ run_airflow_as_airflow_user() {
exec airflow webserver
}

install_and_run_sshd() {
echo "Installing sshd"
if ! command -v /usr/sbin/sshd &> /dev/null
then
sudo apt-get -qq update && sudo DEBIAN_FRONTEND=noninteractive apt-get -qqy install openssh-server > /dev/null 2>&1
sudo mkdir /run/sshd
echo "airflow:${COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD}" | sudo chpasswd
fi
sudo /usr/sbin/sshd
}

main() {
init_airflow

if [ "${COMPOSER_CONTAINER_ENABLE_SSHD}" = "True" ]; then
install_and_run_sshd
fi

if [ "${COMPOSER_CONTAINER_RUN_AS_HOST_USER}" = "True" ]; then
run_airflow_as_host_user
else
Expand Down
53 changes: 46 additions & 7 deletions composer_local_dev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_image_mounts(


def get_default_environment_variables(
dag_dir_list_interval: int, project_id: str
dag_dir_list_interval: int, project_id: str, enable_ssh: bool = False
) -> Dict:
"""Return environment variables that will be set inside container."""
return {
Expand All @@ -92,6 +92,8 @@ def get_default_environment_variables(
# By default, the container runs as the user `airflow` with UID 999. Set
# this env variable to "True" to make it run as the current host user.
"COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False",
"COMPOSER_CONTAINER_ENABLE_SSHD": str(enable_ssh),
"COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow",
"COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}",
"COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}",
"AIRFLOW_HOME": "/home/airflow/airflow",
Expand Down Expand Up @@ -348,7 +350,12 @@ def get_environments_status(


class EnvironmentConfig:
def __init__(self, env_dir_path: pathlib.Path, port: Optional[int]):
def __init__(
self,
env_dir_path: pathlib.Path,
port: Optional[int],
ssh_port: Optional[int] = None,
):
self.env_dir_path = env_dir_path
self.config = self.load_configuration_from_file()
self.project_id = self.get_str_param("composer_project_id")
Expand All @@ -363,6 +370,12 @@ def __init__(self, env_dir_path: pathlib.Path, port: Optional[int]):
if port is not None
else self.parse_int_param("port", allowed_range=(0, 65536))
)
self.enable_ssh = self.get_str_param("enable_ssh")
self.ssh_port = (
ssh_port
if ssh_port is not None
else self.parse_int_param("ssh_port", allowed_range=(0, 65536))
)

def load_configuration_from_file(self) -> Dict:
"""
Expand Down Expand Up @@ -439,6 +452,8 @@ def __init__(
dags_path: Optional[str],
dag_dir_list_interval: int = 10,
port: Optional[int] = None,
enable_ssh: Optional[bool] = False,
ssh_port: Optional[int] = None,
pypi_packages: Optional[Dict] = None,
environment_vars: Optional[Dict] = None,
):
Expand All @@ -455,6 +470,8 @@ def __init__(
self.dags_path = files.resolve_dags_path(dags_path, env_dir_path)
self.dag_dir_list_interval = dag_dir_list_interval
self.port: int = port if port is not None else 8080
self.enable_ssh: bool = enable_ssh
self.ssh_port: int = ssh_port if ssh_port is not None else 10022
self.pypi_packages = (
pypi_packages if pypi_packages is not None else dict()
)
Expand Down Expand Up @@ -493,9 +510,14 @@ def get_container(
raise errors.EnvironmentNotFoundError() from None

@classmethod
def load_from_config(cls, env_dir_path: pathlib.Path, port: Optional[int]):
def load_from_config(
cls,
env_dir_path: pathlib.Path,
port: Optional[int],
ssh_port: Optional[int] = None,
):
"""Create local environment using 'config.json' configuration file."""
config = EnvironmentConfig(env_dir_path, port)
config = EnvironmentConfig(env_dir_path, port, ssh_port)
environment_vars = load_environment_variables(env_dir_path)

return cls(
Expand All @@ -506,6 +528,8 @@ def load_from_config(cls, env_dir_path: pathlib.Path, port: Optional[int]):
dags_path=config.dags_path,
dag_dir_list_interval=config.dag_dir_list_interval,
port=config.port,
enable_ssh=config.enable_ssh,
ssh_port=config.ssh_port,
environment_vars=environment_vars,
)

Expand All @@ -518,6 +542,8 @@ def from_source_environment(
env_dir_path: pathlib.Path,
web_server_port: Optional[int],
dags_path: Optional[str],
enable_ssh: Optional[bool] = False,
ssh_port: Optional[int] = None,
):
"""
Create Environment using configuration retrieved from Composer
Expand All @@ -541,6 +567,8 @@ def from_source_environment(
dags_path=dags_path,
dag_dir_list_interval=10,
port=web_server_port,
enable_ssh=enable_ssh,
ssh_port=ssh_port,
pypi_packages=pypi_packages,
environment_vars=env_variables,
)
Expand Down Expand Up @@ -580,6 +608,8 @@ def write_environment_config_to_config_file(self):
"dags_path": self.dags_path,
"dag_dir_list_interval": int(self.dag_dir_list_interval),
"port": int(self.port),
"enable_ssh": bool(self.enable_ssh),
"ssh_port": int(self.ssh_port),
}
with open(self.env_dir_path / "config.json", "w") as fp:
json.dump(config, fp, indent=4)
Expand All @@ -597,16 +627,25 @@ def create_docker_container(self):
self.requirements_file,
)
default_vars = get_default_environment_variables(
self.dag_dir_list_interval, self.project_id
self.dag_dir_list_interval, self.project_id, self.enable_ssh
)
env_vars = {**default_vars, **self.environment_vars}

if platform.system() == "Windows" and env_vars["COMPOSER_CONTAINER_RUN_AS_HOST_USER"] == "True":
raise Exception("COMPOSER_CONTAINER_RUN_AS_HOST_USER must be set to `False` on Windows")
if (
platform.system() == "Windows"
and env_vars["COMPOSER_CONTAINER_RUN_AS_HOST_USER"] == "True"
):
raise Exception(
"COMPOSER_CONTAINER_RUN_AS_HOST_USER must be set to `False` on Windows"
)

ports = {
f"8080/tcp": self.port,
}

if env_vars["COMPOSER_CONTAINER_ENABLE_SSHD"] == "True":
ports[f"22/tcp"] = self.ssh_port

entrypoint = f"sh {constants.ENTRYPOINT_PATH}"
memory_limit = constants.DOCKER_CONTAINER_MEMORY_LIMIT

Expand Down
4 changes: 1 addition & 3 deletions composer_local_dev/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ def __init__(
self,
param_name: str,
value: int,
int_range: Tuple[
int,
],
int_range: Tuple[int,],
):
if len(int_range) == 1:
allowed_range = f"x>={int_range[0]}"
Expand Down
17 changes: 16 additions & 1 deletion tests/unit/test_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,19 @@ def test_from_image(
],
)
@pytest.mark.parametrize("port", [None, 8090])
@pytest.mark.parametrize("ssh_port", [None, 2222])
@pytest.mark.parametrize("enable_ssh", [False, True])
@mock.patch("composer_local_dev.environment.docker.from_env")
@mock.patch("composer_local_dev.environment.assert_image_exists")
def test_create_and_load_from_config(
self, mocked_docker, mocked_assert, pypi_packages, port, tmp_path
self,
mocked_docker,
mocked_assert,
pypi_packages,
port,
enable_ssh,
ssh_port,
tmp_path,
):
env_dir_path = tmp_path / ".compose" / "my_env"
image_version = "composer-2.0.8-airflow-2.2.3"
Expand All @@ -398,6 +407,8 @@ def test_create_and_load_from_config(
dags_path=str(pathlib.Path(tmp_path)),
dag_dir_list_interval=10,
port=port,
enable_ssh=enable_ssh,
ssh_port=ssh_port,
pypi_packages=pypi_packages,
)
expected_env.create()
Expand Down Expand Up @@ -565,6 +576,8 @@ def test_create_docker_container(self, mocked_mounts, default_env):
"AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE": "True",
"COMPOSER_PYTHON_VERSION": "3",
"COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False",
"COMPOSER_CONTAINER_ENABLE_SSHD": "False",
"COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow",
"COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}",
"COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}",
"AIRFLOW_HOME": "/home/airflow/airflow",
Expand Down Expand Up @@ -928,6 +941,8 @@ def test_get_environment_variables():
"COMPOSER_PYTHON_VERSION": "3",
"AIRFLOW_HOME": "/home/airflow/airflow",
"COMPOSER_CONTAINER_RUN_AS_HOST_USER": "False",
"COMPOSER_CONTAINER_ENABLE_SSHD": "False",
"COMPOSER_CONTAINER_AIRFLOW_USER_PASSWORD": "airflow",
"COMPOSER_HOST_USER_NAME": f"{getpass.getuser()}",
"COMPOSER_HOST_USER_ID": f"{os.getuid() if platform.system() != 'Windows' else ''}",
"AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT": "google-cloud-platform://?"
Expand Down