|
| 1 | +""" |
| 2 | +Mutual information between each feature and the target column. |
| 3 | +
|
| 4 | +Uses sklearn's mutual_info_classif (categorical target) or |
| 5 | +mutual_info_regression (numeric target). Categorical features are |
| 6 | +label-encoded before scoring. |
| 7 | +""" |
| 8 | + |
| 9 | +import pandas as pd |
| 10 | +from sklearn.feature_selection import mutual_info_classif, mutual_info_regression |
| 11 | +from sklearn.preprocessing import LabelEncoder |
| 12 | + |
| 13 | +from ..config import DEFAULT_CONFIG |
| 14 | +from ..utils.logging import get_logger |
| 15 | + |
| 16 | +_log = get_logger("summaries.mutual_info") |
| 17 | +_MI = DEFAULT_CONFIG.mutual_info |
| 18 | + |
| 19 | + |
| 20 | +def summarize_mutual_information( |
| 21 | + df: pd.DataFrame, |
| 22 | + target_col: str, |
| 23 | + column_types: dict[str, str], |
| 24 | +) -> dict: |
| 25 | + """ |
| 26 | + Compute mutual information between every feature and the target column. |
| 27 | +
|
| 28 | + Returns a dict: |
| 29 | + { |
| 30 | + "target": target_col, |
| 31 | + "task": "classification" | "regression", |
| 32 | + "scores": {col: mi_score, ...}, # nats, sorted descending |
| 33 | + } |
| 34 | + or an empty dict when MI cannot be computed (too few samples, bad target, etc.). |
| 35 | + """ |
| 36 | + if target_col not in df.columns: |
| 37 | + return {} |
| 38 | + |
| 39 | + target_type = column_types.get(target_col, "Unsupported") |
| 40 | + n = len(df.dropna(subset=[target_col])) |
| 41 | + if n < _MI.min_samples_for_mi: |
| 42 | + return {} |
| 43 | + |
| 44 | + # Determine task type |
| 45 | + if target_type in ("Numeric",): |
| 46 | + task = "regression" |
| 47 | + mi_fn = mutual_info_regression |
| 48 | + else: |
| 49 | + task = "classification" |
| 50 | + mi_fn = mutual_info_classif |
| 51 | + |
| 52 | + # Build feature matrix — include Numeric and low-cardinality Categorical cols |
| 53 | + feature_cols = [] |
| 54 | + discrete_mask = [] |
| 55 | + |
| 56 | + for col in df.columns: |
| 57 | + if col == target_col: |
| 58 | + continue |
| 59 | + typ = column_types.get(col, "Unsupported") |
| 60 | + if typ == "Numeric": |
| 61 | + feature_cols.append(col) |
| 62 | + discrete_mask.append(False) |
| 63 | + elif typ == "Categorical" and df[col].nunique() <= _MI.max_categories_for_mi: |
| 64 | + feature_cols.append(col) |
| 65 | + discrete_mask.append(True) |
| 66 | + |
| 67 | + if not feature_cols: |
| 68 | + return {} |
| 69 | + |
| 70 | + # Build X: label-encode categoricals, drop rows missing target |
| 71 | + sub = df[feature_cols + [target_col]].dropna(subset=[target_col]) |
| 72 | + X = sub[feature_cols].copy() |
| 73 | + |
| 74 | + for col, is_discrete in zip(feature_cols, discrete_mask): |
| 75 | + if is_discrete: |
| 76 | + le = LabelEncoder() |
| 77 | + filled = X[col].fillna("__missing__").astype(str) |
| 78 | + X[col] = le.fit_transform(filled) |
| 79 | + else: |
| 80 | + X[col] = X[col].fillna(X[col].median()) |
| 81 | + |
| 82 | + y_raw = sub[target_col] |
| 83 | + if task == "classification": |
| 84 | + le_y = LabelEncoder() |
| 85 | + y = le_y.fit_transform(y_raw.fillna("__missing__").astype(str)) |
| 86 | + else: |
| 87 | + y = y_raw.values |
| 88 | + |
| 89 | + try: |
| 90 | + mi_scores = mi_fn(X.values, y, discrete_features=discrete_mask, random_state=0) |
| 91 | + except Exception as e: |
| 92 | + _log.debug("Mutual information computation failed: %s", e) |
| 93 | + return {} |
| 94 | + |
| 95 | + scores = {col: float(score) for col, score in zip(feature_cols, mi_scores)} |
| 96 | + # Sort descending by MI score |
| 97 | + scores = dict(sorted(scores.items(), key=lambda kv: kv[1], reverse=True)) |
| 98 | + |
| 99 | + return { |
| 100 | + "target": target_col, |
| 101 | + "task": task, |
| 102 | + "scores": scores, |
| 103 | + } |
0 commit comments