Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions cognite_toolkit/_cdf_tk/utils/interactive_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import questionary
from cognite.client.data_classes import (
Asset,
AssetList,
DataSet,
DataSetList,
UserProfileList,
filters,
)
Expand Down Expand Up @@ -42,7 +44,7 @@
T_Type = TypeVar("T_Type", bound=Asset | DataSet)


class AssetCentricInteractiveSelect(ABC):
class AssetCentricBaseSelect(ABC):
def __init__(self, client: ToolkitClient, operation: str) -> None:
self.client = client
self.operation = operation
Expand Down Expand Up @@ -220,22 +222,22 @@ def _select(
return selected


class AssetInteractiveSelect(AssetCentricInteractiveSelect):
class AssetInteractiveSelect(AssetCentricBaseSelect):
def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator:
return AssetAggregator(self.client)


class FileMetadataInteractiveSelect(AssetCentricInteractiveSelect):
class FileMetadataInteractiveSelect(AssetCentricBaseSelect):
def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator:
return FileAggregator(self.client)


class TimeSeriesInteractiveSelect(AssetCentricInteractiveSelect):
class TimeSeriesInteractiveSelect(AssetCentricBaseSelect):
def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator:
return TimeSeriesAggregator(self.client)


class EventInteractiveSelect(AssetCentricInteractiveSelect):
class EventInteractiveSelect(AssetCentricBaseSelect):
def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator:
return EventAggregator(self.client)

Expand Down Expand Up @@ -773,3 +775,99 @@ def _get_available_spaces(self, include_global: bool = False) -> SpaceList:
if include_global:
return self._available_spaces
return SpaceList([space for space in self._available_spaces if not space.is_global])


T_Option = TypeVar("T_Option", bound=DataSet | Asset)


class AssetCentricInteractive:
def __init__(self, client: ToolkitClient, operation: str) -> None:
self.client = client
self.operation = operation

@overload
def select_data_set(
self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True
) -> DataSet | None: ...

@overload
def select_data_set(
self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True
) -> DataSet: ...

@overload
def select_data_set(
self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool = True
) -> DataSetList: ...

def select_data_set(
self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True
) -> DataSet | DataSetList | None:
datasets = self.client.data_sets.list(limit=-1)
selected = self._select(
datasets, "data set", "data_set_external_id", multi, allow_empty, include_resource_counts
)
if isinstance(selected, list):
return DataSetList(selected)
return selected

@overload
def select_hierarchy(
self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True
) -> Asset | None: ...

@overload
def select_hierarchy(
self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True
) -> Asset: ...

@overload
def select_hierarchy(self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool) -> AssetList: ...

def select_hierarchy(
self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True
) -> Asset | AssetList | None:
hierarchies = self.client.assets.list(root=True, limit=-1)
selected = self._select(hierarchies, "hierarchy", "hierarchy", multi, allow_empty, include_resource_counts)
if isinstance(selected, list):
return AssetList(selected)
return selected

def _select(
self,
options: Sequence[T_Option],
display_name: str,
count_arg: Literal["data_set_external_id", "hierarchy"],
multi: bool,
allow_empty: bool = False,
include_resource_counts: bool = True,
) -> T_Option | list[T_Option] | None:
if not options and not allow_empty:
raise ToolkitValueError(f"No {display_name} is available to select.")
choices: list[questionary.Choice] = []
for option in sorted(options, key=lambda o: o.name or o.external_id or ""):
title = f"{option.name} ({option.external_id})" if option.name != option.external_id else f"{option.name}"
if include_resource_counts and option.external_id:
kwargs = {count_arg: option.external_id}

# MyPy fails to recognize that kwargs has string keys here.
asset_count = AssetAggregator(self.client).count(**kwargs) # type: ignore[misc]
event_count = EventAggregator(self.client).count(**kwargs) # type: ignore[misc]
file_count = FileAggregator(self.client).count(**kwargs) # type: ignore[misc]
time_series_count = TimeSeriesAggregator(self.client).count(**kwargs) # type: ignore[misc]
title += (
f"[Assets: {asset_count:,}, Events: {event_count:,}, "
f"Files: {file_count:,}, Time Series: {time_series_count:,}]"
)
choices.append(questionary.Choice(title=title, value=option))

message = f"Select a {display_name} to {self.operation} listed as 'name (external_id)'"
if include_resource_counts:
message += " [Assets: x, Events: x, Files: x, Time Series: x]"
if multi:
selected = questionary.checkbox(message, choices=choices).ask()
else:
selected = questionary.select(message, choices=choices).ask()
if selected is None and not allow_empty:
raise ToolkitValueError(f"No {display_name} selected. Aborting.")
return selected
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from cognite_toolkit._cdf_tk.utils.aggregators import AssetCentricAggregator
from cognite_toolkit._cdf_tk.utils.interactive_select import (
AssetCentricDestinationSelect,
AssetCentricInteractive,
AssetInteractiveSelect,
DataModelingSelect,
EventInteractiveSelect,
Expand Down Expand Up @@ -967,3 +968,89 @@ def test_select_instance_spaces_without_view_or_instance_type_no_instances(self,
selector.select_instance_space()

assert "No instances found in any space" in str(exc_info.value)


class TestAssetCentricInteractive:
@pytest.mark.parametrize(
"multi,allow_empty,expected",
[
pytest.param(False, False, DataSet("B"), id="Single select, no empty"),
pytest.param(True, False, [DataSet("B")], id="Multi select, no empty"),
pytest.param(False, True, None, id="Single select, allow empty"),
pytest.param(True, True, [], id="Multi select, allow empty"),
],
)
def test_select_dataset(
self, multi: bool, allow_empty: bool, expected: DataSet | list[DataSet] | None, monkeypatch
) -> None:
data_sets = [DataSet(external_id=letter) for letter in "ABC"]

def select_data_set(choices: list[Choice]) -> DataSet | list[DataSet] | None:
assert len(choices) == 3
if allow_empty and multi:
return []
elif allow_empty:
return None
elif multi:
return [choices[1].value] # Select "B"
else:
return choices[1].value # Select "B"

answers = [select_data_set]

with (
monkeypatch_toolkit_client() as client,
MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers),
):
client.data_sets.list.return_value = data_sets
client.assets.aggregate_count.return_value = 10
client.events.aggregate_count.return_value = 20
client.files.aggregate.return_value = [CountAggregate(30)]
client.time_series.aggregate_count.return_value = 40

selector = AssetCentricInteractive(client, "test_operation")
result = selector.select_data_set(multi, allow_empty, include_resource_counts=True)

assert result == expected

@pytest.mark.parametrize(
"multi,allow_empty,expected",
[
pytest.param(False, False, Asset(external_id="B"), id="Single select, no empty"),
pytest.param(True, False, [Asset(external_id="B")], id="Multi select, no empty"),
pytest.param(False, True, None, id="Single select, allow empty"),
pytest.param(True, True, [], id="Multi select, allow empty"),
],
)
def test_select_hierarchy(
self, multi: bool, allow_empty: bool, expected: Asset | list[Asset] | None, monkeypatch
) -> None:
hierarchies = [Asset(external_id=letter) for letter in "ABC"]

def select_hierarchy(choices: list[Choice]) -> Asset | list[Asset] | None:
assert len(choices) == 3
if allow_empty and multi:
return []
elif allow_empty:
return None
elif multi:
return [choices[1].value] # Select "B"
else:
return choices[1].value # Select "B"

answers = [select_hierarchy]

with (
monkeypatch_toolkit_client() as client,
MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers),
):
client.assets.list.return_value = hierarchies
client.assets.aggregate_count.return_value = 10
client.events.aggregate_count.return_value = 20
client.files.aggregate.return_value = [CountAggregate(30)]
client.time_series.aggregate_count.return_value = 40

selector = AssetCentricInteractive(client, "test_operation")
result = selector.select_hierarchy(multi, allow_empty, include_resource_counts=True)

assert result == expected
Loading