"""
.. module:: likelihood_class
:synopsis: Definition of the major likelihoods
.. moduleauthor:: Julien Lesgourgues <lesgourg@cern.ch>
.. moduleauthor:: Benjamin Audren <benjamin.audren@epfl.ch>
Contains the definition of the base likelihood class :class:`Likelihood`, with
basic functions, as well as more specific likelihood classes that may be reused
to implement new ones.
"""
import os
import numpy as np
import math
import warnings
import re
import scipy.constants as const
import io_mp
[docs]class Likelihood(object):
"""
General class that all likelihoods will inherit from.
"""
def __init__(self, path, data, command_line):
"""
It copies the content of self.path from the initialization routine of
the :class:`Data <data.Data>` class, and defines a handful of useful
methods, that every likelihood might need.
If the nuisance parameters required to compute this likelihood are not
defined (either fixed or varying), the code will stop.
Parameters
----------
data : class
Initialized instance of :class:`Data <data.Data>`
command_line : NameSpace
NameSpace containing the command line arguments
"""
self.name = self.__class__.__name__
self.folder = os.path.abspath(os.path.join(
data.path['MontePython'], 'likelihoods', self.name))
if not data.log_flag:
path = os.path.join(command_line.folder, 'log.param')
# Define some default fields
self.data_directory = ''
# Store all the default fields stored, for the method read_file.
self.default_values = ['data_directory']
# Recover the values potentially read in the input.param file.
if hasattr(data, self.name):
exec("attributes = [e for e in dir(data.%s) if e.find('__') == -1]" % self.name)
for elem in attributes:
exec("setattr(self, elem, getattr(data.%s, elem))" % self.name)
# Read values from the data file
self.read_from_file(path, data, command_line)
# Default state
self.need_update = True
# Check if the nuisance parameters are defined
error_flag = False
try:
for nuisance in self.use_nuisance:
if nuisance not in data.get_mcmc_parameters(['nuisance']):
error_flag = True
warnings.warn(
nuisance + " must be defined, either fixed or " +
"varying, for %s likelihood" % self.name)
self.nuisance = self.use_nuisance
except AttributeError:
self.use_nuisance = []
self.nuisance = []
# If at least one is missing, raise an exception.
if error_flag:
raise io_mp.LikelihoodError(
"Check your nuisance parameter list for your set of"
"experiments")
# Append to the log.param the value used (WARNING: so far no comparison
# is done to ensure that the experiments share the same parameters)
if data.log_flag:
io_mp.log_likelihood_parameters(self, command_line)
[docs] def loglkl(self, cosmo, data):
"""
Placeholder to remind that this function needs to be defined for a
new likelihood.
Raises
------
NotImplementedError
"""
raise NotImplementedError(
'Must implement method loglkl() in your likelihood')
[docs] def read_from_file(self, path, data, command_line):
"""
Extract the information from the log.param concerning this likelihood.
If the log.param is used, check that at least one item for each
likelihood is recovered. Otherwise, it means the log.param does not
contain information on the likelihood. This happens when the first run
fails early, before calling the likelihoods, and the program did not
log the information. This check might not be completely secure, but it
is better than nothing.
.. warning::
This checks relies on the fact that a likelihood should always have
at least **one** line of code written in the likelihood.data file.
This should be always true, but in case a run fails with the error
message described below, think about it.
.. warning::
As of version 2.0.2, you can specify likelihood options in the
parameter file. They have complete priority over the ones specified
in the `likelihood.data` file, and it will be reflected in the
`log.param` file.
"""
# Counting how many lines are read.
counter = 0
self.path = path
self.dictionary = {}
if os.path.isfile(path):
data_file = open(path, 'r')
for line in data_file:
if line.find('#') == -1:
if line.find(self.name+'.') != -1:
# Recover the name and value from the .data file
regexp = re.match(
"%s.(.*)\s*=\s*(.*)" % self.name, line)
name, value = (
elem.strip() for elem in regexp.groups())
# If this name was already defined in the parameter
# file, be sure to take this value instead. Beware,
# there are a few parameters which are always
# predefined, such as data_directory, which should be
# ignored in this check.
is_ignored = False
if name not in self.default_values:
try:
value = getattr(self, name)
is_ignored = True
except AttributeError:
pass
if not is_ignored:
exec('self.'+name+' = '+value)
value = getattr(self, name)
counter += 1
self.dictionary[name] = value
data_file.seek(0)
data_file.close()
# Checking that at least one line was read, exiting otherwise
if counter == 0:
raise io_mp.ConfigurationError(
"No information on %s likelihood " % self.name +
"was found in the %s file.\n" % path +
"This can result from a failed initialization of a previous " +
"run. To solve this, you can do a \n " +
"]$ rm -rf %s \n " % command_line.folder +
"Be sure there is noting in it before doing this !")
[docs] def get_cl(self, cosmo, l_max=-1):
"""
Return the :math:`C_{\ell}` from the cosmological code in
:math:`\mu {\\rm K}^2`
"""
# get C_l^XX from the cosmological code
cl = cosmo.lensed_cl(l_max)
# convert dimensionless C_l's to C_l in muK**2
T = cosmo.T_cmb()
for key in cl.iterkeys():
# All quantities need to be multiplied by this factor, except the
# phi-phi term, that is already dimensionless
if key not in ['pp', 'ell']:
cl[key] *= (T*1.e6)**2
return cl
[docs] def need_cosmo_arguments(self, data, dictionary):
"""
Ensure that the arguments of dictionary are defined to the correct
value in the cosmological code
.. warning::
So far there is no way to enforce a parameter where `smaller is
better`. A bigger value will always overried any smaller one
(`cl_max`, etc...)
Parameters
----------
data : dict
Initialized instance of :class:`data`
dictionary : dict
Desired precision for some cosmological parameters
"""
array_flag = False
for key, value in dictionary.iteritems():
try:
data.cosmo_arguments[key]
try:
float(data.cosmo_arguments[key])
num_flag = True
except ValueError:
num_flag = False
except TypeError:
num_flag = True
array_flag = True
except KeyError:
try:
float(value)
num_flag = True
data.cosmo_arguments[key] = 0
except ValueError:
num_flag = False
data.cosmo_arguments[key] = ''
except TypeError:
num_flag = True
array_flag = True
if num_flag is False:
if data.cosmo_arguments[key].find(value) == -1:
data.cosmo_arguments[key] += ' '+value+' '
else:
if array_flag is False:
if float(data.cosmo_arguments[key]) < value:
data.cosmo_arguments[key] = value
else:
data.cosmo_arguments[key] = '%.2g' % value[0]
for i in range(1, len(value)):
data.cosmo_arguments[key] += ',%.2g' % (value[i])
[docs] def read_contamination_spectra(self, data):
for nuisance in self.use_nuisance:
# read spectrum contamination (so far, assumes only temperature
# contamination; will be trivial to generalize to polarization when
# such templates will become relevant)
setattr(self, "%s_contamination" % nuisance,
np.zeros(self.l_max+1, 'float64'))
try:
File = open(os.path.join(
self.data_directory, getattr(self, "%s_file" % nuisance)),
'r')
for line in File:
l = int(float(line.split()[0]))
if ((l >= 2) and (l <= self.l_max)):
exec "self.%s_contamination[l]=float(line.split()[1])/(l*(l+1.)/2./math.pi)" % nuisance
except:
print 'Warning: you did not pass a file name containing '
print 'a contamination spectrum regulated by the nuisance '
print 'parameter '+nuisance
# read renormalization factor
# if it is not there, assume it is one, i.e. do not renormalize
try:
# do the following operation:
# self.nuisance_contamination *= float(self.nuisance_scale)
setattr(self, "%s_contamination" % nuisance,
getattr(self, "%s_contamination" % nuisance) *
float(getattr(self, "%s_scale" % nuisance)))
except AttributeError:
pass
# read central value of nuisance parameter
# if it is not there, assume one by default
try:
getattr(self, "%s_prior_center" % nuisance)
except AttributeError:
setattr(self, "%s_prior_center" % nuisance, 1.)
# read variance of nuisance parameter
# if it is not there, assume flat prior (encoded through
# variance=0)
try:
getattr(self, "%s_prior_variance" % nuisance)
except:
setattr(self, "%s_prior_variance" % nuisance, 0.)
[docs] def add_contamination_spectra(self, cl, data):
# Recover the current value of the nuisance parameter.
for nuisance in self.use_nuisance:
nuisance_value = float(
data.mcmc_parameters[nuisance]['current'] *
data.mcmc_parameters[nuisance]['scale'])
# add contamination spectra multiplied by nuisance parameters
for l in range(2, self.l_max):
exec "cl['tt'][l] += nuisance_value*self.%s_contamination[l]" % nuisance
return cl
[docs] def add_nuisance_prior(self, lkl, data):
# Recover the current value of the nuisance parameter.
for nuisance in self.use_nuisance:
nuisance_value = float(
data.mcmc_parameters[nuisance]['current'] *
data.mcmc_parameters[nuisance]['scale'])
# add prior on nuisance parameters
if getattr(self, "%s_prior_variance" % nuisance) > 0:
# convenience variables
prior_center = getattr(self, "%s_prior_center" % nuisance)
prior_variance = getattr(self, "%s_prior_variance" % nuisance)
lkl += -0.5*((nuisance_value-prior_center)/prior_variance)**2
return lkl
[docs] def computeLikelihood(self, ctx):
"""
Interface with CosmoHammer
Parameters
----------
ctx : Context
Contains several dictionaries storing data and cosmological
information
"""
# Recover both instances from the context
cosmo = ctx.get("cosmo")
data = ctx.get("data")
loglkl = self.loglkl(cosmo, data)
return loglkl
###################################
#
# END OF GENERIC LIKELIHOOD CLASS
#
###################################
###################################
# PRIOR TYPE LIKELIHOOD
# --> H0,...
###################################
[docs]class Likelihood_prior(Likelihood):
[docs] def loglkl(self):
raise NotImplementedError('Must implement method loglkl() in your likelihood')
###################################
# NEWDAT TYPE LIKELIHOOD
# --> spt,boomerang,etc.
###################################
[docs]class Likelihood_newdat(Likelihood):
def __init__(self, path, data, command_line):
Likelihood.__init__(self, path, data, command_line)
self.need_cosmo_arguments(
data, {'lensing': 'yes', 'output': 'tCl lCl pCl'})
# open .newdat file
newdatfile = open(
os.path.join(self.data_directory, self.file), 'r')
# find beginning of window functions file names
window_name = newdatfile.readline().strip('\n').replace(' ', '')
# initialize list of fist and last band for each type
band_num = np.zeros(6, 'int')
band_min = np.zeros(6, 'int')
band_max = np.zeros(6, 'int')
# read number of bands for each of the six types TT, EE, BB, EB, TE, TB
line = newdatfile.readline()
for i in range(6):
band_num[i] = int(line.split()[i])
# read string equal to 'BAND_SELECTION' or not
line = str(newdatfile.readline()).strip('\n').replace(' ', '')
# if yes, read 6 lines containing 'min, max'
if (line == 'BAND_SELECTION'):
for i in range(6):
line = newdatfile.readline()
band_min[i] = int(line.split()[0])
band_max[i] = int(line.split()[1])
# if no, set min to 1 and max to band_num (=use all bands)
else:
band_min = [1 for i in range(6)]
band_max = band_num
# read line defining calibration uncertainty
# contains: flag (=0 or 1), calib, calib_uncertainty
line = newdatfile.readline()
calib = float(line.split()[1])
if (int(line.split()[0]) == 0):
self.calib_uncertainty = 0
else:
self.calib_uncertainty = float(line.split()[2])
# read line defining beam uncertainty
# contains: flag (=0, 1 or 2), beam_width, beam_sigma
line = newdatfile.readline()
beam_type = int(line.split()[0])
if (beam_type > 0):
self.has_beam_uncertainty = True
else:
self.has_beam_uncertainty = False
beam_width = float(line.split()[1])
beam_sigma = float(line.split()[2])
# read flag (= 0, 1 or 2) for lognormal distributions and xfactors
line = newdatfile.readline()
likelihood_type = int(line.split()[0])
if (likelihood_type > 0):
self.has_xfactors = True
else:
self.has_xfactors = False
# declare array of quantitites describing each point of measurement
# size yet unknown, it will be found later and stored as
# self.num_points
self.obs = np.array([], 'float64')
self.var = np.array([], 'float64')
self.beam_error = np.array([], 'float64')
self.has_xfactor = np.array([], 'bool')
self.xfactor = np.array([], 'float64')
# temporary array to know which bands are actually used
used_index = np.array([], 'int')
index = -1
# scan the lines describing each point of measurement
for cltype in range(6):
if (int(band_num[cltype]) != 0):
# read name (but do not use it)
newdatfile.readline()
for band in range(int(band_num[cltype])):
# read one line corresponding to one measurement
line = newdatfile.readline()
index += 1
# if we wish to actually use this measurement
if ((band >= band_min[cltype]-1) and
(band <= band_max[cltype]-1)):
used_index = np.append(used_index, index)
self.obs = np.append(
self.obs, float(line.split()[1])*calib**2)
self.var = np.append(
self.var,
(0.5*(float(line.split()[2]) +
float(line.split()[3]))*calib**2)**2)
self.xfactor = np.append(
self.xfactor, float(line.split()[4])*calib**2)
if ((likelihood_type == 0) or
((likelihood_type == 2) and
(int(line.split()[7]) == 0))):
self.has_xfactor = np.append(
self.has_xfactor, [False])
if ((likelihood_type == 1) or
((likelihood_type == 2) and
(int(line.split()[7]) == 1))):
self.has_xfactor = np.append(
self.has_xfactor, [True])
if (beam_type == 0):
self.beam_error = np.append(self.beam_error, 0.)
if (beam_type == 1):
l_mid = float(line.split()[5]) +\
0.5*(float(line.split()[5]) +
float(line.split()[6]))
self.beam_error = np.append(
self.beam_error,
abs(math.exp(
-l_mid*(l_mid+1)*1.526e-8*2.*beam_sigma *
beam_width)-1.))
if (beam_type == 2):
if (likelihood_type == 2):
self.beam_error = np.append(
self.beam_error, float(line.split()[8]))
else:
self.beam_error = np.append(
self.beam_error, float(line.split()[7]))
# now, skip and unused part of the file (with sub-correlation
# matrices)
for band in range(int(band_num[cltype])):
newdatfile.readline()
# number of points that we will actually use
self.num_points = np.shape(self.obs)[0]
# total number of points, including unused ones
full_num_points = index+1
# read full correlation matrix
full_covmat = np.zeros((full_num_points, full_num_points), 'float64')
for point in range(full_num_points):
full_covmat[point] = newdatfile.readline().split()
# extract smaller correlation matrix for points actually used
covmat = np.zeros((self.num_points, self.num_points), 'float64')
for point in range(self.num_points):
covmat[point] = full_covmat[used_index[point], used_index]
# recalibrate this correlation matrix
covmat *= calib**4
# redefine the correlation matrix, the observed points and their
# variance in case of lognormal likelihood
if (self.has_xfactors):
for i in range(self.num_points):
for j in range(self.num_points):
if (self.has_xfactor[i]):
covmat[i, j] /= (self.obs[i]+self.xfactor[i])
if (self.has_xfactor[j]):
covmat[i, j] /= (self.obs[j]+self.xfactor[j])
for i in range(self.num_points):
if (self.has_xfactor[i]):
self.var[i] /= (self.obs[i]+self.xfactor[i])**2
self.obs[i] = math.log(self.obs[i]+self.xfactor[i])
# invert correlation matrix
self.inv_covmat = np.linalg.inv(covmat)
# read window function files a first time, only for finding the
# smallest and largest l's for each point
self.win_min = np.zeros(self.num_points, 'int')
self.win_max = np.zeros(self.num_points, 'int')
for point in range(self.num_points):
for line in open(os.path.join(
self.data_directory, 'windows', window_name) +
str(used_index[point]+1), 'r'):
if any([float(line.split()[i]) != 0.
for i in range(1, len(line.split()))]):
if (self.win_min[point] == 0):
self.win_min[point] = int(line.split()[0])
self.win_max[point] = int(line.split()[0])
# infer from format of window function files whether we will use
# polarisation spectra or not
num_col = len(line.split())
if (num_col == 2):
self.has_pol = False
else:
if (num_col == 5):
self.has_pol = True
else:
raise io_mp.LikelihoodError(
"In likelihood %s. " % self.name +
"Window function files are understood if they contain " +
"2 columns (l TT), or 5 columns (l TT TE EE BB)." +
"In this case the number of columns is %d" % num_col)
# define array of window functions
self.window = np.zeros(
(self.num_points, max(self.win_max)+1, num_col-1), 'float64')
# go again through window function file, this time reading window
# functions; that are distributed as: l TT (TE EE BB) where the last
# columns contaim W_l/l, not W_l we mutiply by l in order to store the
# actual W_l
for point in range(self.num_points):
for line in open(os.path.join(
self.data_directory, 'windows', window_name) +
str(used_index[point]+1), 'r'):
l = int(line.split()[0])
if (((self.has_pol is False) and (len(line.split()) != 2))
or ((self.has_pol is True) and
(len(line.split()) != 5))):
raise io_mp.LikelihoodError(
"In likelihood %s. " % self.name +
"for a given experiment, all window functions should" +
" have the same number of columns, 2 or 5. " +
"This is not the case here.")
if ((l >= self.win_min[point]) and (l <= self.win_max[point])):
self.window[point, l, :] = [
float(line.split()[i])
for i in range(1, len(line.split()))]
self.window[point, l, :] *= l
# eventually, initialise quantitites used in the marginalization over
# nuisance parameters
if ((self.has_xfactors) and
((self.calib_uncertainty > 1.e-4) or
(self.has_beam_uncertainty))):
self.halfsteps = 5
self.margeweights = np.zeros(2*self.halfsteps+1, 'float64')
for i in range(-self.halfsteps, self.halfsteps+1):
self.margeweights[i+self.halfsteps] = np.exp(
-(float(i)*3./float(self.halfsteps))**2/2)
self.margenorm = sum(self.margeweights)
# store maximum value of l needed by window functions
self.l_max = max(self.win_max)
# impose that the cosmological code computes Cl's up to maximum l
# needed by the window function
self.need_cosmo_arguments(data, {'l_max_scalars': self.l_max})
# deal with nuisance parameters
try:
self.use_nuisance
self.nuisance = self.use_nuisance
except:
self.use_nuisance = []
self.nuisance = []
self.read_contamination_spectra(data)
# end of initialisation
[docs] def loglkl(self, cosmo, data):
# get Cl's from the cosmological code
cl = self.get_cl(cosmo)
# add contamination spectra multiplied by nuisance parameters
cl = self.add_contamination_spectra(cl, data)
# get likelihood
lkl = self.compute_lkl(cl, cosmo, data)
# add prior on nuisance parameters
lkl = self.add_nuisance_prior(lkl, data)
return lkl
[docs] def compute_lkl(self, cl, cosmo, data):
# checks that Cl's have been computed up to high enough l given window
# function range. Normally this has been imposed before, so this test
# could even be supressed.
if (np.shape(cl['tt'])[0]-1 < self.l_max):
raise io_mp.LikelihoodError(
"%s computed Cls till l=" % data.cosmological_module_name +
"%d " % (np.shape(cl['tt'])[0]-1) +
"while window functions need %d." % self.l_max)
# compute theoretical bandpowers, store them in theo[points]
theo = np.zeros(self.num_points, 'float64')
for point in range(self.num_points):
# find bandpowers B_l by convolving C_l's with [(l+1/2)/2pi W_l]
for l in range(self.win_min[point], self.win_max[point]):
theo[point] += cl['tt'][l]*self.window[point, l, 0] *\
(l+0.5)/2./math.pi
if (self.has_pol):
theo[point] += (
cl['te'][l]*self.window[point, l, 1] +
cl['ee'][l]*self.window[point, l, 2] +
cl['bb'][l]*self.window[point, l, 3]) *\
(l+0.5)/2./math.pi
# allocate array for differencve between observed and theoretical
# bandpowers
difference = np.zeros(self.num_points, 'float64')
# depending on the presence of lognormal likelihood, calibration
# uncertainty and beam uncertainity, use several methods for
# marginalising over nuisance parameters:
# first method: numerical integration over calibration uncertainty:
if (self.has_xfactors and
((self.calib_uncertainty > 1.e-4) or
self.has_beam_uncertainty)):
chisq_tmp = np.zeros(2*self.halfsteps+1, 'float64')
chisqcalib = np.zeros(2*self.halfsteps+1, 'float64')
beam_error = np.zeros(self.num_points, 'float64')
# loop over various beam errors
for ibeam in range(2*self.halfsteps+1):
# beam error
for point in range(self.num_points):
if (self.has_beam_uncertainty):
beam_error[point] = 1.+self.beam_error[point] *\
(ibeam-self.halfsteps)*3/float(self.halfsteps)
else:
beam_error[point] = 1.
# loop over various calibraion errors
for icalib in range(2*self.halfsteps+1):
# calibration error
calib_error = 1+self.calib_uncertainty*(
icalib-self.halfsteps)*3/float(self.halfsteps)
# compute difference between observed and theoretical
# points, after correcting the later for errors
for point in range(self.num_points):
# for lognormal likelihood, use log(B_l+X_l)
if (self.has_xfactor[point]):
difference[point] = self.obs[point] -\
math.log(
theo[point]*beam_error[point] *
calib_error+self.xfactor[point])
# otherwise use B_l
else:
difference[point] = self.obs[point] -\
theo[point]*beam_error[point]*calib_error
# find chisq with those corrections
# chisq_tmp[icalib] = np.dot(np.transpose(difference),
# np.dot(self.inv_covmat, difference))
chisq_tmp[icalib] = np.dot(
difference, np.dot(self.inv_covmat, difference))
minchisq = min(chisq_tmp)
# find chisq marginalized over calibration uncertainty (if any)
tot = 0
for icalib in range(2*self.halfsteps+1):
tot += self.margeweights[icalib]*math.exp(
max(-30., -(chisq_tmp[icalib]-minchisq)/2.))
chisqcalib[ibeam] = -2*math.log(tot/self.margenorm)+minchisq
# find chisq marginalized over beam uncertainty (if any)
if (self.has_beam_uncertainty):
minchisq = min(chisqcalib)
tot = 0
for ibeam in range(2*self.halfsteps+1):
tot += self.margeweights[ibeam]*math.exp(
max(-30., -(chisqcalib[ibeam]-minchisq)/2.))
chisq = -2*math.log(tot/self.margenorm)+minchisq
else:
chisq = chisqcalib[0]
# second method: marginalize over nuisance parameters (if any)
# analytically
else:
# for lognormal likelihood, theo[point] should contain log(B_l+X_l)
if (self.has_xfactors):
for point in range(self.num_points):
if (self.has_xfactor[point]):
theo[point] = math.log(theo[point]+self.xfactor[point])
# find vector of difference between observed and theoretical
# bandpowers
difference = self.obs-theo
# find chisq
chisq = np.dot(
np.transpose(difference), np.dot(self.inv_covmat, difference))
# correct eventually for effect of analytic marginalization over
# nuisance parameters
if ((self.calib_uncertainty > 1.e-4) or self.has_beam_uncertainty):
denom = 1.
tmpi = np.dot(self.inv_covmat, theo)
chi2op = np.dot(np.transpose(difference), tmp)
chi2pp = np.dot(np.transpose(theo), tmp)
# TODO beam is not defined here !
if (self.has_beam_uncertainty):
for points in range(self.num_points):
beam[point] = self.beam_error[point]*theo[point]
tmp = np.dot(self.inv_covmat, beam)
chi2dd = np.dot(np.transpose(beam), tmp)
chi2pd = np.dot(np.transpose(theo), tmp)
chi2od = np.dot(np.transpose(difference), tmp)
if (self.calib_uncertainty > 1.e-4):
wpp = 1/(chi2pp+1/self.calib_uncertainty**2)
chisq = chisq-wpp*chi2op**2
denom = denom/wpp*self.calib_uncertainty**2
else:
wpp = 0
if (self.has_beam_uncertainty):
wdd = 1/(chi2dd-wpp*chi2pd**2+1)
chisq = chisq-wdd*(chi2od-wpp*chi2op*chi2pd)**2
denom = denom/wdd
chisq += math.log(denom)
# finally, return ln(L)=-chi2/2
self.lkl = -0.5 * chisq
return self.lkl
###################################
# CLIK TYPE LIKELIHOOD
# --> clik_fake_planck,clik_wmap,etc.
###################################
[docs]class Likelihood_clik(Likelihood):
def __init__(self, path, data, command_line):
Likelihood.__init__(self, path, data, command_line)
self.need_cosmo_arguments(
data, {'lensing': 'yes', 'output': 'tCl lCl pCl'})
try:
import clik
except ImportError:
raise io_mp.MissingLibraryError(
"You must first activate the binaries from the Clik " +
"distribution. Please run : \n " +
"]$ source /path/to/clik/bin/clik_profile.sh \n " +
"and try again.")
# for lensing, some routines change. Intializing a flag for easier
# testing of this condition
#if self.name == 'Planck_lensing':
if 'lensing' in self.name and 'Planck' in self.name:
self.lensing = True
else:
self.lensing = False
try:
if self.lensing:
self.clik = clik.clik_lensing(self.path_clik)
try:
self.l_max = max(self.clik.get_lmax())
# following 2 lines for compatibility with lensing likelihoods of 2013 and before
# (then, clik.get_lmax() just returns an integer for lensing likelihoods;
# this behavior was for clik versions < 10)
except:
self.l_max = self.clik.get_lmax()
else:
self.clik = clik.clik(self.path_clik)
self.l_max = max(self.clik.get_lmax())
except clik.lkl.CError:
raise io_mp.LikelihoodError(
"The path to the .clik file for the likelihood "
"%s was not found where indicated." % self.name +
" Note that the default path to search for it is"
" one directory above the path['clik'] field. You"
" can change this behaviour in all the "
"Planck_something.data, to reflect your local configuration, "
"or alternatively, move your .clik files to this place.")
except KeyError:
raise io_mp.LikelihoodError(
"In the %s.data file, the field 'clik' of the " % self.name +
"path dictionary is expected to be defined. Please make sure"
" it is the case in you configuration file")
self.need_cosmo_arguments(
data, {'l_max_scalars': self.l_max})
self.nuisance = list(self.clik.extra_parameter_names)
# line added to deal with a bug in planck likelihood release: A_planck called A_Planck in plik_lite
if (self.name == 'Planck_highl_lite'):
for i in range(len(self.nuisance)):
if (self.nuisance[i] == 'A_Planck'):
self.nuisance[i] = 'A_planck'
print "In %s, MontePython corrected nuisance parameter name A_Planck to A_planck" % self.name
# testing if the nuisance parameters are defined. If there is at least
# one non defined, raise an exception.
exit_flag = False
nuisance_parameter_names = data.get_mcmc_parameters(['nuisance'])
for nuisance in self.nuisance:
if nuisance not in nuisance_parameter_names:
exit_flag = True
print '%20s\tmust be a fixed or varying nuisance parameter' % nuisance
if exit_flag:
raise io_mp.LikelihoodError(
"The likelihood %s " % self.name +
"expected some nuisance parameters that were not provided")
# deal with nuisance parameters
try:
self.use_nuisance
except:
self.use_nuisance = []
# Add in use_nuisance all the parameters that have non-flat prior
for nuisance in self.nuisance:
if hasattr(self, '%s_prior_center' % nuisance):
self.use_nuisance.append(nuisance)
[docs] def loglkl(self, cosmo, data):
nuisance_parameter_names = data.get_mcmc_parameters(['nuisance'])
# get Cl's from the cosmological code
cl = self.get_cl(cosmo)
# testing for lensing
if self.lensing:
try:
length = len(self.clik.get_lmax())
tot = np.zeros(
np.sum(self.clik.get_lmax()) + length +
len(self.clik.get_extra_parameter_names()))
# following 3 lines for compatibility with lensing likelihoods of 2013 and before
# (then, clik.get_lmax() just returns an integer for lensing likelihoods,
# and the length is always 2 for cl['pp'], cl['tt'])
except:
length = 2
tot = np.zeros(2*self.l_max+length + len(self.clik.get_extra_parameter_names()))
else:
length = len(self.clik.get_has_cl())
tot = np.zeros(
np.sum(self.clik.get_lmax()) + length +
len(self.clik.get_extra_parameter_names()))
# fill with Cl's
index = 0
if not self.lensing:
for i in range(length):
if (self.clik.get_lmax()[i] > -1):
for j in range(self.clik.get_lmax()[i]+1):
if (i == 0):
tot[index+j] = cl['tt'][j]
if (i == 1):
tot[index+j] = cl['ee'][j]
if (i == 2):
tot[index+j] = cl['bb'][j]
if (i == 3):
tot[index+j] = cl['te'][j]
if (i == 4):
tot[index+j] = 0 #cl['tb'][j] class does not compute tb
if (i == 5):
tot[index+j] = 0 #cl['eb'][j] class does not compute eb
index += self.clik.get_lmax()[i]+1
else:
try:
for i in range(length):
if (self.clik.get_lmax()[i] > -1):
for j in range(self.clik.get_lmax()[i]+1):
if (i == 0):
tot[index+j] = cl['pp'][j]
if (i == 1):
tot[index+j] = cl['tt'][j]
if (i == 2):
tot[index+j] = cl['ee'][j]
if (i == 3):
tot[index+j] = cl['bb'][j]
if (i == 4):
tot[index+j] = cl['te'][j]
if (i == 5):
tot[index+j] = 0 #cl['tb'][j] class does not compute tb
if (i == 6):
tot[index+j] = 0 #cl['eb'][j] class does not compute eb
index += self.clik.get_lmax()[i]+1
# following 8 lines for compatibility with lensing likelihoods of 2013 and before
# (then, clik.get_lmax() just returns an integer for lensing likelihoods,
# and the length is always 2 for cl['pp'], cl['tt'])
except:
for i in range(length):
for j in range(self.l_max):
if (i == 0):
tot[index+j] = cl['pp'][j]
if (i == 1):
tot[index+j] = cl['tt'][j]
index += self.l_max+1
# fill with nuisance parameters
for nuisance in self.clik.get_extra_parameter_names():
# line added to deal with a bug in planck likelihood release: A_planck called A_Planck in plik_lite
if (self.name == 'Planck_highl_lite'):
if nuisance == 'A_Planck':
nuisance = 'A_planck'
if nuisance in nuisance_parameter_names:
nuisance_value = data.mcmc_parameters[nuisance]['current'] *\
data.mcmc_parameters[nuisance]['scale']
else:
raise io_mp.LikelihoodError(
"the likelihood needs a parameter %s. " % nuisance +
"You must pass it through the input file " +
"(as a free nuisance parameter or a fixed parameter)")
#print "found one nuisance with name",nuisance
tot[index] = nuisance_value
index += 1
# compute likelihood
#print "lkl:",self.clik(tot)
lkl = self.clik(tot)[0]
# add prior on nuisance parameters
lkl = self.add_nuisance_prior(lkl, data)
return lkl
###################################
# MOCK CMB TYPE LIKELIHOOD
# --> mock planck, cmbpol, etc.
###################################
[docs]class Likelihood_mock_cmb(Likelihood):
def __init__(self, path, data, command_line):
Likelihood.__init__(self, path, data, command_line)
self.need_cosmo_arguments(
data, {'lensing': 'yes', 'output': 'tCl lCl pCl'})
################
# Noise spectrum
################
# convert arcmin to radians
self.theta_fwhm *= np.array([math.pi/60/180])
self.sigma_T *= np.array([math.pi/60/180])
self.sigma_P *= np.array([math.pi/60/180])
# compute noise in muK**2
self.noise_T = np.zeros(self.l_max+1, 'float64')
self.noise_P = np.zeros(self.l_max+1, 'float64')
for l in range(self.l_min, self.l_max+1):
self.noise_T[l] = 0
self.noise_P[l] = 0
for channel in range(self.num_channels):
self.noise_T[l] += self.sigma_T[channel]**-2 *\
math.exp(
-l*(l+1)*self.theta_fwhm[channel]**2/8/math.log(2))
self.noise_P[l] += self.sigma_P[channel]**-2 *\
math.exp(
-l*(l+1)*self.theta_fwhm[channel]**2/8/math.log(2))
self.noise_T[l] = 1/self.noise_T[l]
self.noise_P[l] = 1/self.noise_P[l]
# impose that the cosmological code computes Cl's up to maximum l
# needed by the window function
self.need_cosmo_arguments(data, {'l_max_scalars': self.l_max})
###########
# Read data
###########
# If the file exists, initialize the fiducial values
self.Cl_fid = np.zeros((3, self.l_max+1), 'float64')
self.fid_values_exist = False
if os.path.exists(os.path.join(
self.data_directory, self.fiducial_file)):
self.fid_values_exist = True
fid_file = open(os.path.join(
self.data_directory, self.fiducial_file), 'r')
line = fid_file.readline()
while line.find('#') != -1:
line = fid_file.readline()
while (line.find('\n') != -1 and len(line) == 1):
line = fid_file.readline()
for l in range(self.l_min, self.l_max+1):
ll = int(line.split()[0])
self.Cl_fid[0, ll] = float(line.split()[1])
self.Cl_fid[1, ll] = float(line.split()[2])
self.Cl_fid[2, ll] = float(line.split()[3])
line = fid_file.readline()
# Else the file will be created in the loglkl() function.
# end of initialisation
return
[docs] def loglkl(self, cosmo, data):
# get Cl's from the cosmological code (returned in muK**2 units)
cl = self.get_cl(cosmo)
# get likelihood
lkl = self.compute_lkl(cl, cosmo, data)
return lkl
[docs] def compute_lkl(self, cl, cosmo, data):
# Write fiducial model spectra if needed (return an imaginary number in
# that case)
if self.fid_values_exist is False:
# Store the values now.
fid_file = open(os.path.join(
self.data_directory, self.fiducial_file), 'w')
fid_file.write('# Fiducial parameters')
for key, value in data.mcmc_parameters.iteritems():
fid_file.write(', %s = %.5g' % (
key, value['current']*value['scale']))
fid_file.write('\n')
for l in range(self.l_min, self.l_max+1):
fid_file.write("%5d " % l)
fid_file.write("%.8g " % (cl['tt'][l]+self.noise_T[l]))
fid_file.write("%.8g " % (cl['ee'][l]+self.noise_P[l]))
fid_file.write("%.8g " % cl['te'][l])
fid_file.write("\n")
print '\n\n'
warnings.warn(
"Writing fiducial model in %s, for %s likelihood" % (
self.data_directory+self.fiducial_file, self.name))
return 1j
# compute likelihood
chi2 = 0
Cov_obs = np.zeros((2, 2), 'float64')
Cov_the = np.zeros((2, 2), 'float64')
Cov_mix = np.zeros((2, 2), 'float64')
for l in range(self.l_min, self.l_max+1):
Cov_obs = np.array([
[self.Cl_fid[0, l], self.Cl_fid[2, l]],
[self.Cl_fid[2, l], self.Cl_fid[1, l]]])
Cov_the = np.array([
[cl['tt'][l]+self.noise_T[l], cl['te'][l]],
[cl['te'][l], cl['ee'][l]+self.noise_P[l]]])
det_obs = np.linalg.det(Cov_obs)
det_the = np.linalg.det(Cov_the)
det_mix = 0.
for i in range(2):
Cov_mix = np.copy(Cov_the)
Cov_mix[:, i] = Cov_obs[:, i]
det_mix += np.linalg.det(Cov_mix)
chi2 += (2.*l+1.)*self.f_sky *\
(det_mix/det_the + math.log(det_the/det_obs) - 2)
return -chi2/2
###################################
# MPK TYPE LIKELIHOOD
# --> sdss, wigglez, etc.
###################################
[docs]class Likelihood_mpk(Likelihood):
def __init__(self, path, data, command_line, common=False, common_dict={}):
Likelihood.__init__(self, path, data, command_line)
# require P(k) from class
self.need_cosmo_arguments(data, {'output': 'mPk'})
if common:
self.add_common_knowledge(common_dict)
try:
self.use_halofit
except:
self.use_halofit = False
if self.use_halofit:
self.need_cosmo_arguments(data, {'non linear': 'halofit'})
# read values of k (in h/Mpc)
self.k_size = self.max_mpk_kbands_use-self.min_mpk_kbands_use+1
self.mu_size = 1
self.k = np.zeros((self.k_size), 'float64')
self.kh = np.zeros((self.k_size), 'float64')
datafile = open(self.data_directory+self.kbands_file, 'r')
for i in range(self.num_mpk_kbands_full):
line = datafile.readline()
if i+2 > self.min_mpk_kbands_use and i < self.max_mpk_kbands_use:
self.kh[i-self.min_mpk_kbands_use+1] = float(line.split()[0])
datafile.close()
khmax = self.kh[-1]
# check if need hight value of k for giggleZ
try:
self.use_giggleZ
except:
self.use_giggleZ = False
# Try a new model, with an additional nuisance parameter. Note
# that the flag use_giggleZPP0 being True requires use_giggleZ
# to be True as well. Note also that it is defined globally,
# and not for every redshift bin.
if self.use_giggleZ:
try:
self.use_giggleZPP0
except:
self.use_giggleZPP0 = False
else:
self.use_giggleZPP0 = False
# If the flag use_giggleZPP0 is set to True, the nuisance parameters
# P0_a, P0_b, P0_c and P0_d are expected.
if self.use_giggleZPP0:
if 'P0_a' not in data.get_mcmc_parameters(['nuisance']):
raise io_mp.LikelihoodError(
"In likelihood %s. " % self.name +
"P0_a is not defined in the .param file, whereas this " +
"nuisance parameter is required when the flag " +
"'use_giggleZPP0' is set to true for WiggleZ")
if self.use_giggleZ:
datafile = open(self.data_directory+self.giggleZ_fidpk_file, 'r')
line = datafile.readline()
k = float(line.split()[0])
line_number = 1
while (k < self.kh[0]):
line = datafile.readline()
k = float(line.split()[0])
line_number += 1
ifid_discard = line_number-2
while (k < khmax):
line = datafile.readline()
k = float(line.split()[0])
line_number += 1
datafile.close()
self.k_fid_size = line_number-ifid_discard+1
khmax = k
if self.use_halofit:
khmax *= 2
# require k_max and z_max from the cosmological module
self.need_cosmo_arguments(
data, {'P_k_max_h/Mpc': khmax, 'z_max_pk': self.redshift})
# read information on different regions in the sky
try:
self.has_regions
except:
self.has_regions = False
if (self.has_regions):
self.num_regions = len(self.used_region)
self.num_regions_used = 0
for i in range(self.num_regions):
if (self.used_region[i]):
self.num_regions_used += 1
if (self.num_regions_used == 0):
raise io_mp.LikelihoodError(
"In likelihood %s. " % self.name +
"Mpk: no regions begin used in this data set")
else:
self.num_regions = 1
self.num_regions_used = 1
self.used_region = [True]
# read window functions
self.n_size = self.max_mpk_points_use-self.min_mpk_points_use+1
self.window = np.zeros(
(self.num_regions, self.n_size, self.k_size), 'float64')
datafile = open(self.data_directory+self.windows_file, 'r')
for i_region in range(self.num_regions):
if self.num_regions > 1:
line = datafile.readline()
for i in range(self.num_mpk_points_full):
line = datafile.readline()
if (i+2 > self.min_mpk_points_use and
i < self.max_mpk_points_use):
for j in range(self.k_size):
self.window[i_region, i-self.min_mpk_points_use+1, j]=\
float(line.split()[j+self.min_mpk_kbands_use-1])
datafile.close()
# read measurements
self.P_obs = np.zeros((self.num_regions, self.n_size), 'float64')
self.P_err = np.zeros((self.num_regions, self.n_size), 'float64')
datafile = open(self.data_directory+self.measurements_file, 'r')
for i_region in range(self.num_regions):
for i in range(2):
line = datafile.readline()
for i in range(self.num_mpk_points_full):
line = datafile.readline()
if (i+2 > self.min_mpk_points_use and
i < self.max_mpk_points_use):
self.P_obs[i_region, i-self.min_mpk_points_use+1] = \
float(line.split()[3])
self.P_err[i_region, i-self.min_mpk_points_use+1] = \
float(line.split()[4])
datafile.close()
# read covariance matrices
try:
self.covmat_file
self.use_covmat = True
except:
self.use_covmat = False
self.invcov = np.zeros(
(self.num_regions, self.n_size, self.n_size), 'float64')
if self.use_covmat:
cov = np.zeros((self.n_size, self.n_size), 'float64')
invcov_tmp = np.zeros((self.n_size, self.n_size), 'float64')
datafile = open(self.data_directory+self.covmat_file, 'r')
for i_region in range(self.num_regions):
for i in range(1):
line = datafile.readline()
for i in range(self.num_mpk_points_full):
line = datafile.readline()
if (i+2 > self.min_mpk_points_use and
i < self.max_mpk_points_use):
for j in range(self.num_mpk_points_full):
if (j+2 > self.min_mpk_points_use and
j < self.max_mpk_points_use):
cov[i-self.min_mpk_points_use+1,
j-self.min_mpk_points_use+1] =\
float(line.split()[j])
invcov_tmp = np.linalg.inv(cov)
for i in range(self.n_size):
for j in range(self.n_size):
self.invcov[i_region, i, j] = invcov_tmp[i, j]
datafile.close()
else:
for i_region in range(self.num_regions):
for j in range(self.n_size):
self.invcov[i_region, j, j] = \
1./(self.P_err[i_region, j]**2)
# read fiducial model
if self.use_giggleZ:
self.P_fid = np.zeros((self.k_fid_size), 'float64')
self.k_fid = np.zeros((self.k_fid_size), 'float64')
datafile = open(self.data_directory+self.giggleZ_fidpk_file, 'r')
for i in range(ifid_discard):
line = datafile.readline()
for i in range(self.k_fid_size):
line = datafile.readline()
self.k_fid[i] = float(line.split()[0])
self.P_fid[i] = float(line.split()[1])
datafile.close()
return
[docs] def add_common_knowledge(self, common_dictionary):
"""
Add to a class the content of a shared dictionary of attributes
The purpose of this method is to set some attributes globally for a Pk
likelihood, that are shared amongst all the redshift bins (in
WiggleZ.data for instance, a few flags and numbers are defined that
will be transfered to wigglez_a, b, c and d
"""
for key, value in common_dictionary.iteritems():
# First, check if the parameter exists already
try:
exec("self.%s" % key)
warnings.warn(
"parameter %s from likelihood %s will be replaced by " +
"the common knowledge routine" % (key, self.name))
except:
if type(value) != type('foo'):
exec("self.%s = %s" % (key, value))
else:
exec("self.%s = '%s'" % (key, value))
# compute likelihood
[docs] def loglkl(self, cosmo, data):
# reduced Hubble parameter
h = cosmo.h()
# WiggleZ specific
if self.use_scaling:
# angular diameter distance at this redshift, in Mpc
d_angular = cosmo.angular_distance(self.redshift)
# radial distance at this redshift, in Mpc, is simply 1/H (itself
# in Mpc^-1). Hz is an array, with only one element.
r, Hz = cosmo.z_of_r([self.redshift])
d_radial = 1/Hz[0]
# scaling factor = (d_angular**2 * d_radial)^(1/3) for the
# fiducial cosmology used in the data files of the observations
# divided by the same quantity for the cosmology we are comparing with.
# The fiducial values are stored in the .data files for
# each experiment, and are truly in Mpc. Beware for a potential
# difference with CAMB conventions here.
scaling = pow(
(self.d_angular_fid/d_angular)**2 *
(self.d_radial_fid/d_radial), 1./3.)
else:
scaling = 1
# get rescaled values of k in 1/Mpc
self.k = self.kh*h*scaling
# get P(k) at right values of k, convert it to (Mpc/h)^3 and rescale it
P_lin = np.zeros((self.k_size), 'float64')
# If the flag use_giggleZ is set to True, the power spectrum retrieved
# from Class will get rescaled by the fiducial power spectrum given by
# the GiggleZ N-body simulations CITE
if self.use_giggleZ:
P = np.zeros((self.k_fid_size), 'float64')
for i in range(self.k_fid_size):
P[i] = cosmo.pk(self.k_fid[i]*h, self.redshift)
power = 0
# The following create a polynome in k, which coefficients are
# stored in the .data files of the experiments.
for j in range(6):
power += self.giggleZ_fidpoly[j]*self.k_fid[i]**j
# rescale P by fiducial model and get it in (Mpc/h)**3
P[i] *= pow(10, power)*(h/scaling)**3/self.P_fid[i]
if self.use_giggleZPP0:
# Shot noise parameter addition to GiggleZ model. It should
# recover the proper nuisance parameter, depending on the name.
# I.e., Wigglez_A should recover P0_a, etc...
tag = self.name[-2:] # circle over "_a", "_b", etc...
P0_value = data.mcmc_parameters['P0'+tag]['current'] *\
data.mcmc_parameters['P0'+tag]['scale']
P_lin = np.interp(self.kh,self.k_fid,P+P0_value)
else:
# get P_lin by interpolation. It is still in (Mpc/h)**3
P_lin = np.interp(self.kh, self.k_fid, P)
else:
# get rescaled values of k in 1/Mpc
self.k = self.kh*h*scaling
# get values of P(k) in Mpc**3
for i in range(self.k_size):
P_lin[i] = cosmo.pk(self.k[i], self.redshift)
# get rescaled values of P(k) in (Mpc/h)**3
P_lin *= (h/scaling)**3
W_P_th = np.zeros((self.n_size), 'float64')
# starting analytic marginalisation over bias
# Define quantities living in all the regions possible. If only a few
# regions are selected in the .data file, many elements from these
# arrays will stay at 0.
P_data_large = np.zeros(
(self.n_size*self.num_regions_used), 'float64')
W_P_th_large = np.zeros(
(self.n_size*self.num_regions_used), 'float64')
cov_dat_large = np.zeros(
(self.n_size*self.num_regions_used), 'float64')
cov_th_large = np.zeros(
(self.n_size*self.num_regions_used), 'float64')
normV = 0
# infer P_th from P_lin. It is still in (Mpc/h)**3. TODO why was it
# called P_lin in the first place ? Couldn't we use now P_th all the
# way ?
P_th = P_lin
# Loop over all the available regions
for i_region in range(self.num_regions):
# In each region that was selected with the array of flags
# self.used_region, define boundaries indices, and fill in the
# corresponding windowed power spectrum. All the unused regions
# will still be set to zero as from the initialization, which will
# not contribute anything in the final sum.
if self.used_region[i_region]:
imin = i_region*self.n_size
imax = (i_region+1)*self.n_size-1
W_P_th = np.dot(self.window[i_region, :], P_th)
for i in range(self.n_size):
P_data_large[imin+i] = self.P_obs[i_region, i]
W_P_th_large[imin+i] = W_P_th[i]
cov_dat_large[imin+i] = np.dot(
self.invcov[i_region, i, :],
self.P_obs[i_region, :])
cov_th_large[imin+i] = np.dot(
self.invcov[i_region, i, :],
W_P_th[:])
# Explain what it is TODO
normV += np.dot(W_P_th_large, cov_th_large)
# Sort of bias TODO ?
b_out = np.sum(W_P_th_large*cov_dat_large) / \
np.sum(W_P_th_large*cov_th_large)
# Explain this formula better, link to article ?
chisq = np.dot(P_data_large, cov_dat_large) - \
np.dot(W_P_th_large, cov_dat_large)**2/normV
return -chisq/2
[docs]class Likelihood_sn(Likelihood):
def __init__(self, path, data, command_line):
Likelihood.__init__(self, path, data, command_line)
# try and import pandas
try:
import pandas
except ImportError:
raise io_mp.MissingLibraryError(
"This likelihood has a lot of IO manipulation. You have "
"to install the 'pandas' library to use it. Please type:\n"
"`(sudo) pip install pandas --user`")
# check that every conflicting experiments is not present in the list
# of tested experiments, in which case, complain
if hasattr(self, 'conflicting_experiments'):
for conflict in self.conflicting_experiments:
if conflict in data.experiments:
raise io_mp.LikelihoodError(
'conflicting %s measurements, you can ' % conflict +
' have either %s or %s ' % (self.name, conflict) +
'as an experiment, not both')
# Read the configuration file, supposed to be called self.settings.
# Note that we unfortunately can not
# immediatly execute the file, as it is not formatted as strings.
assert hasattr(self, 'settings') is True, (
"You need to provide a settings file")
self.read_configuration_file()
[docs] def read_configuration_file(self):
"""
Extract Python variables from the configuration file
This routine performs the equivalent to the program "inih" used in the
original c++ library.
"""
settings_path = os.path.join(self.data_directory, self.settings)
with open(settings_path, 'r') as config:
for line in config:
# Dismiss empty lines and commented lines
if line and line.find('#') == -1:
lhs, rhs = [elem.strip() for elem in line.split('=')]
# lhs will always be a string, so set the attribute to this
# likelihood. The right hand side requires more work.
# First case, if set to T or F for True or False
if str(rhs) in ['T', 'F']:
rhs = True if str(rhs) == 'T' else False
# It can also be a path, starting with 'data/'. We remove
# this leading folder path
elif str(rhs).find('data/') != -1:
rhs = rhs.replace('data/', '')
else:
# Try to convert it to a float
try:
rhs = float(rhs)
# If it fails, it is a string
except ValueError:
rhs = str(rhs)
# Set finally rhs to be a parameter of the class
setattr(self, lhs, rhs)
[docs] def read_matrix(self, path):
"""
extract the matrix from the path
This routine uses the blazing fast pandas library (0.10 seconds to load
a 740x740 matrix). If not installed, it uses a custom routine that is
twice as slow (but still 4 times faster than the straightforward
numpy.loadtxt method)
.. note::
the length of the matrix is stored on the first line... then it has
to be unwrapped. The pandas routine read_table understands this
immediatly, though.
"""
from pandas import read_table
path = os.path.join(self.data_directory, path)
# The first line should contain the length.
with open(path, 'r') as text:
length = int(text.readline())
# Note that this function does not require to skiprows, as it
# understands the convention of writing the length in the first
# line
matrix = read_table(path).as_matrix().reshape((length, length))
return matrix
[docs] def read_light_curve_parameters(self):
"""
Read the file jla_lcparams.txt containing the SN data
.. note::
the length of the resulting array should be equal to the length of
the covariance matrices stored in C00, etc...
"""
from pandas import read_table
path = os.path.join(self.data_directory, self.data_file)
# Recover the names of the columns. The names '3rdvar' and 'd3rdvar'
# will be changed, because 3rdvar is not a valid variable name
with open(path, 'r') as text:
clean_first_line = text.readline()[1:].strip()
names = [e.strip().replace('3rd', 'third')
for e in clean_first_line.split()]
lc_parameters = read_table(
path, sep=' ', names=names, header=0, index_col=False)
return lc_parameters
[docs]class Likelihood_clocks(Likelihood):
"""Base implementation of H(z) measurements"""
def __init__(self, path, data, command_line):
Likelihood.__init__(self, path, data, command_line)
# Read the content of the data file, containing z, Hz and error
total = np.loadtxt(
os.path.join(self.data_directory, self.data_file))
# Store the columns separately
self.z = total[:, 0]
self.Hz = total[:, 1]
self.err = total[:, 2]
[docs] def loglkl(self, cosmo, data):
# Store the speed of light in km/s
c_light_km_per_sec = const.c/1000.
chi2 = 0
# Loop over the redshifts
for index, z in enumerate(self.z):
# Query the cosmo module for the Hubble rate (in 1/Mpc), and
# convert it to km/s/Mpc
H_cosmo = cosmo.Hubble(z)*c_light_km_per_sec
# Add to the tota chi2
chi2 += (self.Hz[index]-H_cosmo)**2/self.err[index]**2
return -0.5 * chi2