From 4f3e305277712af8e13783e03b1a10eefd858863 Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Mon, 8 Dec 2025 18:25:16 +0100 Subject: [PATCH] Implement pointwise observation operator --- src/scifem/__init__.py | 3 +- src/scifem/eval.py | 131 +++++++++++++++++++++++++++++++ tests/test_eval.py | 171 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 303 insertions(+), 2 deletions(-) diff --git a/src/scifem/__init__.py b/src/scifem/__init__.py index 6f2cd3a..fa511d5 100644 --- a/src/scifem/__init__.py +++ b/src/scifem/__init__.py @@ -19,7 +19,7 @@ transfer_meshtags_to_submesh, reverse_mark_entities, ) -from .eval import evaluate_function +from .eval import evaluate_function, create_pointwise_observation_matrix from .interpolation import interpolation_matrix, prepare_interpolation_data @@ -47,6 +47,7 @@ "interpolate_function_onto_facet_dofs", "interpolation_matrix", "prepare_interpolation_data", + "create_pointwise_observation_matrix", ] diff --git a/src/scifem/eval.py b/src/scifem/eval.py index f879c3e..acd946b 100644 --- a/src/scifem/eval.py +++ b/src/scifem/eval.py @@ -3,6 +3,8 @@ import numpy as np import numpy.typing as npt +__all__ = ["evaluate_function"] + def evaluate_function( u: dolfinx.fem.Function, points: npt.ArrayLike, broadcast=True @@ -72,3 +74,132 @@ def evaluate_function( return u_out else: return values + + +if dolfinx.has_petsc4py: + from petsc4py import PETSc + + __all__.append("create_pointwise_observation_matrix") + + def create_pointwise_observation_matrix( + Vh: dolfinx.fem.FunctionSpace, points: np.ndarray + ) -> PETSc.Mat: + """ + Constructs a sparse observation matrix :math:`B (m*bs \times N)` + such that :math:`d = B u` + + Args: + Vh: dolfinx.fem.FunctionSpace + points: numpy array of shape (m, 3) or (m, 2) + + Returns: + petsc4py.PETSc.Mat: The assembled observation matrix. + + Note: + If :code:`Vh` is a scalar space (bs=1): + - B has :math:`m` rows. + - d[i] is the value of u at point i. + + If :code:`Vh` is a vector space (bs > 1): + - B has :math:`m \times bs` rows. + - :math:`d[i \times bs + c]` is the value of component :math:`c` + of :math:`u` at point :math:`i`. + Note: + Points outside the domain will lead to zero rows in the matrix. + """ + mesh = Vh.mesh + comm = mesh.comm + + # Make sure points are all in 3D for search + points = np.ascontiguousarray(points, dtype=np.float64) + original_dim = points.shape[1] + if original_dim == 2: + points_3d = np.zeros((points.shape[0], 3)) + points_3d[:, :2] = points + else: + points_3d = points + + # Find the cells containing the points + bb_tree = dolfinx.geometry.bb_tree(mesh, mesh.topology.dim) + cell_candidates = dolfinx.geometry.compute_collisions_points(bb_tree, points_3d) + colliding_cells = dolfinx.geometry.compute_colliding_cells(mesh, cell_candidates, points_3d) + + # Setup PETSc Matrix + # Number of observation points + m_points = points.shape[0] + # Number of DOFs in the FE space + dofmap = Vh.dofmap + index_map = dofmap.index_map + bs = dofmap.bs + # Number of rows and columns + global_rows = m_points * bs + n_dofs_global = index_map.size_global * bs + local_dof_size = index_map.size_local * bs + + B = PETSc.Mat().create(comm) + B.setSizes([[None, global_rows], [local_dof_size, n_dofs_global]]) + + # Preallocation + element = Vh.element + + # 5. Evaluation Data Structures + geom_dofs = mesh.geometry.dofmap + geom_x = mesh.geometry.x + cmap = mesh.geometry.cmap + + # Iterate over all observation points and fill the matrix + for i in range(m_points): + # Get cells containing point i + cells = colliding_cells.links(i) + + # If point not found on this process we continue + # This means that points outside the domain will lead to zero rows + if len(cells) == 0: + continue + + # We take the first cell containing the point + cell_index = cells[0] + + # Pull back point to reference coordinates + cell_geom_indices = geom_dofs[cell_index] + cell_coords = geom_x[cell_geom_indices] + point_ref = cmap.pull_back(np.array([points_3d[i]]), cell_coords) + # Evaluate basis functions at reference point + basis_values = element.basix_element.tabulate(0, point_ref)[0, 0, :] + + # Get Global Indices + local_dofs = dofmap.cell_dofs(cell_index) + global_block_indices = index_map.local_to_global(local_dofs).astype(np.int32) + + # Insert values into matrix + if bs == 1: + # Scalar Case: Row i, Cols global_block_indices + B.setValues( + [i], + global_block_indices, + basis_values, + addv=PETSc.InsertMode.INSERT_VALUES, + ) + else: + # Vector Case: + # We have 'bs' components. We populate 'bs' rows for this single point. + # Row (i*bs + 0) observes component 0 -> uses cols (global_block * bs + 0) + # Row (i*bs + 1) observes component 1 -> uses cols (global_block * bs + 1) + for comp in range(bs): + row_idx = i * bs + comp + + # The columns for this component + # global_block_indices are the node indices. + # The actual matrix column is node_index * bs + comp + col_indices = global_block_indices * bs + comp + + # The weights are simply the scalar basis function values + B.setValues( + [row_idx], + col_indices.astype(np.int32), + basis_values, + addv=PETSc.InsertMode.INSERT_VALUES, + ) + + B.assemble() + return B diff --git a/tests/test_eval.py b/tests/test_eval.py index 0f331fe..1c7d9d4 100644 --- a/tests/test_eval.py +++ b/tests/test_eval.py @@ -5,7 +5,7 @@ import numpy as np import dolfinx -from scifem import evaluate_function +from scifem import evaluate_function, create_pointwise_observation_matrix @pytest.mark.parametrize( @@ -114,3 +114,172 @@ def test_evaluate_vector_function_3D(cell_type): exact = np.array(f(points.T)).T assert np.allclose(u_values, exact) + + +@pytest.mark.parametrize( + "cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral] +) +def test_pointwise_observation_2d_scalar(cell_type): + """Test 2D scalar function with block size 1 on unit square.""" + # 1. Mesh & Space + comm = MPI.COMM_WORLD + msh = dolfinx.mesh.create_unit_square(comm, 10, 10, cell_type=cell_type) + V = dolfinx.fem.functionspace(msh, ("Lagrange", 1)) + + # 2. Function u = 2x + y + u = dolfinx.fem.Function(V) + u.interpolate(lambda x: 2 * x[0] + x[1]) + + # 3. Observation Points + points = np.array( + [ + [0.5, 0.5, 0.0], # Expected: 2(0.5) + 0.5 = 1.5 + [0.0, 0.0, 0.0], # Expected: 0 + [1.0, 0.2, 0.0], # Expected: 2(1) + 0.2 = 2.2 + ] + ) + + # 4. Create Matrix + B = create_pointwise_observation_matrix(V, points) + + # 5. Compute d = B * u + d = B.createVecLeft() + B.mult(u.x.petsc_vec, d) + + local_vals = d.array + ranges = B.getOwnershipRange() # (start, end) row indices owned by this rank + + expected_full = np.array([1.5, 0.0, 2.2]) + + for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])): + assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10) + + +@pytest.mark.parametrize( + "cell_type", [dolfinx.mesh.CellType.tetrahedron, dolfinx.mesh.CellType.hexahedron] +) +def test_pointwise_observation_3d_scalar(cell_type): + comm = MPI.COMM_WORLD + msh = dolfinx.mesh.create_unit_cube(comm, 5, 5, 5, cell_type=cell_type) + V = dolfinx.fem.functionspace(msh, ("Lagrange", 1)) + + u = dolfinx.fem.Function(V) + # u(x,y,z) = x + y + z + u.interpolate(lambda x: x[0] + x[1] + x[2]) + + points = np.array( + [ + [0.5, 0.5, 0.5], # Expected: 1.5 + [0.1, 0.1, 0.1], # Expected: 0.3 + ] + ) + + B = create_pointwise_observation_matrix(V, points) + d = B.createVecLeft() + B.mult(u.x.petsc_vec, d) + + ranges = B.getOwnershipRange() + local_vals = d.array + expected_full = np.array([1.5, 0.3]) + + for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])): + assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10) + + +@pytest.mark.parametrize( + "cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral] +) +def test_pointwise_observation_2d_vector_bs2(cell_type): + """Test 2D vector function with block size 2 on unit square.""" + comm = MPI.COMM_WORLD + msh = dolfinx.mesh.create_unit_square(comm, 5, 5, cell_type=cell_type) + # Vector Function Space + V = dolfinx.fem.functionspace(msh, ("Lagrange", 1, (msh.geometry.dim,))) + + u = dolfinx.fem.Function(V) + # Set u = (x, y) + u.interpolate(lambda x: (x[0], x[1])) + + points = np.array([[0.5, 0.8, 0.0], [0.2, 0.3, 0.0]]) + + # B should be size (num_points * 2) x N + B = create_pointwise_observation_matrix(V, points) + d = B.createVecLeft() + B.mult(u.x.petsc_vec, d) + + # Rows are interleaved: + # Row 0: Pt0, Comp X -> 0.5 + # Row 1: Pt0, Comp Y -> 0.8 + # Row 2: Pt1, Comp X -> 0.2 + # Row 3: Pt1, Comp Y -> 0.3 + expected_full = np.array([0.5, 0.8, 0.2, 0.3]) + + ranges = B.getOwnershipRange() + local_vals = d.array + + for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])): + assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10) + + +@pytest.mark.parametrize( + "cell_type", [dolfinx.mesh.CellType.tetrahedron, dolfinx.mesh.CellType.hexahedron] +) +def test_pointwise_observation_3d_vector_bs3(cell_type): + """Test 3D vector function with block size 3 on unit cube.""" + comm = MPI.COMM_WORLD + msh = dolfinx.mesh.create_unit_cube(comm, 4, 4, 4, cell_type=cell_type) + V = dolfinx.fem.functionspace(msh, ("Lagrange", 1, (msh.geometry.dim,))) + + u = dolfinx.fem.Function(V) + u.interpolate(lambda x: (x[2], x[1], x[0])) + + points = np.array([[0.1, 0.5, 0.9]]) + + # Expected: + # x=0.1, y=0.5, z=0.9 + # u_x = z = 0.9 + # u_y = y = 0.5 + # u_z = x = 0.1 + expected_full = np.array([0.9, 0.5, 0.1]) + + B = create_pointwise_observation_matrix(V, points) + d = B.createVecLeft() + B.mult(u.x.petsc_vec, d) + + ranges = B.getOwnershipRange() + local_vals = d.array + + for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])): + assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10) + + +@pytest.mark.parametrize( + "cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral] +) +def test_pointwise_observation_points_outside(cell_type): + """Test behavior when points are outside the domain.""" + comm = MPI.COMM_WORLD + msh = dolfinx.mesh.create_unit_square(comm, 5, 5, cell_type=cell_type) + V = dolfinx.fem.functionspace(msh, ("Lagrange", 1)) + u = dolfinx.fem.Function(V) + u.interpolate(lambda x: np.ones_like(x[0])) # u=1 everywhere + + points = np.array( + [ + [2.0, 2.0, 0.0], # Outside + [0.5, 0.5, 0.0], # Inside + ] + ) + + B = create_pointwise_observation_matrix(V, points) + d = B.createVecLeft() + B.mult(u.x.petsc_vec, d) + + # Assume points outside the domain yield zero values + expected_full = np.array([0.0, 1.0]) + + ranges = B.getOwnershipRange() + local_vals = d.array + + for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])): + assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)