Skip to content
Open
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.venv
/venv
__pycache__
**/__pycache__
*.egg-info
.pytest_cache
Expand Down
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
added:
- Added GCP logs in Household and Metadata services to assist further investigation of the 502 errors.
130 changes: 124 additions & 6 deletions policyengine_api/endpoints/household.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
)
from policyengine_api.data import database, local_database
import json
import uuid
from flask import Response, request
from policyengine_api.utils import hash_object
from policyengine_api.constants import COUNTRY_PACKAGE_VERSIONS
Expand All @@ -11,6 +12,9 @@
import json
import logging
from datetime import date
from policyengine_api.structured_logger import get_logger, log_struct

logger = get_logger()
from policyengine_api.utils.payload_validators import validate_country


Expand Down Expand Up @@ -88,14 +92,59 @@ def get_household_under_policy(

api_version = COUNTRY_PACKAGE_VERSIONS.get(country_id)

# Look in computed_households to see if already computed
# Generate a unique request ID for tracing
request_id = uuid.uuid4().hex

# Common context for all logs
log_context = {
"request_id": request_id,
"country_id": country_id,
"household_id": household_id,
"policy_id": policy_id,
"api_version": api_version,
"request_path": request.path,
}

# Log start of request
log_struct(
event="get_household_under_policy_start",
input_data=log_context,
message="Started processing household under policy request.",
severity="INFO",
logger=logger,
)

row = local_database.query(
f"SELECT * FROM computed_household WHERE household_id = ? AND policy_id = ? AND api_version = ?",
(household_id, policy_id, api_version),
).fetchone()
# Look in computed_household cache table
try:
row = local_database.query(
f"SELECT * FROM computed_household WHERE household_id = ? AND policy_id = ? AND api_version = ?",
(household_id, policy_id, api_version),
).fetchone()
except Exception as e:
log_struct(
event="computed_household_query_failed",
input_data=log_context,
message=f"Database query failed: {e}",
severity="ERROR",
)
return Response(
json.dumps(
{
"status": "error",
"message": "Internal server error while querying computed_household.",
}
),
status=500,
mimetype="application/json",
)

if row is not None:
log_struct(
event="cached_computed_household_found",
input_data=log_context,
message="Found precomputed household result in cache.",
severity="INFO",
)
result = dict(
policy_id=row["policy_id"],
household_id=row["household_id"],
Expand All @@ -122,7 +171,21 @@ def get_household_under_policy(
if row is not None:
household = dict(row)
household["household_json"] = json.loads(household["household_json"])
log_struct(
event="household_data_loaded",
input_data=log_context,
message="Loaded household data from DB.",
severity="INFO",
)

else:
log_struct(
event="household_not_found",
input_data=log_context,
message=f"Household #{household_id} not found.",
severity="WARNING",
)

Comment on lines 171 to +188
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question, not blocking: Why not add the try/catch on this and the policy table below?

I'd rather get this over the line, so please don't block on this, just curious on thinking.

response_body = dict(
status="error",
message=f"Household #{household_id} not found.",
Expand Down Expand Up @@ -168,7 +231,22 @@ def get_household_under_policy(
household_id,
policy_id,
)

log_struct(
event="household_calculation_db_success",
input_data=log_context,
message="Household calculation succeeded.",
severity="INFO",
)

except Exception as e:
log_struct(
event="household_calculation_db_failed",
input_data=log_context,
message=f"Calculation failed: {e}",
severity="ERROR",
)

logging.exception(e)
response_body = dict(
status="error",
Expand All @@ -193,7 +271,21 @@ def get_household_under_policy(
api_version,
),
)
except Exception:
log_struct(
event="computed_household_inserted",
input_data=log_context,
message="Inserted new computed_household record.",
severity="INFO",
)

except Exception as e:
log_struct(
event="computed_household_insert_failed",
input_data=log_context,
message=f"Insert operation failed. Error: {e}",
severity="ERROR",
)

# Update the result if it already exists
local_database.query(
f"UPDATE computed_household SET computed_household_json = ? WHERE country_id = ? AND household_id = ? AND policy_id = ?",
Expand All @@ -215,10 +307,22 @@ def get_calculate(country_id: str, add_missing: bool = False) -> dict:
country_id (str): The country ID.
"""

# Generate a unique request ID for tracing
request_id = uuid.uuid4().hex

payload = request.json
household_json = payload.get("household", {})
policy_json = payload.get("policy", {})

# Log context shared across all logs
log_context = {
"request_id": request_id,
"country_id": country_id,
"request_path": request.path,
"household_json": household_json,
"policy_json": policy_json,
}

if add_missing:
# Add in any missing yearly variables to household_json
household_json = add_yearly_variables(household_json, country_id)
Expand All @@ -227,7 +331,21 @@ def get_calculate(country_id: str, add_missing: bool = False) -> dict:

try:
result = country.calculate(household_json, policy_json)
log_struct(
event="calculation_success_lightweight",
input_data=log_context,
message="Calculation completed successfully without DB storage.",
severity="INFO",
)

except Exception as e:
log_struct(
event="calculation_failed_lightweight",
input_data=log_context,
message=f"Error calculating household under policy without DB storage: {e}",
severity="ERROR",
)

logging.exception(e)
response_body = dict(
status="error",
Expand Down
41 changes: 41 additions & 0 deletions policyengine_api/structured_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
import json
import sys


class JsonFormatter(logging.Formatter):
"""Formatter that outputs logs as structured JSON."""

def format(self, record):
log_record = {
"severity": record.levelname,
"event": getattr(record, "event", None),
"input": getattr(record, "input", None),
"message": record.getMessage(),
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record, indent=2)


def get_logger(name="policyengine-api", level=logging.INFO):
logger = logging.getLogger(name)
logger.setLevel(level)

# If no handlers are set, add a StreamHandler with JSON formatting
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I'm curious if this worked in GCP correctly. I believe you said you tested locally and it does; is that correct?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve tested it locally and confirmed the logger outputs JSON in the terminal; if you want, we can also verify the logs in GCP Logs Explorer once the lower environment is set up.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue that creating and sending a GCP log to the prod server is a low-risk activity for the following reasons:

  • It shouldn't impact the service itself
  • If configured properly, it shouldn't delete any logs

Prior to the deployment of any QA environments, and assuming you have the necessary permissions, could you write a Python script using the relevant log-writing snippet to confirm that this structure logs correctly to GCP? I'd have my money on it logging everything as a massive piece of text.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anth-volk - I tested it by sending logs from local and it’s working correctly — the logs are showing up as structured JSON with the expected fields instead of one large text string in log explorer
log

handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
return logger


def log_struct(event, input_data, message, severity="INFO", logger=None):
"""
Implementation-agnostic structured logger.
"""
if logger is None:
logger = get_logger()

log_func = getattr(logger, severity.lower(), logger.info)
log_func(message, extra={"event": event, "input": input_data})
Loading