#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
An affine invariant Markov chain Monte Carlo (MCMC) sampler.
Goodman & Weare, Ensemble Samplers With Affine Invariance
Comm. App. Math. Comp. Sci., Vol. 5 (2010), No. 1, 65–80
"""
from __future__ import (division, print_function, absolute_import,
unicode_literals)
__all__ = ["EnsembleSampler"]
import numpy as np
from . import autocorr
from .sampler import Sampler
from .interruptible_pool import InterruptiblePool
[docs]class EnsembleSampler(Sampler):
"""
A generalized Ensemble sampler that uses 2 ensembles for parallelization.
The ``__init__`` function will raise an ``AssertionError`` if
``k < 2 * dim`` (and you haven't set the ``live_dangerously`` parameter)
or if ``k`` is odd.
**Warning**: The :attr:`chain` member of this object has the shape:
``(nwalkers, nlinks, dim)`` where ``nlinks`` is the number of steps
taken by the chain and ``k`` is the number of walkers. Use the
:attr:`flatchain` property to get the chain flattened to
``(nlinks, dim)``. For users of pre-1.0 versions, this shape is
different so be careful!
:param nwalkers:
The number of Goodman & Weare "walkers".
:param dim:
Number of dimensions in the parameter space.
:param lnpostfn:
A function that takes a vector in the parameter space as input and
returns the natural logarithm of the posterior probability for that
position.
:param a: (optional)
The proposal scale parameter. (default: ``2.0``)
:param args: (optional)
A list of extra positional arguments for ``lnpostfn``. ``lnpostfn``
will be called with the sequence ``lnpostfn(p, *args, **kwargs)``.
:param kwargs: (optional)
A list of extra keyword arguments for ``lnpostfn``. ``lnpostfn``
will be called with the sequence ``lnpostfn(p, *args, **kwargs)``.
:param postargs: (optional)
Alias of ``args`` for backwards compatibility.
:param threads: (optional)
The number of threads to use for parallelization. If ``threads == 1``,
then the ``multiprocessing`` module is not used but if
``threads > 1``, then a ``Pool`` object is created and calls to
``lnpostfn`` are run in parallel.
:param pool: (optional)
An alternative method of using the parallelized algorithm. If
provided, the value of ``threads`` is ignored and the
object provided by ``pool`` is used for all parallelization. It
can be any object with a ``map`` method that follows the same
calling sequence as the built-in ``map`` function.
:param runtime_sortingfn: (optional)
A function implementing custom runtime load-balancing. See
:ref:`loadbalance` for more information.
"""
def __init__(self, nwalkers, dim, lnpostfn, a=2.0, args=[], kwargs={},
postargs=None, threads=1, pool=None, live_dangerously=False,
runtime_sortingfn=None):
self.k = nwalkers
self.a = a
self.threads = threads
self.pool = pool
self.runtime_sortingfn = runtime_sortingfn
if postargs is not None:
args = postargs
super(EnsembleSampler, self).__init__(dim, lnpostfn, args=args,
kwargs=kwargs)
# Do a little bit of _magic_ to make the likelihood call with
# ``args`` and ``kwargs`` pickleable.
self.lnprobfn = _function_wrapper(self.lnprobfn, self.args,
self.kwargs)
assert self.k % 2 == 0, "The number of walkers must be even."
if not live_dangerously:
assert self.k >= 2 * self.dim, (
"The number of walkers needs to be more than twice the "
"dimension of your parameter space... unless you're "
"crazy!")
if self.threads > 1 and self.pool is None:
self.pool = InterruptiblePool(self.threads)
[docs] def clear_blobs(self):
"""
Clear the ``blobs`` list.
"""
self._blobs = []
[docs] def reset(self):
"""
Clear the ``chain`` and ``lnprobability`` array. Also reset the
bookkeeping parameters.
"""
super(EnsembleSampler, self).reset()
self.naccepted = np.zeros(self.k)
self._chain = np.empty((self.k, 0, self.dim))
self._lnprob = np.empty((self.k, 0))
# Initialize list for storing optional metadata blobs.
self.clear_blobs()
[docs] def sample(self, p0, lnprob0=None, rstate0=None, blobs0=None,
iterations=1, thin=1, storechain=True, mh_proposal=None):
"""
Advance the chain ``iterations`` steps as a generator.
:param p0:
A list of the initial positions of the walkers in the
parameter space. It should have the shape ``(nwalkers, dim)``.
:param lnprob0: (optional)
The list of log posterior probabilities for the walkers at
positions given by ``p0``. If ``lnprob is None``, the initial
values are calculated. It should have the shape ``(k, dim)``.
:param rstate0: (optional)
The state of the random number generator.
See the :attr:`Sampler.random_state` property for details.
:param iterations: (optional)
The number of steps to run.
:param thin: (optional)
If you only want to store and yield every ``thin`` samples in the
chain, set thin to an integer greater than 1.
:param storechain: (optional)
By default, the sampler stores (in memory) the positions and
log-probabilities of the samples in the chain. If you are
using another method to store the samples to a file or if you
don't need to analyse the samples after the fact (for burn-in
for example) set ``storechain`` to ``False``.
:param mh_proposal: (optional)
A function that returns a list of positions for ``nwalkers``
walkers given a current list of positions of the same size. See
:class:`utils.MH_proposal_axisaligned` for an example.
At each iteration, this generator yields:
* ``pos`` - A list of the current positions of the walkers in the
parameter space. The shape of this object will be
``(nwalkers, dim)``.
* ``lnprob`` - The list of log posterior probabilities for the
walkers at positions given by ``pos`` . The shape of this object
is ``(nwalkers, dim)``.
* ``rstate`` - The current state of the random number generator.
* ``blobs`` - (optional) The metadata "blobs" associated with the
current position. The value is only returned if ``lnpostfn``
returns blobs too.
"""
# Try to set the initial value of the random number generator. This
# fails silently if it doesn't work but that's what we want because
# we'll just interpret any garbage as letting the generator stay in
# it's current state.
self.random_state = rstate0
p = np.array(p0)
halfk = int(self.k / 2)
# If the initial log-probabilities were not provided, calculate them
# now.
lnprob = lnprob0
blobs = blobs0
if lnprob is None:
lnprob, blobs = self._get_lnprob(p)
# Check to make sure that the probability function didn't return
# ``np.nan``.
if np.any(np.isnan(lnprob)):
raise ValueError("The initial lnprob was NaN.")
# Store the initial size of the stored chain.
i0 = self._chain.shape[1]
# Here, we resize chain in advance for performance. This actually
# makes a pretty big difference.
if storechain:
N = int(iterations / thin)
self._chain = np.concatenate((self._chain,
np.zeros((self.k, N, self.dim))),
axis=1)
self._lnprob = np.concatenate((self._lnprob,
np.zeros((self.k, N))), axis=1)
for i in range(int(iterations)):
self.iterations += 1
# If we were passed a Metropolis-Hastings proposal
# function, use it.
if mh_proposal is not None:
# Draw proposed positions & evaluate lnprob there
q = mh_proposal(p)
newlnp, blob = self._get_lnprob(q)
# Accept if newlnp is better; and ...
acc = (newlnp > lnprob)
# ... sometimes accept for steps that got worse
worse = np.flatnonzero(~acc)
acc[worse] = ((newlnp[worse] - lnprob[worse]) >
np.log(self._random.rand(len(worse))))
del worse
# Update the accepted walkers.
lnprob[acc] = newlnp[acc]
p[acc] = q[acc]
self.naccepted[acc] += 1
if blob is not None:
assert blobs is not None, (
"If you start sampling with a given lnprob, you also "
"need to provide the current list of blobs at that "
"position.")
ind = np.arange(self.k)[acc]
for j in ind:
blobs[j] = blob[j]
else:
# Loop over the two ensembles, calculating the proposed
# positions.
# Slices for the first and second halves
first, second = slice(halfk), slice(halfk, self.k)
for S0, S1 in [(first, second), (second, first)]:
q, newlnp, acc, blob = self._propose_stretch(p[S0], p[S1],
lnprob[S0])
if np.any(acc):
# Update the positions, log probabilities and
# acceptance counts.
lnprob[S0][acc] = newlnp[acc]
p[S0][acc] = q[acc]
self.naccepted[S0][acc] += 1
if blob is not None:
assert blobs is not None, (
"If you start sampling with a given lnprob, "
"you also need to provide the current list of "
"blobs at that position.")
ind = np.arange(len(acc))[acc]
indfull = np.arange(self.k)[S0][acc]
for j in range(len(ind)):
blobs[indfull[j]] = blob[ind[j]]
if storechain and i % thin == 0:
ind = i0 + int(i / thin)
self._chain[:, ind, :] = p
self._lnprob[:, ind] = lnprob
if blobs is not None:
self._blobs.append(list(blobs))
# Yield the result as an iterator so that the user can do all
# sorts of fun stuff with the results so far.
if blobs is not None:
# This is a bit of a hack to keep things backwards compatible.
yield p, lnprob, self.random_state, blobs
else:
yield p, lnprob, self.random_state
def _propose_stretch(self, p0, p1, lnprob0):
"""
Propose a new position for one sub-ensemble given the positions of
another.
:param p0:
The positions from which to jump.
:param p1:
The positions of the other ensemble.
:param lnprob0:
The log-probabilities at ``p0``.
This method returns:
* ``q`` - The new proposed positions for the walkers in ``ensemble``.
* ``newlnprob`` - The vector of log-probabilities at the positions
given by ``q``.
* ``accept`` - A vector of type ``bool`` indicating whether or not
the proposed position for each walker should be accepted.
* ``blob`` - The new meta data blobs or ``None`` if nothing was
returned by ``lnprobfn``.
"""
s = np.atleast_2d(p0)
Ns = len(s)
c = np.atleast_2d(p1)
Nc = len(c)
# Generate the vectors of random numbers that will produce the
# proposal.
zz = ((self.a - 1.) * self._random.rand(Ns) + 1) ** 2. / self.a
rint = self._random.randint(Nc, size=(Ns,))
# Calculate the proposed positions and the log-probability there.
q = c[rint] - zz[:, np.newaxis] * (c[rint] - s)
newlnprob, blob = self._get_lnprob(q)
# Decide whether or not the proposals should be accepted.
lnpdiff = (self.dim - 1.) * np.log(zz) + newlnprob - lnprob0
accept = (lnpdiff > np.log(self._random.rand(len(lnpdiff))))
return q, newlnprob, accept, blob
def _get_lnprob(self, pos=None):
"""
Calculate the vector of log-probability for the walkers.
:param pos: (optional)
The position vector in parameter space where the probability
should be calculated. This defaults to the current position
unless a different one is provided.
This method returns:
* ``lnprob`` - A vector of log-probabilities with one entry for each
walker in this sub-ensemble.
* ``blob`` - The list of meta data returned by the ``lnpostfn`` at
this position or ``None`` if nothing was returned.
"""
if pos is None:
p = self.pos
else:
p = pos
# Check that the parameters are in physical ranges.
if np.any(np.isinf(p)):
raise ValueError("At least one parameter value was infinite.")
if np.any(np.isnan(p)):
raise ValueError("At least one parameter value was NaN.")
# If the `pool` property of the sampler has been set (i.e. we want
# to use `multiprocessing`), use the `pool`'s map method. Otherwise,
# just use the built-in `map` function.
if self.pool is not None:
M = self.pool.map
else:
M = map
# sort the tasks according to (user-defined) some runtime guess
if self.runtime_sortingfn is not None:
p, idx = self.runtime_sortingfn(p)
# Run the log-probability calculations (optionally in parallel).
results = list(M(self.lnprobfn, [p[i] for i in range(len(p))]))
try:
lnprob = np.array([float(l[0]) for l in results])
blob = [l[1] for l in results]
except (IndexError, TypeError):
lnprob = np.array([float(l) for l in results])
blob = None
# sort it back according to the original order - get the same
# chain irrespective of the runtime sorting fn
if self.runtime_sortingfn is not None:
orig_idx = np.argsort(idx)
lnprob = lnprob[orig_idx]
p = [p[i] for i in orig_idx]
if blob is not None:
blob = [blob[i] for i in orig_idx]
# Check for lnprob returning NaN.
if np.any(np.isnan(lnprob)):
# Print some debugging stuff.
print("NaN value of lnprob for parameters: ")
for pars in p[np.isnan(lnprob)]:
print(pars)
# Finally raise exception.
raise ValueError("lnprob returned NaN.")
return lnprob, blob
@property
def blobs(self):
"""
Get the list of "blobs" produced by sampling. The result is a list
(of length ``iterations``) of ``list`` s (of length ``nwalkers``) of
arbitrary objects. **Note**: this will actually be an empty list if
your ``lnpostfn`` doesn't return any metadata.
"""
return self._blobs
@property
def chain(self):
"""
A pointer to the Markov chain itself. The shape of this array is
``(k, iterations, dim)``.
"""
return super(EnsembleSampler, self).chain
@property
def flatchain(self):
"""
A shortcut for accessing chain flattened along the zeroth (walker)
axis.
"""
s = self.chain.shape
return self.chain.reshape(s[0] * s[1], s[2])
@property
def lnprobability(self):
"""
A pointer to the matrix of the value of ``lnprobfn`` produced at each
step for each walker. The shape is ``(k, iterations)``.
"""
return super(EnsembleSampler, self).lnprobability
@property
def flatlnprobability(self):
"""
A shortcut to return the equivalent of ``lnprobability`` but aligned
to ``flatchain`` rather than ``chain``.
"""
return super(EnsembleSampler, self).lnprobability.flatten()
@property
def acceptance_fraction(self):
"""
An array (length: ``k``) of the fraction of steps accepted for each
walker.
"""
return super(EnsembleSampler, self).acceptance_fraction
@property
def acor(self):
"""
An estimate of the autocorrelation time for each parameter (length:
``dim``).
"""
return self.get_autocorr_time()
[docs] def get_autocorr_time(self, low=10, high=None, step=1, c=10, fast=False):
"""
Compute an estimate of the autocorrelation time for each parameter
(length: ``dim``).
:param low: (Optional[int])
The minimum window size to test.
(default: ``10``)
:param high: (Optional[int])
The maximum window size to test.
(default: ``x.shape[axis] / (2*c)``)
:param step: (Optional[int])
The step size for the window search.
(default: ``1``)
:param c: (Optional[float])
The minimum number of autocorrelation times needed to trust the
estimate.
(default: ``10``)
:param fast: (Optional[bool])
If ``True``, only use the first ``2^n`` (for the largest power)
entries for efficiency.
(default: False)
"""
return autocorr.integrated_time(np.mean(self.chain, axis=0), axis=0,
low=low, high=high, step=step, c=c,
fast=fast)
class _function_wrapper(object):
"""
This is a hack to make the likelihood function pickleable when ``args``
or ``kwargs`` are also included.
"""
def __init__(self, f, args, kwargs):
self.f = f
self.args = args
self.kwargs = kwargs
def __call__(self, x):
try:
return self.f(x, *self.args, **self.kwargs)
except:
import traceback
print("emcee: Exception while calling your likelihood function:")
print(" params:", x)
print(" args:", self.args)
print(" kwargs:", self.kwargs)
print(" exception:")
traceback.print_exc()
raise