Source code for gbm.spectra.fitting

# fitting: Classes and functions for MLE of count spectra
#
#     Authors: William Cleveland (USRA),
#              Adam Goldstein (USRA) and
#              Daniel Kocevski (NASA)
#
#     Portions of the code are Copyright 2020 William Cleveland and
#     Adam Goldstein, Universities Space Research Association
#     All rights reserved.
#
#     Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
import warnings
from datetime import datetime

import numpy as np
from numpy.random import multivariate_normal
from scipy.linalg import inv
from scipy.misc import derivative
from scipy.optimize import minimize, brentq
from scipy.stats import chi2

from ..background import BackgroundRates, BackgroundSpectrum
from ..data.primitives import EnergyBins


[docs]class SpectralFitter: """Base class for jointly-fitting spectra from multiple detectors. This is a (large) wrapper around `scipy.optimize.minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_. This class should not be instantiated by the user, but instead a child class for a specific fit statistic should be instantiated. Child classes should be designed with the following requirements: 1. An ``__init__`` method that takes in the inputs in the Parent ``__init__`` method. Can have additional arguments/keywords. 2. A ``_fold_model`` method that accepts the model function and the list of parameters. It must return a single number that represents the likelihood for a single detector/dataset. Parameters: pha_list (list of :class:`~gbm.data.PHA`): The PHA objects containg the count spectrum for each detector bkgd_list (list of :class:`~gbm.background.BackgroundRates` or \ list of :class:`~gbm.background.BackgroundSpectrum`): The background rates object for each detector or background spectrum for each detector. If given the background rates object, the times in the corresponding PHA object will be used for the limits of integration. rsp_list (list of :class:`~gbm.data.RSP`): The response object for each detector statistic (<function>): The function to be used as a fit statistic channel_masks (list of np.array(dtype=bool), optional): A boolean mask for each detector that indicates which energy channels are to be fit. Note: The channel_mask will typically be set by the :attr:`valid_channels` attribute in the PHA object. You can set channel_masks to override the PHA :attr:`valid_channels`, but this is usually unnecessary. method (str, optional): The fitting algorithm, which should be one of the options for scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two algorithms that provide support for fitting with bounds and constraints. Other algorithms will ignore bounds and constraints. Attributes: covariance (np.array): The covariance matrix of the fitted parameters. Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the covariance matrix may not be valid. detectors (list of str): The detectors used in the fit dof (int): The number of degrees of freedom energy_range (float, float): The full energy range of the fit function_components (list of str): The names of the individual model components, if appropriate function_name (str): The name of the model function being fit hessian (np.array): The Hessian matrix of the fitted parameters, for which the covariance matrix is -inverse(Hessian). Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the Hessian matrix may not be valid. jacobian (np.array): The Jacobian vector of the likelihood as a function of the parameters. message (str): The message from the fitter on convergence/non-convergence num_components (int): Number of function components num_sets (int): Number of datasets/detectors being fit parameters (np.array): The parameter values at the maximum likelihood statistic (float): The fit statistic (likelihood) success (bool): True if the fit was successful, False otherwise. symmetric_errors (np.array): The 1-sigma uncertainty on the parameters assuming higher-order terms of the Taylor expansion of the likelihood about the maximum likelihood solution is negligible. If those terms are not negligible, errors could be very incorrect or event NaN. See attributes :attr:`covariance` and :attr:`hessian`. """ def __init__(self, pha_list, bkgd_list, rsp_list, statistic, channel_masks=None, method='SLSQP'): # check that we have the right numbers of data, backgrounds, responses, # and fit masks self._num_sets = len(pha_list) if (len(bkgd_list) != self._num_sets) or ( len(rsp_list) != self._num_sets): raise ValueError('Number of datasets, backgrounds, and responses ' \ 'must be the same') # set the energy channel masks if channel_masks is not None: if len(channel_masks) != self._num_sets: raise ValueError('If channel_masks is set, must be equal to the \ number of datasets') # store response, exposure, and channel masks self._rsp = rsp_list self._exposure = np.array([pha.exposure for pha in pha_list]) if channel_masks is None: channel_masks = [] for pha in pha_list: chan_mask = np.zeros(pha.numchans, dtype=bool) chan_mask[pha.valid_channels] = True channel_masks.append(chan_mask) self._chan_masks = channel_masks # extract observed counts and apply channel mask self._data = self._apply_masks([pha.data.counts for pha in pha_list]) # extract background rates and variances and apply channel masks self._back_rates = [] self._back_var = [] for bkgd, pha in zip(bkgd_list, pha_list): if isinstance(bkgd, BackgroundRates): bkgd_spec = bkgd.integrate_time(*pha.time_range) elif isinstance(bkgd, BackgroundSpectrum): bkgd_spec = bkgd else: raise ValueError('Unknown Background object') self._back_rates.append(bkgd_spec.rates) self._back_var.append(bkgd_spec.rate_uncertainty ** 2) self._back_rates = self._apply_masks(self._back_rates) self._back_var = self._apply_masks(self._back_var) # fitter/function info self._stat = statistic self._method = method self._function = None self._res = None @property def statistic(self): if self._res is not None: return self._res.fun @property def dof(self): free_params = np.sum(self._function.free) datapts = np.sum([chan_mask.sum() for chan_mask in self._chan_masks]) return datapts - free_params @property def function_name(self): if self._function is not None: return self._function.name @property def function_components(self): if self._function is not None: try: return self._function.names except: pass @property def num_components(self): if self._function is not None: return self._function.num_components @property def detectors(self): return [rsp.detector for rsp in self._rsp] @property def energy_range(self): emin = np.concatenate( self._apply_masks([rsp.ebounds['E_MIN'] for rsp in self._rsp])) emax = np.concatenate( self._apply_masks([rsp.ebounds['E_MAX'] for rsp in self._rsp])) return (np.min(emin), np.max(emax)) @property def num_sets(self): return self._num_sets @property def success(self): if self._res is not None: return self._res.success @property def message(self): if self._res is not None: return self._res.message @property def parameters(self): if self._res is not None: return self._res.x @property def jacobian(self): if self._res is not None: return self._res.jac @property def hessian(self): if self._res is not None: return self._hessian(self.parameters, self._function) @property def covariance(self): if self._res is not None: cov = inv(-self.hessian) id = np.identity(self.parameters.size, dtype=bool) return cov @property def symmetric_errors(self): if self._res is not None: id = np.identity(self.parameters.size, dtype=bool) return np.sqrt(np.abs(self.covariance[id]))
[docs] @classmethod def load(cls, filename): """Loads a fitter object that has been saved to disk in a compressed numpy file. Thanks to the cool properties of the numpy format, this method will load the complete state of the fitter when it was saved. Args: filename (str): The complete filename of the saved file """ loaded = np.load(filename, allow_pickle=True) obj = loaded['obj'].reshape(-1)[0] print('Fit loaded from {}'.format(loaded['time'])) return obj
[docs] def save(self, filename): """Saves the fitter object to a compressed binary numpy file. The full state of the fitter is serialized and saved, and it can be restored using the :meth:`load` method. Args: filename (str): The complete filename to save the fitter object to """ now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') np.savez_compressed(filename, time=now, obj=self)
[docs] def fit(self, function, **kwargs): """Fit the given the function to the data Args: function (:class:`~.functions.Function`): The function object to use **kwargs: Keyword arguments passed to the fitter """ self._function = function # treat the fixed parameters init_params = np.array(function.default_values)[function.free].tolist() # do the fit, also returns the jacobian res = minimize(self._eval_stat, init_params, args=(function,), method=self._method, bounds=function.parameter_bounds(), jac=True, **kwargs) self._res = res
[docs] def asymmetric_errors(self, cl=0.683): """Calculate the asymmetric uncertainty on each fitted parameter. This function uses `scipy.optimize.brentq <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.brentq.html>`_ to find the root of: crit = -2(LL_max-LL), where *crit* is the chi-square critical value defined by the number of free parameters and the confidence level, and *LL_max* is the maximum log-likelihood. The brentq function uses a bracketing range to find the roots, and this function uses the best-fit values of the parameters for one side of the bracket and the min/max values defined in the fitted function for the other side of the bracket. Args: cl (float, optional): The confidence level of the uncertainties. Default is 0.683 (1 sigma). Returns: np.array: A (``nparams``, 2) array where each row reprensents the \ (negative, positive) uncertainty on each parameter. """ # must have a fit if self._res is None: raise RuntimeError('Fit has not been performed') # calculate the critical value nparams = self.parameters.size crit = chi2.ppf(cl, nparams) # this is the objective function for which we are finding the root def the_func(param, index): params = self.parameters.copy() params[index] = param test_like = self._fold_model(self._function.fit_eval, params) delta = 2.0 * (-self._res.fun - test_like) return delta - crit # Find the lower and upper bound of the uncertainty. Here, we use # Brent's Method to determine the bounds, which requires a bracketing # range of the solution. One side of the bracket is the maximum # likelihood value. If the function parameter has a finite support, # the corresponding end of the support is used for the other side of the # bracket, otherwise, the other side of the bracket is iteratively # decreased (increased) until the lower (upper) bound is constrained # by Brent's Method. This has to be done because we don't know a priori # how far away from maximum likelihood the bound is. # lower bound root1 = np.zeros(nparams) for i in range(nparams): param = self.parameters[i] while (True): if self._function.min_values[i] == -np.inf: if param < 0.0: minval = 2.0 * param elif param > 0.1: minval = 0.5 * param else: minval = param - 1.0 else: minval = self._function.min_values[i] # if we're at the bounds, we can't bracket the peak try: r, o = brentq(the_func, self.parameters[i], minval, args=(i,), full_output=True, maxiter=1000) break except ValueError: if minval == self._function.min_values[i]: warnings.warn("Parameter exists at its lower bound") r = minval break param = minval root1[i] = r # upper bound root2 = np.zeros(nparams) for i in range(nparams): param = self.parameters[i] while (True): if self._function.max_values[i] == np.inf: if param < -0.1: maxval = 0.5 * param elif param > 0.0: maxval = 2.0 * param else: maxval = 1.0 + param else: maxval = self._function.max_values[i] try: r, o = brentq(the_func, self.parameters[i], maxval, args=(i,), full_output=True, maxiter=1000) break except ValueError: # if we're at the bounds, we can't bracket the peak if maxval == self._function.max_values[i]: warnings.warn("Parameter exists at its upper bound") r = maxval break param = maxval root2[i] = r # convert the bounds to -/+ uncertainties errs = np.zeros((nparams, 2)) errs[:, 0] = np.abs(self.parameters - root1) errs[:, 1] = np.abs(self.parameters - root2) return errs
[docs] def residuals(self, sigma=True): """The fit residuals for each dataset in units of differential counts or model sigma Args: sigma (bool, optional): If True, return in units of model sigma, otherwise return in units of differential counts. Default is True. Returns: tuple: A 4-tuple containing: - *list of np.array*: Energy centroids - *list of np.array*: Energy channel widths - *list of np.array*: Fit residuals - *list of np.array*: Residual uncertainties """ chanwidths = self._apply_masks( [rsp.channel_widths for rsp in self._rsp]) # calculate the count spectrum model model = self.model_count_spectrum() # residuals are the differential model counts subtracted from the # differential source counts above background resid = [] for i in range(self.num_sets): back_rates = self._back_rates[i] / chanwidths[i] rates = self._data[i] / (self._exposure[i] * chanwidths[i]) resid.append((rates - back_rates) - model[i].rates) # can calculate the residuals as a function of the model uncertainty model_var = self.model_variance() if sigma: resid = [resid[i] / np.sqrt(model_var[i]) for i in range(self.num_sets)] resid_err = np.ones_like(resid) else: resid_err = [np.sqrt(var) for var in model_var] # the energy centroids and widths energy = self._apply_masks( [rsp.channel_centroids for rsp in self._rsp]) ewidths = [chanwidth / 2.0 for chanwidth in chanwidths] return (energy, ewidths, resid, resid_err)
[docs] def model_variance(self): """The differential source model variance, accounting for the data variance and background model variance. Returns: list of np.array: The model variance for each dataset. """ chanwidths = self._apply_masks( [rsp.channel_widths for rsp in self._rsp]) # model variance: need positive data counts, background rate and # variance and exposure mvar = [] for i in range(self.num_sets): counts = self._rsp[i].fold_spectrum(self._function.fit_eval, self.parameters) counts = counts[self._chan_masks[i]] model_rate = counts / (self._exposure[i]) model_rate[counts < 0.0] = 0.0 mvar.append(self._back_var[i] + \ (model_rate + self._back_rates[i]) / \ (np.abs(self._exposure[i]) * chanwidths[i])) return mvar
[docs] def model_count_spectrum(self): """The fitted model count spectrum Returns: list of :class:`~gbm.data.primitives.EnergyBins`: An EnergyBins \ object for each dataset """ if self._res is None: raise RuntimeError('Fit has not been performed') ebins = [] for i in range(self.num_sets): rates = self._rsp[i].fold_spectrum(self._function.fit_eval, self.parameters) counts = rates * self._exposure[i] ebin = EnergyBins(counts[self._chan_masks[i]], self._rsp[i].ebounds['E_MIN'][ self._chan_masks[i]], self._rsp[i].ebounds['E_MAX'][ self._chan_masks[i]], [self._exposure[i]] * self._chan_masks[i].sum()) ebins.append(ebin) return ebins
[docs] def data_count_spectrum(self, upper_limits_sigma=2.0): """The observed source count spectrum, which is (total-background). Data bins with source counts less than the model variance are converted to upper limits. Args: upper_limits_sigma (float, optional): The upper limits sigma. Default is 2. Returns: tuple: A 5-tuple containing: - *list of np.array*: Energy centroids - *list of np.array*: Energy channel widths - *list of np.array*: differential count spectrum - *list of np.array*: 1-sigma count spectrum errors - *list of np.array*: Upper limits boolean mask for each dataset. The upper limits mask is a boolean mask where True indicates the element is an upper limit. """ chanwidths = self._apply_masks( [rsp.channel_widths for rsp in self._rsp]) mvar = self.model_variance() # calculate the differential count spectrum and upper limit masks src_spectra = [] ulmasks = [] for i in range(self.num_sets): src_counts = (self._data[i] - self._back_rates[i] * \ self._exposure[i]) / (self._exposure[i] * \ chanwidths[i]) ulmask = src_counts < np.sqrt(mvar[i]) src_counts[ulmask] = upper_limits_sigma * np.sqrt(mvar[i])[ulmask] src_spectra.append(src_counts) ulmasks.append(ulmask) energy = self._apply_masks( [rsp.channel_centroids for rsp in self._rsp]) ewidths = [chanwidth / 2.0 for chanwidth in chanwidths] src_errs = [np.sqrt(var) for var in mvar] return (energy, ewidths, src_spectra, src_errs, ulmasks)
[docs] def spectrum(self, which, numpoints=1000, components=False): r"""The model photon, energy, or :math:`\nu F_\nu` spectrum. Args: which (str): Which spectrum to return. Either 'photon', 'energy', or 'nufnu' numpoints (int, optional): The number of grid points in energy. Default is 1000. components (bool, optional): If True, calculate the spectrum for each individual model components (if there are multiple components). Default is False. Returns: tuple: A 2-tuple containing: - *np.array*: The energy grid at which the spectrum is calculated - *np.array* or *list of np.array*: model spectrum values, or spectrum values for each component """ if self._res is None: raise RuntimeError('Fit has not been performed') grid = np.logspace(*np.log10(self.energy_range), numpoints) pmodel = self._function.fit_eval(self.parameters, grid, components=components) # multiply by energy or energy^2 if returning energy or nufnu spectrum if which == 'energy': pmodel *= grid[np.newaxis,:] elif which == 'nufnu': pmodel *= grid[np.newaxis,:]**2 else: pass return (grid, pmodel)
[docs] def sample_parameters(self, **kwargs): """Produce samples of the fitted parameters. This uses the covariance matrix and assumes a multivariate normal distribuion. Caveat Emptor. Args: **kwargs: Options (like size) to be passed to numpy.random.multivariate_normal Returns: np.array: The random parameter samples """ samples = multivariate_normal(self.parameters, self.covariance, **kwargs) return samples
[docs] def sample_spectrum(self, which, num_samples=1000, numpoints=1000, components=False): r"""Produce samples of the model photon, energy, or :math:`\nu F_\nu` spectrum. This uses the covariance matrix and assumes a multivariate normal distribuion for the parameters. Caveat Emptor. Args: which (str): Which spectrum to return. Either 'photon', 'energy', or 'nufnu' num_samples (int, optional): The number of spectrum to sample. Default is 1000. numpoints (int, optional): The number of grid points in energy. Default is 1000. components (bool, optional): If True, calculate the spectrum for each individual model components (if there are multiple components). Default is False. Returns: tuple: A 2-tuple containing: - *np.array*: The energy grid at which the spectrum is calculated - *np.array*: Array of shape (``num_samples``, ``numpoints``) if there is only one component or (``num_samples``, ``num_components``, ``numpoints``) if there are multiple components. """ params = self.sample_parameters(size=num_samples) grid = np.logspace(*np.log10(self.energy_range), numpoints) if self.num_components > 1: model = np.array([self._function.fit_eval(param, grid, components=components) \ for param in params]) else: model = np.array([self._function.fit_eval(param, grid) \ for param in params]) if which == 'energy': model *= grid[np.newaxis, :] elif which == 'nufnu': model *= grid[np.newaxis, :] ** 2 else: pass return (grid, model)
[docs] def sample_flux(self, erange, num_samples=1000, numpoints=1000, **kwargs): """Produce samples of the energy-integrated photon or energy flux. This uses the covariance matrix and assumes a multivariate normal distribuion for the parameters. Caveat Emptor. Args: erange (float, float): Which spectrum to return. Either 'photon', 'energy', or 'nufnu' num_samples (int): The number of spectra to sample. Default is 1000. numpoints (int): The number of grid points in energy. Default is 1000. **kwargs: Options to be passed to :meth:`~gbm.spectra.functions.Function.integrate`. Returns: np.array: The sampled flux """ params = self.sample_parameters(size=num_samples) flux = [ self._function.integrate(param_vec, erange, num_points=numpoints, **kwargs) for param_vec in params] return flux
def _fold_model(self, function, params): """Folds the model throught the spectrum and calculates the fit stat Note: This is an empty function in the base class, and in inherited class must define this. Args: function (:class:`~.functions.Function`): The function object to use params (list): The parameters values Returns: float: The fit statistic value """ pass def _eval_stat(self, params, function): """Evaluate the statistic (and jacobian of the statistic as a function of the model parameters). Args: params (list): The parameters values function (class:`~.functions.Function`) The function object to use Returns: (float, np.array): The fit statistic and the jacobian of the fit statistic """ stat = self._fold_model(function.fit_eval, params) jac = self._jacobian(params, function) return -stat, -jac def _jacobian(self, params, function): """Calculate the Jacobian of the fit statistic as a function of the model parameters. This is evalated numerically using finite differences. Args: params (list): The parameters values function (class:`~.functions.Function`) The function object to use Returns: np.array: The Jacobian """ num_params = len(params) grad = np.zeros(num_params) # for each parameter, do partial derivative using finite differences for i in range(num_params): params_temp = params.copy() dx = np.abs((1.0 + function.delta_rel[i]) * params[i] - params[i]) grad[i] = self._pderiv(function.fit_eval, params_temp, index=i, dx=dx) return grad def _hessian(self, params, function): """Calculate the Hessian of the fit statistic as a function of the model parameters. This is evalated numerically using finite differences. Args: params (list): The parameters values function (class:`~.functions.Function`) The function object to use Returns: np.array: The Hessian matrix """ num_params = len(params) hess = np.zeros((num_params, num_params)) for i in range(num_params): for j in range(num_params): # Hessian is symmetric, so we only have to calculate either the # upper or lower triangle if i > j: hess[i, j] = hess[j, i] continue params_temp = params.copy() dx1 = np.abs( (1.0 + function.delta_rel[i]) * params[i] - params[i]) dx2 = np.abs( (1.0 + function.delta_rel[j]) * params[j] - params[j]) hess[i, j] = self._mixed_pderiv(function.fit_eval, params_temp, i, j, dx1, dx2) return hess def _pderiv(self, function, params, index=0, **kwargs): """Partial derivative of function with respect to one of its parameters This is a wrapper around scipy.misc.derivative to allow a partial derivative to be calculated Args: function (class:`~.functions.Function`) The function object to use params (list): The parameters values index (int, optional): The index into the parameter list, representing the parameter for which the partial derivative will be calculated. Default is 0 **kwargs: Options to pass to scipy.misc.derivative (such as the ``dx`` argument) Returns: float: The partial derivative """ def wraps(x): params[index] = x the_args = [function, params] return self._fold_model(*the_args) return derivative(wraps, params[index], **kwargs) def _mixed_pderiv(self, function, params, index1, index2, dx1, dx2): """Mixed second partial derivative of function with respect to its parameters. This calls _pderiv. Args: function (class:`~.functions.Function`) The function object to use params (list): The parameters values index1 (int): The index into the parameter list, representing the parameter for which the first partial derivative will be calculated. index2 (int): The index into the parameter list, representing the parameter for which the second partial derivative will be calculated. dx1 (float): The step size for the first parameter dx2 (flot): The step size for the second parameter Returns: float: The second mixed partial derivative """ def wraps(x): params[index2] = x the_args = [function, params] return self._pderiv(*the_args, index=index1, dx=dx1) return derivative(wraps, params[index2], dx=dx2) def _apply_masks(self, a_list): """Apply the fit masks to a list of arrays Args: a_list (list of np.array): The list of arrays Returns: list of np.array: The masked array """ return [one_list[one_mask] \ for one_list, one_mask in zip(a_list, self._chan_masks)]
[docs]class SpectralFitterChisq(SpectralFitter): """Jointly-fit datasets using Chi-Squared Likelihood. Parameters: pha_list (list of :class:`~gbm.data.PHA`): The PHA objects containg the count spectrum for each detector bkgd_list (list of :class:`~gbm.background.BackgroundRates`): The background rates object for each detector rsp_list (list of :class:`~gbm.data.RSP`): The response object for each detector channel_masks (list of np.array(dtype=bool), optional): A boolean mask for each detector that indicates which energy channels are to be fit. Note: The channel_mask will typically be set by the :attr:`valid_channels` attribute in the PHA object. You can set channel_masks to override the PHA :attr:`valid_channels`, but this is usually unnecessary. method (str, optional): The fitting algorithm, which should be one of the options for scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two algorithms that provide support for fitting with bounds and constraints. Other algorithms will ignore bounds and constraints. Attributes: covariance (np.array): The covariance matrix of the fitted parameters. Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the covariance matrix may not be valid. detectors (list of str): The detectors used in the fit dof (int): The number of degrees of freedom energy_range (float, float): The full energy range of the fit function_components (list of str): The names of the individual model components, if appropriate function_name (str): The name of the model function being fit hessian (np.array): The Hessian matrix of the fitted parameters, for which the covariance matrix is -inverse(Hessian). Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the Hessian matrix may not be valid. jacobian (np.array): The Jacobian vector of the likelihood as a function of the parameters. message (str): The message from the fitter on convergence/non-convergence num_components (int): Number of function components num_sets (int): Number of datasets/detectors being fit parameters (np.array): The parameter values at the maximum likelihood statistic (float): The fit statistic (likelihood) success (bool): True if the fit was successful, False otherwise. symmetric_errors (np.array): The 1-sigma uncertainty on the parameters assuming higher-order terms of the Taylor expansion of the likelihood about the maximum likelihood solution is negligible. If those terms are not negligible, errors could be very incorrect or event NaN. See attributes :attr:`covariance` and :attr:`hessian`. """ def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs): super().__init__(pha_list, bkgd_list, rsp_list, chisq, **kwargs) def _fold_model(self, function, params): stat = np.zeros(self.num_sets) for i in range(self.num_sets): # fold model through response and convert to raw model counts model = self._rsp[i].fold_spectrum(function, params, channel_mask=self._chan_masks[ i]) # perform chisq calculation for one dataset stat[i] = self._stat(self._data[i], self._back_rates[i], self._back_var[i], model, self._exposure[i]) return stat.sum()
[docs]class SpectralFitterPgstat(SpectralFitter): """Jointly-fit datasets using Profile Gaussian likelihood (PGSTAT). Parameters: pha_list (list of :class:`~gbm.data.PHA`): The PHA objects containg the count spectrum for each detector bkgd_list (list of :class:`~gbm.background.BackgroundRates`): The background rates object for each detector rsp_list (list of :class:`~gbm.data.RSP`): The response object for each detector channel_masks (list of np.array(dtype=bool), optional): A boolean mask for each detector that indicates which energy channels are to be fit. Note: The channel_mask will typically be set by the :attr:`valid_channels` attribute in the PHA object. You can set channel_masks to override the PHA :attr:`valid_channels`, but this is usually unnecessary. back_exp (list of float, optional): Set if different than the source exposure method (str, optional): The fitting algorithm, which should be one of the options for scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two algorithms that provide support for fitting with bounds and constraints. Other algorithms will ignore bounds and constraints. Attributes: covariance (np.array): The covariance matrix of the fitted parameters. Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the covariance matrix may not be valid. detectors (list of str): The detectors used in the fit dof (int): The number of degrees of freedom energy_range (float, float): The full energy range of the fit function_components (list of str): The names of the individual model components, if appropriate function_name (str): The name of the model function being fit hessian (np.array): The Hessian matrix of the fitted parameters, for which the covariance matrix is -inverse(Hessian). Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the Hessian matrix may not be valid. jacobian (np.array): The Jacobian vector of the likelihood as a function of the parameters. message (str): The message from the fitter on convergence/non-convergence num_components (int): Number of function components num_sets (int): Number of datasets/detectors being fit parameters (np.array): The parameter values at the maximum likelihood statistic (float): The fit statistic (likelihood) success (bool): True if the fit was successful, False otherwise. symmetric_errors (np.array): The 1-sigma uncertainty on the parameters assuming higher-order terms of the Taylor expansion of the likelihood about the maximum likelihood solution is negligible. If those terms are not negligible, errors could be very incorrect or event NaN. See attributes :attr:`covariance` and :attr:`hessian`. """ def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs): super().__init__(pha_list, bkgd_list, rsp_list, pgstat, **kwargs) # background exposure (may == source exposure) if back_exp is None: back_exp = self._exposure self._back_exp = back_exp def _fold_model(self, function, params): stat = np.zeros(self.num_sets) for i in range(self.num_sets): # fold model through response and convert to raw model counts model = self._rsp[i].fold_spectrum(function, params, channel_mask=self._chan_masks[i]) # perform pgstat calculation for one dataset stat[i] = self._stat(self._data[i], model, self._exposure[i], self._back_rates[i]*self._back_exp[i], self._back_var[i], self._back_exp[i]) return stat.sum()
[docs]class SpectralFitterCstat(SpectralFitter): """Jointly-fit datasets using C-Stat (Poisson source with Poisson background) Parameters: pha_list (list of :class:`~gbm.data.PHA`): The PHA objects containg the count spectrum for each detector bkgd_list (list of :class:`~gbm.background.BackgroundRates`): The background rates object for each detector rsp_list (list of :class:`~gbm.data.RSP`): The response object for each detector channel_masks (list of np.array(dtype=bool), optional): A boolean mask for each detector that indicates which energy channels are to be fit. Note: The channel_mask will typically be set by the :attr:`valid_channels` attribute in the PHA object. You can set channel_masks to override the PHA :attr:`valid_channels`, but this is usually unnecessary. back_exp (list of float, optional): Set if different than the source exposure method (str, optional): The fitting algorithm, which should be one of the options for scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two algorithms that provide support for fitting with bounds and constraints. Other algorithms will ignore bounds and constraints. Attributes: covariance (np.array): The covariance matrix of the fitted parameters. Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the covariance matrix may not be valid. detectors (list of str): The detectors used in the fit dof (int): The number of degrees of freedom energy_range (float, float): The full energy range of the fit function_components (list of str): The names of the individual model components, if appropriate function_name (str): The name of the model function being fit hessian (np.array): The Hessian matrix of the fitted parameters, for which the covariance matrix is -inverse(Hessian). Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the Hessian matrix may not be valid. jacobian (np.array): The Jacobian vector of the likelihood as a function of the parameters. message (str): The message from the fitter on convergence/non-convergence num_components (int): Number of function components num_sets (int): Number of datasets/detectors being fit parameters (np.array): The parameter values at the maximum likelihood statistic (float): The fit statistic (likelihood) success (bool): True if the fit was successful, False otherwise. symmetric_errors (np.array): The 1-sigma uncertainty on the parameters assuming higher-order terms of the Taylor expansion of the likelihood about the maximum likelihood solution is negligible. If those terms are not negligible, errors could be very incorrect or event NaN. See attributes :attr:`covariance` and :attr:`hessian`. """ def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs): super().__init__(pha_list, bkgd_list, rsp_list, cstat, **kwargs) # background exposure (may == source exposure) if back_exp is None: back_exp = self._exposure self._back_exp = back_exp def _fold_model(self, function, params): stat = np.zeros(self.num_sets) for i in range(self.num_sets): # fold model through response and convert to raw model counts model = self._rsp[i].fold_spectrum(function, params, channel_mask=self._chan_masks[ i]) # perform cstat calculation for one dataset stat[i] = self._stat(self._data[i], model, self._exposure[i], self._back_rates[i]*self._back_exp[i], self._back_exp[i]) return stat.sum()
[docs]class SpectralFitterPstat(SpectralFitter): """Jointly-fit datasets using P-Stat (Poisson source with known background). This statistic assumes non-zero counts, therefore any channels with zero counts will be masked out and not used in fitting. Parameters: pha_list (list of :class:`~gbm.data.PHA`): The PHA objects containg the count spectrum for each detector bkgd_list (list of :class:`~gbm.background.BackgroundRates`): The background rates object for each detector rsp_list (list of :class:`~gbm.data.RSP`): The response object for each detector channel_masks (list of np.array(dtype=bool), optional): A boolean mask for each detector that indicates which energy channels are to be fit. Note: The channel_mask will typically be set by the :attr:`valid_channels` attribute in the PHA object. You can set channel_masks to override the PHA :attr:`valid_channels`, but this is usually unnecessary. back_exp (list of float, optional): Set if different than the source exposure method (str, optional): The fitting algorithm, which should be one of the options for scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two algorithms that provide support for fitting with bounds and constraints. Other algorithms will ignore bounds and constraints. Attributes: covariance (np.array): The covariance matrix of the fitted parameters. Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the covariance matrix may not be valid. detectors (list of str): The detectors used in the fit dof (int): The number of degrees of freedom energy_range (float, float): The full energy range of the fit function_components (list of str): The names of the individual model components, if appropriate function_name (str): The name of the model function being fit hessian (np.array): The Hessian matrix of the fitted parameters, for which the covariance matrix is -inverse(Hessian). Note: The assumption of negligible higher-order terms in the Taylor expansion may not be valid and therefore the Hessian matrix may not be valid. jacobian (np.array): The Jacobian vector of the likelihood as a function of the parameters. message (str): The message from the fitter on convergence/non-convergence num_components (int): Number of function components num_sets (int): Number of datasets/detectors being fit parameters (np.array): The parameter values at the maximum likelihood statistic (float): The fit statistic (likelihood) success (bool): True if the fit was successful, False otherwise. symmetric_errors (np.array): The 1-sigma uncertainty on the parameters assuming higher-order terms of the Taylor expansion of the likelihood about the maximum likelihood solution is negligible. If those terms are not negligible, errors could be very incorrect or event NaN. See attributes :attr:`covariance` and :attr:`hessian`. """ def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs): super().__init__(pha_list, bkgd_list, rsp_list, pstat, **kwargs) def _fold_model(self, function, params): stat = np.zeros(self.num_sets) for i in range(self.num_sets): # fold model through response and convert to raw model counts model = self._rsp[i].fold_spectrum(function, params, channel_mask=self._chan_masks[ i]) # perform pstat calculation for one dataset stat[i] = self._stat(self._data[i], model, self._exposure[i], self._back_rates[i]) return stat.sum()
# -------------------------------------------------------------------------- # FIT STATISTICS
[docs]def chisq(obs_counts, back_rates, back_var, mod_rates, exposure): """Chi-Square Likelihood Args: obs_counts (np.array): The total observed counts back_rates (np.array): The background model count rates back_var (np.array): The background model count rate variance mod_rates (np.array): The model source rates exposure (float): The source exposure Returns: float: The chi-squared statistic """ mask = (obs_counts - back_rates * exposure) > 0 like = ((obs_counts[mask] - back_rates[mask] * exposure) - mod_rates[ mask] * exposure) ** 2 / \ (obs_counts[mask] + back_var[mask] * exposure ** 2) return -like.sum()
[docs]def cstat(obs_counts, mod_rates, exposure, back_counts, back_exp): """C-Statistic Likelihood for Poisson source + Poisson background. The "W" statistic from the `XSPEC Statistics Appendix <https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_. Args: obs_counts (np.array): The total observed counts mod_rates (np.array): The model source rates exposure (float): The source exposure back_counts (np.array): The background model counts back_exp (float): The background exposure Returns: float: The C-statistic """ sum_exp = exposure + back_exp # special cases for when the observed counts = 0 or when the background # model counts = 0 mask1 = (obs_counts == 0) mask2 = (back_counts == 0) & (mod_rates < obs_counts/(sum_exp)) mask3 = (back_counts == 0) & (mod_rates >= obs_counts/(sum_exp)) w = np.zeros_like(obs_counts) w[mask1] = exposure*mod_rates[mask1] - back_counts[mask1]*np.log(back_exp/sum_exp) w[mask2] = -back_exp*mod_rates[mask2] - obs_counts[mask2]*np.log(exposure/sum_exp) w[mask3] = exposure*mod_rates[mask3] + obs_counts[mask3]* \ (np.log(obs_counts[mask3]) - np.log(exposure*mod_rates[mask3])-1.0) # for all other cases: mask = ~(mask1 | mask2 | mask3) mod_rates_nz = mod_rates[mask] obs_counts_nz = obs_counts[mask] back_counts_nz = back_counts[mask] d = np.sqrt((sum_exp*mod_rates_nz - obs_counts_nz - back_counts_nz)**2 + \ 4.0*sum_exp*back_counts_nz*mod_rates_nz) f = (obs_counts_nz + back_counts_nz - sum_exp*mod_rates_nz + d) / (2.0*sum_exp) w[mask] = exposure*mod_rates_nz + sum_exp*f -\ obs_counts_nz*np.log(exposure*mod_rates_nz + exposure*f) - \ back_counts_nz*np.log(back_exp*f) - \ obs_counts_nz*(1.0-np.log(obs_counts_nz)) - \ back_counts_nz*(1.0-np.log(back_counts_nz)) return -w.sum()
[docs]def pstat(obs_counts, mod_rates, exposure, back_rates): """Likelihood for Poisson source + known background. The "pstat" statistic from the `XSPEC Statistics Appendix <https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_. Note: Elements with zero counts are masked out and not figured in the statistic. Args: obs_counts (np.array): The total observed counts mod_rates (np.array): The model source rates exposure (float): The source exposure back_rates (np.array): The background model count rates Returns: float: The C-statistic """ mask = (obs_counts > 0) & (mod_rates + back_rates > 0.0) pstat = exposure * (mod_rates[mask] + back_rates[mask]) - \ obs_counts[mask]*np.log(exposure*(mod_rates[mask] + \ back_rates[mask])) - obs_counts[mask] * \ (1.0-np.log(obs_counts[mask])) return -pstat.sum()
[docs]def pgstat(obs_counts, mod_rates, exposure, back_counts, back_var, back_exp): """Profile Gaussian Likelihood. From the `XSPEC Statistics Appendix <https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_. Args: obs_counts (np.array): The total observed counts mod_rates (np.array): The model source rates exposure (float): The source exposure back_counts (np.array): The background model counts back_var (np.array): The background model count variance back_exp (float): The background exposure Returns: float: The PG-stat """ mask = (obs_counts > 0) pg = np.zeros_like(obs_counts) # special case for zero observed counts pg[~mask] = exposure * mod_rates[~mask] + back_counts[~mask] * ( exposure / back_exp) \ - np.sqrt(back_var[~mask]) * (exposure / back_exp) ** 2 / 2.0 # for all other cases: obs_counts_nz = obs_counts[mask] mod_rates_nz = mod_rates[mask] back_counts_nz = back_counts[mask] back_var_nz = back_var[mask] d = np.sqrt((exposure * back_var_nz - back_exp * back_counts_nz + \ back_exp ** 2 * mod_rates_nz) ** 2 - \ 4.0 * back_exp ** 2 * ( exposure * back_var_nz * mod_rates_nz - \ obs_counts_nz * back_var_nz - \ back_exp * back_counts_nz * mod_rates_nz)) f = (-(exposure * back_var_nz - back_exp * back_counts_nz + \ back_exp ** 2 * mod_rates_nz) + d) / (2.0 * back_exp ** 2) pg[mask] = back_exp * (mod_rates_nz + f) - \ obs_counts_nz * np.log( exposure * mod_rates_nz + exposure * f) + \ (back_counts_nz - back_exp * f) ** 2 / ( 2.0 * back_var_nz) - \ obs_counts_nz * (1.0 - np.log(obs_counts_nz)) return -pg.sum()