Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 140 additions & 1 deletion pyest/filters/Gmkf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import pyest.gm as pygm
from pyest.filters import GaussianMixturePredict, GaussianMixtureUpdate
from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict
from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict, EkfdUpdate
from pyest.utils import make_tuple


Expand Down Expand Up @@ -250,3 +250,142 @@ def lin_gauss_likelihood_agreement(z, m, P, H, R, L=None, h_args=()):
h_args = make_tuple(h_args)
Hk = H(*h_args)
return pygm.eval_mvnpdf(z, Hk@m, Hk@P@Hk.T + LRLt)

class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate):
""" Discrete Gaussian Mixture Extended Kalman Filter Update

Parameters
----------
h : callable
measurement function of the form :math:`h(x, ...)`
H : callable
(nz,nx) measurement Jacobian matrix of the form :math:`H(x)`
R : ndarray
(ny,ny) measurement noise covariance matrix
L : (optional) ndarray
(nz,ny) mapping matrix mapping measurement noise into
measurement space
cov_method : (optional) string
method to use for covariance update. Valid options include 'general'
(default), 'Joseph', 'standard', and 'KWK'.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def __lin_gauss_likelihood_agreement(self, z, zhat, W):
return pygm.eval_mvnpdf(z, zhat, W)

def lin_gauss_cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False):
""" compute the product of the linear Gaussian likelihood function and
another Gaussian pdf

Parameters
----------
m : ndarray
(nx,) prior mean
P : ndarray
(nx,nx) prior covariance matrix
z : ndarray
(nz,) measurement
h_args : (optional) tuple
deterministic parameters to be passed to measurement function
interm_vals : (optional) Boolean
if True, returns intermediate values from computation. False by default

Returns
-------
mp : ndarray
(nx,) posterior mean
Pp : ndarray
(nx,nx) posterior state error covariance
q : float
likelihood agreement, :math:`q = N(z; Hm, HPH' + R)`

If interm_vals is true, additionally returns a dictionary containing:
W : ndarray
(nz,nz) innovatations covariance
C : (ndarray)
(nx,nz) cross-covariance
K : (ndarray)
gain matrix
zhat : (ndarray)
predicted measurement
"""
mp, Pp, interm = super().update(m, P, z, interm_vals=True, h_args=h_args)
q = self.__lin_gauss_likelihood_agreement(z, interm['zhat'], interm['W'])

if not interm_vals:
return mp, Pp, q
else:
return mp, Pp, q, interm

def cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False):
""" compute the product of the linear Gaussian likelihood function and
another Gaussian pdf

Parameters
----------
m : ndarray
(nx,) prior mean
P : ndarray
(nx,nx) prior covariance matrix
z : ndarray
(nz,) measurement
h_args : (optional) tuple
deterministic parameters to be passed to measurement function
interm_vals : (optional) Boolean
if True, returns intermediate values from computation. False by default

Returns
-------
mp : ndarray
(nx,) posterior mean
Pp : ndarray
(nx,nx) posterior state error covariance
q : float
likelihood agreement, :math:`q = N(z; Hm, HPH' + R)`

If interm_vals is true, additionally returns a dictionary containing:
W : ndarray
(nz,nz) innovatations covariance
C : ndarray
(nx,nz) cross-covariance
K : ndarray
gain matrix
zhat : (ndarray)
predicted measurement
"""
return self.lin_gauss_cond_likelihood_prod(m, P, z, h_args=h_args, interm_vals=interm_vals)

def update(self, pkm, zk, unnormalized=False, h_args=(), *args, **kwargs):
""" measurement-update of gm

Parameters
----------
pkm : GaussianMixture
prior density at time tk
zk : ndarray
(nz,) measurement at time k
unnormalized : optional
if True, returns unnormalized distribution. False by default
h_args : (optional) tuple
deterministic parameters to be passed to measurement function
"""
wkp = np.empty_like(pkm.w, dtype=float)
mkp = np.empty_like(pkm.m, dtype=float)
Pkp = np.empty_like(pkm.P, dtype=float)
for i, (wm,mm,Pm) in enumerate(pkm):
print(f"updating mixand {i+1} out of {len(pkm)}")
mkp[i], Pkp[i], q, interm_vals = self.lin_gauss_cond_likelihood_prod(mm, Pm, zk, h_args=h_args, interm_vals=True)
print(f"Q values: {q}")
print(f"true z: {zk}")
print(f"z_hat: {interm_vals['zhat']}")
# print(f"W: {interm_vals["W"]}")
# TODO: test (z-zhat).T @ inv(W) @ (z-zhat)
wkp[i] = wm*q

if not unnormalized:
wkp /= np.sum(wkp)

return pygm.GaussianMixture(wkp, mkp, Pkp)
Loading