Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/62773.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix JWT tokens appearing in task logs by redacting them in structlog and avoiding logging full workload objects.
28 changes: 25 additions & 3 deletions shared/logging/src/airflow_shared/logging/structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,33 @@ def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDi


# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is very likely a JWT
# token. Better safe than sorry
# token. Better safe than sorry. We also redact token-bearing objects (e.g. workload) by dumping
# and recursively redacting sensitive keys and JWT strings so they never appear in logs.
_SENSITIVE_KEYS = frozenset({"token"})
_MAX_REDACT_DEPTH = 5


def _redact_value(val: Any, depth: int) -> Any:
"""Recursively redact JWTs in strings and sensitive key values in dicts; handle Pydantic-like objects."""
if depth > _MAX_REDACT_DEPTH:
return val
if isinstance(val, str):
return re.sub(JWT_PATTERN, "eyJ***", val)
if isinstance(val, dict):
return {k: "***" if k in _SENSITIVE_KEYS else _redact_value(v, depth + 1) for k, v in val.items()}
if isinstance(val, list):
return [_redact_value(x, depth + 1) for x in val]
if hasattr(val, "model_dump"):
try:
return _redact_value(val.model_dump(), depth)
except Exception:
return "<redacted>"
return val


def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
for k, v in event_dict.items():
if isinstance(v, str):
event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
event_dict[k] = _redact_value(v, 0)
return event_dict


Expand Down
46 changes: 45 additions & 1 deletion shared/logging/tests/logging/test_structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
from structlog.processors import CallsiteParameter

from airflow_shared.logging import structlog as structlog_module
from airflow_shared.logging.structlog import configure_logging
from airflow_shared.logging.structlog import (
configure_logging,
redact_jwt,
)

# We don't want to use the caplog fixture in this test, as the main purpose of this file is to capture the
# _rendered_ output of the tests to make sure it is correct.
Expand Down Expand Up @@ -380,3 +383,44 @@ def test_logger_respects_configured_level(structlog_config):

written = sio.getvalue()
assert "[my_logger] Debug message\n" in written


def test_redact_jwt_redacts_string_with_jwt():
"""JWT-like strings are redacted to eyJ***."""
event_dict = {"event": "Auth", "token_str": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.secret"}
out = redact_jwt(None, "info", event_dict)
assert out["token_str"] == "eyJ***"


def test_redact_jwt_redacts_token_in_object_with_model_dump():
"""Values with model_dump() (e.g. Pydantic) have 'token' key and JWT strings redacted."""
event_dict = {"event": "Executing workload", "workload": None}
logger = None
method_name = "info"

class WorkloadLike:
def model_dump(self):
return {"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.secret", "dag_id": "my_dag"}

event_dict["workload"] = WorkloadLike()
out = redact_jwt(logger, method_name, event_dict)
assert out["workload"] == {"token": "***", "dag_id": "my_dag"}


def test_redact_jwt_replaces_non_dumpable_object_with_redacted():
"""Objects with model_dump() that raise are replaced with <redacted>."""

class BadDump:
def model_dump(self):
raise ValueError("nope")

event_dict = {"event": "x", "obj": BadDump()}
out = redact_jwt(None, "info", event_dict)
assert out["obj"] == "<redacted>"


def test_redact_jwt_unchanged_when_no_jwt_or_sensitive():
"""Event dict with plain strings and no workload is unchanged except no JWT to redact."""
event_dict = {"event": "Hello", "key1": "value1"}
out = redact_jwt(None, "info", event_dict)
assert out == event_dict
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ def execute_workload(workload: ExecuteTask) -> None:
if not isinstance(workload, workloads.ExecuteTask):
raise ValueError(f"Executor does not know how to handle {type(workload)}")

log.info("Executing workload", workload=workload)
log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could keep the same logging statement here and instead set the JWT-containing attribute as a pydantic.SecretStr, which will by default redact the field from logs.

"Executing workload",
dag_id=workload.ti.dag_id,
task_id=workload.ti.task_id,
run_id=workload.ti.run_id,
log_path=workload.log_path,
)

base_url = conf.get("api", "base_url", fallback="/")
# If it's a relative URL, use localhost:8080 as the default
Expand Down
Loading