Source code for cosmogrb.lightcurve.light_curve_storage

import collections
import logging

import coloredlogs
import matplotlib.pyplot as plt
import numba as nb
import numpy as np
import pandas as pd
from IPython.display import display

import cosmogrb.utils.logging
from cosmogrb.utils.plotting import channel_plot, step_plot

logger = logging.getLogger("cosmogrb.lightcurve_storage")


[docs]class LightCurveStorage(object): def __init__( self, name, tstart, tstop, time_adjustment, pha, times, pha_source, times_source, pha_background, times_background, channels, ebounds, T0, instrument, extra_info, ): """ Container class for light curve objects :param pha: :param times: :param pha_source: :param times_source: :param pha_background: :param times_background: :param channels: :param ebounds: :param T0: :returns: :rtype: """ self._name = name self._tstart = tstart self._tstop = tstop self._time_adjustment = time_adjustment self._pha = pha.astype(int) self._times = times self._n_counts = self._times.shape[0] self._pha_source = pha_source self._times_source = times_source self._n_counts_source = self._times_source.shape[0] self._pha_background = pha_background self._times_background = times_background self._n_counts_background = self._times_background.shape[0] self._channels = channels self._ebounds = ebounds self._T0 = T0 self._instrument = instrument self._extra_info = extra_info @property def name(self): return self._name @property def instrument(self): return self._instrument @property def tstart(self): return self._tstart @property def tstop(self): return self._tstop @property def n_counts(self): return self._n_counts @property def n_counts_source(self): return self._n_counts_source @property def n_counts_background(self): return self._n_counts_background @property def T0(self): return self._T0 @property def channels(self): return self._channels @property def ebounds(self): return self._ebounds @property def times(self): return self._times @property def pha(self): return self._pha @property def pha_source(self): return self._pha_source @property def times_source(self): return self._times_source @property def pha_background(self): return self._pha_background @property def times_background(self): return self._times_background @property def time_adjustment(self): return self._time_adjustment @property def extra_info(self): return self._extra_info def _select_channel(self, emin, emax, pha, original_idx=None): """ return the idx of events between certain channels :param emin: :param emax: :param pha: :param original_idx: :returns: :rtype: """ if original_idx is None: original_idx = np.ones_like(pha, dtype=bool) if emin is not None: chan = self._ebounds.searchsorted(emin) idx_lo = pha > chan original_idx = np.logical_and(original_idx, idx_lo) if emax is not None: chan = self._ebounds.searchsorted(emax) idx_hi = pha < chan original_idx = np.logical_and(original_idx, idx_hi) return original_idx def _select_time(self, tmin, tmax, times, original_idx=None): """ return the idx of event between certain times :param tmin: :param tmax: :param times: :param original_idx: :returns: :rtype: """ return select_time(tmin, tmax, times, original_idx)
[docs] def get_idx_over_interval(self, tmin, tmax): """ returns the selection over an interval of the full light curve :param tmin: :param tmax: :returns: :rtype: """ return self._select_time(tmin, tmax, self._times)
def _bin_lightcurve(self, dt, emin, emax, times, pha, tmin=None, tmax=None): """ bin the light curve for plotting :param dt: :param emin: :param emax: :param times: :param pha: :param tmin: :param tmax: :returns: :rtype: """ if tmin is None: tmin = times.min() if tmax is None: tmax = times.max() assert ( tmin < tmax ), f"the specified tmin and tmax are out of order ({tmin} > {tmax})" logger.debug(f"attempting to bin {len(times)} counts") bins = np.arange(tmin, tmax, dt) logger.debug( f"created {len(bins)} spanning {tmin} to {tmax} at a cadence of {dt}") idx = np.ones_like(times, dtype=bool) # filter channels if requested idx = self._select_channel(emin, emax, pha, idx) times = times[idx] # histogram the counts and convert to a rate counts, edges = np.histogram(times, bins=bins) rate = counts / dt xbins = np.vstack([edges[:-1], edges[1:]]).T return xbins, rate
[docs] def binned_counts(self, dt, emin, emax, tmin=None, tmax=None): """ get the time bins and counts for a given selection :param dt: :param emin: :param emax: :param times: :param pha: :param tmin: :param tmax: :returns: :rtype: """ bins, rate = self._bin_lightcurve( dt, emin, emax, self._times, self._pha, tmin, tmax ) counts = (rate * dt).astype(int) return bins, counts
def _display_lightcurve( self, times, pha, dt=1, tmin=None, tmax=None, emin=None, emax=None, ax=None, **kwargs, ): if ax is None: fig, ax = plt.subplots() else: fig = ax.get_figure() # do not try to plot one photon if len(times) <= 1: logger.debug("tried to plot a light curve with no photons") # logging.warn('there were no') return fig xbins, rate = self._bin_lightcurve( dt=dt, emin=emin, emax=emax, times=times, pha=pha, tmin=tmin, tmax=tmax ) logger.debug(f"there are {len(xbins)} bins") step_plot(xbins, rate, ax=ax, **kwargs) ax.set_xlabel("time") ax.set_ylabel("rate") return fig
[docs] def display_lightcurve( self, dt=1, tmin=None, tmax=None, emin=None, emax=None, ax=None, **kwargs ): """FIXME! briefly describe function :param dt: :param tmin: :param tmax: :param emin: :param emax: :param ax: :returns: :rtype: """ logger.debug("DISPLAY TOTAL LIGHTCURVE") fig = self._display_lightcurve( times=self._times, pha=self._pha, dt=dt, tmin=tmin, tmax=tmax, emin=emin, emax=emax, ax=ax, **kwargs, ) return fig
[docs] def display_background( self, dt=1, tmin=None, tmax=None, emin=None, emax=None, ax=None, **kwargs ): """ display the background light curve :param dt: :param tmin: :param tmax: :param emin: :param emax: :param ax: :returns: :rtype: """ logger.debug("DISPLAY BACKGROUND LIGHTCURVE") fig = self._display_lightcurve( times=self._times_background, pha=self._pha_background, dt=dt, tmin=tmin, tmax=tmax, emin=emin, emax=emax, ax=ax, **kwargs, ) return fig
[docs] def display_source( self, dt=1, tmin=None, tmax=None, emin=None, emax=None, ax=None, **kwargs ): """ display the source only light curve :param dt: :param tmin: :param tmax: :param emin: :param emax: :param ax: :returns: :rtype: """ logger.debug("DISPLAY SOURCE LIGHTCURVE") fig = self._display_lightcurve( times=self._times_source, pha=self._pha_source, dt=dt, tmin=tmin, tmax=tmax, emin=emin, emax=emax, ax=ax, **kwargs, ) return fig
def _bin_spectrum(self, tmin, tmax, times, pha): """ bin the spectrum into counts :param tmin: :param tmax: :param times: :param pha: :returns: :rtype: """ idx = np.ones_like(times, dtype=bool) # filter times idx = self._select_time(tmin, tmax, times, idx) # down select the pha pha = pha[idx] channels = np.append(self._channels, self._channels[-1] + 1) counts, _ = np.histogram(pha, bins=channels - 0.5) return counts def _display_count_spectrum( self, times, pha, tmin=None, tmax=None, ax=None, **kwargs ): """ generic count spectrum plotter :param times: :param pha: :param tmin: :param tmax: :param ax: :returns: :rtype: """ if ax is None: fig, ax = plt.subplots() else: fig = ax.get_figure() emin = self._ebounds[:-1] emax = self._ebounds[1:] if tmin is None: tmin = self._tstart if tmax is None: tmax = self._tstop counts = self._bin_spectrum(tmin, tmax, times, pha) # plot counts and background for the currently selected data channel_plot(ax, emin, emax, counts, **kwargs) return fig
[docs] def display_count_spectrum(self, tmin=None, tmax=None, ax=None, **kwargs): """ display the total count spectrum :param tmin: :param tmax: :param ax: :returns: :rtype: """ fig = self._display_count_spectrum( self._times, self._pha, tmin, tmax, ax, **kwargs ) return fig
[docs] def display_count_spectrum_source(self, tmin=None, tmax=None, ax=None, **kwargs): """ display the source count spectrum :param tmin: :param tmax: :param ax: :returns: :rtype: """ fig = self._display_count_spectrum( self._times_source, self._pha_source, tmin, tmax, ax, **kwargs ) return fig
[docs] def display_count_spectrum_background( self, tmin=None, tmax=None, ax=None, **kwargs ): """ display the background count spectrum :param tmin: :param tmax: :param ax: :returns: :rtype: """ fig = self._display_count_spectrum( self._times_background, self._pha_background, tmin, tmax, ax, **kwargs ) return fig
def __repr__(self): return self._output().to_string()
[docs] def info(self): self._output(as_display=True)
def _output(self, as_display=False): std_dict = collections.OrderedDict() std_dict["name"] = self._name std_dict["instrument"] = self._instrument std_dict["tstart"] = self._tstart std_dict["tstop"] = self._tstop std_dict["time adjustment"] = self._time_adjustment std_dict["T0"] = self._T0 std_dict["n_counts"] = self._n_counts std_dict["n_counts_source"] = self._n_counts_source std_dict["n_counts_background"] = self._n_counts_background if as_display: std_df = pd.Series(data=std_dict, index=std_dict.keys()) display(std_df.to_frame()) if self._extra_info: extra_df = pd.Series( data=self._extra_info, index=self._extra_info.keys() ) display(extra_df.to_frame()) else: if self._extra_info: for k, v in self._extra_info.items(): std_dict[k] = v return pd.Series(data=std_dict, index=std_dict.keys())
[docs]@nb.njit(fastmath=True, nogil=True, cache=True) def select_time(tmin, tmax, times, original_idx=None): N = times.shape[0] if original_idx is None: original_idx = np.zeros(N, dtype=nb.bool_) if tmin is None: tmin = times[0] m = np.searchsorted(times, tmin) for n in range(m, N): if (tmax is not None) and (times[n] < tmax): original_idx[n] = 1 else: break return original_idx