Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ class SampleHandle:
def push_task_name(self, task_name: StringType) -> None: ...
def push_threadinfo(self, thread_id: int, thread_native_id: int, thread_name: StringType) -> None: ...
def push_walltime(self, value: int, count: int) -> None: ...

def get_process_role() -> Optional[str]: ...
5 changes: 5 additions & 0 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
from ddtrace.internal.datadog.profiling._types import StringType
from ddtrace.internal.datadog.profiling.code_provenance import get_code_provenance_file
from ddtrace.internal.datadog.profiling.util import sanitize_string
from ddtrace.internal.runtime import get_process_role
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.settings._agent import config as agent_config

Expand Down Expand Up @@ -415,6 +416,10 @@ def upload(tracer: Optional[Tracer] = ddtrace.tracer, enable_code_provenance: Op
call_func_with_str(ddup_set_runtime_id, get_runtime_id())
ddup_set_process_id()

role = get_process_role()
if role is not None:
call_ddup_config_user_tag("process_type", role)

processor = tracer._endpoint_call_counter_span_processor
endpoint_counts, endpoint_to_span_ids = processor.reset()

Expand Down
15 changes: 15 additions & 0 deletions ddtrace/internal/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

__all__ = [
"get_ancestor_runtime_id",
"get_process_role",
"get_runtime_id",
"get_parent_runtime_id",
"get_runtime_propagation_envs",
Expand Down Expand Up @@ -82,6 +83,20 @@ def get_parent_runtime_id() -> t.Optional[str]:
return _PARENT_RUNTIME_ID


def get_process_role() -> t.Optional[str]:
"""Return the role of this process in a forking framework.

Returns ``'worker'`` if this process was forked from a parent (or spawned
as a child via multiprocessing), ``'main'`` if this process has forked
worker children, or ``None`` for a single-process application.
"""
if _PARENT_RUNTIME_ID is not None:
return "worker"
if forksafe.has_forked():
return "main"
return None


def get_runtime_propagation_envs() -> dict[str, str]:
"""Return session lineage env vars to inject into child process environments.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
features:
- |
profiling: Profiles generated from fork-based servers now include a ``process_type``
tag with the value ``main`` or ``worker``.
9 changes: 6 additions & 3 deletions tests/profiling/test_gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def debug_print(*args: Any) -> None:

RunGunicornFunc: TypeAlias = Callable[..., subprocess.Popen[bytes]]

# Use /dev/shm for Linux; fall back to /tmp on macOS and other platforms.
_WORKER_TMP_DIR = "/dev/shm" if os.path.isdir("/dev/shm") else "/tmp"


def _run_gunicorn(*args: str, app: str = "tests.profiling.gunicorn-app:app") -> subprocess.Popen[bytes]:
cmd = (
Expand All @@ -47,7 +50,7 @@ def _run_gunicorn(*args: str, app: str = "tests.profiling.gunicorn-app:app") ->
"--bind",
"127.0.0.1:7644",
"--worker-tmp-dir",
"/dev/shm",
_WORKER_TMP_DIR,
"-c",
os.path.dirname(__file__) + "/gunicorn.conf.py",
"--chdir",
Expand Down Expand Up @@ -270,7 +273,7 @@ def test_gunicorn_gevent_sigterm_graceful_shutdown(monkeypatch: pytest.MonkeyPat
"--bind",
bind_addr,
"--worker-tmp-dir",
"/dev/shm",
_WORKER_TMP_DIR,
"-k",
"gevent",
"-w",
Expand Down Expand Up @@ -350,7 +353,7 @@ def test_gunicorn_profile_export_count_two_workers_flush_false(
"--bind",
"127.0.0.1:7644",
"--worker-tmp-dir",
"/dev/shm",
_WORKER_TMP_DIR,
"-c",
os.path.dirname(__file__) + "/gunicorn.conf.py",
"--chdir",
Expand Down
184 changes: 184 additions & 0 deletions tests/profiling/test_process_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Tests for process_type tag in profiling fork scenarios.

Verifies that the profiler's ``get_process_role()`` helper returns the correct
value in Gunicorn-style pre-fork, uWSGI postfork, and multiprocessing spawn
scenarios, so that ``ddup.upload()`` can emit the right ``process_type`` tag
(``'main'`` or ``'worker'``) to the Datadog intake.
"""

from __future__ import annotations

from typing import Generator

import pytest

from ddtrace.profiling import profiler


@pytest.fixture(autouse=True)
def _reset_profiler_active_instance() -> Generator[None, None, None]:
yield
profiler.Profiler._active_instance = None


# ---------------------------------------------------------------------------
# Gunicorn-style pre-fork: main forks workers
# ---------------------------------------------------------------------------


@pytest.mark.subprocess(env={"PYTHONWARNINGS": "ignore::DeprecationWarning"})
def test_gunicorn_style_fork_parent_is_main() -> None:
"""After forking Gunicorn workers the main process reports role 'main'."""
import os

from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None

child = os.fork()
if child == 0:
os._exit(0)

os.waitpid(child, 0)
assert get_process_role() == "main", get_process_role()


@pytest.mark.subprocess(env={"PYTHONWARNINGS": "ignore::DeprecationWarning"})
def test_gunicorn_style_fork_child_is_worker() -> None:
"""Gunicorn worker process (forked child) reports role 'worker'."""
import os

from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None

child = os.fork()
if child == 0:
assert get_process_role() == "worker", get_process_role()
os._exit(0)

_, status = os.waitpid(child, 0)
assert os.WEXITSTATUS(status) == 0


@pytest.mark.subprocess(env={"PYTHONWARNINGS": "ignore::DeprecationWarning"})
def test_gunicorn_style_multiple_workers_all_report_worker() -> None:
"""Each of N Gunicorn workers reports 'worker'; main reports 'main' after all forks."""
import os

from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None

pids = []
for _ in range(2):
pid = os.fork()
if pid == 0:
assert get_process_role() == "worker", get_process_role()
os._exit(0)
pids.append(pid)

for pid in pids:
_, status = os.waitpid(pid, 0)
assert os.WEXITSTATUS(status) == 0

assert get_process_role() == "main", get_process_role()


# ---------------------------------------------------------------------------
# uWSGI postfork simulation: profiler started in worker via _start_on_fork()
# ---------------------------------------------------------------------------


def test_uwsgi_postfork_worker_role_via_mock(monkeypatch: pytest.MonkeyPatch) -> None:
"""uWSGI postfork worker: get_process_role() returns 'worker' when parent runtime ID is set.

Uses the same monkeypatching pattern as test_uwsgi.py: simulate the main
process by raising uWSGIMasterProcess, then call _start_on_fork() as the
postfork callback would. The role check verifies the detection primitive
under mock.
"""
import ddtrace.internal.runtime as _runtime_mod

def _raise_main(*args: object, **kwargs: object) -> None:
raise profiler.uwsgi.uWSGIMasterProcess()

monkeypatch.setattr(profiler.uwsgi, "check_uwsgi", _raise_main)

# Simulate what happens to _PARENT_RUNTIME_ID after a real fork in a worker.
monkeypatch.setattr(_runtime_mod, "_PARENT_RUNTIME_ID", "fake-parent-id")

from ddtrace.internal.runtime import get_process_role

assert get_process_role() == "worker"

p = profiler.Profiler()
p.start()
p._start_on_fork()
assert profiler.Profiler._active_instance is p
p.stop(flush=False)


def test_uwsgi_main_role_via_mock(monkeypatch: pytest.MonkeyPatch) -> None:
"""uWSGI main process: get_process_role() returns None before any fork."""
from ddtrace.internal.runtime import get_process_role

# Main process has no parent runtime ID and has not yet forked (no workers yet).
assert get_process_role() is None


# ---------------------------------------------------------------------------
# multiprocessing spawn / forkserver (env-var seeded _DD_PARENT_PY_SESSION_ID)
# ---------------------------------------------------------------------------


@pytest.mark.subprocess(
env={
"_DD_PARENT_PY_SESSION_ID": "parent-session-id-spawn",
"DD_TRACE_SUBPROCESS_ENABLED": "false",
}
)
def test_spawn_child_is_worker() -> None:
"""Multiprocessing spawn/forkserver child (env-var parent session id) reports 'worker'."""
from ddtrace.internal.runtime import get_process_role

assert get_process_role() == "worker", get_process_role()


# ---------------------------------------------------------------------------
# Single process: no role (no tag emitted)
# ---------------------------------------------------------------------------


def test_single_process_no_role() -> None:
"""Standalone single-process application: get_process_role() returns None."""
from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None


# ---------------------------------------------------------------------------
# ddup module wiring: get_process_role is imported into the _ddup namespace
# so that upload() can call it.
# ---------------------------------------------------------------------------


def test_get_process_role_available_in_ddup_namespace() -> None:
"""get_process_role must be importable from the _ddup extension module.

upload() looks it up by name from its module globals at call time. This
test confirms the symbol is present in the Cython module's namespace so that
a subsequent monkeypatch in integration tests can override it.
"""
try:
import ddtrace.internal.datadog.profiling.ddup._ddup as _ddup_mod
except ImportError:
pytest.skip("ddup native extension not available")

assert hasattr(_ddup_mod, "get_process_role"), (
"get_process_role not found in _ddup module namespace; "
"upload() would call the wrong function after a monkeypatch"
)
from ddtrace.internal.runtime import get_process_role

assert _ddup_mod.get_process_role is get_process_role
55 changes: 55 additions & 0 deletions tests/tracer/runtime/test_runtime_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,58 @@ def test_parent_runtime_id():

_, status = os.waitpid(child, 0)
assert os.WEXITSTATUS(status) == 42


def test_get_process_role_single_process() -> None:
"""Single-process application: get_process_role() returns None."""
from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None


@pytest.mark.subprocess(env={"PYTHONWARNINGS": "ignore::DeprecationWarning"})
def test_get_process_role_fork_child() -> None:
"""Forked child process: get_process_role() returns 'worker'."""
import os

from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None

child = os.fork()
if child == 0:
assert get_process_role() == "worker", get_process_role()
os._exit(0)

_, status = os.waitpid(child, 0)
assert os.WEXITSTATUS(status) == 0


@pytest.mark.subprocess(env={"PYTHONWARNINGS": "ignore::DeprecationWarning"})
def test_get_process_role_fork_parent() -> None:
"""Parent process after forking a child: get_process_role() returns 'main'."""
import os

from ddtrace.internal.runtime import get_process_role

assert get_process_role() is None

child = os.fork()
if child == 0:
os._exit(0)

os.waitpid(child, 0)
assert get_process_role() == "main", get_process_role()


@pytest.mark.subprocess(
env={
"_DD_PARENT_PY_SESSION_ID": "some-parent-session-id",
"DD_TRACE_SUBPROCESS_ENABLED": "false",
}
)
def test_get_process_role_spawn_child() -> None:
"""Multiprocessing spawn/forkserver child (env-var seeded): returns 'worker'."""
from ddtrace.internal.runtime import get_process_role

assert get_process_role() == "worker", get_process_role()
Loading