diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c322566 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,88 @@ +# Ambrosia + +A/B testing framework for experiment design, group splitting, and results evaluation. +Supports both pandas and Spark DataFrames. + +## Commands + +```bash +make install # create .venv via Poetry (poetry install --all-extras) +make test # run pytest with coverage +make lint # isort + black + pylint + flake8 (checks only) +make autoformat # isort + black (fix in place) +make clean # remove .venv, build artifacts, reports/ +``` + +Single test: `PYTHONPATH=. pytest tests/path/test_file.py::test_fn` + +Line length: **120**. + +## Architecture + +### Three-stage pipeline + +`Designer` → `Splitter` → `Tester` are independent, stateless-ish classes. +No shared state between stages; each takes a DataFrame and parameters. + +### Pandas/Spark dispatch + +Never subclass for pandas vs. Spark. Instead use `DataframeHandler` or the +free function `choose_on_table(alternatives, dataframe)` in +`ambrosia/tools/ab_abstract_component.py`: + +```python +choose_on_table([pandas_func, spark_func], dataframe) +``` + +`DataframeHandler._handle_cases` / `_handle_on_table` wrap this pattern for +method dispatch in handlers (e.g. `TheoryHandler`, `EmpiricHandler`). + +### ABMetaClass + +`ABMetaClass(ABCMeta, YAMLObjectMetaclass)` in `ab_abstract_component.py` +resolves the metaclass conflict between `ABCMeta` and PyYAML's +`YAMLObjectMetaclass`. Any class that inherits from `ABToolAbstract` **and** +needs YAML serialization must set `metaclass=ABMetaClass`. + +### ABToolAbstract._prepare_arguments() + +Constructor args are "saved" defaults; `run()` args can override them at +call time. `_prepare_arguments` resolves the priority: +run-time arg → constructor arg → `ValueError` if both are None. + +```python +chosen = _prepare_arguments({"alpha": [self._alpha, given_alpha]}) +``` + +### Stat criteria strategy pattern + +Hierarchy: `StatCriterion` (abstract, just `calculate_pvalue`) → +`ABStatCriterion` (adds `calculate_effect`, `calculate_conf_interval`, +`get_results`). + +Concrete implementations in `ambrosia/tools/stat_criteria.py`: +`TtestIndCriterion`, `TtestRelCriterion`, `MannWhitneyCriterion`, +`WilcoxonCriterion`. + +`Tester` dispatches by string alias via `AVAILABLE_AB_CRITERIA` dict — duck +typing, not isinstance checks. To add a criterion: subclass `ABStatCriterion`, +set `alias` and `implemented_effect_types` class attributes, register in the +dict. + +### Preprocessor chain + +`Preprocessor` (pandas only) uses method chaining — each method returns +`self`. Each step appends a fitted `AbstractFittableTransformer` to +`self.transformers`. The transformer list supports serialization +(`store_transformations` / `load_transformations` → JSON) and replay +(`apply_transformations`) for consistent train/test preprocessing. + +### Theoretical vs empirical design + +Two design philosophies plug into the same `SimpleDesigner` interface: + +- **Theoretical** (`TheoryHandler`): closed-form power/sample-size formulas +- **Empirical** (`EmpiricHandler`): bootstrap/simulation-based estimates + +Both implement `size_design`, `effect_design`, `power_design` and dispatch +pandas vs. Spark internally via `DataframeHandler`. diff --git a/ambrosia/preprocessing/__init__.py b/ambrosia/preprocessing/__init__.py index 82561ab..e6d33e8 100644 --- a/ambrosia/preprocessing/__init__.py +++ b/ambrosia/preprocessing/__init__.py @@ -21,7 +21,7 @@ from .ml_var_reducer import MLVarianceReducer from .preprocessor import Preprocessor from .robust import IQRPreprocessor, RobustPreprocessor -from .transformers import BoxCoxTransformer, LogTransformer +from .transformers import BoxCoxTransformer, LinearizationTransformer, LogTransformer __all__ = [ "AggregatePreprocessor", @@ -32,5 +32,6 @@ "RobustPreprocessor", "IQRPreprocessor", "BoxCoxTransformer", + "LinearizationTransformer", "LogTransformer", ] diff --git a/ambrosia/preprocessing/preprocessor.py b/ambrosia/preprocessing/preprocessor.py index c865008..6d7b205 100644 --- a/ambrosia/preprocessing/preprocessor.py +++ b/ambrosia/preprocessing/preprocessor.py @@ -34,7 +34,7 @@ from ambrosia.preprocessing.aggregate import AggregatePreprocessor from ambrosia.preprocessing.cuped import Cuped, MultiCuped from ambrosia.preprocessing.robust import IQRPreprocessor, RobustPreprocessor -from ambrosia.preprocessing.transformers import BoxCoxTransformer, LogTransformer +from ambrosia.preprocessing.transformers import BoxCoxTransformer, LinearizationTransformer, LogTransformer class Preprocessor: @@ -378,6 +378,50 @@ def multicuped( self.transformers.append(transformer) return self + def linearize( + self, + numerator: types.ColumnNameType, + denominator: types.ColumnNameType, + transformed_name: Optional[types.ColumnNameType] = None, + load_path: Optional[Path] = None, + ) -> Preprocessor: + """ + Linearize a ratio metric for use in A/B testing. + + Computes a per-unit linearized value that is approximately normally + distributed, enabling correct t-test usage for ratio metrics: + + linearized_i = numerator_i - ratio * denominator_i + + where ratio = mean(numerator) / mean(denominator) is estimated on + the data passed to this ``Preprocessor`` instance (reference / control data). + + Parameters + ---------- + numerator : ColumnNameType + Column name of the ratio numerator (e.g. ``"revenue"``). + denominator : ColumnNameType + Column name of the ratio denominator (e.g. ``"orders"``). + transformed_name : ColumnNameType, optional + Name for the new linearized column. Defaults to + ``"{numerator}_lin"``. + load_path : Path, optional + Path to a json file with pre-fitted parameters. + + Returns + ------- + self : Preprocessor + Instance object. + """ + transformer = LinearizationTransformer() + if load_path is None: + transformer.fit_transform(self.dataframe, numerator, denominator, transformed_name, inplace=True) + else: + transformer.load_params(load_path) + transformer.transform(self.dataframe, inplace=True) + self.transformers.append(transformer) + return self + def transformations(self) -> List: """ List of all transformations which were called. diff --git a/ambrosia/preprocessing/transformers.py b/ambrosia/preprocessing/transformers.py index 6397641..1649e86 100644 --- a/ambrosia/preprocessing/transformers.py +++ b/ambrosia/preprocessing/transformers.py @@ -16,7 +16,7 @@ Module contains tools for metrics transformations during a preprocessing task. """ -from typing import Dict, Union +from typing import Dict, Optional, Union import numpy as np import pandas as pd @@ -386,3 +386,134 @@ def inverse_transform(self, dataframe: pd.DataFrame, inplace: bool = False) -> U transformed: pd.DataFrame = dataframe if inplace else dataframe.copy() transformed[self.column_names] = np.exp(transformed[self.column_names].values) return None if inplace else transformed + + +class LinearizationTransformer(AbstractFittableTransformer): + """ + Linearization transformer for ratio metrics. + + Converts a ratio metric (numerator / denominator) into a per-unit linearized + metric that is approximately normally distributed, enabling correct t-test usage: + + linearized_i = numerator_i - ratio * denominator_i + + where ratio = mean(numerator) / mean(denominator), estimated on the reference + (control group / historical) data passed to fit(). + + Parameters + ---------- + numerator : str + Column name of the ratio numerator (e.g. "revenue"). + denominator : str + Column name of the ratio denominator (e.g. "orders"). + transformed_name : str, optional + Name for the new column. Defaults to ``"{numerator}_lin"``. + + Examples + -------- + >>> transformer = LinearizationTransformer() + >>> transformer.fit(control_df, "revenue", "orders", "arpu_lin") + >>> transformer.transform(experiment_df, inplace=True) + """ + + def __str__(self) -> str: + return "Linearization transformation" + + def __init__(self) -> None: + self.numerator: Optional[str] = None + self.denominator: Optional[str] = None + self.transformed_name: Optional[str] = None + self.ratio: Optional[float] = None + super().__init__() + + def get_params_dict(self) -> Dict: + self._check_fitted() + return { + "numerator": self.numerator, + "denominator": self.denominator, + "transformed_name": self.transformed_name, + "ratio": self.ratio, + } + + def load_params_dict(self, params: Dict) -> None: + for key in ("numerator", "denominator", "transformed_name", "ratio"): + if key not in params: + raise TypeError(f"params argument must contain: {key}") + setattr(self, key, params[key]) + self.fitted = True + + def fit( + self, + dataframe: pd.DataFrame, + numerator: str, + denominator: str, + transformed_name: Optional[str] = None, + ): + """ + Estimate ratio = mean(numerator) / mean(denominator) on reference data. + + Parameters + ---------- + dataframe : pd.DataFrame + Reference dataframe (typically control group or historical data). + numerator : str + Column name of the ratio numerator. + denominator : str + Column name of the ratio denominator. + transformed_name : str, optional + Name for the linearized column. Defaults to ``"{numerator}_lin"``. + """ + self._check_cols(dataframe, [numerator, denominator]) + denom_mean = dataframe[denominator].mean() + if denom_mean == 0: + raise ValueError(f"Mean of denominator column '{denominator}' is zero; cannot compute ratio.") + self.numerator = numerator + self.denominator = denominator + self.transformed_name = transformed_name if transformed_name is not None else f"{numerator}_lin" + self.ratio = dataframe[numerator].mean() / denom_mean + self.fitted = True + return self + + def transform(self, dataframe: pd.DataFrame, inplace: bool = False) -> Union[pd.DataFrame, None]: + """ + Apply linearization: transformed = numerator - ratio * denominator. + + Parameters + ---------- + dataframe : pd.DataFrame + Dataframe to transform. + inplace : bool, default: ``False`` + If ``True`` modifies dataframe in place, otherwise returns a copy. + """ + self._check_fitted() + self._check_cols(dataframe, [self.numerator, self.denominator]) + df = dataframe if inplace else dataframe.copy() + df[self.transformed_name] = df[self.numerator] - self.ratio * df[self.denominator] + return None if inplace else df + + def fit_transform( + self, + dataframe: pd.DataFrame, + numerator: str, + denominator: str, + transformed_name: Optional[str] = None, + inplace: bool = False, + ) -> Union[pd.DataFrame, None]: + """ + Fit and transform in one step. + + Parameters + ---------- + dataframe : pd.DataFrame + Reference dataframe for fitting and transformation. + numerator : str + Column name of the ratio numerator. + denominator : str + Column name of the ratio denominator. + transformed_name : str, optional + Name for the linearized column. + inplace : bool, default: ``False`` + If ``True`` modifies dataframe in place. + """ + self.fit(dataframe, numerator, denominator, transformed_name) + return self.transform(dataframe, inplace) diff --git a/ambrosia/tester/handlers.py b/ambrosia/tester/handlers.py index 7d65c35..5c97779 100644 --- a/ambrosia/tester/handlers.py +++ b/ambrosia/tester/handlers.py @@ -51,7 +51,15 @@ class SparkCriteria(enum.Enum): class TheoreticalTesterHandler: def __init__( - self, group_a, group_b, column: str, alpha: np.ndarray, effect_type: str, criterion: StatCriterion, **kwargs + self, + group_a, + group_b, + column: str, + alpha: np.ndarray, + effect_type: str, + criterion: StatCriterion, + metric_func=None, + **kwargs, ): self.group_a = group_a self.group_b = group_b @@ -59,6 +67,7 @@ def __init__( self.alpha = alpha self.effect_type = effect_type self.criterion = criterion + self.metric_func = metric_func self.kwargs = kwargs def _correct_criterion(self, criterion: tp.Any) -> bool: @@ -79,8 +88,12 @@ def get_criterion(self, criterion: str, data_example: types.SparkOrPandas): def _set_kwargs(self): if isinstance(self.group_a, pd.DataFrame): - self.group_a = self.group_a[self.column].values - self.group_b = self.group_b[self.column].values + if self.metric_func is not None: + self.group_a = np.asarray(self.metric_func(self.group_a)) + self.group_b = np.asarray(self.metric_func(self.group_b)) + else: + self.group_a = self.group_a[self.column].values + self.group_b = self.group_b[self.column].values elif isinstance(self.group_a, types.SparkDataFrame): self.kwargs["column"] = self.column self.kwargs["alpha"] = self.alpha diff --git a/ambrosia/tester/tester.py b/ambrosia/tester/tester.py index 386d6a5..304cdbe 100644 --- a/ambrosia/tester/tester.py +++ b/ambrosia/tester/tester.py @@ -29,7 +29,7 @@ """ import itertools from copy import deepcopy -from typing import Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Union from warnings import warn import numpy as np @@ -241,6 +241,7 @@ def __init__( id_column: Optional[types.ColumnNameType] = None, first_type_errors: types.StatErrorType = 0.05, metrics: Optional[types.MetricNamesType] = None, + metric_funcs: Optional[Dict[str, Callable]] = None, ): """ Tester class constructor to initialize the object. @@ -257,6 +258,7 @@ def __init__( self.set_experiment_results(experiment_results=experiment_results) self.set_errors(first_type_errors) self.set_metrics(metrics) + self.__metric_funcs = metric_funcs or {} @staticmethod def __filter_data( @@ -372,9 +374,15 @@ def __pre_run(method: str, args: types._UsageArgumentsType, **kwargs) -> types.T if method not in accepted_methods: raise ValueError(f'Choose method from {", ".join(accepted_methods)}') result: types.TesterResult = {} + metric_funcs: Dict = args.get("metric_funcs", {}) for metric in args["metrics"]: - a_values: np.ndarray = args["data_a_group"][metric].values - b_values: np.ndarray = args["data_b_group"][metric].values + metric_func = metric_funcs.get(metric) + if metric_func is not None: + a_values: np.ndarray = np.asarray(metric_func(args["data_a_group"])) + b_values: np.ndarray = np.asarray(metric_func(args["data_b_group"])) + else: + a_values = args["data_a_group"][metric].values + b_values = args["data_b_group"][metric].values if method == "theory": # TODO: Make it SolverClass ~ method # solver = SolverClass(...) @@ -386,6 +394,7 @@ def __pre_run(method: str, args: types._UsageArgumentsType, **kwargs) -> types.T alpha=np.array(args["alpha"]), effect_type=args["effect_type"], criterion=args["criterion"], + metric_func=metric_func, **kwargs, ) sub_result = solver.solve() @@ -473,6 +482,7 @@ def run( criterion: Optional[ABStatCriterion] = None, correction_method: Union[str, None] = "bonferroni", as_table: bool = True, + metric_funcs: Optional[Dict[str, Callable]] = None, **kwargs, ) -> types.TesterResult: """ @@ -556,6 +566,8 @@ def run( chosen_args: types._UsageArgumentsType = Tester._prepare_arguments(arguments_choice) chosen_args["effect_type"] = effect_type chosen_args["criterion"] = criterion + effective_metric_funcs = {**self.__metric_funcs, **(metric_funcs or {})} + chosen_args["metric_funcs"] = effective_metric_funcs hypothesis_num: int = len(list(itertools.combinations(chosen_args["experiment_results"], 2))) * len( chosen_args["metrics"] @@ -602,6 +614,7 @@ def test( criterion: Optional[ABStatCriterion] = None, correction_method: Union[str, None] = "bonferroni", as_table: bool = True, + metric_funcs: Optional[Dict[str, Callable]] = None, **kwargs, ) -> types.TesterResult: """ @@ -673,5 +686,6 @@ def test( criterion=criterion, correction_method=correction_method, as_table=as_table, + metric_funcs=metric_funcs, **kwargs, ) diff --git a/tests/test_preprocessor.py b/tests/test_preprocessor.py index 9106336..e2954b9 100644 --- a/tests/test_preprocessor.py +++ b/tests/test_preprocessor.py @@ -1,5 +1,6 @@ import os +import numpy as np import pandas as pd import pytest @@ -117,3 +118,71 @@ def test_store_load_config(data_for_agg): transformed_by_config: pd.DataFrame = loaded_preprocessor.apply_transformations() os.remove(store_path) assert (transformed == transformed_by_config).all(None) + + +@pytest.mark.smoke() +def test_linearize_basic(data_nonlin_var): + """ + Test that linearize creates new column and returns self. + """ + preprocessor = Preprocessor(data_nonlin_var, verbose=False) + result = preprocessor.linearize("target", "feature_1", transformed_name="target_lin") + assert result is preprocessor # method chaining + assert "target_lin" in preprocessor.data().columns + + +@pytest.mark.unit() +def test_linearize_formula(data_nonlin_var): + """ + Test that linearized values satisfy: linearized = num - ratio * denom. + """ + preprocessor = Preprocessor(data_nonlin_var, verbose=False) + preprocessor.linearize("target", "feature_1", transformed_name="target_lin") + df = preprocessor.data() + transformer = preprocessor.transformations()[-1] + ratio = transformer.ratio + expected = data_nonlin_var["target"] - ratio * data_nonlin_var["feature_1"] + np.testing.assert_allclose(df["target_lin"].values, expected.values, rtol=1e-10) + + +@pytest.mark.unit() +def test_linearize_in_chain(data_nonlin_var): + """ + Test linearize as part of a preprocessing chain. + """ + preprocessor = Preprocessor(data_nonlin_var, verbose=False) + result = ( + preprocessor.robust("feature_1", alpha=0.01) + .linearize("target", "feature_1", transformed_name="target_lin") + .data() + ) + assert "target_lin" in result.columns + + +@pytest.mark.unit() +def test_linearize_load_store(data_nonlin_var): + """ + Test that linearization transformer can be serialized and replayed. + """ + store_path = "tests/configs/linearize_config.json" + preprocessor = Preprocessor(data_nonlin_var, verbose=False) + preprocessor.linearize("target", "feature_1", transformed_name="target_lin") + preprocessor.store_transformations(store_path) + + loaded_preprocessor = Preprocessor(data_nonlin_var, verbose=False) + loaded_preprocessor.load_transformations(store_path) + + os.remove(store_path) + + for t, lt in zip(preprocessor.transformations(), loaded_preprocessor.transformations()): + assert t.get_params_dict() == lt.get_params_dict() + + +@pytest.mark.unit() +def test_linearize_default_name(data_nonlin_var): + """ + Test that default transformed_name is '{numerator}_lin'. + """ + preprocessor = Preprocessor(data_nonlin_var, verbose=False) + preprocessor.linearize("target", "feature_1") + assert "target_lin" in preprocessor.data().columns diff --git a/tests/test_tester.py b/tests/test_tester.py index c4ba892..a1597e8 100644 --- a/tests/test_tester.py +++ b/tests/test_tester.py @@ -384,3 +384,61 @@ def test_paired_bootstrap(effect_type, alternative): ) assert test_results_dep[0]["pvalue"] < test_results_ind[0]["pvalue"] assert test_results_dep[0]["confidence_interval"][0] > test_results_ind[0]["confidence_interval"][0] + + +@pytest.mark.unit +def test_metric_func_constructor(results_ltv_retention_conversions): + """ + Test that metric_funcs passed to constructor are used when metric name matches. + """ + # ratio metric: ltv / retention (arbitrary, just to test callable path) + ratio_func = lambda df: (df["ltv"] / (df["retention"] + 1e-6)).values + tester = Tester( + dataframe=results_ltv_retention_conversions, + column_groups="group", + metrics=["ratio_metric"], + metric_funcs={"ratio_metric": ratio_func}, + ) + result = tester.run(as_table=False) + assert len(result) == 1 + assert "pvalue" in result[0] + + +@pytest.mark.unit +@pytest.mark.parametrize("method", ["theory", "empiric"]) +def test_metric_func_run(method, results_ltv_retention_conversions): + """ + Test that metric_funcs passed to run() work for theory and empiric methods. + """ + double_ltv = lambda df: (df["ltv"] * 2).values + tester = Tester( + dataframe=results_ltv_retention_conversions, + column_groups="group", + metrics=["ltv"], + ) + result_normal = tester.run(method=method, metrics=["ltv"], as_table=False) + result_func = tester.run( + method=method, + metrics=["custom"], + metric_funcs={"custom": double_ltv}, + as_table=False, + ) + # Doubling values doesn't change pvalue for ttest (same scale), but effect should be doubled + assert abs(result_func[0]["effect"]) == pytest.approx(abs(result_normal[0]["effect"]) * 2, rel=1e-4) + + +@pytest.mark.unit +def test_metric_func_overrides_constructor(results_ltv_retention_conversions): + """ + Test that metric_funcs in run() override those set in constructor. + """ + func_a = lambda df: df["ltv"].values + func_b = lambda df: (df["ltv"] * 3).values + tester = Tester( + dataframe=results_ltv_retention_conversions, + column_groups="group", + metric_funcs={"my_metric": func_a}, + ) + result_a = tester.run(metrics=["my_metric"], as_table=False) + result_b = tester.run(metrics=["my_metric"], metric_funcs={"my_metric": func_b}, as_table=False) + assert abs(result_b[0]["effect"]) == pytest.approx(abs(result_a[0]["effect"]) * 3, rel=1e-4)