Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 78 additions & 48 deletions src/splinator/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class MinimizationMethod(Enum):


class LossGradHess:
def __init__(self, X, y, alpha, intercept):
# type: (np.ndarray, np.ndarray, float, bool) -> None
def __init__(self, X, y, alpha, intercept, sample_weight=None):
# type: (np.ndarray, np.ndarray, float, bool, Optional[np.ndarray]) -> None
"""
In the generation of design matrix, if intercept option is True, the first column of design matrix is of 1's,
which means that the first coefficient corresponds to the intercept term. This setup is a little different
Expand All @@ -37,13 +37,17 @@ def __init__(self, X, y, alpha, intercept):
self.X = X
self.alpha = alpha
self.intercept = intercept
if sample_weight is None:
self.sample_weight = np.ones(len(y))
else:
self.sample_weight = np.asarray(sample_weight)
self.weight_sum = np.sum(self.sample_weight)

def loss(self, coefs):
# type: (np.ndarray) -> np.ndarray
yz = self.y * np.dot(self.X, coefs)
# P(label= 1 or -1 |X) = 1 / (1+exp(-yz))
# Log Likelihood = Sum over log ( 1/(1 + exp(-yz)) )
loss_val = -np.sum(log_expit(yz))
loss_val = -np.sum(self.sample_weight * log_expit(yz))
if self.intercept:
loss_val += 0.5 * self.alpha * np.dot(coefs[1:], coefs[1:])
else:
Expand All @@ -58,8 +62,7 @@ def grad(self, coefs):

# if y = 1, we want z to be close to 1; if y = -1, we want z to be close to 0.
z0 = (z - 1) * self.y

grad = np.dot(self.X.T, z0)
grad = np.dot(self.X.T, self.sample_weight * z0)

if self.intercept:
grad[1:] += self.alpha * coefs[1:]
Expand All @@ -74,19 +77,14 @@ def hess(self, coefs):
Compute the Hessian of the logistic loss.

The Hessian of logistic regression is: H = X.T @ diag(w) @ X + alpha * I
where w = p * (1 - p) and p = sigmoid(X @ coefs).
where w = sample_weight * p * (1 - p) and p = sigmoid(X @ coefs).

This is used by trust-constr for faster convergence (Newton-like steps).
"""
z = np.dot(self.X, coefs)
p = expit(z)
# Weights for the Hessian: p * (1 - p)
# This is always positive, making the Hessian positive semi-definite
weights = p * (1 - p)

# H = X.T @ diag(weights) @ X
# Efficient computation: (X.T * weights) @ X
H = np.dot(self.X.T * weights, self.X)
hess_weights = self.sample_weight * p * (1 - p)
H = np.dot(self.X.T * hess_weights, self.X)

# Add regularization term
if self.intercept:
Expand Down Expand Up @@ -259,8 +257,8 @@ def __init__(
self.random_state = random_state
self.verbose = verbose

def _fit(self, X, y, initial_guess=None):
# type: (pd.DataFrame, pd.Series, Optional[np.ndarray], bool) -> None
def _fit(self, X, y, sample_weight=None, initial_guess=None):
# type: (pd.DataFrame, pd.Series, Optional[np.ndarray], Optional[np.ndarray]) -> None
constraint = [] # type: Union[Dict[str, Any], List, LinearConstraint]
if self.monotonicity != Monotonicity.none.value:
# This function returns G and h such that G * beta <= 0 is the constraint we want:
Expand Down Expand Up @@ -298,7 +296,7 @@ def _fit(self, X, y, initial_guess=None):
else:
x0 = initial_guess

lgh = LossGradHess(design_X, y, 1 / self.C, self.intercept)
lgh = LossGradHess(design_X, y, 1 / self.C, self.intercept, sample_weight)

# Determine whether to use Hessian
# - 'auto': use Hessian only for trust-constr with no monotonicity constraints (3-4x faster)
Expand Down Expand Up @@ -344,8 +342,8 @@ def get_additional_columns(self, X):
additional_columns = np.delete(X, self.input_score_column_index, axis=1)
return additional_columns

def _stratified_subsample(self, X, y, n_samples, n_strata=10):
# type: (np.ndarray, np.ndarray, int, int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]
def _stratified_subsample(self, X, y, n_samples, sample_weight=None, n_strata=10):
# type: (np.ndarray, np.ndarray, int, Optional[np.ndarray], int) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], np.ndarray]
"""
Create a stratified subsample based on quantiles of the input scores.

Expand All @@ -358,13 +356,15 @@ def _stratified_subsample(self, X, y, n_samples, n_strata=10):
y : array-like of shape (n_samples,)
n_samples : int
Target number of samples in the subsample
sample_weight : array-like of shape (n_samples,), optional
n_strata : int
Number of strata (quantile bins) to use

Returns
-------
X_sub : array-like
y_sub : array-like
y_sub : array-like
weight_sub : array-like or None
indices : array-like
Indices of selected samples
"""
Expand Down Expand Up @@ -412,11 +412,12 @@ def _stratified_subsample(self, X, y, n_samples, n_strata=10):
else:
X_sub = X[selected_indices, :]
y_sub = y[selected_indices]
weight_sub = sample_weight[selected_indices] if sample_weight is not None else None

return X_sub, y_sub, selected_indices
return X_sub, y_sub, weight_sub, selected_indices

def _random_subsample(self, X, y, n_samples):
# type: (np.ndarray, np.ndarray, int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]
def _random_subsample(self, X, y, n_samples, sample_weight=None):
# type: (np.ndarray, np.ndarray, int, Optional[np.ndarray]) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], np.ndarray]
"""Simple random subsampling (original behavior)."""
indices = self.random_state_.choice(
np.arange(len(X)), n_samples, replace=False
Expand All @@ -425,7 +426,8 @@ def _random_subsample(self, X, y, n_samples):
X_sub, y_sub = X.iloc[indices], y[indices]
else:
X_sub, y_sub = X[indices, :], y[indices]
return X_sub, y_sub, indices
weight_sub = sample_weight[indices] if sample_weight is not None else None
return X_sub, y_sub, weight_sub, indices

def _check_convergence(self, coefs_old, coefs_new):
# type: (np.ndarray, np.ndarray) -> bool
Expand All @@ -443,11 +445,28 @@ def _check_convergence(self, coefs_old, coefs_new):

return change < self.early_stopping_tol

def fit(self, X, y):
def fit(self, X, y, sample_weight=None):
# type: (pd.DataFrame, Union[np.ndarray, pd.Series], Optional[np.ndarray]) -> None
"""
Fit the linear spline logistic regression model.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target values (binary: 0 or 1).
sample_weight : array-like of shape (n_samples,), optional
Individual weights for each sample. If not provided, all samples
are given equal weight.

Returns
-------
self : object
Fitted estimator.

Notes
-----
Supports three fitting modes:
1. Direct fitting (default): Fit on full data
2. Two-stage fitting (legacy): Uses `two_stage_fitting_initial_size`
Expand Down Expand Up @@ -478,10 +497,21 @@ def fit(self, X, y):
)
y = y[:, 0]

# Validate sample_weight
if sample_weight is not None:
sample_weight = np.asarray(sample_weight)
if sample_weight.shape[0] != X.shape[0]:
raise ValueError(
f"sample_weight has {sample_weight.shape[0]} samples, "
f"but X has {X.shape[0]} samples"
)
if np.any(sample_weight < 0):
raise ValueError("sample_weight must be non-negative")

if self.n_knots and self.knots is None:
# only n_knots given so we create knots
self.n_knots_ = min([self.n_knots, X.shape[0]])
self.knots_ = _fit_knots(self.get_input_scores(X), self.n_knots_)
self.knots_ = _fit_knots(self.get_input_scores(X), self.n_knots_, sample_weight)
elif self.knots is not None and self.n_knots is None:
# knots are given so we just take them
self.knots_ = np.array(self.knots)
Expand All @@ -496,13 +526,13 @@ def fit(self, X, y):
# Determine fitting mode
if self.progressive_fitting_fractions is not None:
# Mode 3: Progressive fitting (recommended)
self._progressive_fit(X, y)
self._progressive_fit(X, y, sample_weight)
elif self.two_stage_fitting_initial_size is not None:
# Mode 2: Legacy two-stage fitting
self._two_stage_fit(X, y)
self._two_stage_fit(X, y, sample_weight)
else:
# Mode 1: Direct fitting on full data
self._fit(X, y, initial_guess=None)
self._fit(X, y, sample_weight=sample_weight, initial_guess=None)
self.fitting_history_.append({
'stage': 0,
'n_samples': n_samples,
Expand All @@ -513,8 +543,8 @@ def fit(self, X, y):

return self

def _two_stage_fit(self, X, y):
# type: (np.ndarray, np.ndarray) -> None
def _two_stage_fit(self, X, y, sample_weight=None):
# type: (np.ndarray, np.ndarray, Optional[np.ndarray]) -> None
"""Legacy two-stage fitting for backward compatibility."""
n_samples = X.shape[0]

Expand All @@ -532,15 +562,15 @@ def _two_stage_fit(self, X, y):

# Stage 1: Fit on subsample
if self.stratified_sampling:
X_sub, y_sub, _ = self._stratified_subsample(
X, y, self.two_stage_fitting_initial_size
X_sub, y_sub, weight_sub, _ = self._stratified_subsample(
X, y, self.two_stage_fitting_initial_size, sample_weight
)
else:
X_sub, y_sub, _ = self._random_subsample(
X, y, self.two_stage_fitting_initial_size
X_sub, y_sub, weight_sub, _ = self._random_subsample(
X, y, self.two_stage_fitting_initial_size, sample_weight
)

self._fit(X_sub, y_sub, initial_guess=None)
self._fit(X_sub, y_sub, sample_weight=weight_sub, initial_guess=None)
self.fitting_history_.append({
'stage': 0,
'n_samples': self.two_stage_fitting_initial_size,
Expand All @@ -555,7 +585,7 @@ def _two_stage_fit(self, X, y):

# Stage 2: Fit on full data with warm start
coefs_stage1 = self.coefficients_.copy()
self._fit(X, y, initial_guess=coefs_stage1)
self._fit(X, y, sample_weight=sample_weight, initial_guess=coefs_stage1)
self.fitting_history_.append({
'stage': 1,
'n_samples': n_samples,
Expand All @@ -567,8 +597,8 @@ def _two_stage_fit(self, X, y):
if self.verbose:
print(f"Stage 2: {n_samples} samples, {self.result_.nit} iterations")

def _progressive_fit(self, X, y):
# type: (np.ndarray, np.ndarray) -> None
def _progressive_fit(self, X, y, sample_weight=None):
# type: (np.ndarray, np.ndarray, Optional[np.ndarray]) -> None
"""
Progressive fitting with gradual sample increase.

Expand All @@ -590,12 +620,12 @@ def _progressive_fit(self, X, y):

for stage, frac in enumerate(fractions):
n_stage_samples = int(n_samples * frac)
n_stage_samples = max(n_stage_samples, self.knots_.shape[0] + 10) # Ensure enough samples
n_stage_samples = max(n_stage_samples, self.knots_.shape[0] + 10)
n_stage_samples = min(n_stage_samples, n_samples)

if frac >= 1.0:
# Use full data for final stage
X_stage, y_stage = X, y
X_stage, y_stage, weight_stage = X, y, sample_weight
else:
# Warn if subsample too small for knots
samples_per_knot = n_stage_samples / (self.knots_.shape[0] + 1)
Expand All @@ -604,15 +634,15 @@ def _progressive_fit(self, X, y):

# Subsample for intermediate stages
if self.stratified_sampling:
X_stage, y_stage, _ = self._stratified_subsample(
X, y, n_stage_samples
X_stage, y_stage, weight_stage, _ = self._stratified_subsample(
X, y, n_stage_samples, sample_weight
)
else:
X_stage, y_stage, _ = self._random_subsample(
X, y, n_stage_samples
X_stage, y_stage, weight_stage, _ = self._random_subsample(
X, y, n_stage_samples, sample_weight
)

self._fit(X_stage, y_stage, initial_guess=prev_coefs)
self._fit(X_stage, y_stage, sample_weight=weight_stage, initial_guess=prev_coefs)

self.fitting_history_.append({
'stage': stage,
Expand All @@ -631,9 +661,9 @@ def _progressive_fit(self, X, y):
if frac < 1.0 and self._check_convergence(prev_coefs, self.coefficients_):
if self.verbose:
print(f"Early stopping: coefficients converged at stage {stage + 1}")
# Still fit on full data for final refinement, but with fewer iterations expected
# Still fit on full data for final refinement
prev_coefs = self.coefficients_.copy()
self._fit(X, y, initial_guess=prev_coefs)
self._fit(X, y, sample_weight=sample_weight, initial_guess=prev_coefs)
self.fitting_history_.append({
'stage': stage + 1,
'n_samples': n_samples,
Expand Down
64 changes: 60 additions & 4 deletions src/splinator/monotonic_spline.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,65 @@ def _get_design_matrix(
).astype(float)


def _fit_knots(X, num_knots):
# type: (np.ndarray, int) -> np.ndarray
def _weighted_quantile(values, quantiles, sample_weight=None):
# type: (np.ndarray, np.ndarray, Optional[np.ndarray]) -> np.ndarray
"""
Generates knots by finding `num_knots` quantiles of the given input distribution
Compute weighted quantiles.

Parameters
----------
values : array-like
Values to compute quantiles for.
quantiles : array-like
Quantiles to compute, in [0, 1].
sample_weight : array-like, optional
Weights for each value.

Returns
-------
result : ndarray
Quantile values.
"""
values = np.asarray(values)
quantiles = np.asarray(quantiles)

if sample_weight is None:
return np.quantile(values, quantiles)

sample_weight = np.asarray(sample_weight)
sorted_indices = np.argsort(values)
sorted_values = values[sorted_indices]
sorted_weights = sample_weight[sorted_indices]
cumsum = np.cumsum(sorted_weights)
cumsum_normalized = cumsum / cumsum[-1]
result = np.zeros(len(quantiles))
for i, q in enumerate(quantiles):
idx = np.searchsorted(cumsum_normalized, q)
if idx >= len(sorted_values):
idx = len(sorted_values) - 1
result[i] = sorted_values[idx]

return result


def _fit_knots(X, num_knots, sample_weight=None):
# type: (np.ndarray, int, Optional[np.ndarray]) -> np.ndarray
"""
Generates knots by finding `num_knots` quantiles of the given input distribution.

Parameters
----------
X : ndarray
1-D array of input values.
num_knots : int
Number of knots to generate.
sample_weight : ndarray, optional
Sample weights for weighted quantile computation.

Returns
-------
knots : ndarray
Knot positions at evenly-spaced quantiles.
"""
if len(X.shape) != 1:
raise ValueError("X must be a vector; has shape {}".format(X.shape))
Expand All @@ -139,5 +194,6 @@ def _fit_knots(X, num_knots):
)

percentiles = np.linspace(0, 100, num_knots, endpoint=False)[1:]
quantiles = percentiles / 100.0

return np.percentile(X, percentiles)
return _weighted_quantile(X, quantiles, sample_weight)
3 changes: 2 additions & 1 deletion tests/test_progressive_fitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ def test_stratified_sampling_method(self):
model.random_state_ = np.random.RandomState(42)
model.input_score_column_index = 0

X_sub, y_sub, indices = model._stratified_subsample(
X_sub, y_sub, weight_sub, indices = model._stratified_subsample(
self.X, self.y, n_samples=100, n_strata=10
)

self.assertEqual(len(X_sub), 100)
self.assertEqual(len(y_sub), 100)
self.assertIsNone(weight_sub)

# Check coverage
original_range = np.ptp(self.X[:, 0])
Expand Down
Loading