Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/scifem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
transfer_meshtags_to_submesh,
reverse_mark_entities,
)
from .eval import evaluate_function
from .eval import evaluate_function, create_pointwise_observation_matrix
from .interpolation import interpolation_matrix, prepare_interpolation_data


Expand Down Expand Up @@ -47,6 +47,7 @@
"interpolate_function_onto_facet_dofs",
"interpolation_matrix",
"prepare_interpolation_data",
"create_pointwise_observation_matrix",
]


Expand Down
131 changes: 131 additions & 0 deletions src/scifem/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import numpy as np
import numpy.typing as npt

__all__ = ["evaluate_function"]


def evaluate_function(
u: dolfinx.fem.Function, points: npt.ArrayLike, broadcast=True
Expand Down Expand Up @@ -72,3 +74,132 @@ def evaluate_function(
return u_out
else:
return values


if dolfinx.has_petsc4py:
from petsc4py import PETSc

__all__.append("create_pointwise_observation_matrix")

def create_pointwise_observation_matrix(
Vh: dolfinx.fem.FunctionSpace, points: np.ndarray
) -> PETSc.Mat:
"""
Constructs a sparse observation matrix :math:`B (m*bs \times N)`
such that :math:`d = B u`

Args:
Vh: dolfinx.fem.FunctionSpace
points: numpy array of shape (m, 3) or (m, 2)

Returns:
petsc4py.PETSc.Mat: The assembled observation matrix.

Note:
If :code:`Vh` is a scalar space (bs=1):
- B has :math:`m` rows.
- d[i] is the value of u at point i.

If :code:`Vh` is a vector space (bs > 1):
- B has :math:`m \times bs` rows.
- :math:`d[i \times bs + c]` is the value of component :math:`c`
of :math:`u` at point :math:`i`.
Note:
Points outside the domain will lead to zero rows in the matrix.
"""
mesh = Vh.mesh
comm = mesh.comm

# Make sure points are all in 3D for search
points = np.ascontiguousarray(points, dtype=np.float64)
original_dim = points.shape[1]
if original_dim == 2:
points_3d = np.zeros((points.shape[0], 3))
points_3d[:, :2] = points
else:
points_3d = points

# Find the cells containing the points
bb_tree = dolfinx.geometry.bb_tree(mesh, mesh.topology.dim)
cell_candidates = dolfinx.geometry.compute_collisions_points(bb_tree, points_3d)
colliding_cells = dolfinx.geometry.compute_colliding_cells(mesh, cell_candidates, points_3d)

# Setup PETSc Matrix
# Number of observation points
m_points = points.shape[0]
# Number of DOFs in the FE space
dofmap = Vh.dofmap
index_map = dofmap.index_map
bs = dofmap.bs
# Number of rows and columns
global_rows = m_points * bs
n_dofs_global = index_map.size_global * bs
local_dof_size = index_map.size_local * bs

B = PETSc.Mat().create(comm)
B.setSizes([[None, global_rows], [local_dof_size, n_dofs_global]])

# Preallocation
element = Vh.element

# 5. Evaluation Data Structures
geom_dofs = mesh.geometry.dofmap
geom_x = mesh.geometry.x
cmap = mesh.geometry.cmap

# Iterate over all observation points and fill the matrix
for i in range(m_points):
# Get cells containing point i
cells = colliding_cells.links(i)

# If point not found on this process we continue
# This means that points outside the domain will lead to zero rows
if len(cells) == 0:
continue

# We take the first cell containing the point
cell_index = cells[0]

# Pull back point to reference coordinates
cell_geom_indices = geom_dofs[cell_index]
cell_coords = geom_x[cell_geom_indices]
point_ref = cmap.pull_back(np.array([points_3d[i]]), cell_coords)
# Evaluate basis functions at reference point
basis_values = element.basix_element.tabulate(0, point_ref)[0, 0, :]

# Get Global Indices
local_dofs = dofmap.cell_dofs(cell_index)
global_block_indices = index_map.local_to_global(local_dofs).astype(np.int32)

# Insert values into matrix
if bs == 1:
# Scalar Case: Row i, Cols global_block_indices
B.setValues(
[i],
global_block_indices,
basis_values,
addv=PETSc.InsertMode.INSERT_VALUES,
)
else:
# Vector Case:
# We have 'bs' components. We populate 'bs' rows for this single point.
# Row (i*bs + 0) observes component 0 -> uses cols (global_block * bs + 0)
# Row (i*bs + 1) observes component 1 -> uses cols (global_block * bs + 1)
for comp in range(bs):
row_idx = i * bs + comp

# The columns for this component
# global_block_indices are the node indices.
# The actual matrix column is node_index * bs + comp
col_indices = global_block_indices * bs + comp

# The weights are simply the scalar basis function values
B.setValues(
[row_idx],
col_indices.astype(np.int32),
basis_values,
addv=PETSc.InsertMode.INSERT_VALUES,
)

B.assemble()
return B
171 changes: 170 additions & 1 deletion tests/test_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import dolfinx

from scifem import evaluate_function
from scifem import evaluate_function, create_pointwise_observation_matrix


@pytest.mark.parametrize(
Expand Down Expand Up @@ -114,3 +114,172 @@ def test_evaluate_vector_function_3D(cell_type):
exact = np.array(f(points.T)).T

assert np.allclose(u_values, exact)


@pytest.mark.parametrize(
"cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral]
)
def test_pointwise_observation_2d_scalar(cell_type):
"""Test 2D scalar function with block size 1 on unit square."""
# 1. Mesh & Space
comm = MPI.COMM_WORLD
msh = dolfinx.mesh.create_unit_square(comm, 10, 10, cell_type=cell_type)
V = dolfinx.fem.functionspace(msh, ("Lagrange", 1))

# 2. Function u = 2x + y
u = dolfinx.fem.Function(V)
u.interpolate(lambda x: 2 * x[0] + x[1])

# 3. Observation Points
points = np.array(
[
[0.5, 0.5, 0.0], # Expected: 2(0.5) + 0.5 = 1.5
[0.0, 0.0, 0.0], # Expected: 0
[1.0, 0.2, 0.0], # Expected: 2(1) + 0.2 = 2.2
]
)

# 4. Create Matrix
B = create_pointwise_observation_matrix(V, points)

# 5. Compute d = B * u
d = B.createVecLeft()
B.mult(u.x.petsc_vec, d)

local_vals = d.array
ranges = B.getOwnershipRange() # (start, end) row indices owned by this rank

expected_full = np.array([1.5, 0.0, 2.2])

for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])):
assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)


@pytest.mark.parametrize(
"cell_type", [dolfinx.mesh.CellType.tetrahedron, dolfinx.mesh.CellType.hexahedron]
)
def test_pointwise_observation_3d_scalar(cell_type):
comm = MPI.COMM_WORLD
msh = dolfinx.mesh.create_unit_cube(comm, 5, 5, 5, cell_type=cell_type)
V = dolfinx.fem.functionspace(msh, ("Lagrange", 1))

u = dolfinx.fem.Function(V)
# u(x,y,z) = x + y + z
u.interpolate(lambda x: x[0] + x[1] + x[2])

points = np.array(
[
[0.5, 0.5, 0.5], # Expected: 1.5
[0.1, 0.1, 0.1], # Expected: 0.3
]
)

B = create_pointwise_observation_matrix(V, points)
d = B.createVecLeft()
B.mult(u.x.petsc_vec, d)

ranges = B.getOwnershipRange()
local_vals = d.array
expected_full = np.array([1.5, 0.3])

for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])):
assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)


@pytest.mark.parametrize(
"cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral]
)
def test_pointwise_observation_2d_vector_bs2(cell_type):
"""Test 2D vector function with block size 2 on unit square."""
comm = MPI.COMM_WORLD
msh = dolfinx.mesh.create_unit_square(comm, 5, 5, cell_type=cell_type)
# Vector Function Space
V = dolfinx.fem.functionspace(msh, ("Lagrange", 1, (msh.geometry.dim,)))

u = dolfinx.fem.Function(V)
# Set u = (x, y)
u.interpolate(lambda x: (x[0], x[1]))

points = np.array([[0.5, 0.8, 0.0], [0.2, 0.3, 0.0]])

# B should be size (num_points * 2) x N
B = create_pointwise_observation_matrix(V, points)
d = B.createVecLeft()
B.mult(u.x.petsc_vec, d)

# Rows are interleaved:
# Row 0: Pt0, Comp X -> 0.5
# Row 1: Pt0, Comp Y -> 0.8
# Row 2: Pt1, Comp X -> 0.2
# Row 3: Pt1, Comp Y -> 0.3
expected_full = np.array([0.5, 0.8, 0.2, 0.3])

ranges = B.getOwnershipRange()
local_vals = d.array

for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])):
assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)


@pytest.mark.parametrize(
"cell_type", [dolfinx.mesh.CellType.tetrahedron, dolfinx.mesh.CellType.hexahedron]
)
def test_pointwise_observation_3d_vector_bs3(cell_type):
"""Test 3D vector function with block size 3 on unit cube."""
comm = MPI.COMM_WORLD
msh = dolfinx.mesh.create_unit_cube(comm, 4, 4, 4, cell_type=cell_type)
V = dolfinx.fem.functionspace(msh, ("Lagrange", 1, (msh.geometry.dim,)))

u = dolfinx.fem.Function(V)
u.interpolate(lambda x: (x[2], x[1], x[0]))

points = np.array([[0.1, 0.5, 0.9]])

# Expected:
# x=0.1, y=0.5, z=0.9
# u_x = z = 0.9
# u_y = y = 0.5
# u_z = x = 0.1
expected_full = np.array([0.9, 0.5, 0.1])

B = create_pointwise_observation_matrix(V, points)
d = B.createVecLeft()
B.mult(u.x.petsc_vec, d)

ranges = B.getOwnershipRange()
local_vals = d.array

for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])):
assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)


@pytest.mark.parametrize(
"cell_type", [dolfinx.mesh.CellType.triangle, dolfinx.mesh.CellType.quadrilateral]
)
def test_pointwise_observation_points_outside(cell_type):
"""Test behavior when points are outside the domain."""
comm = MPI.COMM_WORLD
msh = dolfinx.mesh.create_unit_square(comm, 5, 5, cell_type=cell_type)
V = dolfinx.fem.functionspace(msh, ("Lagrange", 1))
u = dolfinx.fem.Function(V)
u.interpolate(lambda x: np.ones_like(x[0])) # u=1 everywhere

points = np.array(
[
[2.0, 2.0, 0.0], # Outside
[0.5, 0.5, 0.0], # Inside
]
)

B = create_pointwise_observation_matrix(V, points)
d = B.createVecLeft()
B.mult(u.x.petsc_vec, d)

# Assume points outside the domain yield zero values
expected_full = np.array([0.0, 1.0])

ranges = B.getOwnershipRange()
local_vals = d.array

for local_idx, global_idx in enumerate(range(ranges[0], ranges[1])):
assert np.isclose(local_vals[local_idx], expected_full[global_idx], atol=1e-10)
Loading