Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion app/controllers/health_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from flask import Blueprint, jsonify
import logging
#from ..config.db_connection import DBConnection
from ..services.fetch_data_bigQuery import BigQueryService

# Initialize logger
Expand Down
515 changes: 260 additions & 255 deletions app/controllers/report_controller.py

Large diffs are not rendered by default.

130 changes: 73 additions & 57 deletions app/controllers/report_controller_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import ctypes
import time as time_module
from app.authentication.AccessTokenValidator import AccessTokenValidator
from constants import X_AUTHENTICATED_USER_TOKEN, IS_VALIDATION_ENABLED, X_ORG_ID
from constants import X_AUTHENTICATED_USER_TOKEN, IS_VALIDATION_ENABLED, X_ORG_ID, TEXT_CSV_HOLDER, MALLOC_TRIM_HOLDER_MSG, LIBC_SO_6
from errormsg import REQUEST_BODY_MISSING_ERROR, UNEXPECTED_ERROR_OCCURRED, MALLOC_TRIM_HOLDER_ERROR_MSG

# Configure logger
logging.basicConfig(
Expand All @@ -27,7 +28,7 @@ def _validate_request_common(org_id):
logger.error("Missing 'x_org_id' in headers.")
return {'error': 'Organization ID is required.'}, 400

if not ReportService.isValidOrg(x_org_id, org_id):
if not ReportService.is_valid_org(x_org_id, org_id):
logger.error(f"Invalid organization ID: {org_id}")
return {'error': f'Not authorized to view the report for : {org_id}'}, 401

Expand Down Expand Up @@ -83,9 +84,9 @@ def get_report(org_id):
# Parse request data
data = request.get_json()
if not data:
logger.error("Request body is missing")
return jsonify({'error': 'Request body is missing'}), 400
logger.error(REQUEST_BODY_MISSING_ERROR)
return jsonify({'error': REQUEST_BODY_MISSING_ERROR}), 400

# Parse and validate date range
try:
start_date, end_date, error = _parse_date_range(data)
Expand Down Expand Up @@ -135,7 +136,7 @@ def get_report(org_id):

response = Response(
stream_with_context(csv_data),
mimetype="text/csv",
mimetype=TEXT_CSV_HOLDER,
headers={
"Content-Disposition": f'attachment; filename="report_v2_{org_id}.csv"'
}
Expand All @@ -155,67 +156,40 @@ def get_report(org_id):
except Exception as e:
error_message = str(e)
logger.exception(f"Unexpected error occurred: {error_message}")
return jsonify({'error': 'An unexpected error occurred. Please try again later.', 'details': error_message}), 500
return jsonify({'error': UNEXPECTED_ERROR_OCCURRED, 'details': error_message}), 500
finally:
gc.collect()
try:
logger.info("inside malloc_trim:")
ctypes.CDLL("libc.so.6").malloc_trim(0)
logger.info(MALLOC_TRIM_HOLDER_MSG)
ctypes.CDLL(LIBC_SO_6).malloc_trim(0)
except Exception as e:
logger.exception("malloc_trim failed: %s", str(e))
logger.exception(MALLOC_TRIM_HOLDER_ERROR_MSG, str(e))

@report_controller_v2.route('/report/v2/user/sync/<orgId>', methods=['POST'])
def get_user_report(orgId):
@report_controller_v2.route('/report/v2/user/sync/<org_id>', methods=['POST'])
def get_user_report(org_id):
"""V2 endpoint for user report with advanced filtering"""
start_timer = time_module.time()
try:
logger.info(f"Received request to generate v2 user report for orgId={orgId}")
logger.info(f"Received request to generate v2 user report for org_id={org_id}")

# Validate request
validation_result = _validate_request_common(orgId)
validation_result = _validate_request_common(org_id)
if validation_result:
return jsonify(validation_result[0]), validation_result[1]

# Parse and validate input parameters
try:
data = request.get_json()
if not data:
logger.error("Request body is missing")
return jsonify({'error': 'Request body is missing'}), 400
except Exception as e:
logger.error(f"Request body is missing: {str(e)}")
return jsonify({'error': 'Request body is missing'}), 400

user_email = data.get('userEmail')
user_phone = data.get('userPhone')
ehrms_id = data.get('ehrmsId')

# Trim whitespace if present
user_email = user_email.strip() if user_email else None
user_phone = user_phone.strip() if user_phone else None
ehrms_id = ehrms_id.strip() if ehrms_id else None
data = _get_request_data()
if isinstance(data, Response):
return data

user_email, user_phone, ehrms_id = _extract_user_identifiers(data)
if not (user_email or user_phone or ehrms_id):
logger.error("At least one of 'userEmail', 'userPhone', or 'ehrmsId' must be provided.")
return jsonify({'error': "At least one of 'userEmail', 'userPhone', or 'ehrmsId' must be provided."}), 400

# New date filter and orgId parameter
start_date = data.get('start_date')
end_date = data.get('end_date')

# Validate date range if provided
if start_date and end_date:
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
start_date = datetime.combine(start_date.date(), time.min) # 00:00:00
end_date = datetime.combine(end_date.date(), time.max) # 23:59:59.999999
except ValueError:
return jsonify({'error': 'Invalid date format. Use YYYY-MM-DD.'}), 400
start_date, end_date = _validate_date_range(data)

required_columns = data.get('required_columns', [])

# Get additional filters
additional_filters = data.get('additionalFilter', {})

logger.info(f"Generating v2 user report for userEmail={user_email}, userPhone={user_phone}, ehrmsId={ehrms_id}")
Expand All @@ -228,7 +202,7 @@ def get_user_report(orgId):
ehrms_id=ehrms_id,
start_date=start_date,
end_date=end_date,
orgId=orgId,
org_id=org_id,
required_columns=required_columns,
additional_filters=additional_filters
)
Expand All @@ -249,7 +223,7 @@ def get_user_report(orgId):
stream_with_context(csv_data),
mimetype="text/csv",
headers={
"Content-Disposition": f'attachment; filename="user-report-v2.csv"'
"Content-Disposition": 'attachment; filename="user-report-v2.csv"'
}
)

Expand All @@ -276,15 +250,57 @@ def get_user_report(orgId):
except Exception as e:
logger.exception("malloc_trim failed: %s", str(e))

@report_controller_v2.route('/report/v2/org/user/<orgId>', methods=['POST'])
def get_org_user_report(orgId):

def _get_request_data():
"""Extract and validate request data."""
try:
data = request.get_json()
if not data:
logger.error(REQUEST_BODY_MISSING_ERROR)
return jsonify({'error': REQUEST_BODY_MISSING_ERROR}), 400
return data
except Exception as e:
logger.error(f"Request body is missing: {str(e)}")
return jsonify({'error': REQUEST_BODY_MISSING_ERROR}), 400


def _extract_user_identifiers(data):
"""Extract and trim user identifiers."""
user_email = data.get('userEmail')
user_phone = data.get('userPhone')
ehrms_id = data.get('ehrmsId')

user_email = user_email.strip() if user_email else None
user_phone = user_phone.strip() if user_phone else None
ehrms_id = ehrms_id.strip() if ehrms_id else None

return user_email, user_phone, ehrms_id


def _validate_date_range(data):
"""Validate and parse date range."""
start_date = data.get('start_date')
end_date = data.get('end_date')

if start_date and end_date:
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
start_date = datetime.combine(start_date.date(), time.min) # 00:00:00
end_date = datetime.combine(end_date.date(), time.max) # 23:59:59.999999
except ValueError:
raise ValueError('Invalid date format. Use YYYY-MM-DD.')
return start_date, end_date

@report_controller_v2.route('/report/v2/org/user/<org_id>', methods=['POST'])
def get_org_user_report(org_id):
"""V2 endpoint for organization user report with advanced filtering"""
start_timer = time_module.time()
try:
logger.info(f"Received request to generate v2 organization user report for orgId={orgId}")
logger.info(f"Received request to generate v2 organization user report for org_id={org_id}")

# Validate request
validation_result = _validate_request_common(orgId)
validation_result = _validate_request_common(org_id)
if validation_result:
return jsonify(validation_result[0]), validation_result[1]

Expand Down Expand Up @@ -318,12 +334,12 @@ def get_org_user_report(orgId):
# Get additional filters
additional_filters = data.get('additionalFilter', {})

logger.info(f"Generating v2 organization user report for orgId={orgId}")
logger.info(f"Generating v2 organization user report for orgId={org_id}")
logger.info(f"Additional filters: {additional_filters}")

try:
csv_data = ReportServiceV2.generate_org_user_report(
mdo_id=orgId,
mdo_id=org_id,
is_full_report_required=is_full_report_required,
required_columns=required_columns,
user_creation_start_date=user_creation_start_date,
Expand All @@ -332,7 +348,7 @@ def get_org_user_report(orgId):
)

if not csv_data:
logger.warning(f"No data found for orgId={orgId}")
logger.warning(f"No data found for org_id={org_id}")
return jsonify({'error': 'No data found for the given org details and filters.'}), 404

except Exception as e:
Expand All @@ -341,13 +357,13 @@ def get_org_user_report(orgId):
return jsonify({'error': 'Failed to generate the report due to an internal error.', 'details': error_message}), 500

time_taken = round(time_module.time() - start_timer, 2)
logger.info(f"V2 Org User Report generated successfully in {time_taken} seconds for orgId={orgId}")
logger.info(f"V2 Org User Report generated successfully in {time_taken} seconds for org_id={org_id}")

response = Response(
stream_with_context(csv_data),
mimetype="text/csv",
headers={
"Content-Disposition": f'attachment; filename="user-org-report-v2.csv"'
"Content-Disposition": 'attachment; filename="user-org-report-v2.csv"'
}
)

Expand Down
5 changes: 2 additions & 3 deletions app/services/GcsToBigQuerySyncService.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ def sync_all_tables(self):
self.merge_parquet_to_bq(
table_config["gcs_uri"],
table_config["dataset"],
table_config["table"],
table_config["merge_keys"]
table_config["table"]
)
except Exception as e:
logger.exception(f"Error during sync: {e}")
Expand Down Expand Up @@ -62,7 +61,7 @@ def get_sync_config(self):

return sync_config

def merge_parquet_to_bq(self, gcs_uri, dataset, target_table, merge_keys):
def merge_parquet_to_bq(self, gcs_uri, dataset, target_table):
full_target_table = f"{dataset}.{target_table}"
try:
# Delete the target table if it exists
Expand Down
69 changes: 69 additions & 0 deletions app/services/data_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Helper functions for data masking and stream generation"""

import logging
import gc
from typing import Dict, List, Generator, Any
import pandas as pd

logger = logging.getLogger(__name__)

def mask_email(email: str) -> str:
"""Mask email address while preserving username"""
if not email:
return email

parts = email.split('@')
if len(parts) == 2:
domain_parts = parts[1].split('.')
masked_domain = '.'.join(['*' * len(part) for part in domain_parts])
return f"{parts[0]}@{masked_domain}"
return parts[0]

def mask_phone(phone: str) -> str:
"""Mask phone number while preserving last 4 digits"""
if not phone:
return phone

phone = str(phone)
if len(phone) >= 4:
return '*' * (len(phone) - 4) + phone[-4:]
return '*' * len(phone)

def mask_sensitive_data(row_dict: Dict[str, Any], masking_enabled: bool) -> Dict[str, Any]:
"""Apply masking to sensitive data fields"""
if not masking_enabled:
return row_dict

masked = row_dict.copy()
if 'email' in masked and masked['email']:
masked['email'] = mask_email(masked['email'])
if any(key in masked for key in ['phone_number', 'phone']) and masked.get('phone_number') or masked.get('phone'):
phone_key = 'phone_number' if 'phone_number' in masked else 'phone'
masked[phone_key] = mask_phone(masked[phone_key])
return masked

def filter_required_columns(df: 'pd.DataFrame', required_columns: List[str]) -> 'pd.DataFrame':
"""Filter DataFrame to include only required columns"""
if not required_columns:
return df

existing_columns = [col for col in required_columns if col in df.columns]
missing_columns = list(set(required_columns) - set(existing_columns))
if missing_columns:
logger.info(f"Warning: Missing columns skipped: {missing_columns}")
return df[existing_columns]

def generate_csv_stream(df: 'pd.DataFrame', cols: List[str], masking_enabled: bool = False, separator: str = '|') -> Generator[str, None, None]:
"""Generate a CSV stream from a DataFrame with optional masking"""
try:
yield separator.join(cols) + '\n'
for row in df.itertuples(index=False, name=None):
row_dict = dict(zip(cols, row))
if masking_enabled:
row_dict = mask_sensitive_data(row_dict, masking_enabled)
yield separator.join(str(row_dict.get(col, '')) for col in cols) + '\n'
finally:
df.drop(df.index, inplace=True)
del df
gc.collect()
logger.info("Cleaned up DataFrame after streaming.")
59 changes: 59 additions & 0 deletions app/services/query_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Helper functions for SQL query generation"""

from typing import List, Optional, Dict, Any
from google.cloud import bigquery

def build_date_filter(start_date: Optional[str], end_date: Optional[str], column: str = "enrolled_on") -> str:
"""Build date range filter"""
if start_date and end_date:
return f" AND {column} BETWEEN '{start_date}' AND '{end_date}'"
return ""

def build_mdo_id_filter(mdo_ids: List[str]) -> str:
"""Build MDO ID filter"""
mdo_id_list = [f"'{mid}'" for mid in mdo_ids]
return f"mdo_id IN ({', '.join(mdo_id_list)})"

def build_user_filter(email: Optional[str], phone: Optional[str], ehrms_id: Optional[str]) -> List[str]:
"""Build user filter conditions"""
filters = []
if email:
filters.append(f"email = '{email}'")
if phone:
filters.append(f"phone_number = '{phone}'")
if ehrms_id:
filters.append(f"external_system_id = '{ehrms_id}'")
return filters

def build_apar_query(table: str, filters: Dict[str, Any], filter_map: Dict[str, str],
start_date: Optional[str] = None, end_date: Optional[str] = None) -> tuple[str, List[bigquery.ScalarQueryParameter]]:
"""Build APAR query with parameters"""
filter_clauses = []
params = []

# Add date filters
if start_date and end_date:
filter_clauses.insert(0, "enrolled_on >= @start_date AND enrolled_on <= @end_date")
params.extend([
bigquery.ScalarQueryParameter("start_date", "TIMESTAMP", start_date),
bigquery.ScalarQueryParameter("end_date", "TIMESTAMP", end_date)
])

# Add other filters
for key, value in filters.items():
if value and key in filter_map:
bq_col = filter_map[key]
filter_clauses.append(f"{bq_col} = @{bq_col}")
params.append(bigquery.ScalarQueryParameter(bq_col, "STRING", value.strip()))

# Build final query
if filter_clauses:
query = f"""
SELECT *
FROM `{table}`
WHERE {" AND ".join(filter_clauses)}
"""
else:
query = f"SELECT * FROM `{table}`"

return query, params
Loading