summaryrefslogtreecommitdiff
path: root/nitime/analysis/granger.py
diff options
context:
space:
mode:
Diffstat (limited to 'nitime/analysis/granger.py')
-rw-r--r--nitime/analysis/granger.py216
1 files changed, 216 insertions, 0 deletions
diff --git a/nitime/analysis/granger.py b/nitime/analysis/granger.py
new file mode 100644
index 0000000..6e1efb2
--- /dev/null
+++ b/nitime/analysis/granger.py
@@ -0,0 +1,216 @@
+"""
+
+Analyzers for the calculation of Granger 'causality'
+
+"""
+
+import numpy as np
+import nitime.algorithms as alg
+import nitime.utils as utils
+from nitime import descriptors as desc
+
+from .base import BaseAnalyzer
+
+# To suppport older versions of numpy that don't have tril_indices:
+from nitime.index_utils import tril_indices_from
+
+def fit_model(x1, x2, order=None, max_order=10,
+ criterion=utils.bayesian_information_criterion):
+ """
+ Fit the auto-regressive model used in calculation of Granger 'causality'.
+
+ Parameters
+ ----------
+
+ x1,x2: float arrays (n)
+ x1,x2 bivariate combination.
+ order: int (optional)
+ If known, the order of the autoregressive process
+ max_order: int (optional)
+ If the order is not known, this will be the maximal order to fit.
+ criterion: callable
+ A function which defines an information criterion, used to determine the
+ order of the model.
+
+ """
+ c_old = np.inf
+ n_process = 2
+ Ntotal = n_process * x1.shape[-1]
+
+ # If model order was provided as an input:
+ if order is not None:
+ lag = order + 1
+ Rxx = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
+ coef, ecov = alg.lwr_recursion(np.array(Rxx).transpose(2, 0, 1))
+
+ # If the model order is not known and provided as input:
+ else:
+ for lag in range(1, max_order):
+ Rxx_new = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
+ coef_new, ecov_new = alg.lwr_recursion(
+ np.array(Rxx_new).transpose(2, 0, 1))
+ order_new = coef_new.shape[0]
+ c_new = criterion(ecov_new, n_process, order_new, Ntotal)
+ if c_new > c_old:
+ # Keep the values you got in the last round and break out:
+ break
+
+ else:
+ # Replace the output values with the new calculated values and
+ # move on to the next order:
+ c_old = c_new
+ order = order_new
+ Rxx = Rxx_new
+ coef = coef_new
+ ecov = ecov_new
+ else:
+ e_s = ("Model estimation order did not converge at max_order = %s"
+ % max_order)
+ raise ValueError(e_s)
+
+ return order, Rxx, coef, ecov
+
+
+class GrangerAnalyzer(BaseAnalyzer):
+ """Analyzer for computing all-to-all Granger 'causality' """
+ def __init__(self, input=None, ij=None, order=None, max_order=10,
+ criterion=utils.bayesian_information_criterion, n_freqs=1024):
+ """
+ Initializer for the GrangerAnalyzer.
+
+ Parameters
+ ----------
+
+ input: nitime TimeSeries object
+ ij: List of tuples of the form: [(0, 1), (0, 2)], etc.
+ These are the indices of pairs of time-series for which the
+ analysis will be done. Defaults to all vs. all.
+ order: int (optional)
+ The order of the process. If this is not known, it will be
+ estimated from the data, using the information criterion
+ max_order: if the order is estimated, this is the maximal order to
+ estimate for.
+ n_freqs: int (optional)
+ The size of the sampling grid in the frequency domain.
+ Defaults to 1024
+ criterion:
+ XXX
+ """
+ self.data = input.data
+ self.sampling_rate = input.sampling_rate
+ self._n_process = input.shape[0]
+ self._n_freqs = n_freqs
+ self._order = order
+ self._criterion = criterion
+ self._max_order = max_order
+ if ij is None:
+ # The following gets the full list of combinations of
+ # non-same i's and j's:
+ x, y = np.meshgrid(np.arange(self._n_process),
+ np.arange(self._n_process))
+ self.ij = list(zip(x[tril_indices_from(x, -1)],
+ y[tril_indices_from(y, -1)]))
+ else:
+ self.ij = ij
+
+ @desc.setattr_on_read
+ def _model(self):
+ model = dict(order={}, autocov={}, model_coef={}, error_cov={})
+ for i, j in self.ij:
+ model[i, j] = dict()
+ order_t, Rxx_t, coef_t, ecov_t = fit_model(self.data[i],
+ self.data[j],
+ order=self._order,
+ max_order=self._max_order,
+ criterion=self._criterion)
+ model['order'][i, j] = order_t
+ model['autocov'][i, j] = Rxx_t
+ model['model_coef'][i, j] = coef_t
+ model['error_cov'][i, j] = ecov_t
+
+ return model
+
+ @desc.setattr_on_read
+ def order(self):
+ if self._order is None:
+ return self._model['order']
+ else:
+ order = {}
+ for i, j in self.ij:
+ order[i, j] = self._order
+ return order
+
+ @desc.setattr_on_read
+ def autocov(self):
+ return self._model['autocov']
+
+ @desc.setattr_on_read
+ def model_coef(self):
+ return self._model['model_coef']
+
+ @desc.setattr_on_read
+ def error_cov(self):
+ return self._model['error_cov']
+
+ @desc.setattr_on_read
+ def _granger_causality(self):
+ """
+ This returns a dict with the values computed by
+ :func:`granger_causality_xy`, rather than arrays, so that we can delay
+ the allocation of arrays as much as possible.
+
+ """
+ gc = dict(frequencies={}, gc_xy={}, gc_yx={}, gc_sim={},
+ spectral_density={})
+ for i, j in self.ij:
+ w, f_x2y, f_y2x, f_xy, Sw = \
+ alg.granger_causality_xy(self.model_coef[i, j],
+ self.error_cov[i, j],
+ n_freqs=self._n_freqs)
+
+ # All other measures are dependent on i, j:
+ gc['gc_xy'][i, j] = f_x2y
+ gc['gc_yx'][i, j] = f_y2x
+ gc['gc_sim'][i, j] = f_xy
+ gc['spectral_density'][i, j] = Sw
+
+ return gc
+
+ @desc.setattr_on_read
+ def frequencies(self):
+ return utils.get_freqs(self.sampling_rate, self._n_freqs)
+
+ def _dict2arr(self, key):
+ """
+ A helper function that will generate an array with all nan's and insert
+ the measure defined by 'key' into the array and return it to the
+ calling function. This allows us to get matrices of the measures of
+ interest, instead of a dict.
+ """
+ # Prepare the matrix for the output:
+ arr = np.empty((self._n_process,
+ self._n_process,
+ self.frequencies.shape[0]))
+
+ arr.fill(np.nan)
+
+ # 'Translate' from dict form into matrix form:
+ for i, j in self.ij:
+ arr[j, i, :] = self._granger_causality[key][i, j]
+ return arr
+
+ @desc.setattr_on_read
+ def causality_xy(self):
+ return self._dict2arr('gc_xy')
+
+ @desc.setattr_on_read
+ def causality_yx(self):
+ return self._dict2arr('gc_yx')
+
+ @desc.setattr_on_read
+ def simultaneous_causality(self):
+ return self._dict2arr('gc_sim')
+
+ @desc.setattr_on_read
+ def spectral_matrix(self):
+ return self._granger_causality['spectral_density']