Source code for cosmogrb.universe.survey

import popsynth
import h5py
import numpy as np
import pandas as pd
from IPython.display import display
import collections
import warnings
from natsort import natsorted

from cosmogrb.io.grb_save import GRBSave
from cosmogrb.io.detector_save import DetectorSave
from cosmogrb.grb.grb_detector import GRBDetector
from cosmogrb.utils.file_utils import file_existing_and_readable


[docs]class Observation(object): def __init__( self, grb_save_file, grb_detector_file=None, population=None, idx=None ): """ A small container class to access observations :param grb_save_file: :param grb_detector_file: :param population: :param idx: :returns: :rtype: """ self._grb = grb_save_file self._detector = grb_detector_file @property def grb(self): return GRBSave.from_file(self._grb) @property def detector_info(self): if self._detector is None: return None else: return DetectorSave.from_file(self._detector)
[docs]class Survey(collections.OrderedDict): def __init__(self, grb_save_files, population_file, grb_detector_files=None): """ A container for a survey of observed GRBs. Holds file locations for all the GRBs created in the Universe. It also allows you to process the observations with a GRBDetector class. :param grb_save_files: the file locations for the survey :param population_file: the population file used to generate the population :param grb_detector_files: the generated detector files :returns: :rtype: """ super(Survey, self).__init__() self._n_grbs = len(grb_save_files) self._grb_save_files = grb_save_files self._names = [] # build a population from the file if file_existing_and_readable(population_file): self._population_file = population_file self._population = popsynth.Population.from_file(self._population_file) else: self._population_file = None self._population = None warnings.warn(f"{population_file} does not exist. Perhaps you moved it?") for f in self._grb_save_files: with h5py.File(f, "r") as f: self._names.append(f.attrs["grb_name"]) # we start off with not being processed unless # we find that there are some detector files self._is_processed = False self._detected = np.zeros(len(grb_save_files), dtype=bool) self._grb_detector_files = None # lets see if we have detector files if grb_detector_files is not None: self._is_processed = True self._grb_detector_files = natsorted(grb_detector_files) assert len(grb_detector_files) == len(grb_save_files) # fill in the detected ones for i, f in enumerate(self._grb_detector_files): tmp = DetectorSave.from_file(f) if tmp.is_detected: self._detected[i] = True # now fill the dict for name, grb_save_file, grb_detector_file in zip( self._names, self._grb_save_files, self._grb_detector_files ): self[name] = Observation( grb_save_file=grb_save_file, grb_detector_file=grb_detector_file ) else: for name, grb_save_file in zip(self._names, self._grb_save_files): self[name] = Observation( grb_save_file=grb_save_file, grb_detector_file=None ) @property def population(self): return self._population @property def n_detected(self): return self._detected.sum() @property def n_grbs(self): return self._n_grbs
[docs] def info(self): """ display the information about the survey :returns: :rtype: """ generic_info = collections.OrderedDict() generic_info["n_grbs"] = self._n_grbs generic_info["is_processed"] = self._is_processed if self._is_processed: generic_info["n_detected"] = self.n_detected df = pd.Series(data=generic_info, index=generic_info.keys()) display(df.to_frame())
[docs] def process(self, detector_type, client=None, serial=False, **kwargs): """ Process the triggers or detectors in the survey. This runs the provided GRBDetector type on each of the GRBs and prepares the information :param detector_type: a **class** of GRBDetector type :param client: the dask client :param serial: True/False for if the survey is processed without dask :returns: :rtype: """ assert issubclass(detector_type, GRBDetector), "Not a valid GRB detector" if not serial: assert ( client is not None ), "One must provide a client to process in parallel" args = [] for grb_file in self._grb_save_files: args.append([grb_file, detector_type, kwargs]) futures = client.map(_submit, args) client.gather(futures) else: for grb_file in self._grb_save_files: _submit([grb_file, detector_type, kwargs]) # the survey has now had its triggers run # so lets flip its status and make sure that when # when we save it, we record the new status self._is_processed = True self._grb_detector_files = [] for file_name in self._grb_save_files: file_name_head = ".".join(file_name.split(".")[:-1]) out_file_name = f"{file_name_head}_detection_info.h5" self._grb_detector_files.append(out_file_name)
@property def is_processed(self): return self._is_processed
[docs] def write(self, file_name): """ write the info to a file. if the universe has been processed, this information is also written :param file_name: :returns: :rtype: """ dt = h5py.string_dtype(encoding="utf-8") with h5py.File(file_name, "w") as f: f.attrs["n_grbs"] = self._n_grbs f.attrs["is_processed"] = self._is_processed f.attrs["population_file"] = self._population_file grbs = f.create_dataset( "grb_saves", data=np.array(self._grb_save_files, dtype=dt) ) if self._is_processed: grb_dets = f.create_dataset( "grb_dets", data=np.array(self._grb_detector_files, dtype=dt) )
[docs] @classmethod def from_file(cls, file_name): """ create a universe :param cls: :param file_name: :returns: :rtype: """ with h5py.File(file_name, "r") as f: n_grbs = f.attrs["n_grbs"] is_processed = f.attrs["is_processed"] population_file = f.attrs["population_file"] grb_files = f["grb_saves"][()].astype(str) grb_dets = None if is_processed: grb_dets = f["grb_dets"][()].astype(str) return cls(grb_files, population_file, grb_dets)
def _submit(args): grb_file, detector_type, kwargs = args processor = detector_type(grb_save_file_name=grb_file, **kwargs) processor.process() processor.save()