Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions cosipy/statistics/likelihood_functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import itertools
import logging
import operator
from typing import Optional

from cosipy import UnBinnedData
from cosipy.interfaces.expectation_interface import ExpectationInterface, ExpectationDensityInterface
from cosipy.util.iterables import itertools_batched
from cosipy.util.iterables import asarray

logger = logging.getLogger(__name__)

Expand All @@ -23,15 +25,22 @@

class UnbinnedLikelihood(UnbinnedLikelihoodInterface):
def __init__(self,
expectation:ExpectationDensityInterface,
batch_size:int = 100000):
expectation: ExpectationDensityInterface,
batch_size: Optional[int] = None):
"""
Will get the number of events from the response and bkg expectation_density iterators

Parameters
----------
response
bkg
expectation : ExpectationDensityInterface
Object that provides the expected counts and the
``expectation_density()`` iterator used to compute the likelihood.

batch_size : int or None, optional
Number of density values to process at a time in ``get_log_like()``.
If None, all values are processed in a single batch.

This parameter only affects iteration when the expectation density
is provided as an iterator that is not a numpy array. If it is already
an array batching is not applied.
"""

self._expectation = expectation
Expand Down Expand Up @@ -61,17 +70,23 @@ def get_log_like(self) -> float:
# Based on the system
nobservations = 0
density_log_sum = 0

for density_iter_chunk in itertools_batched(self._expectation.expectation_density(), self._batch_size):

density = np.fromiter(density_iter_chunk, dtype=float)

if np.any(density == 0):
# np.log(0) = -inf for any event, no need to keep iterationg

expectation_density = self._expectation.expectation_density()

if (self._batch_size is None) or isinstance(expectation_density, np.ndarray):
chunks = [expectation_density]
else:
chunks = itertools_batched(expectation_density, self._batch_size)

for chunk in chunks:
density = asarray(chunk, dtype=np.float64, force_dtype=False)

# We don't have to continue
if density.min() <= 0.0:
return -np.inf

density_log_sum += np.sum(np.log(density))
nobservations += density.size
density_log_sum += np.sum(np.log(density))

self._nobservations = nobservations

Expand Down
26 changes: 25 additions & 1 deletion cosipy/util/iterables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import itertools
from typing import Union, Iterable, Optional
import numpy.typing as npt

import numpy as np

Expand All @@ -18,8 +20,30 @@ def itertools_batched(iterable, n, *, strict=False):
raise ValueError('batched(): incomplete batch')
yield batch

def asarray(a, dtype):
def asarray(a : Union[npt.ArrayLike, Iterable], dtype:npt.DTypeLike, force_dtype = True):
"""
Convert an iterable or an array-like object into a numpy array.

Parameters
----------
a: Iterable or array-like object
dtype: Desired type (e.g. np.float64)
force_dtype: If True, it is guaranteed that the output will have the specified dtype. If False,
we will attempt to infer the data-type from the input data, and dtype will be considered a fallback option.
Relaxing the dtype requirement can prevent an unnecessary copy if the input type does not exactly match the
requested dtype (e.g. np.float32 vs np.float64)

Returns
-------

"""
if hasattr(a, "__len__"):
# np.asarray does not work with an object without __len__
if not force_dtype:
# the data-type is inferred from the input data.
dtype = None

return np.asarray(a, dtype = dtype)
else:
# fromiter needs a dtype
return np.fromiter(a, dtype = dtype)
Empty file added tests/statistics/__init__.py
Empty file.
74 changes: 74 additions & 0 deletions tests/statistics/test_unbinned_likelihood.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import numpy as np
from typing import Iterable

from cosipy.interfaces.expectation_interface import ExpectationDensityInterface
from cosipy.statistics.likelihood_functions import UnbinnedLikelihood

class MockExpectationDensity(ExpectationDensityInterface):
"""
A minimal mock implementation of ExpectationDensityInterface
to feed predictable data into UnbinnedLikelihood for testing.
"""
def __init__(self, counts: float, density: Iterable[float]):
self._counts = counts
self._density = density

def expected_counts(self) -> float:
return self._counts

def expectation_density(self) -> Iterable[float]:
return self._density


def test_unbinned_likelihood_nobservations():
"""Test that the nobservations property correctly counts the density iterable."""
mock_exp = MockExpectationDensity(counts=10.0, density=[2.0, 3.0, 5.0])
likelihood = UnbinnedLikelihood(expectation=mock_exp)

assert likelihood.nobservations == 3


def test_unbinned_likelihood_get_log_like_success():
"""Test the correct mathematical calculation of the log-likelihood."""

counts = 10.0
density = [2.0, 3.0, 5.0]
mock_exp = MockExpectationDensity(counts=counts, density=density)

likelihood = UnbinnedLikelihood(expectation=mock_exp)
result = likelihood.get_log_like()

expected_log_like = np.log(30.0) - counts

assert np.isclose(result, expected_log_like)
assert likelihood.nobservations == 3


def test_unbinned_likelihood_negative_or_zero_density():
"""Test that a density <= 0 immediately returns -np.inf."""

mock_exp = MockExpectationDensity(counts=10.0, density=[2.0, 0.0, 5.0])
likelihood = UnbinnedLikelihood(expectation=mock_exp)

result = likelihood.get_log_like()

assert result == -np.inf


def test_unbinned_likelihood_with_generator_and_batching():
"""Test that batching works correctly when given a generator instead of a list."""

def density_generator():
for val in [2.0, 3.0, 5.0, 4.0]:
yield val

counts = 15.0
mock_exp = MockExpectationDensity(counts=counts, density=density_generator())

likelihood = UnbinnedLikelihood(expectation=mock_exp, batch_size=2)
result = likelihood.get_log_like()

expected_log_like = np.log(120.0) - counts

assert np.isclose(result, expected_log_like)
assert likelihood.nobservations == 4
65 changes: 65 additions & 0 deletions tests/util/test_iterables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from cosipy.util.iterables import asarray
import numpy as np

def test_asarray():

# From array
a = np.asarray([1,2,3], dtype = int)

# Same dtype. No copy
b = asarray(a, int)
assert isinstance(b, np.ndarray)
assert b.dtype == int
assert np.shares_memory(a, b)
assert np.allclose(b, a)

# Different dtype
b = asarray(a, np.float64)
assert isinstance(b, np.ndarray)
assert b.dtype == np.float64
assert np.allclose(b, a)

# Soft dtype requirement. No copy
b = asarray(a, np.float64, force_dtype=False)
assert isinstance(b, np.ndarray)
assert b.dtype == int
assert np.shares_memory(a,b)
assert np.allclose(b, a)

# From array-like
a = [1,2,3]

# With force dtype
b = asarray(a, np.float64)
assert isinstance(b, np.ndarray)
assert b.dtype == np.float64
assert np.allclose(b, a)

b = asarray(a, np.int32)
assert isinstance(b, np.ndarray)
assert b.dtype == np.int32
assert np.allclose(b, a)

# Relax dtype. No copy
b = asarray(a, np.float64, force_dtype=False)
assert isinstance(b, np.ndarray)
assert b.dtype == np.asarray(a).dtype # numpy infers int64
assert np.allclose(b, a)

# From generator w/o len
a_list = [1,2,3]
def gen():
for i in a_list:
yield i

a = gen()
b = asarray(a, np.float64)
assert isinstance(b, np.ndarray)
assert b.dtype == np.float64
assert np.allclose(b, a_list)

a = gen()
b = asarray(a, np.int32)
assert isinstance(b, np.ndarray)
assert b.dtype == np.int32
assert np.allclose(b, a_list)
Loading