import numba as nb
import numpy as np
from numba.core import types
from numba.typed import Dict
from cosmogrb.utils.interpolation import Interp1D
from cosmogrb.utils.response_file import RSP
@nb.njit(fastmath=True, cache=False)
def _digitize(photon_energies, energy_edges, cum_matrix):
pha_channels = np.zeros(len(photon_energies))
for i in range(len(photon_energies)):
idx = np.searchsorted(energy_edges, photon_energies[i]) - 1
# get a uniform random number
r = np.random.random()
# get the pha channel from the cumulative distribution
pha = int(np.abs(cum_matrix[idx] - r).argmin())
pha_channels[i] = pha
return pha_channels
[docs]class Response(object):
def __init__(
self,
matrix,
geometric_area,
energy_edges,
channel_edges=None,
channel_starts_at=0,
):
self._matrix = matrix.astype('f8')
self._energy_edges = energy_edges.astype('f8')
self._energy_width = np.diff(self._energy_edges)
self._energy_mean = (
self._energy_edges[:-1] + self._energy_edges[1:]) / 2.0
self._emin = self._energy_edges[0]
self._emax = self._energy_edges[-1]
self._channel_edges = channel_edges.astype('f8')
self._channel_width = np.diff(self._channel_edges)
self._channel_mean = (
self._channel_edges[:-1] + self._channel_edges[1:]) / 2.0
self._channels = np.arange(len(self._channel_width), dtype=np.int64)
self._build_effective_area_curve()
self._geometric_area = geometric_area
self._construct_probabilities()
# def effective_area(self, energy):
# """
# The effective area of the detector
# in cm^2
# :returns:
# :rtype:
# """
# return self._effective_area(energy)
def _build_effective_area_curve(self):
# first compute the effective area curve
# by summing over the matrix and then create
# and interpolation for later
ea_curve = self._matrix.sum(axis=1)
# we just a numba jitclass to interpolate
# we want this passable to functions locally
self.effective_area = Interp1D(
self._energy_mean,
ea_curve,
self._energy_mean.min(),
self._energy_mean.max()
)
# However is very difficult to serialize
# this shit. So, some functions may just want
# to do the interpolation themselves. We pack
# these variables up for that
# self.effective_area_dict = Dict.empty(
# key_type=types.unicode_type,
# value_type=types.float64[:]
# )
# The typed-dict can be used from the interpreter.
# NOTE: you can't pickle these fucking things..
# self.effective_area_dict["mean_energy"] = self._energy_mean
# self.effective_area_dict["ea_curve"] = ea_curve
self.effective_area_packed = np.vstack((self._energy_mean, ea_curve))
idx = ea_curve[:-10].argmax()
self._max_energy = self._energy_mean[idx]
@property
def effective_area_max(self):
return self._max_energy
[docs] def get_photon_bin(self, energy):
return np.searchsorted(self._energy_edges, energy) - 1
def _construct_probabilities(self):
self._probability_matrix = self._matrix / self._geometric_area
# self._probability_matrix = self._matrix
# sum along the response to get the
# the total probability in each photon bin
self._total_probability_per_bin = self._probability_matrix.sum(axis=1)
# np.linalg.norm(self._probability_matrix, axis=1, keepdims=True)
non_zero_idx = self._total_probability_per_bin > 0
# needs to be non zero... fix later
self._normed_probability_matrix = self._probability_matrix
self._normed_probability_matrix[np.where(non_zero_idx)[0], :] = (
self._normed_probability_matrix[np.where(non_zero_idx)[0], :]
/ self._total_probability_per_bin[non_zero_idx, np.newaxis]
)
self._detection_probability = 1 - \
np.exp(-self._total_probability_per_bin)
self._cumulative_maxtrix = np.cumsum(
self._normed_probability_matrix, axis=1)
[docs] def digitize(self, photon_energies):
"""
digitze the photon into a energy bin
via the energy dispersion
:param photon_energy:
:returns: (pha_channel, detected)
:rtype:
"""
np.random.seed()
pha_channels = _digitize(
photon_energies, self._energy_edges, self._cumulative_maxtrix,
)
return pha_channels
@property
def energy_edges(self):
return self._energy_edges
@property
def emin(self):
return self._emin
@property
def emax(self):
return self._emax
@property
def channel_edges(self):
return self._channel_edges
@property
def channels(self):
return self._channels
@property
def matrix(self):
return self._matrix
@property
def geometric_area(self):
return self._geometric_area
[docs] def set_function(self, integral_function=None):
"""
Set the function to be used for the convolution
:param integral_function: a function f = f(e1,e2) which returns the integral of the model between e1 and e2
:type integral_function: callable
"""
self._integral_function = integral_function
[docs] def convolve(self, t1, t2):
true_fluxes = np.array(
[
self._integral_function(
self._energy_edges[i], self._energy_edges[i + 1], t1, t2
)
for i in range(len(self._energy_edges) - 1)
]
)
# Sometimes some channels have 0 lenths, or maybe they start at 0, where
# many functions (like a power law) are not defined. In the response these
# channels have usually a 0, but unfortunately for a computer
# inf * zero != zero. Thus, let's force this. We avoid checking this situation
# in details because this would have a HUGE hit on performances
idx = np.isfinite(true_fluxes)
true_fluxes[~idx] = 0
folded_counts = np.dot(true_fluxes, self._matrix)
return folded_counts
[docs] def to_fits(
self,
filename,
telescope_name="telescope",
instrument_name="detector",
overwrite=False,
):
"""
Write the current matrix into a OGIP FITS file
:param filename : the name of the FITS file to be created
:type filename : str
:param telescope_name : a name for the telescope/experiment which this matrix applies to
:param instrument_name : a name for the instrument which this matrix applies to
:param overwrite: True or False, whether to overwrite or not the output file
:return: None
"""
fits_file = RSP(
self._energy_edges,
self._channel_edges,
self.matrix.T, # we transpose teh matrix earlier for speed
telescope_name,
instrument_name,
)
fits_file.writeto(filename, clobber=overwrite)
__all__ = ["Response"]