diff --git a/dask_glm/algorithms.py b/dask_glm/algorithms.py index 1320e7b..e2c4c96 100644 --- a/dask_glm/algorithms.py +++ b/dask_glm/algorithms.py @@ -9,9 +9,9 @@ import numpy as np import dask.array as da from scipy.optimize import fmin_l_bfgs_b +from dask.array.utils import normalize_to_array - -from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client +from dask_glm.utils import dot, normalize, scatter_array, get_distributed_client, maybe_to_cupy from dask_glm.families import Logistic from dask_glm.regularizers import Regularizer @@ -225,14 +225,22 @@ def admm(X, y, regularizer='l1', lamduh=0.1, rho=1, over_relax=1, def create_local_gradient(func): @functools.wraps(func) def wrapped(beta, X, y, z, u, rho): - return func(beta, X, y) + rho * (beta - z + u) + beta = maybe_to_cupy(beta, X) + z = maybe_to_cupy(z, X) + u = maybe_to_cupy(u, X) + res = func(beta, X, y) + rho * (beta - z + u) + return normalize_to_array(res) return wrapped def create_local_f(func): @functools.wraps(func) def wrapped(beta, X, y, z, u, rho): - return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u, - beta - z + u) + beta = maybe_to_cupy(beta, X) + z = maybe_to_cupy(z, X) + u = maybe_to_cupy(u, X) + res = func(beta, X, y) + (rho / 2) * np.dot(beta - z + u, + beta - z + u) + return normalize_to_array(res) return wrapped f = create_local_f(pointwise_loss) @@ -286,7 +294,7 @@ def wrapped(beta, X, y, z, u, rho): if primal_res < eps_pri and dual_res < eps_dual: break - return z + return maybe_to_cupy(z, X) def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): @@ -339,19 +347,20 @@ def lbfgs(X, y, regularizer=None, lamduh=1.0, max_iter=100, tol=1e-4, beta0 = np.zeros(p) def compute_loss_grad(beta, X, y): + beta = maybe_to_cupy(beta, X) scatter_beta = scatter_array( beta, dask_distributed_client) if dask_distributed_client else beta loss_fn = pointwise_loss(scatter_beta, X, y) gradient_fn = pointwise_gradient(scatter_beta, X, y) loss, gradient = compute(loss_fn, gradient_fn) - return loss, gradient.copy() + return normalize_to_array(loss), normalize_to_array(gradient.copy()) with dask.config.set(fuse_ave_width=0): # optimizations slows this down beta, loss, info = fmin_l_bfgs_b( compute_loss_grad, beta0, fprime=None, args=(X, y), iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter) - + beta = maybe_to_cupy(beta, X) return beta diff --git a/dask_glm/tests/test_admm.py b/dask_glm/tests/test_admm.py index 7b0373a..648053e 100644 --- a/dask_glm/tests/test_admm.py +++ b/dask_glm/tests/test_admm.py @@ -7,7 +7,7 @@ from dask_glm.algorithms import admm, local_update from dask_glm.families import Logistic, Normal from dask_glm.regularizers import L1 -from dask_glm.utils import make_y +from dask_glm.utils import make_y, to_dask_cupy_array_xy @pytest.mark.parametrize('N', [1000, 10000]) @@ -46,11 +46,16 @@ def wrapped(beta, X, y, z, u, rho): @pytest.mark.parametrize('N', [1000, 10000]) @pytest.mark.parametrize('nchunks', [5, 10]) @pytest.mark.parametrize('p', [1, 5, 10]) -def test_admm_with_large_lamduh(N, p, nchunks): +@pytest.mark.parametrize('is_cupy', [True, False]) +def test_admm_with_large_lamduh(N, p, nchunks, is_cupy): X = da.random.random((N, p), chunks=(N // nchunks, p)) beta = np.random.random(p) y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + if is_cupy: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) + X, y = persist(X, y) z = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=500) diff --git a/dask_glm/tests/test_algos_families.py b/dask_glm/tests/test_algos_families.py index 1f1cfc4..04d2690 100644 --- a/dask_glm/tests/test_algos_families.py +++ b/dask_glm/tests/test_algos_families.py @@ -10,7 +10,8 @@ gradient_descent, admm) from dask_glm.families import Logistic, Normal, Poisson from dask_glm.regularizers import Regularizer -from dask_glm.utils import sigmoid, make_y +from dask_glm.utils import (sigmoid, make_y, maybe_to_cupy, + to_dask_cupy_array_xy) def add_l1(f, lam): @@ -46,8 +47,14 @@ def make_intercept_data(N, p, seed=20009): [(100, 2, 20009), (250, 12, 90210), (95, 6, 70605)]) -def test_methods(N, p, seed, opt): +@pytest.mark.parametrize('is_cupy', [True, False]) +def test_methods(N, p, seed, opt, is_cupy): X, y = make_intercept_data(N, p, seed=seed) + + if is_cupy: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) + coefs = opt(X, y) p = sigmoid(X.dot(coefs).compute()) @@ -64,16 +71,22 @@ def test_methods(N, p, seed, opt): @pytest.mark.parametrize('N', [1000]) @pytest.mark.parametrize('nchunks', [1, 10]) @pytest.mark.parametrize('family', [Logistic, Normal, Poisson]) -def test_basic_unreg_descent(func, kwargs, N, nchunks, family): +@pytest.mark.parametrize('is_cupy', [True, False]) +def test_basic_unreg_descent(func, kwargs, N, nchunks, family, is_cupy): beta = np.random.normal(size=2) M = len(beta) X = da.random.random((N, M), chunks=(N // nchunks, M)) y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + if is_cupy: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) + X, y = persist(X, y) result = func(X, y, family=family, **kwargs) test_vec = np.random.normal(size=2) + test_vec = maybe_to_cupy(test_vec, X) opt = family.pointwise_loss(result, X, y).compute() test_val = family.pointwise_loss(test_vec, X, y).compute() @@ -90,16 +103,22 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family): @pytest.mark.parametrize('family', [Logistic, Normal, Poisson]) @pytest.mark.parametrize('lam', [0.01, 1.2, 4.05]) @pytest.mark.parametrize('reg', [r() for r in Regularizer.__subclasses__()]) -def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg): +@pytest.mark.parametrize('is_cupy', [True, False]) +def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg, is_cupy): beta = np.random.normal(size=2) M = len(beta) X = da.random.random((N, M), chunks=(N // nchunks, M)) y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + if is_cupy: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) + X, y = persist(X, y) result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) test_vec = np.random.normal(size=2) + test_vec = maybe_to_cupy(test_vec, X) f = reg.add_reg_f(family.pointwise_loss, lam) @@ -120,8 +139,12 @@ def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg): 'threading', 'multiprocessing' ]) -def test_determinism(func, kwargs, scheduler): +@pytest.mark.parametrize('is_cupy', [True, False]) +def test_determinism(func, kwargs, scheduler, is_cupy): X, y = make_intercept_data(1000, 10) + if is_cupy: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) with dask.config.set(scheduler=scheduler): a = func(X, y, **kwargs) diff --git a/dask_glm/tests/test_estimators.py b/dask_glm/tests/test_estimators.py index d2212c4..b3c13f8 100644 --- a/dask_glm/tests/test_estimators.py +++ b/dask_glm/tests/test_estimators.py @@ -4,6 +4,7 @@ from dask_glm.estimators import LogisticRegression, LinearRegression, PoissonRegression from dask_glm.datasets import make_classification, make_regression, make_poisson from dask_glm.regularizers import Regularizer +from dask_glm.utils import to_dask_cupy_array_xy @pytest.fixture(params=[r() for r in Regularizer.__subclasses__()]) @@ -44,9 +45,17 @@ def test_pr_init(solver): @pytest.mark.parametrize('fit_intercept', [True, False]) -@pytest.mark.parametrize('is_sparse', [True, False]) -def test_fit(fit_intercept, is_sparse): +@pytest.mark.parametrize('is_sparse,is_cupy', [ + (True, False), + (False, False), + (False, True)]) +def test_fit(fit_intercept, is_sparse, is_cupy): X, y = make_classification(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse) + + if is_cupy and not is_sparse: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) + lr = LogisticRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) @@ -54,9 +63,15 @@ def test_fit(fit_intercept, is_sparse): @pytest.mark.parametrize('fit_intercept', [True, False]) -@pytest.mark.parametrize('is_sparse', [True, False]) -def test_lm(fit_intercept, is_sparse): +@pytest.mark.parametrize('is_sparse,is_cupy', [ + (True, False), + (False, False), + (False, True)]) +def test_lm(fit_intercept, is_sparse, is_cupy): X, y = make_regression(n_samples=100, n_features=5, chunksize=10, is_sparse=is_sparse) + if is_cupy and not is_sparse: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) lr = LinearRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) @@ -65,10 +80,16 @@ def test_lm(fit_intercept, is_sparse): @pytest.mark.parametrize('fit_intercept', [True, False]) -@pytest.mark.parametrize('is_sparse', [True, False]) -def test_big(fit_intercept, is_sparse): +@pytest.mark.parametrize('is_sparse,is_cupy', [ + (True, False), + (False, False), + (False, True)]) +def test_big(fit_intercept, is_sparse, is_cupy): with dask.config.set(scheduler='synchronous'): X, y = make_classification(is_sparse=is_sparse) + if is_cupy and not is_sparse: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) lr = LogisticRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) @@ -78,10 +99,16 @@ def test_big(fit_intercept, is_sparse): @pytest.mark.parametrize('fit_intercept', [True, False]) -@pytest.mark.parametrize('is_sparse', [True, False]) -def test_poisson_fit(fit_intercept, is_sparse): +@pytest.mark.parametrize('is_sparse,is_cupy', [ + (True, False), + (False, False), + (False, True)]) +def test_poisson_fit(fit_intercept, is_sparse, is_cupy): with dask.config.set(scheduler='synchronous'): X, y = make_poisson(is_sparse=is_sparse) + if is_cupy and not is_sparse: + cupy = pytest.importorskip('cupy') + X, y = to_dask_cupy_array_xy(X, y, cupy) pr = PoissonRegression(fit_intercept=fit_intercept) pr.fit(X, y) pr.predict(X) diff --git a/dask_glm/utils.py b/dask_glm/utils.py index 0fe3429..0ae88eb 100644 --- a/dask_glm/utils.py +++ b/dask_glm/utils.py @@ -41,7 +41,7 @@ def sigmoid(x): @dispatch(object) def exp(A): - return A.exp() + return np.exp(A) @dispatch(float) @@ -91,7 +91,7 @@ def sign(A): @dispatch(object) def log1p(A): - return A.log1p() + return np.log1p(A) @dispatch(np.ndarray) @@ -205,3 +205,25 @@ def get_distributed_client(): return get_client() except ValueError: return None + + +def maybe_to_cupy(beta, X): + """ convert beta, a numpy array, to a cupy array + if X is a cupy array or dask cupy array + """ + if "cupy" in str(type(X)) or \ + hasattr(X, '_meta') and 'cupy' in str(type(X._meta)): + import cupy + return cupy.asarray(beta) + return beta + + +def to_dask_cupy_array(X, cupy): + """ convert a dask numpy array to a dask cupy array + """ + return X.map_blocks(lambda x: cupy.asarray(x), + dtype=X.dtype, meta=cupy.asarray(X._meta)) + + +def to_dask_cupy_array_xy(X, y, cupy): + return to_dask_cupy_array(X, cupy), to_dask_cupy_array(y, cupy)