Source code for gbm.finder

# finder.py: Module containing data finder and data catalog classes
#
#     Authors: William Cleveland (USRA),
#              Adam Goldstein (USRA) and
#              Daniel Kocevski (NASA)
#
#     Portions of the code are Copyright 2020 William Cleveland and
#     Adam Goldstein, Universities Space Research Association
#     All rights reserved.
#
#     Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
import os
import socket
import ssl
import sys
import time
from ftplib import FTP_TLS
from urllib.request import urlopen

import numpy as np
from astropy import units as astro_units
from astropy.coordinates import SkyCoord

from gbm.time import Met


[docs]class FtpFinder: """A base class for the interface to the HEASARC FTP archive of GBM data. Specifically, it creates a connection to legacy.gsfc.nasa.gov Attributes: num_files (int): Number of files in the current directory files (list of str): The list of files in the current directory Note: This class should not be directly instantiated, but rather inherited. """ _ftp = FTP_TLS(host='heasarc.gsfc.nasa.gov') _ftp.login() _ftp.prot_p() def __init__(self): self._downloading_file = None self._download_dir = None self._file_list = [] def __del__(self): self._ftp.close() def _reconnect(self): """Attempt a reconnect in case connection was lost """ self._ftp.close() self._ftp = FTP_TLS(host='heasarc.gsfc.nasa.gov') self._ftp.login() self._ftp.prot_p() def _ftp_status(self, chunk): """FTP GET callback function that downloads and reports the percent progress of the download. Args: chunk (str): The byte data to be written """ # append to file file_path = os.path.join(self._download_dir, self._downloading_file) with open(file_path, 'ab') as f: f.write(chunk) self._transferred_bytes += len(chunk) percent = float(self._transferred_bytes) / float(self._total_bytes) # download bar bar = ('=' * int(percent * 30)).ljust(30) # format percent and print along with download bar percent = str("{0:.2f}".format(percent * 100.0)) sys.stdout.write( "\r%s [%s] %s%%" % (self._downloading_file, bar, percent)) # file download is finished if self._transferred_bytes == self._total_bytes: sys.stdout.write('\n') sys.stdout.flush() def _ftp_silent(self, chunk): """FTP GET callback function that silently downloads a file. Args: chunk (str): The byte data to be written """ # append to file file_path = os.path.join(self._download_dir, self._downloading_file) with open(file_path, 'ab') as f: f.write(chunk) def _construct_path(self, id): return NotImplemented def _file_filter(self, file_list, filetype, extension, dets=None): """Filters the directory for the requested filetype, extension, and detectors Args: filetype (str): The type of file, e.g. 'cspec' extension (str): The file extension, e.g. '.pha' dets (list, optional): The detectors. If omitted, then files for all detectors are returned Returns: list: The filtered file list """ files = [f for f in file_list if (filetype in f) & (f.endswith(extension))] if dets is not None: if type(dets) == str: dets = [dets] files = [f for f in files if any('_' + det + '_' in f for det in dets)] return files def _get(self, download_dir, files, verbose=True): """Downloads a list of files from FTP Args: download_dir (str): The download directory location files (list of str): The list of files to download verbose (bool, optional): If True, will output the download status. Default is True. Returns: list of str: The full paths to the downloaded files """ if verbose: callback = self._ftp_status else: callback = self._ftp_silent if os.path.exists(download_dir) == False: os.makedirs(download_dir) self._download_dir = download_dir # download each file filepaths = [] for file in files: # have to save in self because this can't be passed as an argument # in the callback self._downloading_file = file # download file self._ftp.voidcmd('TYPE I') self._total_bytes = self._ftp.size(file) self._transferred_bytes = 0 self._ftp.retrbinary('RETR ' + file, callback=callback) filepaths.append(os.path.join(download_dir, file)) return filepaths @property def num_files(self): return len(self._file_list) @property def files(self): return self._file_list
[docs] def ls(self, id): """List the directory contents of an FTP directory associated with a trigger or data set. Args: id (str): The id associated with a trigger or data set Returns: list of str: Alphabetically ordered file list """ path = self._construct_path(id) try: files = self._ftp.nlst(path) except AttributeError: print('Connection appears to have failed. Attempting to reconnect...') try: self._reconnect() print('Reconnected.') return self.ls(id) except: raise RuntimeError('Failed to reconnect.') except: raise FileExistsError('{} does not exist'.format(path)) files = sorted([os.path.basename(f) for f in files]) return files
[docs]class TriggerFtp(FtpFinder): """A class that interfaces with the HEASARC FTP trigger directories. An instance of this class will represent the available files associated with a single trigger. An instance can be created without a trigger number, however a trigger number will need to be set by set_trigger(tnum) to query and download files. An instance can also be changed from one trigger number to another without having to create a new instance. If multiple instances are created and exist simultaneously, they will all use a single FTP connection. Note: Since HEASARC transitioned to FTPS, some have had issues with connecting to the HEASARC FTP site via Python's ftplib for no obvious reason while it works flawlessy for others (even on the same platform). Currently the thought is that this may be related to the underlying OpenSSL version that is installed. If you have connection problems using this, you may consider upgrading you OpenSSL and see if that solves your problem. A potential solution is to do the following: * $ pip3 install pyopenssl * $ pip3 install requests[security] Parameters: tnum (str, optional): A valid trigger number Attributes: num_files (int): Number of files in the current directory files (list of str): The list of files in the current directory """ _root = '/fermi/data/gbm/triggers' def __init__(self, tnum=None): self._downloading_file = None self._download_dir = None self._tnum = None self._file_list = [] if tnum is not None: try: self._file_list = self.ls(tnum) self._ftp.cwd(self._construct_path(tnum)) self._tnum = tnum except FileExistsError: raise ValueError( '{} is not a valid trigger number'.format(tnum))
[docs] def set_trigger(self, tnum): """Set the trigger number. If the object was previously associated with a trigger number, this will effectively change the working directory to that of the new trigger number. If the trigger number is invalid, an exception will be raised, and no directory change will be made. Args: tnum (str): A valid trigger number """ try: self._file_list = self.ls(tnum) self._ftp.cwd(self._construct_path(tnum)) self._tnum = tnum except FileExistsError: self._tnum = None self._file_list = [] raise ValueError('{} is not a valid trigger number'.format(tnum))
[docs] def ls_ctime(self): """List all ctime files for the trigger Returns: list of str: The file list """ return self._file_filter(self.files, 'ctime', 'pha')
[docs] def ls_cspec(self): """List all cspec files for the trigger Returns: list of str: The file list """ return self._file_filter(self.files, 'cspec', 'pha')
[docs] def ls_tte(self): """List all tte files for the trigger Returns: list of str: The file list """ return self._file_filter(self.files, 'tte', 'fit')
[docs] def ls_rsp(self, ctime=True, cspec=True): """List all response Type-I files for the trigger Args: ctime (bool, optional): If True, list the ctime responses. Default is True. cspec (bool, optional): If True, list the cspec responses. Default is True. Returns: list of str: The file list """ files = [] if cspec: files.extend(self._file_filter(self.files, 'cspec', 'rsp')) if ctime: files.extend(self._file_filter(self.files, 'ctime', 'rsp')) return files
[docs] def ls_rsp2(self, ctime=True, cspec=True): """List all response Type-II files for the trigger Args: ctime (bool, optional): If True, list the ctime responses. Default is True. cspec (bool, optional): If True, list the cspec responses. Default is True. Returns: list of str: The file list """ files = [] if cspec: files.extend(self._file_filter(self.files, 'cspec', 'rsp2')) if ctime: files.extend(self._file_filter(self.files, 'ctime', 'rsp2')) return files
[docs] def ls_lightcurve(self): """List all lightcurve plots for the trigger Returns: list of str: The file list """ return self._file_filter(self.files, 'lc', 'pdf')
[docs] def ls_cat_files(self): """List all catalog files for the trigger Returns: list of str: The file list """ files = [] files.extend(self._file_filter(self.files, 'bcat', 'fit')) files.extend(self._file_filter(self.files, 'scat', 'fit')) files.extend(self._file_filter(self.files, 'tcat', 'fit')) return files
[docs] def ls_trigdat(self): """List the trigger data (trigdat) file for the trigger Returns: list of str: The file list """ return self._file_filter(self.files, 'trigdat', 'fit')
[docs] def ls_localization(self): """List all localization files for the trigger Returns: list of str: The file list """ files = [] files.extend(self._file_filter(self.files, 'healpix', 'fit')) files.extend(self._file_filter(self.files, 'skymap', 'png')) files.extend(self._file_filter(self.files, 'loclist', 'txt')) files.extend(self._file_filter(self.files, 'locprob', 'fit')) files.extend(self._file_filter(self.files, 'locplot', 'png')) return files
[docs] def get_ctime(self, download_dir, dets=None, **kwargs): """Download the ctime files for the trigger Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'ctime', 'pha', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_cspec(self, download_dir, dets=None, **kwargs): """Download the cspec files for the trigger Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'cspec', 'pha', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_tte(self, download_dir, dets=None, **kwargs): """Download the TTE files for the trigger Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'tte', 'fit', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_rsp(self, download_dir, ctime=True, cspec=True, dets=None, **kwargs): """Download the response Type-I files for the trigger Args: download_dir (str): The download directory ctime (bool, optional): If True, download the ctime responses. Default is True. cspec (bool, optional): If True, download the cspec responses. Default is True. dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = [] if cspec: files.extend( self._file_filter(self.files, 'cspec', 'rsp', dets=dets)) if ctime: files.extend( self._file_filter(self.files, 'ctime', 'rsp', dets=dets)) self._get(download_dir, files, **kwargs)
[docs] def get_rsp2(self, download_dir, ctime=True, cspec=True, dets=None, **kwargs): """Download the response Type-I files for the trigger Args: download_dir (str): The download directory ctime (bool, optional): If True, download the ctime responses. Default is True. cspec (bool, optional): If True, download the cspec responses. Default is True. dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = [] if cspec: files.extend( self._file_filter(self.files, 'cspec', 'rsp2', dets=dets)) if ctime: files.extend( self._file_filter(self.files, 'ctime', 'rsp2', dets=dets)) self._get(download_dir, files, **kwargs)
[docs] def get_lightcurve(self, download_dir, **kwargs): """Download the lightcurve plots for the trigger Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'lc', 'pdf') self._get(download_dir, files, **kwargs)
[docs] def get_cat_files(self, download_dir, **kwargs): """Download all catalog files for the trigger Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = [] files.extend(self._file_filter(self.files, 'bcat', 'fit')) files.extend(self._file_filter(self.files, 'scat', 'fit')) files.extend(self._file_filter(self.files, 'tcat', 'fit')) self._get(download_dir, files, **kwargs)
[docs] def get_trigdat(self, download_dir, **kwargs): """Download the trigger data (trigdat) file for the trigger Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'trigdat', 'fit') self._get(download_dir, files, **kwargs)
[docs] def get_localization(self, download_dir, **kwargs): """Download all localization files for the trigger Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = [] files.extend(self._file_filter(self.files, 'healpix', 'fit')) files.extend(self._file_filter(self.files, 'skymap', 'png')) files.extend(self._file_filter(self.files, 'loclist', 'txt')) files.extend(self._file_filter(self.files, 'locprob', 'fit')) files.extend(self._file_filter(self.files, 'locplot', 'png')) self._get(download_dir, files, **kwargs)
[docs] def get_healpix(self, download_dir, **kwargs): """Download the healpix localization file for the trigger. Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'healpix', 'fit') self._get(download_dir, files, **kwargs)
[docs] def get_all(self, download_dir, **kwargs): """Download all files associated with the trigger Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ self._get(download_dir, self._file_list, **kwargs)
def _construct_path(self, str_trigger_num): """Constructs the FTP path for a trigger Args: str_trigger_num (str): The trigger number Returns: str: The path of the FTP directory for the trigger """ year = '20' + str_trigger_num[0:2] path = os.path.join(self._root, year, 'bn' + str_trigger_num, 'current') return path
# mark: TODO: Need date range functionality
[docs]class ContinuousFtp(FtpFinder): """A class that interfaces with the HEASARC FTP continuous daily data directories. An instance of this class will represent the available files associated with a single day. An instance can be created without a time, however a time will need to be set by set_time() to query and download files. An instance can also be changed from one time to another without having to create a new instance. If multiple instances are created and exist simultaneously, they will all use a single FTP connection. Note: Since HEASARC transitioned to FTPS, some have had issues with connecting to the HEASARC FTP site via Python's ftplib for no obvious reason while it works flawlessy for others (even on the same platform). Currently the thought is that this may be related to the underlying OpenSSL version that is installed. If you have connection problems using this, you may consider upgrading you OpenSSL and see if that solves your problem. A potential solution is to do the following: * $ pip3 install pyopenssl * $ pip3 install requests[security] Parameters: met (float, optional): A time in MET. Either met, utc, or gps must be set. utc (str, optional): A UTC time in ISO format: YYYY-MM-DDTHH:MM:SS gps (float, optional): A GPS time Attributes: num_files (int): Number of files in the current directory files (list of str): The list of files in the current directory """ _root = '/fermi/data/gbm/daily' def __init__(self, met=None, utc=None, gps=None): self._downloading_file = None self._download_dir = None self._file_list = [] self._met = None if met is not None: self._met = Met(met) elif utc is not None: self._met = Met.from_iso(utc) elif gps is not None: self._met = Met.from_gps(gps) if self._met is not None: try: self._file_list = self.ls(self._met) self._ftp.cwd(self._construct_path(self._met)) except FileExistsError: raise ValueError('{} is not a valid MET'.format(self._met))
[docs] def set_time(self, met=None, utc=None, gps=None): """Set the time. If the object was previously associated with a different time, this will effectively change the working directory to that of the new time. If the time is invalid, an exception will be raised, and no directory change will be made. Only one of met, utc, or gps should be defined. Args: met (float, optional): A time in MET. utc (str, optional): A UTC time in ISO format: YYYY-MM-DDTHH:MM:SS gps (float, optional): A GPS time """ if met is not None: self._met = Met(met) elif utc is not None: self._met = Met.from_iso(utc) elif gps is not None: self._met = Met.from_gps(gps) else: raise ValueError('Either met, utc, or gps must be specified') try: self._file_list = self.ls(self._met) self._ftp.cwd(self._construct_path(self._met)) except FileExistsError: badtime = self._met self._met = None self._file_list = [] raise ValueError('{} is not a valid MET'.format(badtime))
[docs] def ls_ctime(self): """List all ctime files Returns: list of str: The file list """ return self._file_filter(self.files, 'ctime', 'pha')
[docs] def ls_cspec(self): """List all cspec files Returns: list of str: The file list """ return self._file_filter(self.files, 'cspec', 'pha')
[docs] def ls_poshist(self): """List the poshist file Returns: list of str: The file list """ return self._file_filter(self.files, 'poshist', 'fit')
[docs] def ls_spechist(self): """List all spechist files Returns: list of str: The file list """ return self._file_filter(self.files, 'spechist', 'fit')
[docs] def ls_tte(self, full_day=False): """List all TTE files Args: full_day (bool, optional): If True, will return the TTE files for the full day. If False, will return the TTE files for the hour covering the specified time. Default is False. Returns: list of str: The file list """ files = [] files.extend(self._file_filter(self.files, 'tte', 'fit.gz')) files.extend(self._file_filter(self.files, 'tte', 'fit')) if not full_day: files = self._filter_tte(files) return files
[docs] def get_ctime(self, download_dir, dets=None, **kwargs): """Download the ctime files Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'ctime', 'pha', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_cspec(self, download_dir, dets=None, **kwargs): """Download the cspec files Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'cspec', 'pha', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_poshist(self, download_dir, **kwargs): """Download the poshist file Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'poshist', 'fit') self._get(download_dir, files, **kwargs)
[docs] def get_spechist(self, download_dir, dets=None, **kwargs): """Download the spechist files Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. verbose (bool, optional): If True, will output the download status. Default is True. """ files = self._file_filter(self.files, 'spechist', 'fit', dets=dets) self._get(download_dir, files, **kwargs)
[docs] def get_tte(self, download_dir, dets=None, full_day=False, **kwargs): """Download all TTE files associated with a time. Note: Unless you have a high-bandwidth connection and can handle downloading several GBs, it is not recommended to download the full day of TTE data. Args: download_dir (str): The download directory dets (list, optional): The detectors' data to download. If omitted, will download all. full_day (bool, optional): If True, will download the TTE files for the full day. If False, will return the TTE files for the covering the specified time. Default is False. verbose (bool, optional): If True, will output the download status. Default is True. """ files = [] files.extend(self._file_filter(self.files, 'tte', 'fit.gz', dets=dets)) files.extend(self._file_filter(self.files, 'tte', 'fit', dets=dets)) if not full_day: files = self._filter_tte(files) self._get(download_dir, files, **kwargs)
[docs] def get_all(self, download_dir, **kwargs): """Download all files within a daily directory. Note: Use at your own risk. Unless you have a high-bandwidth connection and can handle downloading several GBs, this function is not recommended for use. Args: download_dir (str): The download directory verbose (bool, optional): If True, will output the download status. Default is True. """ self._get(download_dir, self._file_list, **kwargs)
def _construct_path(self, met_obj): """Constructs the FTP path for antime Args: met_obj (:class:`.time.Met`): The MET time object Returns: str: The path of the FTP directory for the time """ path = os.path.join(self._root, met_obj.datetime.strftime('%Y/%m/%d'), 'current') return path def _filter_tte(self, files): """Filters a list of TTE files for only the files that contain the desired time Args: files (list of str): The list of TTE files Returns: list of str: The filtered list of files """ id = self._met.ymd_h files = [f for f in files if id in f] return files
[docs]class HeasarcBrowse(): """A class that interfaces with the HEASARC Browse API. This can be called directly, but primarily intended as a base class. The class makes a query to HEASARC's w3query.pl perl script in BATCHRETRIEVALCATALOG mode. All fields and rows are retrieved so that this class, on instantiation, contains the full set of catalog data. Any queries based on row or columns selections/slices are then done locally, instead of making repeated requests to the HEASARC. Parameters: table (str, optional): The name of the table to be passed to the w3query.pl script. verbose (bool, optional): Default is True Attributes: columns (np.array): The names of the columns available in the table num_cols (int): The total number of columns (fields) in the data table num_rows: (int): The total number of rows in the data table """ def __init__(self, table=None, verbose=True): self._verbose = verbose host = 'https://heasarc.gsfc.nasa.gov' script = 'cgi-bin/W3Browse/w3query.pl' query = 'tablehead=name=BATCHRETRIEVALCATALOG_2.0+{}&Fields=All'.format( table) # have to add this because HEASARC changed the default behavior without # telling anyone query += '&ResultMax=0' if table is not None: self._is_connected(host) self._header, self._table = self._read_table( host + '/' + script + '?' + query) self._typedefs = self._auto_typedefs() @property def num_rows(self): return self._table.shape[0] @property def num_cols(self): return self._table.shape[1] @property def columns(self): return self._header def _is_connected(self, host): try: # connect to the host -- tells us if the host is actually # reachable socket.create_connection((host.split('/')[-1], 80)) return True except OSError: raise OSError("Either you are not connected to the internet or " "{0} is down.".format(host)) return False def _read_table(self, url): """Read the table from HEASARC Args: url (str): The URL including the query to the HEASARC perl script Returns: header (np.array): The column names of the table table (np.array): The complete data table, unformatted """ # secure connection context = ssl._create_unverified_context() page = urlopen(url, context=context) if self._verbose: print('Downloading Catalog from HEASARC via w3query.pl...') t0 = time.time() # get content, decode to ascii, and split into lines lines = page.read().decode('utf8').splitlines(False) if self._verbose: print('Finished in {} s'.format(int(time.time() - t0))) # now we have to do the following because HEASARC changed the behavior # of their public script without telling anyone lines = lines[1:-1] # table header header = np.array([col.strip() for col in lines[0].split('|')]) # the table data lines = lines[1:] lines = [line for line in lines if '|' in line] table = np.array( [item.strip() for line in lines for item in line.split('|')]) table = table.reshape(-1, header.size) # another undocumented and unannounced change to HEASARC browse: # they added an additional '|' delimiter at the beginning and end of # each line header = header[1:-1] table = table[:, 1:-1] # clean nulls from table table[(table == 'null') | (table == '')] = 'nan' return (header, table) def _auto_typedefs(self): """Auto-detect the datatype for each column of the table. The HEASARC tables are returned as strings, with no definition of datatypes, so we have to do a little work to guess what the proper types are. This usually works pretty well. Can be overridden in a derived class after the base class __init__ has been called. """ typedefs = [] # cycle through each column for i in range(self.num_cols): col = self._table[:, i] j = 0 while (True): # cycle to the first non-null entry if col[j] == 'nan': j += 1 continue # if an entry is a digit, set as integer if col[j].isdigit(): typedefs.append('int') else: # otherwise try applying float try: float(col[j]) typedefs.append('float') except: # if float fails, then must be a string, try datetime try: Met.from_iso(col[j]) typedefs.append('datetime') # all else fails, this is definitely a string except ValueError: typedefs.append('str') break return np.array(typedefs) def _apply_typedef(self, typedef, column): """Apply the type definition to a column of data. Args: typedef (str): The type definition column (np.array): A column of data Returns: np.array: The column of data converted to the requested type """ if typedef == 'int': try: newcol = column.astype('int') except: # nan doesn't work for ints, for now. Not the best solution... mask = (column == 'nan') newcol = np.copy(column) newcol[mask] = '-99999' newcol = newcol.astype('int', copy=False) elif typedef == 'float': newcol = column.astype('float') elif typedef == 'datetime': newcol = column # newcol = np.array([Met.from_iso(item).datetime for item in column]) else: newcol = column return newcol def _colname_to_idx(self, colname): """Convert a column name to the index into the table array Args: colname (str): The column name Returns: int: The index into the table array """ if colname not in self._header: raise ValueError('{} not a valid column name'.format(colname)) idx = np.where(self._header == colname)[0][0] return idx
[docs] def get_table(self, columns=None): """Return the table data as a record array with proper type conversions. Missing values are treated as type-converted ``np.nan``. Args: columns (list of str, optional): The columns to return. If omitted, returns all columns. Returns: np.recarray: A record array containing the requested data """ if columns is None: columns = self.columns idx = np.array([self._colname_to_idx(column) for column in columns]) data = [self._apply_typedef(self._typedefs[i], self._table[:, i]) for i in idx] table = np.rec.fromarrays(data, names=','.join(columns)) return table
[docs] def column_range(self, column): """Return the data range for a given column Args: column (str): The column name Returns: tuple: The (lo, hi) range of the data column """ idx = self._colname_to_idx(column) col = self._apply_typedef(self._typedefs[idx], self._table[:, idx]) col.sort() return (col[0], col[-1])
[docs] def slice(self, column, lo=None, hi=None): """Perform row slices of the data table based on a conditional of a single column Args: column (str): The column name lo (optional): The minimum (inclusive) value of the slice. If not set, uses the lowest range of the data in the column. hi (optional): The maximum (inclusive) value of the slice. If not set, uses the highest range of the data in the column. Returns: :class:`HeasarcBrowse`: Returns a new catalog with the sliced rows """ # have to apply the types and create a mask idx = self._colname_to_idx(column) col = self._apply_typedef(self._typedefs[idx], self._table[:, idx]) if lo is None: lo, _ = self.column_range(column) if hi is None: _, hi = self.column_range(column) mask = (col >= lo) & (col <= hi) # create a new object and fill it with the sliced data obj = HeasarcBrowse() obj._header = np.copy(self._header) obj._table = self._table[mask, :] obj._typedefs = np.copy(self._typedefs) return obj
[docs] def slices(self, columns): """Perform row slices of the data table based on a conditional of multiple columns Args: columns (list of tuples): A list of tuples, where each tuple is (column, lo, hi). The 'column' is the column name, 'lo' is the lowest bounding value, and 'hi' is the highest bouding value. If no low or high bounding is desired, set to None. See :meth:`slice()` for more info. Returns: :class:`HeasarcBrowse`: Returns a new catalog with the sliced rows. """ numcols = len(columns) obj = self for i in range(numcols): obj = obj.slice(columns[i][0], lo=columns[i][1], hi=columns[i][2]) return obj
[docs]class TriggerCatalog(HeasarcBrowse): """Class that interfaces with the GBM Trigger Catalog via HEASARC Browse. Note: Because this calls HEASARC's w3query.pl script on initialization, it may take several seconds for the object to load. Parameters: coord_units_deg (bool, optional): If True, converts the hms sexigesimal format output by HEASARC to decimal degree. Default is True. verbose (bool, optional): Default is True Attributes: columns (np.array): The names of the columns available in the table num_cols (int): The total number of columns (fields) in the data table num_rows: (int): The total number of rows in the data table """ def __init__(self, coord_units_deg=True, **kwargs): super().__init__(table='fermigtrig', **kwargs) # override detector mask typedef idx = self._colname_to_idx('detector_mask') self._typedefs[idx] = 'str' # heasarc only provides these coordinates in hms. if we want # decimal degrees, do the conversion and update the table and typedefs if coord_units_deg: idx1 = self._colname_to_idx('ra') idx2 = self._colname_to_idx('dec') coords = SkyCoord(self._table[:, idx1], self._table[:, idx2], unit=(astro_units.hourangle, astro_units.deg)) self._table[:, idx1] = coords.ra.degree.astype('str') self._table[:, idx2] = coords.dec.degree.astype('str') self._typedefs[idx1] = 'float' self._typedefs[idx2] = 'float'
[docs]class BurstCatalog(HeasarcBrowse): """Class that interfaces with the GBM Burst Catalog via HEASARC Browse. Note: Because this calls HEASARC's w3query.pl script on initialization, it may take several seconds up to a couple of minutes for the object to load. Parameters: coord_units_deg (bool, optional): If True, converts the hms sexigesimal format output by HEASARC to decimal degree. Default is True. verbose (bool, optional): Default is True Attributes: columns (np.array): The names of the columns available in the table num_cols (int): The total number of columns (fields) in the data table num_rows: (int): The total number of rows in the data table """ def __init__(self, coord_units_deg=True, **kwargs): super().__init__(table='fermigbrst', **kwargs) # override detector mask typedef idx = self._colname_to_idx('bcat_detector_mask') self._typedefs[idx] = 'str' # heasarc only provides these coordinates in hms. if we want # decimal degrees, do the conversion and update the table and typedefs if coord_units_deg: idx1 = self._colname_to_idx('ra') idx2 = self._colname_to_idx('dec') coords = SkyCoord(self._table[:, idx1], self._table[:, idx2], unit=(astro_units.hourangle, astro_units.deg)) self._table[:, idx1] = coords.ra.degree.astype('str') self._table[:, idx2] = coords.dec.degree.astype('str') self._typedefs[idx1] = 'float' self._typedefs[idx2] = 'float'