gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
tests.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 Defines various nose unit tests
5 
6 """
7 
8 import numpy as np
9 
10 from .mh import MHSampler
11 from .ensemble import EnsembleSampler
12 from .ptsampler import PTSampler
13 
14 logprecision = -4
15 
16 
17 def lnprob_gaussian(x, icov):
18  return -np.dot(x, np.dot(icov, x)) / 2.0
19 
20 
21 def lnprob_gaussian_nan(x, icov):
22  #if walker's parameters are zeros => return NaN
23  if not (np.array(x)).any():
24  result = np.nan
25  else:
26  result = -np.dot(x, np.dot(icov, x)) / 2.0
27 
28  return result
29 
30 
31 def log_unit_sphere_volume(ndim):
32  if ndim % 2 == 0:
33  logfactorial = 0.0
34  for i in range(1, ndim / 2 + 1):
35  logfactorial += np.log(i)
36  return ndim / 2.0 * np.log(np.pi) - logfactorial
37  else:
38  logfactorial = 0.0
39  for i in range(1, ndim + 1, 2):
40  logfactorial += np.log(i)
41  return (ndim + 1) / 2.0 * np.log(2.0) \
42  + (ndim - 1) / 2.0 * np.log(np.pi) - logfactorial
43 
44 
45 class LnprobGaussian(object):
46  def __init__(self, icov, cutoff=None):
47  """Initialize a gaussian PDF with the given inverse covariance
48  matrix. If not ``None``, ``cutoff`` truncates the PDF at the
49  given number of sigma from the origin (i.e. the PDF is
50  non-zero only on an ellipse aligned with the principal axes of
51  the distribution). Without this cutoff, thermodynamic
52  integration with a flat prior is logarithmically divergent."""
53 
54  self.icov = icov
55  self.cutoff = cutoff
56 
57  def __call__(self, x):
58  dist2 = lnprob_gaussian(x, self.icov)
59 
60  if self.cutoff is not None:
61  if -dist2 > self.cutoff * self.cutoff / 2.0:
62  return float('-inf')
63  else:
64  return dist2
65  else:
66  return dist2
67 
68 
69 def ln_flat(x):
70  return 0.0
71 
72 
73 class Tests:
74 
75  def setUp(self):
76  self.nwalkers = 100
77  self.ndim = 5
78 
79  self.ntemp = 20
80 
81  self.N = 1000
82 
83  self.mean = np.zeros(self.ndim)
84  self.cov = 0.5 - np.random.rand(self.ndim ** 2) \
85  .reshape((self.ndim, self.ndim))
86  self.cov = np.triu(self.cov)
87  self.cov += self.cov.T - np.diag(self.cov.diagonal())
88  self.cov = np.dot(self.cov, self.cov)
89  self.icov = np.linalg.inv(self.cov)
90  self.p0 = [0.1 * np.random.randn(self.ndim)
91  for i in range(self.nwalkers)]
92  self.truth = np.random.multivariate_normal(self.mean, self.cov, 100000)
93 
94  def check_sampler(self, N=None, p0=None):
95  if N is None:
96  N = self.N
97  if p0 is None:
98  p0 = self.p0
99 
100  for i in self.sampler.sample(p0, iterations=N):
101  pass
102 
103  assert np.mean(self.sampler.acceptance_fraction) > 0.25
104  assert np.all(self.sampler.acceptance_fraction > 0)
105 
106  chain = self.sampler.flatchain
107  maxdiff = 10. ** (logprecision)
108  assert np.all((np.mean(chain, axis=0) - self.mean) ** 2 / self.N ** 2
109  < maxdiff)
110  assert np.all((np.cov(chain, rowvar=0) - self.cov) ** 2 / self.N ** 2
111  < maxdiff)
112 
113  def check_pt_sampler(self, cutoff, N=None, p0=None):
114  if N is None:
115  N = self.N
116  if p0 is None:
117  p0 = self.p0
118 
119  for i in self.sampler.sample(p0, iterations=N):
120  pass
121 
122  # Weaker assertions on acceptance fraction
123  assert np.mean(self.sampler.acceptance_fraction) > 0.1
124  assert np.mean(self.sampler.tswap_acceptance_fraction) > 0.1
125 
126  maxdiff = 10.0 ** logprecision
127 
128  chain = np.reshape(self.sampler.chain[0, ...],
129  (-1, self.sampler.chain.shape[-1]))
130 
131  log_volume = self.ndim * np.log(cutoff) \
132  + log_unit_sphere_volume(self.ndim) \
133  + 0.5 * np.log(np.linalg.det(self.cov))
134  gaussian_integral = self.ndim / 2.0 * np.log(2.0 * np.pi) \
135  + 0.5 * np.log(np.linalg.det(self.cov))
136 
137  lnZ, dlnZ = self.sampler.thermodynamic_integration_log_evidence()
138 
139  assert np.abs(lnZ - (gaussian_integral - log_volume)) < 3 * dlnZ
140  assert np.all((np.mean(chain, axis=0) - self.mean) ** 2.0 / N ** 2.0
141  < maxdiff)
142  assert np.all((np.cov(chain, rowvar=0) - self.cov) ** 2.0 / N ** 2.0
143  < maxdiff)
144 
145  def test_mh(self):
146  self.sampler = MHSampler(self.cov, self.ndim, lnprob_gaussian,
147  args=[self.icov])
148  self.check_sampler(N=self.N * self.nwalkers, p0=self.p0[0])
149 
150  def test_ensemble(self):
151  self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
152  lnprob_gaussian, args=[self.icov])
153  self.check_sampler()
154 
155  def test_nan_lnprob(self):
156  self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
157  lnprob_gaussian_nan,
158  args=[self.icov])
159 
160  # If a walker is right at zero, ``lnprobfn`` returns ``np.nan``.
161  p0 = self.p0
162  p0[0] = 0.0
163 
164  try:
165  self.check_sampler(p0=p0)
166  except ValueError:
167  # This should fail *immediately* with a ``ValueError``.
168  return
169 
170  assert False, "We should never get here."
171 
172  def test_inf_nan_params(self):
173  self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
174  lnprob_gaussian, args=[self.icov])
175 
176  # Set one of the walkers to have a ``np.nan`` value.
177  p0 = self.p0
178  p0[0][0] = np.nan
179 
180  try:
181  self.check_sampler(p0=p0)
182 
183  except ValueError:
184  # This should fail *immediately* with a ``ValueError``.
185  pass
186 
187  else:
188  assert False, "The sampler should have failed by now."
189 
190  # Set one of the walkers to have a ``np.inf`` value.
191  p0[0][0] = np.inf
192 
193  try:
194  self.check_sampler(p0=p0)
195 
196  except ValueError:
197  # This should fail *immediately* with a ``ValueError``.
198  pass
199 
200  else:
201  assert False, "The sampler should have failed by now."
202 
203  # Set one of the walkers to have a ``np.inf`` value.
204  p0[0][0] = -np.inf
205 
206  try:
207  self.check_sampler(p0=p0)
208 
209  except ValueError:
210  # This should fail *immediately* with a ``ValueError``.
211  pass
212 
213  else:
214  assert False, "The sampler should have failed by now."
215 
216  # def test_parallel(self):
217  # self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
218  # lnprob_gaussian, args=[self.icov],
219  # threads=2)
220  # self.check_sampler()
221 
222  def test_pt_sampler(self):
223  cutoff = 10.0
224  self.sampler = PTSampler(self.ntemp, self.nwalkers, self.ndim,
225  LnprobGaussian(self.icov, cutoff=cutoff),
226  ln_flat)
227  p0 = np.random.multivariate_normal(mean=self.mean, cov=self.cov,
228  size=(self.ntemp, self.nwalkers))
229  self.check_pt_sampler(cutoff, p0=p0, N=1000)
230 
231  def test_blobs(self):
232  lnprobfn = lambda p: (-0.5 * np.sum(p ** 2), np.random.rand())
233  self.sampler = EnsembleSampler(self.nwalkers, self.ndim, lnprobfn)
234  self.check_sampler()
235 
236  # Make sure that the shapes of everything are as expected.
237  assert (self.sampler.chain.shape == (self.nwalkers, self.N, self.ndim)
238  and len(self.sampler.blobs) == self.N
239  and len(self.sampler.blobs[0]) == self.nwalkers), \
240  "The blob dimensions are wrong."
241 
242  # Make sure that the blobs aren't all the same.
243  blobs = self.sampler.blobs
244  assert np.any([blobs[-1] != blobs[i] for i in range(len(blobs) - 1)])