From 3209ab5c1a279778ebac6145c8fa41842f7b4d3b Mon Sep 17 00:00:00 2001 From: Jakob Wagner Date: Tue, 2 Jul 2024 17:56:41 +0200 Subject: [PATCH 1/2] add masked operator dataset --- src/continuiti/data/__init__.py | 3 +- src/continuiti/data/dataset.py | 288 ++++++++++++++++++++++++++------ 2 files changed, 236 insertions(+), 55 deletions(-) diff --git a/src/continuiti/data/__init__.py b/src/continuiti/data/__init__.py index d194c3ce..d737af1a 100644 --- a/src/continuiti/data/__init__.py +++ b/src/continuiti/data/__init__.py @@ -5,11 +5,12 @@ Every data set is a list of `(x, u, y, v)` tuples. """ -from .dataset import OperatorDataset +from .dataset import OperatorDataset, MaskedOperatorDataset from .utility import split, dataset_loss __all__ = [ "OperatorDataset", + "MaskedOperatorDataset", "split", "dataset_loss", ] diff --git a/src/continuiti/data/dataset.py b/src/continuiti/data/dataset.py index ea13c1dc..43f40a9f 100644 --- a/src/continuiti/data/dataset.py +++ b/src/continuiti/data/dataset.py @@ -7,8 +7,9 @@ import torch import torch.utils.data as td -from typing import Optional, Tuple -from abc import ABC, abstractmethod +from torch.nn.utils.rnn import pad_sequence +from typing import Optional, Tuple, List, Union +from abc import ABC from continuiti.transforms import Transform from continuiti.operators.shape import OperatorShapes, TensorShape @@ -16,28 +17,37 @@ class OperatorDatasetBase(td.Dataset, ABC): """Abstract base class of a dataset for operator training.""" - shapes: OperatorShapes + def __init__(self, shapes: OperatorShapes, n_observations: int) -> None: + super().__init__() + self.shapes = shapes + self.n_observations = n_observations - @abstractmethod - def __len__(self) -> int: - """Return the number of samples. + def _apply_transformations( + self, src: List[Tuple[torch.Tensor, Optional[Transform]]] + ) -> List[torch.Tensor]: + """Applies class transformations to four tensors. + + Args: + src: Returns: - number of samples in the entire set. + Input src with class transformations applied. """ + out = [] + for src_tensor, transformation in src: + if transformation is None: + out.append(src_tensor) + continue + out.append(transformation(src_tensor)) + return out - @abstractmethod - def __getitem__( - self, idx - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: - """Retrieves the input-output pair at the specified index and applies transformations. - - Parameters: - - idx: The index of the sample to retrieve. + def __len__(self) -> int: + """Return the number of observations in the dataset. Returns: - A tuple containing the three input tensors and the output tensor for the given index. + Number of observations in the entire dataset. """ + return self.n_observations class OperatorDataset(OperatorDatasetBase): @@ -57,7 +67,6 @@ class OperatorDataset(OperatorDatasetBase): Attributes: shapes: Shape of all tensors. - transform: Transformations for each tensor. """ def __init__( @@ -85,39 +94,25 @@ def __init__( assert x_size == u_size, "Inconsistent number of sensors." assert y_size == v_size, "Inconsistent number of evaluations." - super().__init__() - self.x = x self.u = u self.y = y self.v = v # used to initialize architectures - self.shapes = OperatorShapes( + shapes = OperatorShapes( x=TensorShape(dim=x_dim, size=x_size), u=TensorShape(dim=u_dim, size=u_size), y=TensorShape(dim=y_dim, size=y_size), v=TensorShape(dim=v_dim, size=v_size), ) - self.transform = { - dim: tf - for dim, tf in [ - ("x", x_transform), - ("u", u_transform), - ("y", y_transform), - ("v", v_transform), - ] - if tf is not None - } + super().__init__(shapes, len(x)) - def __len__(self) -> int: - """Return the number of samples. - - Returns: - Number of samples in the entire set. - """ - return self.x.size(0) + self.x_transform = x_transform + self.u_transform = u_transform + self.y_transform = y_transform + self.v_transform = v_transform def __getitem__( self, @@ -131,29 +126,214 @@ def __getitem__( Returns: A tuple containing the three input tensors and the output tensor for the given index. """ - return self._apply_transformations( - self.x[idx], self.u[idx], self.y[idx], self.v[idx] + tensors = self._apply_transformations( + [ + (self.x[idx], self.x_transform), + (self.u[idx], self.u_transform), + (self.y[idx], self.y_transform), + (self.v[idx], self.v_transform), + ] ) - def _apply_transformations( - self, x: torch.Tensor, u: torch.Tensor, y: torch.Tensor, v: torch.Tensor - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: - """Applies class transformations to four tensors. + return tensors[0], tensors[1], tensors[2], tensors[3] + + +class MaskedOperatorDataset(OperatorDatasetBase): + """A dataset for operator training containing masks in addition to tensors describing the mapping. + + Data, especially described on unstructured grids, can vary in the number of evaluations or sensors. Even + measurements of phenomena do not always contain the same number of sensors and or evaluations. This dataset is able + to handle datasets that have differing number of sensors or evaluations. For this masks, both for the input and + output space, describe which values are relevant to the dataset and which are irrelevant padding values, that + should be ignored during training and evaluation. Padding tensors is important to efficiently handle data in + batches. + + Args: + x: Tensor of shape (num_observations, x_dim, num_sensors) with sensor positions or list containing + num_observations tensors of shape (x_dim, ...). + u: Tensor of shape (num_observations, u_dim, num_sensors) with evaluations of the input functions at sensor + positions or list containing num_observations tensors of shape (u_dim, ...). + y: Tensor of shape (num_observations, y_dim, num_evaluations) with evaluation positions or list containing + num_observations tensors of shape (y_dim, ...). + v: Tensor of shape (num_observations, v_dim, num_evaluations) with ground truth operator mappings or list + containing num_observations tensors of shape (v_dim, ...). + ipt_mask:Boolean tensor of shape (num_observations, num_sensors) with True indicating that a value pair of the + input space should be taken into consideration during training. + opt_mask: Boolean tensor of shape (num_observations, num_evaluations) with True indicating that a value pair of + the output space should be taken into consideration during training. + x_transform: Transformation applied to x. + u_transform: Transformation applied to u. + y_transform: Transformation applied to y. + v_transform: Transformation applied to v. + + """ + + def __init__( + self, + x: Union[torch.Tensor, List[torch.Tensor]], + u: Union[torch.Tensor, List[torch.Tensor]], + y: Union[torch.Tensor, List[torch.Tensor]], + v: Union[torch.Tensor, List[torch.Tensor]], + ipt_mask: Optional[torch.Tensor] = None, + opt_mask: Optional[torch.Tensor] = None, + x_transform: Optional[Transform] = None, + u_transform: Optional[Transform] = None, + y_transform: Optional[Transform] = None, + v_transform: Optional[Transform] = None, + ) -> None: + assert ( + len(x) == len(u) == len(y) == len(v) + ), f"All tensors need to have the same number of observations, but found {len(x)}, {len(u)}, {len(y)}, {len(v)}." + ipt_is_list = isinstance(x, list) + assert self._is_valid_space(x, u, ipt_mask) + opt_is_list = isinstance(y, list) + assert self._is_valid_space(y, v, opt_mask) + + if ipt_is_list: + x, u, ipt_mask = self._pad_list_space(x, u) + + if opt_is_list: + y, v, opt_mask = self._pad_list_space(y, v) + + self.x = x + self.u = u + self.y = y + self.v = v + + self.ipt_mask = ipt_mask + self.opt_mask = opt_mask + + self.x_transform = x_transform + self.u_transform = u_transform + self.y_transform = y_transform + self.v_transform = v_transform + + super().__init__( + shapes=OperatorShapes( + x=TensorShape( + dim=x[0].size(1), size=torch.Size([]) + ), # size agnostic dataset + u=TensorShape(dim=u[0].size(1), size=torch.Size([])), + y=TensorShape(dim=y[0].size(1), size=torch.Size([])), + v=TensorShape(dim=v[0].size(1), size=torch.Size([])), + ), + n_observations=len(x), + ) + + def _is_valid_space( + self, + member: Union[torch.Tensor, List[torch.Tensor]], + values: Union[torch.Tensor, List[torch.Tensor]], + mask: Optional[torch.Tensor], + ) -> bool: + """Asseses whether a space is in alignment with its respective requirements. + + Depending on whether a space is described by a list of tensors or a tensor certain argument need to align. + All observations need to have the same dimensions in their respective samples. The domain and the function + values need to be described by the same number of observations. Args: - x: Tensor of shape (num_observations, x_dim, num_sensors...) with sensor positions. - u: Tensor of shape (num_observations, u_dim, num_sensors...) with evaluations of the input functions at sensor positions. - y: Tensor of shape (num_observations, y_dim, num_evaluations...) with evaluation positions. - v: Tensor of shape (num_observations, v_dim, num_evaluations...) with ground truth operator mappings. + member: Tensor of locations. + values: Function values evaluated in the domain locations. + mask: Boolean mask where a True value indicates that a specific sample should be taken into consideration. Returns: - Input samples with class transformations applied. + A boolean value True when the space description is valid. """ - sample = {"x": x, "u": u, "y": y, "v": v} + assert type(member) is type( + values + ), f"All types of tensors in one space need to match. But found {type(member)} and {type(values)}." + + ndim: int + if mask is not None: + assert isinstance( + member, torch.Tensor + ), f"When providing a mask the member and values need to be tensors. But found {type(member)}" + ndim = member.dim() - 1 # remove batch dimension + else: + assert all( + [di.size(0) == member[0].size(0) for di in member[1:]] + ), "Dimensions of all samples of the member need to match." + assert all( + [vi.size(0) == values[0].size(0) for vi in values[1:]] + ), "Dimensions of all function values need to match." + ndim = member[0].dim() + + assert ( + ndim == 2 + ), f"{self.__class__.__name__} currently only supports exactly one dim and one size dimension." + + return True + + def _pad_list_space( + self, member: List[torch.Tensor], values: List[torch.Tensor] + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Transforms a space described by lists of tensors (image) to a space described by padded tensors and a mask. + + Args: + member: List of tensors describing the locations of the samples. + values: List of tensors describing the function values of the samples. + + Returns: + padded member tensor, padded values tensor, and matching mask. + """ + assert not any( + [torch.any(torch.isinf(mi)) for mi in member] + ), "Expects domain to be truncated in finite space." + + member_padded = pad_sequence( + [mi.transpose(0, 1) for mi in member], + batch_first=True, + padding_value=torch.inf, + ).transpose(1, 2) + values_padded = pad_sequence( + [vi.transpose(0, 1) for vi in values], batch_first=True, padding_value=0 + ).transpose(1, 2) + + mask = member_padded != torch.inf + member_padded[ + ~mask + ] = 0 # mask often applied by adding a tensor with -inf values in masked locations (e.g. in scaled dot product). + + return member_padded, values_padded, mask + + def __getitem__( + self, + idx: int, + ) -> Tuple[ + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + Optional[torch.Tensor], + Optional[torch.Tensor], + ]: + """Retrieves the input-output pair at the specified index and applies transformations. + + Parameters: + idx: The index of the sample to retrieve. + + Returns: + A tuple containing the three input tensors, the output tensor, and masks for both the input and output for + the given index. + """ + tensors = self._apply_transformations( + [ + (self.x[idx], self.x_transform), + (self.u[idx], self.u_transform), + (self.y[idx], self.y_transform), + (self.v[idx], self.v_transform), + ] + ) + + if self.ipt_mask is not None: + ipt_mask = self.ipt_mask[idx] + else: + ipt_mask = None - # transform - for dim, val in sample.items(): - if dim in self.transform: - sample[dim] = self.transform[dim](val) + if self.opt_mask is not None: + opt_mask = self.opt_mask[idx] + else: + opt_mask = None - return sample["x"], sample["u"], sample["y"], sample["v"] + return tensors[0], tensors[1], tensors[2], tensors[3], ipt_mask, opt_mask From 933e100367371e0d800956e0cea6bb1ba5d7453c Mon Sep 17 00:00:00 2001 From: Jakob Wagner Date: Tue, 2 Jul 2024 17:57:06 +0200 Subject: [PATCH 2/2] add test masked operator dataset --- tests/data/test_dataset.py | 152 +++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/tests/data/test_dataset.py b/tests/data/test_dataset.py index a589bd3a..1e8feef3 100644 --- a/tests/data/test_dataset.py +++ b/tests/data/test_dataset.py @@ -1,6 +1,11 @@ +import pytest +from typing import Tuple, List import torch +from random import randint from torch.utils.data import DataLoader from continuiti.data.selfsupervised import SelfSupervisedOperatorDataset +from continuiti.data import MaskedOperatorDataset +from continuiti.transforms import Normalize def test_dataset(): @@ -32,3 +37,150 @@ def test_dataset(): assert y.shape[1] == coordinate_dim assert v.shape[1] == num_channels assert (v == f(y)).all() + + +class TestMaskedDataset: + observations = 19 + batch_size = 2 + x_dim = 3 + u_dim = 5 + y_dim = 7 + v_dim = 11 + max_sensors = 13 + max_evaluations = 17 + + @pytest.fixture + def random_masked_tensors( + self, + ) -> Tuple[ + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + ]: + x = torch.rand(self.observations, self.x_dim, self.max_sensors) + u = torch.rand(self.observations, self.u_dim, self.max_sensors) + ipt_mask = torch.rand(self.observations, self.max_sensors) > 0.25 + + y = torch.rand(self.observations, self.y_dim, self.max_evaluations) + v = torch.rand(self.observations, self.v_dim, self.max_evaluations) + opt_mask = torch.rand(self.observations, self.max_evaluations) > 0.25 + + return x, u, ipt_mask, y, v, opt_mask + + @pytest.fixture + def random_lists( + self, + ) -> Tuple[ + List[torch.Tensor], List[torch.Tensor], List[torch.Tensor], List[torch.Tensor] + ]: + n_sensors = [randint(1, self.max_sensors) for _ in range(self.observations)] + x = [torch.rand(self.x_dim, n) for n in n_sensors] + u = [torch.rand(self.u_dim, n) for n in n_sensors] + + n_observations = [ + randint(1, self.max_evaluations) for _ in range(self.observations) + ] + y = [torch.rand(self.y_dim, n) for n in n_observations] + v = [torch.rand(self.v_dim, n) for n in n_observations] + + return x, u, y, v + + @pytest.fixture + def random_masked_dataset(self, random_masked_tensors): + x, u, ipt_mask, y, v, opt_mask = random_masked_tensors + return MaskedOperatorDataset( + x=x, u=u, y=y, v=v, ipt_mask=ipt_mask, opt_mask=opt_mask + ) + + def test_can_initialize_tensor(self, random_masked_tensors): + x, u, ipt_mask, y, v, opt_mask = random_masked_tensors + dataset = MaskedOperatorDataset( + x=x, u=u, y=y, v=v, ipt_mask=ipt_mask, opt_mask=opt_mask + ) + assert isinstance(dataset, MaskedOperatorDataset) + + def test_can_initalize_list(self, random_lists): + x, u, y, v = random_lists + dataset = MaskedOperatorDataset(x=x, u=u, y=y, v=v) + assert isinstance(dataset, MaskedOperatorDataset) + + def test_dataset_length(self, random_masked_dataset): + assert len(random_masked_dataset) == self.observations + + def test_masking(self, random_masked_dataset, random_masked_tensors): + _, _, ipt_mask, _, _, opt_mask = random_masked_tensors + + for ( + ( + _, + _, + _, + _, + ipt_mask_i, + opt_mask_i, + ), + ipt_mask_gt, + opt_mask_gt, + ) in zip(iter(random_masked_dataset), ipt_mask, opt_mask): + assert torch.all(ipt_mask_i == ipt_mask_gt) + assert torch.all(opt_mask_i == opt_mask_gt) + + def test_unmasked_samples(self, random_masked_dataset): + for x, u, y, v, _, _ in random_masked_dataset: + assert not torch.allclose(x, torch.zeros(x.shape)) + assert not torch.allclose(u, torch.zeros(u.shape)) + assert not torch.allclose(y, torch.zeros(y.shape)) + assert not torch.allclose(v, torch.zeros(v.shape)) + + def test_batch_masking(self, random_masked_dataset): + dataloader = DataLoader( + random_masked_dataset, + shuffle=True, + batch_size=self.batch_size, + drop_last=True, + ) + + for x, u, y, v, ipt_mask, opt_mask in dataloader: + assert ( + len(x) + == len(u) + == len(y) + == len(v) + == len(ipt_mask) + == len(opt_mask) + == self.batch_size + ) + + def test_masked_and_transform(self, random_masked_tensors): + x, u, ipt_mask, y, v, opt_mask = random_masked_tensors + dataset = MaskedOperatorDataset( + x=x, + u=u, + y=y, + v=v, + ipt_mask=ipt_mask, + opt_mask=opt_mask, + x_transform=Normalize( + torch.ones(self.x_dim, 1) / 2.0, + torch.ones(self.x_dim, 1) * torch.sqrt(torch.tensor([1 / 12])), + ), + u_transform=Normalize( + torch.ones(self.u_dim, 1) / 2.0, + torch.ones(self.u_dim, 1) * torch.sqrt(torch.tensor([1 / 12])), + ), + y_transform=Normalize( + torch.ones(self.y_dim, 1) / 2.0, + torch.ones(self.y_dim, 1) * torch.sqrt(torch.tensor([1 / 12])), + ), + v_transform=Normalize( + torch.ones(self.v_dim, 1) / 2.0, + torch.ones(self.v_dim, 1) * torch.sqrt(torch.tensor([1 / 12])), + ), + ) + dataloader = DataLoader(dataset, batch_size=self.batch_size) + + for x, u, y, v, ipt_mask, opt_mask in dataloader: + assert True