Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# conftest.py

# solving the tcl issue
import os
os.environ.setdefault("MPLBACKEND", "Agg")
try:
import matplotlib
matplotlib.use("Agg", force=True)
except Exception:
pass

import numpy as np
import pytest
import pyvinecopulib as pvc
Expand Down Expand Up @@ -27,7 +37,7 @@
def bicop_pair(request):
"""
Returns a tuple:
( family, true_params, U_tensor, bicop_fastkde, bicop_tll )
( family, true_params, U_tensor, bicop_fastkde, bicop_tll, bicop_torchKDE )

notice the scope="module" so that the fixture is created only once and reused in all tests that use it.
"""
Expand All @@ -38,14 +48,17 @@ def bicop_pair(request):
U = true_bc.simulate(n=N_SIM, seeds=SEEDS) # shape (N_SIM, 2)
U_tensor = torch.tensor(U, device=DEVICE, dtype=torch.float64)

# 2) fit two torchvinecopulib instances (fast KDE and TLL)
# 2) fit two torchvinecopulib instances (torch KDE, fast KDE and TLL)
bc_fast = tvc.BiCop(num_step_grid=512).to(DEVICE)
bc_fast.fit(U_tensor, mtd_kde="fastKDE")

bc_torch = tvc.BiCop(num_step_grid=512).to(DEVICE)
bc_torch.fit(U_tensor, mtd_kde="torchKDE")

bc_tll = tvc.BiCop(num_step_grid=512).to(DEVICE)
bc_tll.fit(U_tensor, mtd_kde="tll")

return family, true_params, rotation, U_tensor, bc_fast, bc_tll
return family, true_params, rotation, U_tensor, bc_fast, bc_tll, bc_torch


@pytest.fixture(scope="module")
Expand Down
50 changes: 27 additions & 23 deletions tests/test_bicop.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ def test_device_and_dtype():


def test_monotonicity_and_range(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair

# pick one of the two implementations or loop both
for bicop in (bc_fast, bc_tll):
for bicop in (bc_fast, bc_tll, bc_torch):
# * simple diagonal check
grid = torch.linspace(EPS, 1.0 - EPS, 100, device=U.device, dtype=torch.float64).unsqueeze(
1
Expand All @@ -44,9 +44,9 @@ def test_monotonicity_and_range(bicop_pair):


def test_inversion(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair

for bicop in (bc_fast, bc_tll):
for bicop in (bc_fast, bc_tll, bc_torch):
grid = torch.linspace(0.1, 0.9, 50, device=U.device, dtype=torch.float64).unsqueeze(1)
for u in grid:
pts = torch.hstack([grid, u.repeat(grid.size(0), 1)])
Expand All @@ -68,8 +68,8 @@ def test_inversion(bicop_pair):


def test_pdf_integrates_to_one(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
for cop in (bc_fast, bc_tll):
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop in (bc_fast, bc_tll, bc_torch):
# our grid is uniform on [0,1]² with spacing Δ = 1/(N−1)
Δ = 1.0 / (cop.num_step_grid - 1)
# approximate ∫ pdf(u,v) du dv ≈ Σ_pdf_grid * Δ²
Expand All @@ -80,8 +80,8 @@ def test_pdf_integrates_to_one(bicop_pair):


def test_log_pdf_matches_log_of_pdf(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
for cop in (bc_fast, bc_tll):
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop in (bc_fast, bc_tll, bc_torch):
pts = torch.rand(500, 2, dtype=torch.float64, device=cop.device)
pdf = cop.pdf(pts)
logp = cop.log_pdf(pts)
Expand All @@ -102,8 +102,8 @@ def test_log_pdf_handles_zero():


def test_sample_marginals(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
for cop in (bc_fast, bc_tll):
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop in (bc_fast, bc_tll, bc_torch):
for is_sobol in (False, True):
samp = cop.sample(2000, seed=0, is_sobol=is_sobol)
# samples lie in [0,1]
Expand All @@ -117,8 +117,8 @@ def test_sample_marginals(bicop_pair):


def test_internal_buffers_and_flags(bicop_pair):
_, _, _, U, bc_fast, bc_tll = bicop_pair
for cop, mtd_kde in [(bc_fast, "fastKDE"), (bc_tll, "tll")]:
_, _, _, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop, mtd_kde in [(bc_fast, "fastKDE"), (bc_tll, "tll"), (bc_torch, "torchKDE")]:
print(cop)
assert not cop.is_indep
assert cop.mtd_kde == mtd_kde
Expand All @@ -131,7 +131,7 @@ def test_internal_buffers_and_flags(bicop_pair):


def test_tau_estimation(bicop_pair):
_, _, _, U, bc_fast, bc_mtd_kde = bicop_pair
_, _, _, U, bc_fast, bc_mtd_kde, bc_torch = bicop_pair
# re‐fit with tau estimation
bc = tvc.BiCop(num_step_grid=64)
bc.fit(U, mtd_kde="tll", is_tau_est=True)
Expand All @@ -141,17 +141,18 @@ def test_tau_estimation(bicop_pair):


def test_sample_shape_and_dtype_on_tll(bicop_pair):
_, _, _, U, bc_fast, bc_tll = bicop_pair
for cop in (bc_fast, bc_tll):
_, _, _, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop in (bc_fast, bc_tll, bc_torch):
s = cop.sample(123, seed=7, is_sobol=True)
assert s.shape == (123, 2)
assert s.dtype is cop.dtype
assert s.device == cop.device


def test_imshow_and_plot_api(bicop_pair):
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
cop = bc_fast
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair
# cop = bc_fast
cop = bc_torch
# imshow
fig, ax = cop.imshow(is_log_pdf=True)
assert isinstance(fig, matplotlib.figure.Figure)
Expand Down Expand Up @@ -186,16 +187,18 @@ def test_imshow_and_plot_api(bicop_pair):


def test_plot_accepts_unused_kwargs(bicop_pair):
_, _, _, U, bc_fast, _ = bicop_pair
_, _, _, U, bc_fast, _ , bc_torch = bicop_pair
# just ensure it doesn’t crash
bc_fast.plot(plot_type="contour", margin_type="norm", xylim=(0, 1), grid_size=50)
bc_fast.plot(plot_type="surface", margin_type="unif", xylim=(0, 1), grid_size=20)
# bc_fast.plot(plot_type="contour", margin_type="norm", xylim=(0, 1), grid_size=50)
# bc_fast.plot(plot_type="surface", margin_type="unif", xylim=(0, 1), grid_size=20)
bc_torch.plot(plot_type="contour", margin_type="norm", xylim=(0, 1), grid_size=50)
bc_torch.plot(plot_type="surface", margin_type="unif", xylim=(0, 1), grid_size=20)


def test_reset_and_str(bicop_pair):
# ! notice scope="module" so we put this test at the end
family, params, rotation, U, bc_fast, bc_tll = bicop_pair
for cop in (bc_fast, bc_tll):
family, params, rotation, U, bc_fast, bc_tll, bc_torch = bicop_pair
for cop in (bc_fast, bc_tll, bc_torch):
cop.reset()
# should go back to independent
assert cop.is_indep
Expand Down Expand Up @@ -261,7 +264,8 @@ def test_interp_on_trivial_grid():
def test_imshow_with_existing_axes():
cop = tvc.BiCop(num_step_grid=32)
us = torch.rand(100, 2)
cop.fit(us, mtd_kde="fastKDE")
cop.fit(us, mtd_kde="torchKDE")
# cop.fit(us, mtd_kde="fastKDE")
fig, outer_ax = plt.subplots()
fig2, ax2 = cop.imshow(is_log_pdf=False, ax=outer_ax, cmap="viridis")
# should have returned the same axes object
Expand Down
145 changes: 145 additions & 0 deletions tests/test_torchkde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import pytest
import torch
import torchvinecopulib as tvc
from scipy.stats import kstest, wasserstein_distance

from . import EPS, bicop_pair


def test_1dkde_internal_grid_is_finite():
"""
1D KDE must build finite grids even if input contains NaN/±Inf.
This ensures later interpolation/extrapolation is safe.
"""
x_clean = torch.randn(2000, dtype=torch.float64)
x_bad = torch.tensor([float("nan"), float("inf"), -float("inf")], dtype=torch.float64)
x = torch.cat([x_clean, x_bad])

# Build 1D KDE (adjust path if your class lives elsewhere)
kde = tvc.kdeCDFPPF1D(x, bandwidth_method="auto")

# All internal grids must be finite
for name in ("grid_x", "grid_pdf", "grid_cdf"):
grid = getattr(kde, name)
assert torch.isfinite(grid).all(), f"{name} contains non-finite values"

# Basic validity: pdf ≥ 0; cdf in [0,1] and non-decreasing
assert (kde.grid_pdf >= -EPS).all()
assert kde.grid_cdf.min() >= -EPS and kde.grid_cdf.max() <= 1 + EPS
if kde.grid_cdf.numel() > 1:
assert kde.grid_cdf.diff().min() >= -EPS


def test_bicop_grids_and_eval_are_finite_with_nan_inf(bicop_pair):
"""
All BiCop modes (fastKDE, tll, torchKDE) must:
- build fully finite internal grids even if training data contain NaN/±Inf/OOB,
- evaluate safely on queries containing NaN/±Inf/OOB:
* finite rows → finite outputs (and proper ranges),
* non-finite rows → NaN (no crash).
"""
_, _, _, U, bc_fast, bc_tll, bc_torch = bicop_pair

# Inject some bad rows into otherwise valid data and refit fresh models
bad = torch.tensor(
[
[float("nan"), 0.3],
[0.4, float("inf")],
[-float("inf"), 0.9],
[1.1, -0.1], # OOB but finite
],
dtype=torch.float64,
device=U.device,
)
U_dirty = torch.vstack([U, bad])

modes = [("fastKDE", bc_fast), ("tll", bc_tll), ("torchKDE", bc_torch)]
for name, _ in modes:
cop = tvc.BiCop(num_step_grid=64).to(device=U.device)
cop.fit(U_dirty, mtd_kde=name)

# 1) Internal grids must be finite
for gname in ("_pdf_grid", "_cdf_grid", "_hfunc_l_grid", "_hfunc_r_grid"):
G = getattr(cop, gname)
assert torch.isfinite(G).all(), f"{name}:{gname} contains non-finite values"

# 2) Evaluation must be safe on queries with NaN/±Inf/OOB
Q = torch.tensor(
[
[0.2, 0.7], # clean
[float("nan"), 0.3], # NaN
[0.4, float("inf")], # +Inf
[-float("inf"), 0.9], # -Inf
[0.0, 1.0], # edge
[1.1, -0.1], # OOB (finite)
],
dtype=torch.float64,
device=cop.device,
)
finite = torch.isfinite(Q).all(dim=1)

# pdf/log_pdf: finite rows must be finite; pdf non-negative
pdf = cop.pdf(Q).squeeze(1)
logp = cop.log_pdf(Q).squeeze(1)
assert torch.isfinite(pdf[finite]).all()
assert torch.isfinite(logp[finite]).all()
assert (pdf[finite] >= -EPS).all()

# Non-finite rows should come back as NaN (by design) and never crash
if (~finite).any():
assert torch.isnan(pdf[~finite]).all()
assert torch.isnan(logp[~finite]).all()

# cdf/hfuncs/hinvs: finite rows within [0,1]
for fn in (cop.cdf, cop.hfunc_r, cop.hfunc_l):
out = fn(Q).squeeze(1)
assert (out[finite] >= -EPS).all() and (out[finite] <= 1 + EPS).all()
if (~finite).any():
assert torch.isnan(out[~finite]).all()

# Inverses: finite rows → valid [0,1]; non-finite rows → NaN
out_r = cop.hinv_r(Q).squeeze(1)
out_l = cop.hinv_l(Q).squeeze(1)
assert (out_r[finite] >= -EPS).all() and (out_r[finite] <= 1 + EPS).all()
assert (out_l[finite] >= -EPS).all() and (out_l[finite] <= 1 + EPS).all()
if (~finite).any():
assert torch.isnan(out_r[~finite]).all()
assert torch.isnan(out_l[~finite]).all()



def test_pit_goodness_of_fit_train_test(bicop_pair):
"""
Split U into train/test.
Fit on train; compute PIT on test via hfunc_r.
PIT should be close to Uniform[0,1] (KS and Wasserstein).
"""

_, _, _, U, bc_fast, bc_tll, bc_torch = bicop_pair
n = U.shape[0]
n_tr = int(0.6 * n)
Utr, Ute = U[:n_tr], U[n_tr:]

cases = [
("fastKDE", bc_fast),
("tll", bc_tll),
("torchKDE", bc_torch),
]

for name, _ in cases:
cop = tvc.BiCop(num_step_grid=64)
cop.fit(Utr, mtd_kde=name)

pit = cop.hfunc_r(Ute).squeeze(1).cpu().numpy() # PIT should be ~ U(0,1)

# KS vs Uniform(0,1)
ks_stat, ks_p = kstest(pit, "uniform")

# 1-Wasserstein vs iid Uniform sample of same size
uni = torch.rand_like(Ute[:, 0]).cpu().numpy()
wdist = wasserstein_distance(pit, uni)

# Lenient but meaningful thresholds (tune if your grids/resolution differ)
assert ks_stat < 0.12
assert wdist < 0.06
assert ks_p > 1e-3
2 changes: 2 additions & 0 deletions torchvinecopulib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from . import util
from .bicop import BiCop
from .vinecop import VineCop
from .util import kdeCDFPPF1D

__all__ = [
"BiCop",
"VineCop",
"util",
"kdeCDFPPF1D",
]
# dynamically grab the version you just built & installed
try:
Expand Down
Loading