summaryrefslogtreecommitdiff
path: root/nitime/algorithms/autoregressive.py
diff options
context:
space:
mode:
Diffstat (limited to 'nitime/algorithms/autoregressive.py')
-rw-r--r--nitime/algorithms/autoregressive.py525
1 files changed, 525 insertions, 0 deletions
diff --git a/nitime/algorithms/autoregressive.py b/nitime/algorithms/autoregressive.py
new file mode 100644
index 0000000..019a59d
--- /dev/null
+++ b/nitime/algorithms/autoregressive.py
@@ -0,0 +1,525 @@
+r"""
+
+Autoregressive (AR) processes are processes of the form:
+
+.. math::
+
+ x(n) = a(1)x(n-1) + a(2)x(n-2) + ... + a(P)x(n-P) + e(n)
+
+where e(n) is a white noise process. The usage of 'e' suggests interpreting
+the linear combination of P past values of x(n) as the minimum mean square
+error linear predictor of x(n) Thus
+
+.. math::
+
+ e(n) = x(n) - a(1)x(n-1) - a(2)x(n-2) - ... - a(P)x(n-P)
+
+Due to whiteness, e(n) is also pointwise uncorrelated--ie,
+
+.. math::
+ :nowrap:
+
+ \begin{align*}
+ \text{(i)} && E\{e(n)e^{*}(n-m)\}& = \delta(n-m) &\\
+ \text{(ii)} && E\{e(n)x^{*}(m)\} & = 0 & m\neq n\\
+ \text{(iii)} && E\{|e|^{2}\} = E\{e(n)e^{*}(n)\} &= E\{e(n)x^{*}(n)\} &
+ \end{align*}
+
+These principles form the basis of the methods in this module for
+estimating the AR coefficients and the error/innovations power.
+"""
+
+
+import numpy as np
+from nitime.lazy import scipy_linalg as linalg
+
+import nitime.utils as utils
+from .spectral import freq_response
+
+
+def AR_est_YW(x, order, rxx=None):
+ r"""Determine the autoregressive (AR) model of a random process x using
+ the Yule Walker equations. The AR model takes this convention:
+
+ .. math::
+
+ x(n) = a(1)x(n-1) + a(2)x(n-2) + \dots + a(p)x(n-p) + e(n)
+
+ where e(n) is a zero-mean white noise process with variance sig_sq,
+ and p is the order of the AR model. This method returns the a_i and
+ sigma
+
+ The orthogonality property of minimum mean square error estimates
+ states that
+
+ .. math::
+
+ E\{e(n)x^{*}(n-k)\} = 0 \quad 1\leq k\leq p
+
+ Inserting the definition of the error signal into the equations above
+ yields the Yule Walker system of equations:
+
+ .. math::
+
+ R_{xx}(k) = \sum_{i=1}^{p}a(i)R_{xx}(k-i) \quad1\leq k\leq p
+
+ Similarly, the variance of the error process is
+
+ .. math::
+
+ E\{e(n)e^{*}(n)\} = E\{e(n)x^{*}(n)\} = R_{xx}(0)-\sum_{i=1}^{p}a(i)R^{*}(i)
+
+
+ Parameters
+ ----------
+ x : ndarray
+ The sampled autoregressive random process
+
+ order : int
+ The order p of the AR system
+
+ rxx : ndarray (optional)
+ An optional, possibly unbiased estimate of the autocorrelation of x
+
+ Returns
+ -------
+ ak, sig_sq : The estimated AR coefficients and innovations variance
+
+ """
+ if rxx is not None and type(rxx) == np.ndarray:
+ r_m = rxx[:order + 1]
+ else:
+ r_m = utils.autocorr(x)[:order + 1]
+
+ Tm = linalg.toeplitz(r_m[:order])
+ y = r_m[1:]
+ ak = linalg.solve(Tm, y)
+ sigma_v = r_m[0].real - np.dot(r_m[1:].conj(), ak).real
+ return ak, sigma_v
+
+
+def AR_est_LD(x, order, rxx=None):
+ r"""Levinson-Durbin algorithm for solving the Hermitian Toeplitz
+ system of Yule-Walker equations in the AR estimation problem
+
+ .. math::
+
+ T^{(p)}a^{(p)} = \gamma^{(p+1)}
+
+ where
+
+ .. math::
+ :nowrap:
+
+ \begin{align*}
+ T^{(p)} &= \begin{pmatrix}
+ R_{0} & R_{1}^{*} & \cdots & R_{p-1}^{*}\\
+ R_{1} & R_{0} & \cdots & R_{p-2}^{*}\\
+ \vdots & \vdots & \ddots & \vdots\\
+ R_{p-1}^{*} & R_{p-2}^{*} & \cdots & R_{0}
+ \end{pmatrix}\\
+ a^{(p)} &=\begin{pmatrix} a_1 & a_2 & \cdots a_p \end{pmatrix}^{T}\\
+ \gamma^{(p+1)}&=\begin{pmatrix}R_1 & R_2 & \cdots & R_p \end{pmatrix}^{T}
+ \end{align*}
+
+ and :math:`R_k` is the autocorrelation of the kth lag
+
+ Parameters
+ ----------
+
+ x : ndarray
+ the zero-mean stochastic process
+ order : int
+ the AR model order--IE the rank of the system.
+ rxx : ndarray, optional
+ (at least) order+1 samples of the autocorrelation sequence
+
+ Returns
+ -------
+
+ ak, sig_sq
+ The AR coefficients for 1 <= k <= p, and the variance of the
+ driving white noise process
+
+ """
+
+ if rxx is not None and type(rxx) == np.ndarray:
+ rxx_m = rxx[:order + 1]
+ else:
+ rxx_m = utils.autocorr(x)[:order + 1]
+ w = np.zeros((order + 1, ), rxx_m.dtype)
+ # intialize the recursion with the R[0]w[1]=r[1] solution (p=1)
+ b = rxx_m[0].real
+ w_k = rxx_m[1] / b
+ w[1] = w_k
+ p = 2
+ while p <= order:
+ b *= 1 - (w_k * w_k.conj()).real
+ w_k = (rxx_m[p] - (w[1:p] * rxx_m[1:p][::-1]).sum()) / b
+ # update w_k from k=1,2,...,p-1
+ # with a correction from w*_i i=p-1,p-2,...,1
+ w[1:p] = w[1:p] - w_k * w[1:p][::-1].conj()
+ w[p] = w_k
+ p += 1
+ b *= 1 - (w_k * w_k.conj()).real
+ return w[1:], b
+
+
+def lwr_recursion(r):
+ r"""Perform a Levinson-Wiggins[Whittle]-Robinson recursion to
+ find the coefficients a(i) that satisfy the matrix version
+ of the Yule-Walker system of P + 1 equations:
+
+ sum_{i=0}^{P} a(i)r(k-i) = 0, for k = {1,2,...,P}
+
+ with the additional equation
+
+ sum_{i=0}^{P} a(i)r(-k) = V
+
+ where V is the covariance matrix of the innovations process,
+ and a(0) is fixed at the identity matrix
+
+ Also note that r is defined as:
+
+ r(k) = E{ X(t)X*(t-k) } ( * = conjugate transpose )
+ r(-k) = r*(k)
+
+
+ This routine adapts the algorithm found in eqs (1)-(11)
+ in Morf, Vieira, Kailath 1978
+
+ Parameters
+ ----------
+
+ r : ndarray, shape (P + 1, nc, nc)
+
+ Returns
+ -------
+
+ a : ndarray (P,nc,nc)
+ coefficient sequence of order P
+ sigma : ndarray (nc,nc)
+ covariance estimate
+
+ """
+
+ # r is (P+1, nc, nc)
+ nc = r.shape[1]
+ P = r.shape[0] - 1
+
+ a = np.zeros((P, nc, nc)) # ar coefs
+ b = np.zeros_like(a) # lp coefs
+ sigb = np.zeros_like(r[0]) # forward prediction error covariance
+ sigf = np.zeros_like(r[0]) # backward prediction error covariance
+ delta = np.zeros_like(r[0])
+
+ # initialize
+ idnt = np.eye(nc)
+ sigf[:] = r[0]
+ sigb[:] = r[0]
+
+ # iteratively find sequences A_{p+1}(i) and B_{p+1}(i)
+ for p in range(P):
+
+ # calculate delta_{p+1}
+ # delta_{p+1} = r(p+1) + sum_{i=1}^{p} a(i)r(p+1-i)
+ delta[:] = r[p + 1]
+ for i in range(1, p + 1):
+ delta += np.dot(a[i - 1], r[p + 1 - i])
+
+ # intermediate values XXX: should turn these into solution-problems
+ ka = np.dot(delta, linalg.inv(sigb))
+ kb = np.dot(delta.conj().T, linalg.inv(sigf))
+
+ # store a_{p} before updating sequence to a_{p+1}
+ ao = a.copy()
+ # a_{p+1}(i) = a_{p}(i) - ka*b_{p}(p+1-i) for i in {1,2,...,p}
+ # b_{p+1}(i) = b_{p}(i) - kb*a_{p}(p+1-i) for i in {1,2,...,p}
+ for i in range(1, p + 1):
+ a[i - 1] -= np.dot(ka, b[p - i])
+ for i in range(1, p + 1):
+ b[i - 1] -= np.dot(kb, ao[p - i])
+
+ a[p] = -ka
+ b[p] = -kb
+
+ sigf = np.dot(idnt - np.dot(ka, kb), sigf)
+ sigb = np.dot(idnt - np.dot(kb, ka), sigb)
+
+ return a, sigf
+
+
+def MAR_est_LWR(x, order, rxx=None):
+ r"""
+ MAR estimation, using the LWR algorithm, as in Morf et al.
+
+
+ Parameters
+ ----------
+ x : ndarray
+ The sampled autoregressive random process
+
+ order : int
+ The order P of the AR system
+
+ rxx : ndarray (optional)
+ An optional, possibly unbiased estimate of the autocovariance of x
+
+ Returns
+ -------
+ a, ecov : The system coefficients and the estimated covariance
+ """
+ Rxx = utils.autocov_vector(x, nlags=order)
+ a, ecov = lwr_recursion(Rxx.transpose(2, 0, 1))
+ return a, ecov
+
+
+def AR_psd(ak, sigma_v, n_freqs=1024, sides='onesided'):
+ r"""
+ Compute the PSD of an AR process, based on the process coefficients and
+ covariance
+
+ n_freqs : int
+ The number of spacings on the frequency grid from [-PI,PI).
+ If sides=='onesided', n_freqs/2+1 frequencies are computed from [0,PI]
+
+ sides : str (optional)
+ Indicates whether to return a one-sided or two-sided PSD
+
+ Returns
+ -------
+ (w, ar_psd)
+ w : Array of normalized frequences from [-.5, .5) or [0,.5]
+ ar_psd : A PSD estimate computed by sigma_v / |1-a(f)|**2 , where
+ a(f) = DTFT(ak)
+
+
+ """
+ # compute the psd as |H(f)|**2, where H(f) is the transfer function
+ # for this model s[n] = a1*s[n-1] + a2*s[n-2] + ... aP*s[n-P] + v[n]
+ # Taken as a IIR system with unit-variance white noise input e[n]
+ # and output s[n],
+ # b0*e[n] = w0*s[n] + w1*s[n-1] + w2*s[n-2] + ... + wP*s[n-P],
+ # where b0 = sqrt(VAR{v[n]}), w0 = 1, and wk = -ak for k>0
+ # the transfer function here is H(f) = DTFT(w)
+ # leading to Sxx(f)/Exx(f) = |H(f)|**2 = VAR{v[n]} / |W(f)|**2
+ w, hw = freq_response(sigma_v ** 0.5, a=np.r_[1, -ak],
+ n_freqs=n_freqs, sides=sides)
+ ar_psd = (hw * hw.conj()).real
+ return (w, 2 * ar_psd) if sides == 'onesided' else (w, ar_psd)
+
+
+#-----------------------------------------------------------------------------
+# Granger causality analysis
+#-----------------------------------------------------------------------------
+def transfer_function_xy(a, n_freqs=1024):
+ r"""Helper routine to compute the transfer function H(w) based
+ on sequence of coefficient matrices A(i). The z transforms
+ follow from this definition:
+
+ X[t] + sum_{k=1}^P a[k]X[t-k] = Err[t]
+
+ Parameters
+ ----------
+
+ a : ndarray, shape (P, 2, 2)
+ sequence of coef matrices describing an mAR process
+ n_freqs : int, optional
+ number of frequencies to compute in range [0,PI]
+
+ Returns
+ -------
+
+ Hw : ndarray
+ The transfer function from innovations process vector to
+ mAR process X
+
+ """
+ # these concatenations follow from the observation that A(0) is
+ # implicitly the identity matrix
+ ai = np.r_[1, a[:, 0, 0]]
+ bi = np.r_[0, a[:, 0, 1]]
+ ci = np.r_[0, a[:, 1, 0]]
+ di = np.r_[1, a[:, 1, 1]]
+
+ # compute A(w) such that A(w)X(w) = Err(w)
+ w, aw = freq_response(ai, n_freqs=n_freqs)
+ _, bw = freq_response(bi, n_freqs=n_freqs)
+ _, cw = freq_response(ci, n_freqs=n_freqs)
+ _, dw = freq_response(di, n_freqs=n_freqs)
+
+ #A = np.array([ [1-aw, -bw], [-cw, 1-dw] ])
+ A = np.array([[aw, bw], [cw, dw]])
+ # compute the transfer function from Err to X. Since Err(w) is 1(w),
+ # the transfer function H(w) = A^(-1)(w)
+ # (use 2x2 matrix shortcut)
+ detA = (A[0, 0] * A[1, 1] - A[0, 1] * A[1, 0])
+ Hw = np.array([[dw, -bw], [-cw, aw]])
+ Hw /= detA
+ return w, Hw
+
+
+def spectral_matrix_xy(Hw, cov):
+ r"""Compute the spectral matrix S(w), from the convention:
+
+ X[t] + sum_{k=1}^P a[k]X[t-k] = Err[t]
+
+ The formulation follows from Ding, Chen, Bressler 2008,
+ pg 6 eqs (11) to (15)
+
+ The transfer function H(w) should be computed first from
+ transfer_function_xy()
+
+ Parameters
+ ----------
+
+ Hw : ndarray (2, 2, n_freqs)
+ Pre-computed transfer function from transfer_function_xy()
+
+ cov : ndarray (2, 2)
+ The covariance between innovations processes in Err[t]
+
+ Returns
+ -------
+
+ Sw : ndarrays
+ matrix of spectral density functions
+ """
+
+ nw = Hw.shape[-1]
+ # now compute specral density function estimate
+ # S(w) = H(w)SigH*(w)
+ Sw = np.empty((2, 2, nw), 'D')
+
+ # do a shortcut for 2x2:
+ # compute T(w) = SigH*(w)
+ # t00 = Sig[0,0] * H*_00(w) + Sig[0,1] * H*_10(w)
+ t00 = cov[0, 0] * Hw[0, 0].conj() + cov[0, 1] * Hw[0, 1].conj()
+ # t01 = Sig[0,0] * H*_01(w) + Sig[0,1] * H*_11(w)
+ t01 = cov[0, 0] * Hw[1, 0].conj() + cov[0, 1] * Hw[1, 1].conj()
+ # t10 = Sig[1,0] * H*_00(w) + Sig[1,1] * H*_10(w)
+ t10 = cov[1, 0] * Hw[0, 0].conj() + cov[1, 1] * Hw[0, 1].conj()
+ # t11 = Sig[1,0] * H*_01(w) + Sig[1,1] * H*_11(w)
+ t11 = cov[1, 0] * Hw[1, 0].conj() + cov[1, 1] * Hw[1, 1].conj()
+
+ # now S(w) = H(w)T(w)
+ Sw[0, 0] = Hw[0, 0] * t00 + Hw[0, 1] * t10
+ Sw[0, 1] = Hw[0, 0] * t01 + Hw[0, 1] * t11
+ Sw[1, 0] = Hw[1, 0] * t00 + Hw[1, 1] * t10
+ Sw[1, 1] = Hw[1, 0] * t01 + Hw[1, 1] * t11
+
+ return Sw
+
+
+def coherence_from_spectral(Sw):
+ r"""Compute the spectral coherence between processes X and Y,
+ given their spectral matrix S(w)
+
+ Parameters
+ ----------
+
+ Sw : ndarray
+ spectral matrix
+ """
+
+ Sxx = Sw[0, 0].real
+ Syy = Sw[1, 1].real
+
+ Sxy_mod_sq = (Sw[0, 1] * Sw[1, 0]).real
+ Sxy_mod_sq /= Sxx
+ Sxy_mod_sq /= Syy
+ return Sxy_mod_sq
+
+
+def interdependence_xy(Sw):
+ r"""Compute the 'total interdependence' between processes X and Y,
+ given their spectral matrix S(w)
+
+ Parameters
+ ----------
+
+ Sw : ndarray
+ spectral matrix
+
+ Returns
+ -------
+
+ fxy(w)
+ interdependence function of frequency
+ """
+
+ Cw = coherence_from_spectral(Sw)
+ return -np.log(1 - Cw)
+
+
+def granger_causality_xy(a, cov, n_freqs=1024):
+ r"""Compute the Granger causality between processes X and Y, which
+ are linked in a multivariate autoregressive (mAR) model parameterized
+ by coefficient matrices a(i) and the innovations covariance matrix
+
+ X[t] + sum_{k=1}^P a[k]X[t-k] = Err[t]
+
+ Parameters
+ ----------
+
+ a : ndarray, (P,2,2)
+ coefficient matrices characterizing the autoregressive mixing
+ cov : ndarray, (2,2)
+ covariance matrix characterizing the innovations vector
+ n_freqs: int
+ number of frequencies to compute in the fourier transform
+
+ Returns
+ -------
+
+ w, f_x_on_y, f_y_on_x, f_xy, Sw
+ 1) vector of frequencies
+ 2) function of the Granger causality of X on Y
+ 3) function of the Granger causality of Y on X
+ 4) function of the 'instantaneous causality' between X and Y
+ 5) spectral density matrix
+ """
+
+ w, Hw = transfer_function_xy(a, n_freqs=n_freqs)
+
+ sigma = cov[0, 0]
+ upsilon = cov[0, 1]
+ gamma = cov[1, 1]
+
+ # this transformation of the transfer functions computes the
+ # Granger causality of Y on X
+ gamma2 = gamma - upsilon ** 2 / sigma
+
+ Hxy = Hw[0, 1]
+ Hxx_hat = Hw[0, 0] + (upsilon / sigma) * Hxy
+
+ xx_auto_component = (sigma * Hxx_hat * Hxx_hat.conj()).real
+ cross_component = gamma2 * Hxy * Hxy.conj()
+ Sxx = xx_auto_component + cross_component
+ f_y_on_x = np.log(Sxx.real / xx_auto_component)
+
+ # this transformation computes the Granger causality of X on Y
+ sigma2 = sigma - upsilon ** 2 / gamma
+
+ Hyx = Hw[1, 0]
+ Hyy_hat = Hw[1, 1] + (upsilon / gamma) * Hyx
+ yy_auto_component = (gamma * Hyy_hat * Hyy_hat.conj()).real
+ cross_component = sigma2 * Hyx * Hyx.conj()
+ Syy = yy_auto_component + cross_component
+ f_x_on_y = np.log(Syy.real / yy_auto_component)
+
+ # now compute cross densities, using the latest transformation
+ Hxx = Hw[0, 0]
+ Hyx = Hw[1, 0]
+ Hxy_hat = Hw[0, 1] + (upsilon / gamma) * Hxx
+ Sxy = sigma2 * Hxx * Hyx.conj() + gamma * Hxy_hat * Hyy_hat.conj()
+ Syx = sigma2 * Hyx * Hxx.conj() + gamma * Hyy_hat * Hxy_hat.conj()
+
+ # can safely throw away imaginary part
+ # since Sxx and Syy are real, and Sxy == Syx*
+ detS = (Sxx * Syy - Sxy * Syx).real
+ f_xy = xx_auto_component * yy_auto_component
+ f_xy /= detS
+ f_xy = np.log(f_xy)
+
+ return w, f_x_on_y, f_y_on_x, f_xy, np.array([[Sxx, Sxy], [Syx, Syy]])