forked from PyDMD/PyDMD
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathutils.py
More file actions
320 lines (274 loc) · 11 KB
/
utils.py
File metadata and controls
320 lines (274 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""Utilities module."""
import warnings
from numbers import Number
from typing import NamedTuple
from collections import namedtuple
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
# Named tuples used in functions.
# compute_svd uses "SVD",
# compute_tlsq uses "TLSQ",
# compute_rqb uses "RQB".
SVD = namedtuple("SVD", ["U", "s", "V"])
TLSQ = namedtuple("TLSQ", ["X_denoised", "Y_denoised"])
RQB = namedtuple("RQB", ["Q", "B", "test_matrix"])
def _svht(sigma_svd: np.ndarray, rows: int, cols: int) -> int:
"""
Singular Value Hard Threshold.
:param sigma_svd: Singual values computed by SVD
:type sigma_svd: np.ndarray
:param rows: Number of rows of original data matrix.
:type rows: int
:param cols: Number of columns of original data matrix.
:type cols: int
:return: Computed rank.
:rtype: int
References:
Gavish, Matan, and David L. Donoho, The optimal hard threshold for
singular values is, IEEE Transactions on Information Theory 60.8
(2014): 5040-5053.
https://ieeexplore.ieee.org/document/6846297
"""
beta = np.divide(*sorted((rows, cols)))
omega = 0.56 * beta**3 - 0.95 * beta**2 + 1.82 * beta + 1.43
tau = np.median(sigma_svd) * omega
rank = np.sum(sigma_svd > tau)
if rank == 0:
warnings.warn(
"SVD optimal rank is 0. The largest singular values are "
"indistinguishable from noise. Setting rank truncation to 1.",
RuntimeWarning,
)
rank = 1
return rank
def _compute_rank(
sigma_svd: np.ndarray, rows: int, cols: int, svd_rank: Number
) -> int:
"""
Rank computation for the truncated Singular Value Decomposition.
:param sigma_svd: 1D singular values of SVD.
:type sigma_svd: np.ndarray
:param rows: Number of rows of original matrix.
:type rows: int
:param cols: Number of columns of original matrix.
:type cols: int
:param svd_rank: the rank for the truncation; If 0, the method computes
the optimal rank and uses it for truncation; if positive interger,
the method uses the argument for the truncation; if float between 0
and 1, the rank is the number of the biggest singular values that
are needed to reach the 'energy' specified by `svd_rank`; if -1,
the method does not compute truncation. Default is 0.
:type svd_rank: int or float
:return: the computed rank truncation.
:rtype: int
References:
Gavish, Matan, and David L. Donoho, The optimal hard threshold for
singular values is, IEEE Transactions on Information Theory 60.8
(2014): 5040-5053.
"""
if svd_rank == 0:
rank = _svht(sigma_svd, rows, cols)
elif 0 < svd_rank < 1:
cumulative_energy = np.cumsum(sigma_svd**2 / (sigma_svd**2).sum())
rank = np.searchsorted(cumulative_energy, svd_rank) + 1
elif svd_rank >= 1 and isinstance(svd_rank, int):
rank = min(svd_rank, sigma_svd.size)
else:
rank = min(rows, cols)
return rank
def compute_rank(X: np.ndarray, svd_rank: Number = 0) -> int:
"""
Rank computation for the truncated Singular Value Decomposition.
:param X: the matrix to decompose.
:type X: np.ndarray
:param svd_rank: the rank for the truncation; If 0, the method computes
the optimal rank and uses it for truncation; if positive interger,
the method uses the argument for the truncation; if float between 0
and 1, the rank is the number of the biggest singular values that
are needed to reach the 'energy' specified by `svd_rank`; if -1,
the method does not compute truncation. Default is 0.
:type svd_rank: int or float
:return: the computed rank truncation.
:rtype: int
References:
Gavish, Matan, and David L. Donoho, The optimal hard threshold for
singular values is, IEEE Transactions on Information Theory 60.8
(2014): 5040-5053.
"""
_, s, _ = np.linalg.svd(X, full_matrices=False)
return _compute_rank(s, X.shape[0], X.shape[1], svd_rank)
def compute_tlsq(
X: np.ndarray, Y: np.ndarray, tlsq_rank: int
) -> NamedTuple(
"TLSQ", [("X_denoised", np.ndarray), ("Y_denoised", np.ndarray)]
):
"""
Compute Total Least Square.
:param X: the first matrix;
:type X: np.ndarray
:param Y: the second matrix;
:type Y: np.ndarray
:param tlsq_rank: the rank for the truncation; If 0, the method
does not compute any noise reduction; if positive number, the
method uses the argument for the SVD truncation used in the TLSQ
method.
:type tlsq_rank: int
:return: the denoised matrix X, the denoised matrix Y
:rtype: NamedTuple("TLSQ", [('X_denoised', np.ndarray),
('Y_denoised', np.ndarray)])
References:
https://arxiv.org/pdf/1703.11004.pdf
https://arxiv.org/pdf/1502.03854.pdf
"""
# Do not perform tlsq
if tlsq_rank == 0:
return X, Y
V = np.linalg.svd(np.append(X, Y, axis=0), full_matrices=False)[-1]
rank = min(tlsq_rank, V.shape[0])
VV = V[:rank, :].conj().T.dot(V[:rank, :])
return TLSQ(X.dot(VV), Y.dot(VV))
def compute_svd(
X: np.ndarray, svd_rank: Number = 0
) -> NamedTuple(
"SVD", [("U", np.ndarray), ("s", np.ndarray), ("V", np.ndarray)]
):
"""
Truncated Singular Value Decomposition.
:param X: the matrix to decompose.
:type X: np.ndarray
:param svd_rank: the rank for the truncation; If 0, the method computes
the optimal rank and uses it for truncation; if positive interger,
the method uses the argument for the truncation; if float between 0
and 1, the rank is the number of the biggest singular values that
are needed to reach the 'energy' specified by `svd_rank`; if -1,
the method does not compute truncation. Default is 0.
:type svd_rank: int or float
:return: the truncated left-singular vectors matrix, the truncated
singular values array, the truncated right-singular vectors matrix.
:rtype: NamedTuple("SVD", [('U', np.ndarray),
('s', np.ndarray),
('V', np.ndarray)])
References:
Gavish, Matan, and David L. Donoho, The optimal hard threshold for
singular values is, IEEE Transactions on Information Theory 60.8
(2014): 5040-5053.
"""
U, s, V = np.linalg.svd(X, full_matrices=False)
rank = _compute_rank(s, X.shape[0], X.shape[1], svd_rank)
V = V.conj().T
U = U[:, :rank]
V = V[:, :rank]
s = s[:rank]
return SVD(U, s, V)
def compute_rqb(
X: np.ndarray,
svd_rank: Number,
oversampling: int,
power_iters: int,
test_matrix: np.ndarray = None,
seed: int = None,
) -> NamedTuple(
"RQB", [("Q", np.ndarray), ("B", np.ndarray), ("test_matrix", np.ndarray)]
):
"""
Randomized QB Decomposition.
:param X: the matrix to decompose.
:type X: np.ndarray
:param svd_rank: the rank for the truncation; If 0, the method computes
the optimal rank and uses it for truncation; if positive interger,
the method uses the argument for the truncation; if float between 0
and 1, the rank is the number of the biggest singular values that
are needed to reach the 'energy' specified by `svd_rank`; if -1,
the method does not compute truncation. Use this parameter to
define the target rank of the input matrix.
:type svd_rank: int or float
:param oversampling: Number of additional samples (beyond the target rank)
to use when computing the random test matrix. Note that values in the
range [5, 10] tend to be sufficient.
:type oversampling: int
:param power_iters: Number of power iterations to perform when executing
the Randomized QB Decomposition. Note that as many as 1 to 2 power
iterations often lead to considerable improvements.
:type power_iters: int
:param test_matrix: The random test matrix that will be used when executing
the Randomized QB Decomposition. If not provided, the `svd_rank` and
`oversampling` parameters will be used to compute the random matrix.
:type test_matrix: numpy.ndarray
:param seed: Seed used to initialize the random generator when computing
random test matrices.
:type seed: int
:return: the orthonormal basis matrix, the transformed data matrix, and
the random test matrix.
:rtype: NamedTuple("RQB", [('Q', np.ndarray),
('B', np.ndarray),
('test_matrix', np.ndarray)])
References:
N. Benjamin Erichson, Lionel Mathelin, J. Nathan Kutz, Steven L. Brunton.
Randomized dynamic mode decomposition. SIAM Journal on Applied Dynamical
Systems, 18, 2019.
"""
if X.ndim != 2:
raise ValueError("Please ensure that input data is a 2D array.")
# Define the random test matrix if not provided.
if test_matrix is None:
m = X.shape[-1]
r = compute_rank(X, svd_rank)
rng = np.random.default_rng(seed)
test_matrix = rng.standard_normal((m, r + oversampling))
# Compute sampling matrix.
Y = X.dot(test_matrix)
# Perform power iterations.
for _ in range(power_iters):
Q = np.linalg.qr(Y)[0]
Z = np.linalg.qr(X.conj().T.dot(Q))[0]
Y = X.dot(Z)
# Orthonormalize the sampling matrix.
Q = np.linalg.qr(Y)[0]
# Project the snapshot matrix onto the smaller space.
B = Q.conj().T.dot(X)
return RQB(Q, B, test_matrix)
def pseudo_hankel_matrix(X: np.ndarray, d: int) -> np.ndarray:
"""
Arrange the snapshot in the matrix `X` into the (pseudo) Hankel
matrix. The attribute `d` controls the number of snapshot from `X` in
each snapshot of the Hankel matrix.
:Example:
>>> a = np.array([[1, 2, 3, 4, 5]])
>>> pseudo_hankel_matrix(a, d=2)
array([[1, 2, 3, 4],
[2, 3, 4, 5]])
>>> pseudo_hankel_matrix(a, d=4)
array([[1, 2],
[2, 3],
[3, 4],
[4, 5]])
>>> a = np.array([1,2,3,4,5,6]).reshape(2,3)
>>> print(a)
[[1 2 3]
[4 5 6]]
>>> pseudo_hankel_matrix(a, d=2)
array([[1, 2],
[4, 5],
[2, 3],
[5, 6]])
"""
return (
sliding_window_view(X.T, (d, X.shape[0]))[:, 0]
.reshape(X.shape[1] - d + 1, -1)
.T
)
def differentiate(X, dt):
"""
Method for performing 2nd order finite difference. Assumes that the
input matrix X is 2D, with uniformly-sampled snapshots filling each
column. Requires dt, which is the time step between each snapshot.
"""
if not isinstance(X, np.ndarray) or X.ndim > 2:
raise ValueError("Please ensure that input data is a 1D or 2D array.")
if X.ndim == 1:
X = X[None]
X_prime = np.empty(X.shape)
X_prime[:, 1:-1] = (X[:, 2:] - X[:, :-2]) / (2 * dt)
X_prime[:, 0] = (-3 * X[:, 0] + 4 * X[:, 1] - X[:, 2]) / (2 * dt)
X_prime[:, -1] = (3 * X[:, -1] - 4 * X[:, -2] + X[:, -3]) / (2 * dt)
return np.squeeze(X_prime)