Skip to content

Commit 36b6ee4

Browse files
Merge pull request #66 from abhishekpnt/cbrelease-4.8.28
Issue #KB-11607: code fixes
2 parents 5af7b98 + 2aafc04 commit 36b6ee4

File tree

1 file changed

+113
-23
lines changed

1 file changed

+113
-23
lines changed

jobs/stage-2/userReport.py

Lines changed: 113 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
col, when, coalesce, lit,
99
current_timestamp, date_format, from_unixtime, concat_ws,from_json,explode,trim,length,first
1010
)
11+
from pyspark.sql.types import ArrayType
1112
import os
1213
import time
1314
from datetime import datetime
@@ -218,34 +219,70 @@ def processUserReport(config):
218219
print("🔍 Step 10: Processing User Extended Profile Data...")
219220
# Load user extended profile data
220221
user_extended_profile_df = (
221-
spark.read.parquet(ParquetFileConstants.USER_EXTENDED_PROFILE_FILE) # You'll need to define this constant
222-
.filter(col("contexttype") == "orgAdditionalProperties")
223-
.withColumnRenamed("userid", "userID")
224-
.withColumn("contextData", from_json(col("contextdata"), schemas.context_data_schema))
225-
.select(
226-
col("userID"),
227-
col("contexttype").alias("contextType"),
228-
col("contextData"),
229-
col("contextData.organisationId").alias("mdo_id")
230-
)
222+
spark.read.parquet(ParquetFileConstants.USER_EXTENDED_PROFILE)
223+
.filter(col("contexttype") == "orgAdditionalProperties")
224+
.withColumnRenamed("userid", "userID")
225+
.withColumn("contextDataArray", from_json(col("contextdata"), ArrayType(schemas.context_data_schema)))
226+
.withColumn("contextData", explode(col("contextDataArray")))
227+
.select(
228+
col("userID"),
229+
col("contexttype").alias("contextType"),
230+
col("contextData"),
231+
col("contextData.organisationId").alias("mdo_id")
232+
)
231233
)
232234

233-
# Step 1: Explode customFieldValues to get individual attribute-value pairs
234-
exploded_df = (
235+
# Step 1: Explode customFieldValues and handle based on type
236+
exploded_df_base = (
235237
user_extended_profile_df
236238
.withColumn("customField", explode(col("contextData.customFieldValues")))
237239
.select(
238240
col("userID"),
239241
col("mdo_id"),
242+
col("customField.type").alias("field_type"),
240243
col("customField.attributeName").alias("attribute_name"),
241-
col("customField.value").alias("attribute_value")
244+
col("customField.value").alias("direct_value"),
245+
col("customField.values").alias("values_array")
246+
)
247+
)
248+
249+
# Handle direct values (where type is not "masterList" and direct_value is not null)
250+
direct_values_df = (
251+
exploded_df_base
252+
.filter(
253+
(col("field_type") != "masterList") &
254+
col("direct_value").isNotNull()
255+
)
256+
.select(
257+
col("userID"),
258+
col("mdo_id"),
259+
col("attribute_name"), # Use main attributeName for text fields
260+
col("direct_value").alias("attribute_value")
242261
)
262+
)
263+
264+
# Handle masterList values (where type is "masterList" and values_array is not null)
265+
master_list_values_df = (
266+
exploded_df_base
243267
.filter(
244-
col("attribute_name").isNotNull() &
245-
col("attribute_value").isNotNull()
268+
(col("field_type") == "masterList") &
269+
col("values_array").isNotNull()
270+
)
271+
.withColumn("valueItem", explode(col("values_array")))
272+
.select(
273+
col("userID"),
274+
col("mdo_id"),
275+
col("valueItem.attributeName").alias("attribute_name"), # Use nested attributeName for masterList
276+
col("valueItem.value").alias("attribute_value")
246277
)
247278
)
248279

280+
# Combine both direct values and masterList values
281+
exploded_df = direct_values_df.union(master_list_values_df).filter(
282+
col("attribute_name").isNotNull() &
283+
col("attribute_value").isNotNull()
284+
)
285+
249286
# Write to warehouse tables
250287
exploded_df.coalesce(1).write.mode("overwrite").option("compression", "snappy").parquet(
251288
f"{config.warehouseReportDir}/userCustomFields"
@@ -365,12 +402,68 @@ def processUserReport(config):
365402
"Report_Last_Generated_On", "mdoid"
366403
]
367404

368-
# Combine fixed and dynamic columns
369-
all_columns = fixed_cols + attribute_names
405+
# Create case-insensitive conflict detection
406+
fixed_cols_lower = [col_name.lower() for col_name in fixed_cols]
407+
attribute_names_lower = [attr_name.lower() for attr_name in attribute_names]
408+
409+
# Find conflicts using case-insensitive comparison
410+
conflicts = []
411+
for i, attr_lower in enumerate(attribute_names_lower):
412+
if attr_lower in fixed_cols_lower:
413+
conflicts.append(attribute_names[i]) # Add original case version
414+
415+
print(f" Debug - Column conflicts found (case-insensitive): {conflicts}")
416+
417+
# Rename conflicting columns in pivoted DataFrame BEFORE join
418+
renamed_pivoted = pivoted
419+
custom_field_mapping = {} # Track original -> renamed mappings
370420

371-
# Select columns that exist in the dataframe
372-
existing_columns = joined.columns
373-
final_columns = [c for c in all_columns if c in existing_columns]
421+
for conflict_col in conflicts:
422+
if conflict_col in pivoted.columns:
423+
new_name = f"Custom_{conflict_col}"
424+
renamed_pivoted = renamed_pivoted.withColumnRenamed(conflict_col, new_name)
425+
custom_field_mapping[conflict_col] = new_name
426+
print(f" Debug - Renamed {conflict_col} to {new_name}")
427+
428+
# print(f" Debug - Pivoted columns after rename: {renamed_pivoted.columns}")
429+
430+
# Join the cleaned DataFrames
431+
joined = (
432+
renamed_pivoted
433+
.join(mdo_wise_slim, ["userID"], "left")
434+
.withColumn("mdoid", lit(org_id))
435+
)
436+
437+
# Create final column list using renamed mappings
438+
final_custom_cols = []
439+
for attr_name in attribute_names:
440+
if attr_name in custom_field_mapping:
441+
final_custom_cols.append(custom_field_mapping[attr_name]) # Use renamed version
442+
else:
443+
final_custom_cols.append(attr_name) # Use original name
444+
445+
# Get existing columns from each category
446+
available_columns = set(joined.columns)
447+
existing_fixed_cols = [c for c in fixed_cols if c in available_columns]
448+
existing_custom_cols = [c for c in final_custom_cols if c in available_columns]
449+
450+
# print(f" Debug - Final fixed cols: {existing_fixed_cols}")
451+
# print(f" Debug - Final custom cols: {existing_custom_cols}")
452+
453+
# Combine all columns for final selection
454+
final_columns = existing_fixed_cols + existing_custom_cols
455+
456+
# Check for any remaining duplicates
457+
duplicate_check = {}
458+
for col_name in final_columns:
459+
duplicate_check[col_name] = duplicate_check.get(col_name, 0) + 1
460+
duplicates = [k for k, v in duplicate_check.items() if v > 1]
461+
462+
if duplicates:
463+
print(f" ❌ ERROR - Still have duplicate columns: {duplicates}")
464+
# Remove duplicates by keeping only the first occurrence
465+
final_columns = list(dict.fromkeys(final_columns))
466+
print(f" Debug - Deduplicated final columns: {final_columns}")
374467

375468
ordered = joined.select(*[col(c) for c in final_columns])
376469

@@ -390,9 +483,6 @@ def processUserReport(config):
390483
)
391484

392485
exploded_cached.unpersist()
393-
394-
print("✅ Step 11 Complete")
395-
print(f"Custom reports generated for {len(org_ids)} organizations")
396486

397487
# Performance Summary
398488
total_duration = time.time() - start_time

0 commit comments

Comments
 (0)