forked from PyDMD/PyDMD
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathsubspacedmd.py
More file actions
201 lines (156 loc) · 6.38 KB
/
subspacedmd.py
File metadata and controls
201 lines (156 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
Derived module from dmdbase.py for Subspace DMD.
Reference:
Takeishi, Naoya, Yoshinobu Kawahara, and Takehisa Yairi. "Subspace dynamic mode
decomposition for stochastic Koopman analysis." Physical Review E 96.3 (2017):
033310.
"""
import numpy as np
from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
def reducedsvd(X, r=None):
"""
Computes the reduced SVD of `X` taking only the first `r` modes.
:param np.ndarray X: Matrix to be used for SVD.
:param int r: Number of modes to be retained, if `None` the rank of `X` is
used instead.
:rtype: tuple
:return: Left singular vectors, singular values and right singular vectors.
"""
U, s, V = np.linalg.svd(X, full_matrices=True)
if r is None:
r = np.linalg.matrix_rank(X)
return U[:, :r], s[:r], V.conj().T[:, :r]
class SubspaceDMDOperator(DMDOperator):
"""
Subspace Dynamic Mode Decomposition operator class.
:param svd_rank: the rank for the truncation; if -1 all the columns of
:math:`U_q` are used, if `svd_rank` is an integer grater than zero
it is used as the number of columns retained from :math:`U_q`.
`svd_rank=0` or float values are not supported.
:type svd_rank: int
:param rescale_mode: Scale Atilde as shown in
10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its
eigendecomposition. None means no rescaling, 'auto' means automatic
rescaling using singular values, otherwise the scaling factors.
:type rescale_mode: {'auto'} or None or numpy.ndarray
:param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by
magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary
part to break ties) if `sorted_eigs='real'`. Default: False.
:type sorted_eigs: {'real', 'abs'} or False
"""
def __init__(self, svd_rank, rescale_mode, sorted_eigs):
super().__init__(
svd_rank=svd_rank,
exact=True,
forward_backward=False,
rescale_mode=rescale_mode,
sorted_eigs=sorted_eigs,
tikhonov_regularization=None,
)
self._Atilde = None
self._modes = None
self._Lambda = None
def compute_operator(self, Yp, Yf):
"""
Compute the low-rank operator.
Computes also modes, eigenvalues and DMD amplitudes.
:param numpy.ndarray Yp: Matrix `Yp` as defined in the original paper.
:param numpy.ndarray Yp: Matrix `Yf` as defined in the original paper.
"""
n = Yp.shape[0] // 2
_, _, Vp = reducedsvd(Yp)
O = Yf.dot(Vp).dot(Vp.T.conj())
Uq, _, _ = reducedsvd(O)
r = min(np.linalg.matrix_rank(O), n)
if self._svd_rank > 0:
r = min(r, self._svd_rank)
Uq1, Uq2 = Uq[:n, :r], Uq[n:, :r]
U, S, V = reducedsvd(Uq1)
self._Atilde = self._least_square_operator(U, S, V, Uq2)
self._compute_eigenquantities()
M = Uq2.dot(V) * np.reciprocal(S)
self._compute_modes(M)
def _compute_modes(self, M):
"""
Private method that computes eigenvalues and eigenvectors of the
high-dimensional operator (stored in self.modes and self.Lambda).
:param numpy.ndarray M: Matrix `M` as defined in the original paper.
"""
if self._rescale_mode is None:
W = self.eigenvectors
else:
# compute W as shown in arXiv:1409.5496 (section 2.4)
factors_sqrt = np.diag(np.power(self._rescale_mode, 0.5))
W = factors_sqrt.dot(self.eigenvectors)
# compute the eigenvectors of the high-dimensional operator
high_dimensional_eigenvectors = M.dot(W) * np.reciprocal(
self.eigenvalues
)
# eigenvalues are the same of lowrank
high_dimensional_eigenvalues = self.eigenvalues
self._modes = high_dimensional_eigenvectors
self._Lambda = high_dimensional_eigenvalues
class SubspaceDMD(DMDBase):
"""
Subspace Dynamic Mode Decomposition
:param svd_rank: the rank for the truncation; if -1 all the columns of
:math:`U_q` are used, if `svd_rank` is an integer grater than zero
it is used as the number of columns retained from :math:`U_q`.
`svd_rank=0` or float values are not supported.
:type svd_rank: int
:param opt: argument to control the computation of DMD modes amplitudes.
See :class:`DMDBase`. Default is False.
:type opt: bool or int
:param rescale_mode: Scale Atilde as shown in
10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its
eigendecomposition. None means no rescaling, 'auto' means automatic
rescaling using singular values, otherwise the scaling factors.
:type rescale_mode: {'auto'} or None or numpy.ndarray
:param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by
magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary
part to break ties) if `sorted_eigs='real'`. Default: False.
:type sorted_eigs: {'real', 'abs'} or False
"""
def __init__(
self,
svd_rank=-1,
opt=False,
rescale_mode=None,
sorted_eigs=False,
):
self._tlsq_rank = 0
self._original_time = None
self._dmd_time = None
self._opt = opt
self._b = None
self._snapshots_holder = None
self._modes_activation_bitmask_proxy = None
self._Atilde = SubspaceDMDOperator(
svd_rank=svd_rank,
rescale_mode=rescale_mode,
sorted_eigs=sorted_eigs,
)
def fit(self, X):
"""
Compute the Subspace Dynamic Modes Decomposition to the input data.
:param X: the input snapshots.
:type X: numpy.ndarray or iterable
"""
self._reset()
self._snapshots_holder = Snapshots(X)
n_samples = self.snapshots.shape[1]
Y0 = self.snapshots[:, :-3]
Y1 = self.snapshots[:, 1:-2]
Y2 = self.snapshots[:, 2:-1]
Y3 = self.snapshots[:, 3:]
Yp = np.vstack((Y0, Y1))
Yf = np.vstack((Y2, Y3))
self.operator.compute_operator(Yp, Yf)
# Default timesteps
self._set_initial_time_dictionary(
{"t0": 0, "tend": n_samples - 1, "dt": 1}
)
self._b = self._compute_amplitudes()
return self