Source code for gbm.data.phaii

# phaii.py: PHAII and TTE classes
#
#     Authors: William Cleveland (USRA),
#              Adam Goldstein (USRA) and
#              Daniel Kocevski (NASA)
#
#     Portions of the code are Copyright 2020 William Cleveland and
#     Adam Goldstein, Universities Space Research Association
#     All rights reserved.
#
#     Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
import os
import astropy.io.fits as fits
import numpy as np
from collections import OrderedDict

import warnings
from astropy.utils.exceptions import AstropyUserWarning

warnings.filterwarnings("ignore", category=AstropyUserWarning)

from .data import DataFile
from .pha import PHA
from . import headers as hdr

from .primitives import TimeEnergyBins, EventList, EnergyBins, GTI

def time_slice_gti(data, gti):
    """ Helper function to create new gti when applying time slices
        
        Args:
            data (EventList or TimeEnergyBins): Either a list of events or
                                                binned data
            gti : list
                List of good time intervals
        
        Returns:        
            list: List of good time intervals that also obey time slices
    """
    slice_gti = []
    for d in data:
        for start, stop in gti:
            try:
                mask = (start <= d.time) & (d.time <= stop)
                if mask.sum():
                    slice_gti.append([d.time[mask].min(), d.time[mask].max()])
            except:
                mask = (start <= d.tstop) & (d.tstart <= stop)
                if mask.sum():
                    slice_gti.append([d.tstart[mask].min(), d.tstop[mask].max()])
    if len(slice_gti):
        return slice_gti
    return gti 

[docs]class PHAII: """PHAII class for time history and spectra. Attributes: data (:class:`~.primitives.TimeEnergyBins`): The PHAII data gti ([(float, float), ...]): A list of tuples defining the good time intervals headers (dict): The headers for each extension of the PHAII trigtime (float): The trigger time of the data, if available. time_range (float, float): The time range of the data energy_range (float, float): The energy range of the data numchans (int): The number of energy channels """ def __init__(self): super(PHAII, self).__init__() self._headers = OrderedDict() self._data = None self._gti = None self._trigtime = 0.0 def _assert_exposure(self, spec): """Sometimes the data files contain a negative exposure. That is very very naughty, and we don't like dealing with naughty things. """ mask = spec['EXPOSURE'] < 0.0 spec['EXPOSURE'][mask] = 0.0 return spec def _assert_range(self, valrange): assert valrange[0] <= valrange[1], \ 'Range must be in increasing order: (lo, hi)' return valrange def _assert_range_list(self, range_list): try: iter(range_list[0]) except: range_list = [tuple(range_list)] range_list = [self._assert_range(r) for r in range_list] return range_list @property def data(self): return self._data @property def gti(self): return self._assert_range_list(self._gti.tolist()) @property def headers(self): return self._headers @property def trigtime(self): return self._trigtime @trigtime.setter def trigtime(self, val): try: float_val = float(val) except: raise TypeError('trigtime must be a float') if float_val < 0.0: raise ValueError('trigtime must be non-negative') self._trigtime = float_val self._headers['PRIMARY']['TRIGTIME'] = float_val @property def time_range(self): return self._data.time_range @property def energy_range(self): return self._data.energy_range @property def numchans(self): return self._data.numchans
[docs] @classmethod def open(cls, filename): """Open a PHAII FITS file and return the PHAII object Args: filename (str): The filename of the FITS file Returns: :class:`PHAII`: The PHAII object """ obj = cls() obj._file_properties(filename) # open FITS file with fits.open(filename, mmap=False) as hdulist: # store headers for hdu in hdulist: obj._headers.update({hdu.name: hdu.header}) # trigger time if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()): if obj._headers['PRIMARY']['TRIGTIME'] != '': obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME']) else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 ebounds = hdulist['EBOUNDS'].data spec = hdulist['SPECTRUM'].data spec = obj._assert_exposure(spec) # convert to recarray because there are many situations where we # don't want TZERO applied, and it gets quite messy to keep track # of where we do or don't want it applied... # Unfortunately, recarray doesn't treat the unsigned integers # correctly, so they are converted to signed integers. We have to # correct this as well. counts = spec['COUNTS'].astype(np.uint16) spec = spec.view(np.recarray) spec = spec.astype([('COUNTS', np.uint16, (counts.shape[1],)), ('EXPOSURE', '>f4'), ('QUALITY', '>i2'), ('TIME', '>f8'), ('ENDTIME', '>f8')]) spec['COUNTS'] = counts # Add TZERO to event times if not trigger if obj._trigtime == 0: spec['TIME'] += obj._headers['SPECTRUM']['TZERO4'] spec['ENDTIME'] += obj._headers['SPECTRUM']['TZERO5'] # Do this for GTI as well gti = hdulist['GTI'].data gti = np.vstack((gti['START'], gti['STOP'])).squeeze().T if obj._trigtime > 0.0: gti -= obj._trigtime # create the time-energy histogram, the core of the PHAII obj._data = TimeEnergyBins(spec['COUNTS'], spec['TIME'], spec['ENDTIME'], spec['EXPOSURE'], ebounds['E_MIN'], ebounds['E_MAX']) obj._gti = gti return obj
[docs] @classmethod def from_data(cls, data, gti=None, trigtime=0.0, detector=None, object=None, ra_obj=None, dec_obj=None, err_rad=None): """Create a PHAII object from TimeEnergyBins data object. Args: data (:class:`~gbm.data.primitives.TimeEnergyBins`): The PHAII data gti ([(float, float), ...], optional): The list of tuples representing the good time intervals (start, stop). If omitted, the GTI is assumed to be [(tstart, tstop)]. trigtime (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. detector (str, optional): The detector that produced the data object (str, optional): The object being observed ra_obj (float, optional): The RA of the object dec_obj (float, optional): The Dec of the object err_rad (float, optional): The localization error radius of the object Returns: :class:`.PHAII`: The newly created PHAII object """ obj = cls() filetype = 'PHAII' obj._data = data detchans = data.numchans tstart, tstop = data.time_range try: trigtime = float(trigtime) except: raise TypeError('trigtime must be a float') if trigtime < 0.0: raise ValueError('trigtime must be non-negative') obj._trigtime = trigtime tstart += trigtime tstop += trigtime detector = None if detector == '' else detector object = None if object == '' else object ra_obj = None if ra_obj == '' else ra_obj dec_obj = None if dec_obj == '' else dec_obj err_rad = None if err_rad == '' else err_rad # create the primary extension primary_header = hdr.primary(detnam=detector, filetype=filetype, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers = [primary_header] header_names = ['PRIMARY'] # ebounds extension ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(ebounds_header) header_names.append('EBOUNDS') # spectrum extension spectrum_header = hdr.spectrum(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(spectrum_header) header_names.append('SPECTRUM') # gti extension if gti is None: gti = [data.time_range] ngti = len(gti) gti_rec = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti_rec['START'] = [one_gti[0] for one_gti in gti] gti_rec['STOP'] = [one_gti[1] for one_gti in gti] gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers.append(gti_header) header_names.append('GTI') obj._gti = gti_rec # store headers and set data properties obj._headers = {name: header for name, header in zip(header_names, headers)} return obj
[docs] def get_exposure(self, time_ranges=None): """Calculate the total exposure of a time range or time ranges of data Args: time_ranges ([(float, float), ...], optional): The time range or time ranges over which to calculate the exposure. If omitted, calculates the total exposure of the data. Returns: float: The exposure of the time selections """ if time_ranges is None: time_ranges = self.time_range time_ranges = self._assert_range_list(time_ranges) exposure = self._data.get_exposure(time_ranges=time_ranges) return exposure
[docs] def to_lightcurve(self, time_range=None, energy_range=None, channel_range=None): """Integrate the PHAII data over energy to produce a lightcurve Args: time_range ((float, float), optional): The time range of the lightcurve. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the lightcurve. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the lightcurve. If omitted, uses the entire energy range of the data. Returns: :class:`~gbm.data.primitives.TimeBins`: The lightcurve data """ # slice to desired time range if time_range is not None: temp = self._data.slice_time(*self._assert_range(time_range)) else: temp = self._data # limit integration to be over desired energy or channel range if channel_range is not None: self._assert_range(channel_range) energy_range = (self._data.emin[channel_range[0]], self._data.emax[channel_range[1]]) if energy_range is not None: emin, emax = self._assert_range(energy_range) else: emin, emax = None, None # produce the TimeBins lightcurve object lc = temp.integrate_energy(emin=emin, emax=emax) return lc
[docs] def to_spectrum(self, time_range=None, energy_range=None, channel_range=None): """Integrate the PHAII data over time to produce a count spectrum Args: time_range ((float, float), optional): The time range of the spectrum. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the spectrum. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the spectrum. If omitted, uses the entire energy range of the data. Returns: :class:`~gbm.data.primitives.EnergyBins`: The count spectrum data """ # slice to desired energy or channel range if (channel_range is not None) or (energy_range is not None): if channel_range is not None: self._assert_range(channel_range) energy_range = (self._data.emin[channel_range[0]], self._data.emax[channel_range[1]]) temp = self._data.slice_energy(*self._assert_range(energy_range)) else: temp = self._data # limit integration to be over desired time range if time_range is not None: tstart, tstop = self._assert_range(time_range) else: tstart, tstop = None, None # produce the EnergyBins count spectrum spec = temp.integrate_time(tstart=tstart, tstop=tstop) return spec
[docs] def write(self, directory, filename=None): """Writes the data to a FITS file. Args: directory (str): The directory to write the file filename (str, optional): If set, will override the standardized name """ # set filename if (self.filename is None) and (filename is None): raise NameError('Filename not set') if filename is None: filename = self.filename self._full_path = os.path.join(directory, filename) self._headers['PRIMARY']['FILENAME'] = filename # make ebounds table chan_idx = np.arange(self.numchans, dtype=int) ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'), ('E_MIN', '>f4'), ('E_MAX', '>f4')]) ebounds['CHANNEL'] = chan_idx ebounds['E_MIN'] = self._data.emin ebounds['E_MAX'] = self._data.emax # make spectrum table trigtime = self.trigtime spec = np.recarray(self._data.numtimes, dtype=[('COUNTS', '>i2', (self.numchans,)), ('EXPOSURE', '>f4'), ('QUALITY', '>i2'), ('TIME', '>f8'), ('ENDTIME', '>f8')]) spec['COUNTS'] = self._data.counts spec['EXPOSURE'] = self._data.exposure spec['QUALITY'] = np.full(self._data.numtimes, 1) spec['TIME'] = self._data.tstart spec['ENDTIME'] = self._data.tstop # make the GTI table ngti = len(self.gti) gti = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti['START'] = np.array(self.gti)[:, 0] gti['STOP'] = np.array(self.gti)[:, 1] # TZERO wreaks havoc, so we have to adjust the values based on # if TZERO=TRIGTIME or not if self.trigtime == 0.0: spec['TIME'] -= self._headers['SPECTRUM']['TSTART'] spec['ENDTIME'] -= self._headers['SPECTRUM']['TSTART'] gti['START'] -= self._headers['SPECTRUM']['TSTART'] gti['STOP'] -= self._headers['SPECTRUM']['TSTART'] # if trigtime is zero (not present), remove from headers if self.trigtime == 0.0: self._headers['PRIMARY'].pop('TRIGTIME', None) self._headers['EBOUNDS'].pop('TRIGTIME', None) self._headers['SPECTRUM'].pop('TRIGTIME', None) self._headers['GTI'].pop('TRIGTIME', None) # create FITS hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY']) hdulist.append(primary_hdu) # ebounds extension ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS', header=self._headers['EBOUNDS']) hdulist.append(ebounds_hdu) # spectrum extension spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM', header=self._headers['SPECTRUM']) if self.trigtime > 0.0: spectrum_hdu.header['TZERO4'] = self.trigtime spectrum_hdu.header['TZERO5'] = self.trigtime else: spectrum_hdu.header['TZERO4'] = self._headers['SPECTRUM']['TSTART'] spectrum_hdu.header['TZERO5'] = self._headers['SPECTRUM']['TSTART'] hdulist.append(spectrum_hdu) # the GTI extension gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'], name='GTI') if self.trigtime > 0.0: gti_hdu.header['TZERO1'] = self.trigtime gti_hdu.header['TZERO2'] = self.trigtime else: gti_hdu.header['TZERO1'] = self._headers['SPECTRUM']['TSTART'] gti_hdu.header['TZERO2'] = self._headers['SPECTRUM']['TSTART'] hdulist.append(gti_hdu) # write to file try: hdulist.writeto(self.full_path, checksum=True) except Exception as e: print(e) if self.trigtime == 0.0: spec['TIME'] += self._headers['SPECTRUM']['TSTART'] spec['ENDTIME'] += self._headers['SPECTRUM']['TSTART']
[docs] def to_pha(self, time_ranges=None, energy_range=None, channel_range=None, **kwargs): """Integrate the PHAII data over time to produce a PHA object Args: time_ranges ([(float, float), ...], optional): The time range of the spectrum. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the spectrum. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the spectrum. If omitted, uses the entire energy range of the data. **kwargs: Options passed to :meth:`gbm.data.PHA.from_data` Returns: :class:`~gbm.data.PHA`: The single spectrum PHA """ if time_ranges is None: time_ranges = [self.time_range] time_ranges = self._assert_range_list(time_ranges) specs = [] times = [] for time_range in time_ranges: spec = self.to_spectrum(time_range=time_range, energy_range=energy_range, channel_range=channel_range) # PHA needs to have the full spectral range and zero out the ones # we're not using lomask = (self.data.emin < spec.range[0]) pre_counts = np.zeros(np.sum(lomask), dtype=int) pre_lo_edges = self.data.emin[lomask] pre_hi_edges = self.data.emax[lomask] himask = (self.data.emax > spec.range[1]) post_counts = np.zeros(np.sum(himask), dtype=int) post_lo_edges = self.data.emin[himask] post_hi_edges = self.data.emax[himask] spec.counts = np.concatenate( (pre_counts, spec.counts, post_counts)) spec.lo_edges = np.concatenate( (pre_lo_edges, spec.lo_edges, post_lo_edges)) spec.hi_edges = np.concatenate( (pre_hi_edges, spec.hi_edges, post_hi_edges)) spec.exposure = np.full(spec.size, spec.exposure[0]) specs.append(spec) # need the actual time range times.append(self.data.slice_time(*time_range).time_range) # sum disjoint time selections data = EnergyBins.sum(specs) # values for the PHA object channel_mask = ~(lomask | himask) tstart, tstop = np.min(times), np.max( times) # time_ranges[0][0], time_ranges[-1][1] obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] pha = PHA.from_data(data, tstart, tstop, channel_mask=channel_mask, gti=times, trigtime=self.trigtime, object=obj, detector=self.detector, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, datatype=self.datatype.lower(), **kwargs) return pha
[docs] def slice_time(self, time_ranges): """Slice the PHAII by one or more time range. Produces a new PHAII object. Args: time_ranges ([(float, float), ...]): The time ranges to slice the data to. Returns: :class:`PHAII`: A new PHAII object with the requested time ranges """ time_ranges = self._assert_range_list(time_ranges) data = [self.data.slice_time(*self._assert_range(time_range)) \ for time_range in time_ranges] gti = time_slice_gti(data, self.gti) data = TimeEnergyBins.merge_time(data) hdr = self.headers['PRIMARY'] keys = list(hdr.keys()) obj = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None phaii = self.from_data(data, gti=gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return phaii
[docs] def slice_energy(self, energy_ranges): """Slice the PHAII by one or more energy range. Produces a new PHAII object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice the data to. Returns: :class:`PHAII`: A new PHAII object with the requested energy ranges """ energy_ranges = self._assert_range_list(energy_ranges) data = [self.data.slice_energy(*self._assert_range(energy_range)) \ for energy_range in energy_ranges] data = TimeEnergyBins.merge_energy(data) hdr = self.headers['PRIMARY'] keys = list(hdr.keys()) obj = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None phaii = self.from_data(data, gti=self.gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return phaii
[docs] def rebin_time(self, method, *args, time_range=(None, None)): """Rebin the PHAII in time given a rebinning method. Produces a new PHAII object. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function time_range ((float, float), optional): The starting and ending time to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns :class:`PHAII`: A new PHAII object with the requested rebinned data """ tstart, tstop = time_range data = self.data.rebin_time(method, *args, tstart=tstart, tstop=tstop) if tstart is None: tstart = self.time_range[0] if tstop is None: tstop = self.time_range[1] tstart, tstop = self._assert_range((tstart, tstop)) gti = [data.time_range] try: obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] except: obj = ra_obj = dec_obj = err_rad = None phaii = self.from_data(data, gti=gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return phaii
[docs] def rebin_energy(self, method, *args, energy_range=(None, None)): """Rebin the PHAII in energy given a rebinning method. Produces a new PHAII object. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function energy_range ((float, float), optional): The starting and ending energy to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns :class:`PHAII`: A new PHAII object with the requested rebinned data """ emin, emax = energy_range data = self.data.rebin_energy(method, *args, emin=emin, emax=emax) gti = self.gti obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] phaii = self.from_data(data, gti=gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return phaii
[docs]class Ctime(DataFile, PHAII): """Class for CTIME data. Attributes: data (:class:`~.primitives.TimeEnergyBins`): The PHAII data datatype (str): The datatype of the file detector (str): The GBM detector the file is associated with directory (str): The directory the file is located in energy_range (float, float): The energy range of the data filename (str): The filename full_path (str): The full path+filename gti ([(float, float), ...]): A list of tuples defining the good time intervals headers (dict): The headers for each extension of the PHAII id (str): The GBM file ID is_gbm_file (bool): True if the file is a valid GBM standard file, False if it is not. is_trigger (bool): True if the file is a GBM trigger file, False if not numchans (int): The number of energy channels trigtime (float): The trigger time of the data, if available. time_range (float, float): The time range of the data """ def __init__(self): PHAII.__init__(self)
[docs] @classmethod def from_data(cls, data, detector=None, **kwargs): obj = super().from_data(data, detector=detector, **kwargs) # set file properties trigtime = None if obj.trigtime == 0.0 else obj.trigtime obj.set_properties(detector=detector, trigtime=trigtime, tstart=obj.time_range[0], datatype='ctime', extension='pha') return obj
[docs]class Cspec(DataFile, PHAII): """Class for CSPEC data. Attributes: data (:class:`~.primitives.TimeEnergyBins`): The PHAII data datatype (str): The datatype of the file detector (str): The GBM detector the file is associated with directory (str): The directory the file is located in energy_range (float, float): The energy range of the data filename (str): The filename full_path (str): The full path+filename gti ([(float, float), ...]): A list of tuples defining the good time intervals headers (dict): The headers for each extension of the PHAII id (str): The GBM file ID is_gbm_file (bool): True if the file is a valid GBM standard file, False if it is not. is_trigger (bool): True if the file is a GBM trigger file, False if not numchans (int): The number of energy channels trigtime (float): The trigger time of the data, if available. time_range (float, float): The time range of the data """ def __init__(self): PHAII.__init__(self)
[docs] @classmethod def from_data(cls, data, detector=None, **kwargs): obj = super().from_data(data, detector=detector, **kwargs) # set file properties trigtime = None if obj.trigtime == 0.0 else obj.trigtime obj.set_properties(detector=detector, trigtime=trigtime, tstart=obj.time_range[0], datatype='cspec', extension='pha') return obj
[docs]class TTE(DataFile): """Class for Time-Tagged Event data Attributes: data (:class:`~.primitives.EventList`): The event data datatype (str): The datatype of the file detector (str): The GBM detector the file is associated with directory (str): The directory the file is located in energy_range (float, float): The energy range of the data filename (str): The filename full_path (str): The full path+filename gti ([(float, float), ...]): A list of tuples defining the good time intervals headers (dict): The headers for each extension of the TTE id (str): The GBM file ID is_gbm_file (bool): True if the file is a valid GBM standard file, False if it is not. is_trigger (bool): True if the file is a GBM trigger file, False if not numchans (int): The number of energy channels trigtime (float): The trigger time of the data, if available. time_range (float, float): The time range of the data """ def __init__(self): super(TTE, self).__init__() self._headers = OrderedDict() self._data = None self._gti = None self._trigtime = 0.0 def _assert_range(self, valrange): assert valrange[0] <= valrange[1], \ 'Range must be in increasing order: (lo, hi)' return valrange def _assert_range_list(self, range_list): try: iter(range_list[0]) except: range_list = [range_list] range_list = [self._assert_range(r) for r in range_list] return range_list @property def data(self): return self._data @property def gti(self): return self._assert_range_list(self._gti.tolist()) @property def headers(self): return self._headers @property def trigtime(self): return self._trigtime @trigtime.setter def trigtime(self, val): try: float_val = float(val) except: raise TypeError('trigtime must be a float') if float_val < 0.0: raise ValueError('trigtime must be non-negative') self._trigtime = float_val self._headers['PRIMARY']['TRIGTIME'] = float_val @property def time_range(self): return self._data.time_range @property def energy_range(self): return self._data.energy_range @property def numchans(self): return self._data.numchans
[docs] @classmethod def open(cls, filename): """Open a TTE FITS file and return the TTE object Args: filename (str): The filename of the FITS file Returns: :class:`TTE`: The TTE object """ obj = cls() obj._file_properties(filename) with fits.open(filename, mmap=False) as hdulist: for hdu in hdulist: obj._headers.update({hdu.name: hdu.header}) # trigger time if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()): if obj._headers['PRIMARY']['TRIGTIME'] != '': obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME']) else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 ebounds = hdulist['EBOUNDS'].data # convert to recarray because there are many situations where we # don't want TZERO applied, and it gets quite messy to keep track # of where we do or don't want it applied... events = hdulist['EVENTS'].data.view(np.recarray) # Add TZERO to event times if not trigger if obj._trigtime == 0: events['TIME'] += obj._headers['EVENTS']['TZERO1'] # Do this for GTI as well gti = hdulist['GTI'].data gti = np.vstack((gti['START'], gti['STOP'])).squeeze().T if obj._trigtime > 0.0: gti -= obj._trigtime # create the EventList, the core of the TTE class obj._data = EventList.from_fits_array(events, ebounds) obj._gti = gti return obj
[docs] @classmethod def from_data(cls, data, gti=None, trigtime=0.0, detector=None, object=None, ra_obj=None, dec_obj=None, err_rad=None): """Create a TTE object from an EventList data object. Args: data (:class:`~.primitives.EventList`): The event data gti ([(float, float), ...], optional): The list of tuples representing the good time intervals (start, stop). If omitted, the GTI is assumed to be [(tstart, tstop)]. trigtime (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. detector (str, optional): The detector that produced the data object (str, optional): The object being observed ra_obj (float, optional): The RA of the object dec_obj (float, optional): The Dec of the object err_rad (float, optional): The localization error radius of the object Returns: :class:`TTE`: The newly created TTE object """ obj = cls() filetype = 'GBM PHOTON LIST' obj._data = data detchans = data.numchans tstart, tstop = data.time_range try: trigtime = float(trigtime) except: raise TypeError('trigtime must be a float') if trigtime < 0.0: raise ValueError('trigtime must be non-negative') obj._trigtime = trigtime tstart += trigtime tstop += trigtime detector = None if detector == '' else detector object = None if object == '' else object ra_obj = None if ra_obj == '' else ra_obj dec_obj = None if dec_obj == '' else dec_obj err_rad = None if err_rad == '' else err_rad # create the primary extension primary_header = hdr.primary(detnam=detector, filetype=filetype, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers = [primary_header] header_names = ['PRIMARY'] # ebounds extension ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(ebounds_header) header_names.append('EBOUNDS') # spectrum extension events_header = hdr.events(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(events_header) header_names.append('EVENTS') # gti extension if gti is None: gti = [data.time_range] ngti = len(gti) gti_rec = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti_rec['START'] = [one_gti[0] for one_gti in gti] gti_rec['STOP'] = [one_gti[1] for one_gti in gti] gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers.append(gti_header) header_names.append('GTI') obj._gti = gti_rec # store headers and set data properties obj._headers = {name: header for name, header in zip(header_names, headers)} # set file info trig = None if obj.trigtime == 0.0 else obj.trigtime obj.set_properties(detector=detector, trigtime=trig, tstart=obj.time_range[0], datatype='tte', extension='fit') return obj
[docs] @classmethod def merge(cls, ttes, primary=0, force_unique=True): """Merge multiple TTE objects into a single new TTE object. Note: The amount of data in a single TTE object can be quite large. It is up to you to determine if you have enough memory to support the merge. Args: ttes (lists): A list of TTE objects to merge primary (int): The index into the list of TTE objects to designate the primary TTE object. The primary object will be the reference for the header information for the new merged TTE object. Default is the first TTE object in the list (primary=0). force_unique (bool, optional): If True, force all events to be unique via brute force sorting. If False, the EventLists will only be checked and masked for overlapping time ranges. Events can potentially be lost if the merged EventLists contain overlapping times (but not necessarily duplicate events), however this method is much faster. Default is True. Returns: :class:`TTE`: The new merged TTE object """ # merge the Event data data = [tte.data for tte in ttes] data = EventList.merge(data, sort_attrib='TIME', force_unique=force_unique) # merge the GTIs gtis = [GTI.from_list(tte.gti) for tte in ttes] merged_gti = gtis[0] for gti in gtis[1:]: merged_gti = GTI.merge(merged_gti, gti) merged_gti = merged_gti.as_list() # header info hdr = ttes[primary].headers['PRIMARY'] keys = list(hdr.keys()) trigtime = ttes[primary].trigtime detector = ttes[primary].detector object = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None obj = cls.from_data(data, gti=merged_gti, trigtime=trigtime, detector=detector, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return obj
[docs] def get_exposure(self, time_ranges=None): """Calculate the total exposure of a time range or time ranges of data Args: time_ranges ([(float, float), ...], optional): The time range or time ranges over which to calculate the exposure. If omitted, calculates the total exposure of the data. Returns: float: The exposure of the time selections """ time_ranges = self._assert_range_list(time_ranges) exposure = self._data.get_exposure(time_ranges=time_ranges) return exposure
[docs] def to_spectrum(self, time_range=None, energy_range=None, channel_range=None): """Integrate the TTE data over time to produce a count spectrum Args: time_range ([(float, float), ...], optional): The time range of the spectrum. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the spectrum. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the spectrum. If omitted, uses the entire energy range of the data. Returns: :class:`~.primitives.EnergyBins`: The spectrum data """ # slice to desired energy or channel range if (channel_range is not None) or (energy_range is not None): if channel_range is not None: self._assert_range(channel_range) energy_range = (self._data.emin[channel_range[0]], self._data.emax[channel_range[1]]) temp = self._data.slice_energy(*self._assert_range(energy_range)) else: temp = self._data if time_range is None: time_range = self.time_range # time slice segment = temp.time_slice(*self._assert_range(time_range)) spec = segment.count_spectrum return spec
[docs] def to_phaii(self, bin_method, *args, time_range=None, energy_range=None, channel_range=None, **kwargs): """Utilizing a binning function, convert the data to a PHAII object Args: bin_method (<function>): A binning function *args: Arguments to pass to the binning function time_range ([(float, float), ...], optional): The time range of the spectrum. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the spectrum. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the spectrum. If omitted, uses the entire energy range of the data. **kwargs: Options to pass to the binning function Returns: :class:`Cspec`: The PHAII object """ # slice to desired energy or channel range if (channel_range is not None) or (energy_range is not None): if channel_range is not None: self._assert_range(channel_range) energy_range = (self._data.emin[channel_range[0]], self._data.emax[channel_range[1]]) temp = self._data.energy_slice(*self._assert_range(energy_range)) else: temp = self._data # do the time binning to create the TimeEnergyBins if time_range is None: tstart, tstop = None, None else: tstart, tstop = time_range bins = temp.bin(bin_method, *args, tstart=tstart, tstop=tstop, **kwargs) # create the Cspec object if 'OBJECT' in self.headers['PRIMARY']: obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] else: obj, ra_obj, dec_obj, err_rad = None, None, None, None obj = Cspec.from_data(bins, gti=self.gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return obj
[docs] def to_pha(self, time_ranges=None, energy_range=None, channel_range=None, **kwargs): """Integrate the TTE data over time to produce a PHA object Args: time_ranges ([(float, float), ...], optional): The time range of the spectrum. If omitted, uses the entire time range of the data. energy_range ((float, float), optional): The energy range of the spectrum. If omitted, uses the entire energy range of the data. channel_range ((int, int), optional): The channel range of the spectrum. If omitted, uses the entire energy range of the data. **kwargs: Options passed to :meth:`.PHA.from_data()` Returns: :class:`~.PHA`: The PHA object """ if time_ranges is None: time_ranges = [self.time_range] time_ranges = self._assert_range_list(time_ranges) segs = [self._data.time_slice(*self._assert_range(time_range)) \ for time_range in time_ranges] times = [seg.time_range for seg in segs] el = EventList.merge(segs) if energy_range is not None: el = el.energy_slice(*self._assert_range(energy_range)) lomask = (self.data.emin < energy_range[0]) himask = (self.data.emax > energy_range[1]) elif channel_range is not None: el = el.energy_slice(*self._assert_range(channel_range)) idx = np.arange(self.numchans) lomask = (idx < channel_range[0]) himask = (idx > channel_range[1]) else: lomask = himask = np.zeros(self.numchans, dtype=bool) data = el.count_spectrum # values for the PHA object channel_mask = ~(lomask | himask) if 'OBJECT' in self.headers['PRIMARY']: obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] else: obj, ra_obj, dec_obj, err_rad = None, None, None, None pha = PHA.from_data(data, *el.time_range, channel_mask=channel_mask, gti=times, trigtime=self.trigtime, object=obj, detector=self.detector, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, datatype=self.datatype.lower(), **kwargs) return pha
[docs] def write(self, directory, filename=None): """Writes the data to a FITS file. Args: directory (str): The directory to write the file filename (str, optional): If set, will override the standardized name """ # set filename if (self.filename is None) and (filename is None): raise NameError('Filename not set') if filename is None: filename = self.filename self._full_path = os.path.join(directory, filename) self._headers['PRIMARY']['FILENAME'] = filename # ebounds table ebounds = self._data._ebounds # events table trigtime = self.trigtime events = self._data._events # gti table ngti = len(self.gti) gti = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti['START'] = np.array(self.gti)[:, 0] gti['STOP'] = np.array(self.gti)[:, 1] # TZERO wreaks havoc, so we have to adjust the values based on # if TZERO=TRIGTIME or not if self.trigtime == 0.0: events['TIME'] -= self._headers['EVENTS']['TSTART'] gti['START'] -= self._headers['EVENTS']['TSTART'] gti['STOP'] -= self._headers['EVENTS']['TSTART'] # if trigtime is zero (not present), remove from headers if self.trigtime == 0.0: self._headers['PRIMARY'].pop('TRIGTIME', None) self._headers['EBOUNDS'].pop('TRIGTIME', None) self._headers['EVENTS'].pop('TRIGTIME', None) self._headers['GTI'].pop('TRIGTIME', None) # create FITS hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY']) hdulist.append(primary_hdu) # ebounds extension ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS', header=self._headers['EBOUNDS']) hdulist.append(ebounds_hdu) # the events extension spectrum_hdu = fits.BinTableHDU(data=events, name='EVENTS', header=self._headers['EVENTS']) if self.trigtime > 0.0: spectrum_hdu.header['TZERO1'] = self.trigtime else: spectrum_hdu.header['TZERO1'] = self._headers['EVENTS']['TSTART'] hdulist.append(spectrum_hdu) # the GTI extension gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'], name='GTI') if self.trigtime > 0.0: gti_hdu.header['TZERO1'] = self.trigtime gti_hdu.header['TZERO2'] = self.trigtime else: gti_hdu.header['TZERO1'] = self._headers['EVENTS']['TSTART'] gti_hdu.header['TZERO2'] = self._headers['EVENTS']['TSTART'] hdulist.append(gti_hdu) # write to file try: hdulist.writeto(self.full_path, checksum=True) except Exception as e: print(e) if self.trigtime == 0.0: events['TIME'] += self._headers['EVENTS']['TSTART']
[docs] def slice_time(self, time_ranges): """Slice the TTE by one or more time range. Produces a new TTE object. Args: time_ranges ([(float, float), ...]): The time ranges to slice the data to. Returns: :class:`TTE`: A new TTE object with the requested time ranges """ time_ranges = self._assert_range_list(time_ranges) data = [self.data.time_slice(*self._assert_range(time_range)) \ for time_range in time_ranges] gti = time_slice_gti(data, self.gti) data = EventList.merge(data, sort_attrib='TIME') hdr = self.headers['PRIMARY'] keys = list(hdr.keys()) obj = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None tte = self.from_data(data, gti=gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return tte
[docs] def slice_energy(self, energy_ranges): """Slice the TTE by one or more energy range. Produces a new TTE object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice the data to. Returns: :class:`TTE`: A new TTE object with the requested energy ranges """ energy_ranges = self._assert_range_list(energy_ranges) data = [self.data.energy_slice(*self._assert_range(energy_range)) \ for energy_range in energy_ranges] data = EventList.merge(data, sort_attrib='TIME') hdr = self.headers['PRIMARY'] keys = list(hdr.keys()) obj = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None tte = self.from_data(data, gti=self.gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return tte
[docs] def rebin_energy(self, bin_method, *args): """Produce a TTE object with rebinned PHA channels Args: bin_method (<function>): A rebinning function *args: Arguments to be passed to the bin method Returns :class:`TTE`: A new TTE object with the rebinned PHA channels """ data = self._data.rebin_energy(bin_method, *args) detector = self.detector # header info hdr = self.headers['PRIMARY'] keys = list(hdr.keys()) object = hdr['OBJECT'] if 'OBJECT' in keys else None ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None obj = TTE.from_data(data, gti=self.gti, trigtime=self.trigtime, detector=detector, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) return obj