Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/phylim/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ModelPsubs,
classify_matrix,
)
from phylim.delta_col import calc_delta_col
from phylim.eval_identifiability import IdentCheckRes, eval_identifiability


Expand Down Expand Up @@ -94,6 +95,7 @@ class PhyloLimitRec:
model_name: Union[str, None]
boundary_values: Union[list[dict], None]
nondlc_and_identity: Union[dict[tuple[str, ...], MatrixCategory], None]
delta_col: dict[str, float]

def to_rich_dict(self) -> dict:
result = self.check.to_rich_dict()
Expand All @@ -104,6 +106,7 @@ def to_rich_dict(self) -> dict:
result["nondlc_and_identity"] = {
k[0]: v.value for k, v in self.nondlc_and_identity.items()
}
result["delta_col"] = self.delta_col or {}
result["version"] = __version__
return result

Expand Down Expand Up @@ -169,14 +172,15 @@ def main(
boundary_values = check_bound_app(inference).vio
psubs_labelled = classify_psubs_app(inference)
result = eval_identifiability(psubs_labelled, tree, self.strict)

delta_col = calc_delta_col(load_psubs(_get_lf(inference)))
return PhyloLimitRec(
check=result,
model_name=inference.name,
boundary_values=boundary_values,
nondlc_and_identity={
k: v for k, v in psubs_labelled.items() if v is not DLC
},
delta_col=delta_col,
)


Expand Down
39 changes: 39 additions & 0 deletions src/phylim/delta_col.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import numpy

from phylim.classify_matrix import ModelPsubs


def min_diff_from_diag(
m: numpy.ndarray, diag_indices: tuple, off_diag_indices: numpy.ndarray
) -> numpy.ndarray:
"""compute difference for each column between diagonal and
the largest off-diagonal element

Args:
m (numpy.ndarray): a matrix
diag_indices (tuple): diagonal indices
off_diag_indices (numpy.ndarray): off-diagonal indices in boolean
"""
return m[diag_indices] - m.max(axis=0, where=off_diag_indices, initial=m.min())


def min_col_diff(
m: numpy.ndarray,
diag_indices: tuple, # = DIAG,
off_diag_indices: numpy.ndarray, # = OFFDIAG,
) -> float:
return min_diff_from_diag(m, diag_indices, off_diag_indices).min()


def calc_delta_col(psubs: ModelPsubs) -> dict:
"""calculate delta_col for given psubs"""
psubs_dict = {}
for key, value in psubs.items():
p = value.to_array()
shape = p.shape
mask = numpy.ones(shape, bool)
mask[numpy.diag_indices(shape[0])] = False
diag_indices = numpy.diag_indices(shape[0])
offdiag_indices = mask
psubs_dict[key] = min_col_diff(p, diag_indices, offdiag_indices)
return psubs_dict
18 changes: 18 additions & 0 deletions tests/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,21 @@ def test_phylim_filter_app_fail():
filter_app = phylim_filter(strict=False)
result2 = filter_app(_model_res)
assert isinstance(result2, model_result)


def test_delta_col():
rec_app = phylim()
record = rec_app(_model_res)
delta_col_dict = record.delta_col
assert isinstance(delta_col_dict, dict)
assert all(isinstance(v, float) for v in delta_col_dict.values())


def test_calc_delta_col_tree_coverage():
tree = _model_res.lf.tree
expected_edge_count = len(list(tree.preorder(include_self=False)))
rec_app = phylim()
record = rec_app(_model_res)
delta_col_dict = record.delta_col

assert len(delta_col_dict) == expected_edge_count
51 changes: 51 additions & 0 deletions tests/test_delta_col.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import numpy
import pytest
from cogent3.util.dict_array import DictArray

from phylim.classify_matrix import ModelPsubs
from phylim.delta_col import calc_delta_col


@pytest.mark.parametrize(
"matrix,expected_delta_col",
[
(
numpy.array(
[
[1.0, 0.0, 0.0, 0.0],
[0.0, 0.9, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.1, 0.0, 1.0],
]
),
0.8,
),
(
numpy.array(
[
[1.0, 0.0, 0.0, 0.0],
[0.0, 0.1, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.9, 0.0, 1.0],
]
),
-0.8,
),
(
numpy.array(
[
[1.0, 0.0, 0.0, 0.0],
[0.0, 0.5, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.5, 0.0, 1.0],
]
),
0.0,
),
],
)
def test_calc_delta_col_hand_constructed(matrix, expected_delta_col):
dict_array = DictArray(matrix)
psubs = ModelPsubs(psubs={("test_edge",): dict_array}, source="test")
result = calc_delta_col(psubs)
assert numpy.isclose(result[("test_edge",)], expected_delta_col)