# binned.py: Module containing data binning functions for pre-binned data
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import numpy as np
import warnings
[docs]def combine_by_factor(counts, exposure, old_edges, bin_factor):
"""Rebins binned data to a multiple factor
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
bin_factor (int): The number of consecutive bins to be combined
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
assert bin_factor >= 1, "bin_factor must be a positive integer"
bin_factor = int(bin_factor)
new_edges = old_edges[::bin_factor]
# make sure the number of bins is a multiple of the bin_factor
mod = counts.shape[0] % bin_factor
# if the number of bins is not a multiple of the bin_factor,
# then remove the modulo bins
if mod != 0:
counts = counts[:-mod]
exposure = exposure[:-mod]
# reshape and combine counts and exposure
new_counts = np.sum(counts.reshape(-1, bin_factor), axis=1)
new_exposure = np.sum(exposure.reshape(-1, bin_factor), axis=1)
return new_counts, new_exposure, new_edges
[docs]def rebin_by_time(counts, exposure, old_edges, dt):
"""Rebins binned data to a specified temporal bin width.
If the requested bin width is smaller than some of the original bin widths,
those bins will be left as is.
If the requested bin width is an exact factor of all the current bin widths,
the resulting bin width will be exactly as requested. If the requested bin
width is not an exact factor, then the resulting bin width will be
approximately the requested bin width without exceeding the requested bin
width.
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
dt (float): The requested temporal bin width in seconds
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
assert dt > 0.0, "Requested bin width must be > 0.0 s"
num_old_bins = counts.size
dts = old_edges[1:] - old_edges[0:-1]
# if the the requested bin width is a factor of the current bin width for all bins,
# call combine_by_factor. this is an easier task
warnings.filterwarnings("ignore", category=RuntimeWarning)
if np.sum((dt % dts) == 0.0) == num_old_bins:
return combine_by_factor(counts, exposure, old_edges, int(dt / dts[0]))
# print('Temporal factor for rebinning is not a perfect multiple of all bin widths.')
# print('Bin widths will be approximate to and will not exceed the temporal factor.')
# cycle through current bins and add up bins until we have approximately reached,
# but not exceeded, the requested binwidth
new_edges = [0]
istart = 0
while True:
dtscum = np.cumsum(dts[istart:])
iend = istart + np.sum(dtscum <= dt)
if iend < istart:
iend = istart
new_edges.append(iend)
if iend >= num_old_bins - 1:
break
istart = iend + 1
# create the new rebinned arrays
new_edges = np.array(new_edges)
bounds_idx = np.array((new_edges[0:-1], new_edges[1:])).T
new_exposure = np.array(
[np.sum(exposure[i[0]:i[1] + 1]) for i in bounds_idx])
new_counts = np.array([np.sum(counts[i[0]:i[1] + 1]) for i in bounds_idx])
new_edges = old_edges[new_edges]
return new_counts, new_exposure, new_edges
[docs]def combine_into_one(counts, exposure, old_edges):
"""Combines binned data into a single bin
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
new_counts = np.array([np.sum(counts)])
new_exposure = np.array([np.sum(exposure)])
new_edges = old_edges[[0, -1]]
return new_counts, new_exposure, new_edges
[docs]def rebin_by_snr(counts, exposure, old_edges, background_counts, snr):
"""Rebins binned data such that each bin is above a minimum signal-to-noise ratio
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
background_counts (np.array): The background counts in each bin
snr (float): The minimum signal-to-ratio threshold
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
num_old_bins = counts.size
# make sure there is a non-zero background
mask = (background_counts > 0.0)
if np.sum(mask) == 0:
raise ValueError(
'Background counts are all non-positive. Cannot bin by SNR.')
# cycle through current bins and combine bins until we have exceeded the requested snr
new_edges = [0]
istart = 0
while True:
countscum = np.cumsum(counts[istart:])
backgroundscum = np.cumsum(background_counts[istart:])
snrcum = np.cumsum(
(countscum - backgroundscum) / np.sqrt(backgroundscum))
iend = istart + np.sum(snrcum < snr) + 1
if iend < istart:
iend = istart
new_edges.append(iend)
if (iend >= num_old_bins - 1):
break
istart = iend
if len(old_edges) - 1 not in new_edges:
new_edges.append(len(old_edges) - 1)
# create the new rebinned arrays
new_edges = np.array(new_edges)
numbins = len(new_edges) - 1
new_counts = [np.sum(counts[new_edges[i]:new_edges[i + 1]]) for i in
range(numbins)]
new_exposure = [np.sum(exposure[new_edges[i]:new_edges[i + 1]]) for i in
range(numbins)]
new_edges = old_edges[new_edges]
return np.array(new_counts), np.array(new_exposure), new_edges
[docs]def rebin_by_edge_index(counts, exposure, old_edges, new_edge_index):
"""Rebins binned data based on an array of bin edge indices
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
new_edge_index (np.array): The edge indices for the new binned data
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
# new edges
num_bins = new_edge_index.size - 1
nei = new_edge_index.astype(dtype=int)
new_edges = old_edges[nei]
# combine the counts and exposure
new_exposure = np.zeros(num_bins, dtype=float)
new_counts = np.zeros(num_bins, dtype=float)
for i in range(num_bins):
start_idx = new_edge_index[i]
end_idx = new_edge_index[i + 1]
new_counts[i] = np.sum(counts[start_idx:end_idx])
new_exposure[i] = np.sum(exposure[start_idx:end_idx])
return new_counts, new_exposure, new_edges
[docs]def rebin_by_edge(counts, exposure, old_edges, new_edges):
"""Rebins binned data based on an array of bin edge indices
Args:
counts (np.array): The counts in each bin
exposure (np.array): The exposure of each bin
old_edges (np.array): The time edges of each bin
new_edges (np.array): The new edges of the binned data
Returns:
(np.array, np.array, np.array): The counts and exposure in each bin \
and the new bin edges
"""
# new edges
num_bins = new_edges.size - 1
# combine the counts and exposure
old_edges_list = old_edges.tolist()
new_exposure = np.zeros(num_bins, dtype=float)
new_counts = np.zeros(num_bins, dtype=float)
for i in range(num_bins):
start_idx = old_edges_list.index(new_edges[i])
end_idx = old_edges_list.index(new_edges[i + 1])
new_counts[i] = np.sum(counts[start_idx:end_idx])
new_exposure[i] = np.sum(exposure[start_idx:end_idx])
return new_counts, new_exposure, new_edges