Source code for emcee.mh

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
A vanilla Metropolis-Hastings sampler

"""

from __future__ import (division, print_function, absolute_import,
                        unicode_literals)

__all__ = ["MHSampler"]

import numpy as np

from . import autocorr
from .sampler import Sampler


# === MHSampler ===
[docs]class MHSampler(Sampler): """ The most basic possible Metropolis-Hastings style MCMC sampler :param cov: The covariance matrix to use for the proposal distribution. :param dim: Number of dimensions in the parameter space. :param lnpostfn: A function that takes a vector in the parameter space as input and returns the natural logarithm of the posterior probability for that position. :param args: (optional) A list of extra positional arguments for ``lnpostfn``. ``lnpostfn`` will be called with the sequence ``lnpostfn(p, *args, **kwargs)``. :param kwargs: (optional) A list of extra keyword arguments for ``lnpostfn``. ``lnpostfn`` will be called with the sequence ``lnpostfn(p, *args, **kwargs)``. """ def __init__(self, cov, *args, **kwargs): super(MHSampler, self).__init__(*args, **kwargs) self.cov = cov def reset(self): super(MHSampler, self).reset() self._chain = np.empty((0, self.dim)) self._lnprob = np.empty(0)
[docs] def sample(self, p0, lnprob=None, randomstate=None, thin=1, storechain=True, iterations=1): """ Advances the chain ``iterations`` steps as an iterator :param p0: The initial position vector. :param lnprob0: (optional) The log posterior probability at position ``p0``. If ``lnprob`` is not provided, the initial value is calculated. :param rstate0: (optional) The state of the random number generator. See the :func:`random_state` property for details. :param iterations: (optional) The number of steps to run. :param thin: (optional) If you only want to store and yield every ``thin`` samples in the chain, set thin to an integer greater than 1. :param storechain: (optional) By default, the sampler stores (in memory) the positions and log-probabilities of the samples in the chain. If you are using another method to store the samples to a file or if you don't need to analyse the samples after the fact (for burn-in for example) set ``storechain`` to ``False``. At each iteration, this generator yields: * ``pos`` - The current positions of the chain in the parameter space. * ``lnprob`` - The value of the log posterior at ``pos`` . * ``rstate`` - The current state of the random number generator. """ self.random_state = randomstate p = np.array(p0) if lnprob is None: lnprob = self.get_lnprob(p) # Resize the chain in advance. if storechain: N = int(iterations / thin) self._chain = np.concatenate((self._chain, np.zeros((N, self.dim))), axis=0) self._lnprob = np.append(self._lnprob, np.zeros(N)) i0 = self.iterations # Use range instead of xrange for python 3 compatability for i in range(int(iterations)): self.iterations += 1 # Calculate the proposal distribution. q = self._random.multivariate_normal(p, self.cov) newlnprob = self.get_lnprob(q) diff = newlnprob - lnprob # M-H acceptance ratio if diff < 0: diff = np.exp(diff) - self._random.rand() if diff > 0: p = q lnprob = newlnprob self.naccepted += 1 if storechain and i % thin == 0: ind = i0 + int(i / thin) self._chain[ind, :] = p self._lnprob[ind] = lnprob # Heavy duty iterator action going on right here... yield p, lnprob, self.random_state
@property def acor(self): """ An estimate of the autocorrelation time for each parameter (length: ``dim``). """ return self.get_autocorr_time()
[docs] def get_autocorr_time(self, low=10, high=None, step=1, c=10, fast=False): """ Compute an estimate of the autocorrelation time for each parameter (length: ``dim``). :param low: (Optional[int]) The minimum window size to test. (default: ``10``) :param high: (Optional[int]) The maximum window size to test. (default: ``x.shape[axis] / (2*c)``) :param step: (Optional[int]) The step size for the window search. (default: ``1``) :param c: (Optional[float]) The minimum number of autocorrelation times needed to trust the estimate. (default: ``10``) :param fast: (Optional[bool]) If ``True``, only use the first ``2^n`` (for the largest power) entries for efficiency. (default: False) """ return autocorr.integrated_time(self.chain, axis=0, low=low, high=high, step=step, c=c, fast=fast)