diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 37039df..2056bc0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,10 +35,6 @@ jobs: strategy: matrix: include: - # linux builds - - python-version: "3.8" - os: ubuntu-latest - - python-version: "3.9" os: ubuntu-latest @@ -49,13 +45,6 @@ jobs: os: ubuntu-latest extra-args: ['codecov'] - # macos builds - - python-version: "3.10" - os: macos-latest - - - python-version: "3.11" - os: macos-latest - runs-on: ${{ matrix.os }} steps: @@ -101,22 +90,22 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.9" - name: Install doc dependencies run: | pip install -e .[doc] git describe --tags - python -c 'import template; print(template.__version__)' + python -c 'import ctapipe_io_zfits ; print(ctapipe_io_zfits.__version__)' - name: Build docs - run: make html SPHINXOPTS="-W --keep-going -n --color -j auto" + run: make -C docs html SPHINXOPTS="-W --keep-going --color -j auto" - name: Deploy to github pages # only run on push to master if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} uses: JamesIves/github-pages-deploy-action@v4 with: - folder: build/html + folder: docs/build/html clean: true single_commit: true diff --git a/MANIFEST.in b/MANIFEST.in index 14fa977..2685c29 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ prune src/ctapipe_io_zfits/_dev_version prune .github +include src/ctapipe_io_zfits/resources/* diff --git a/docs/conf.py b/docs/conf.py index 7a7f56d..e27637b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,13 +33,21 @@ # intersphinx allows referencing other packages sphinx docs intersphinx_mapping = { "python": ("https://docs.python.org/3.9", None), + "traitlets": ("https://traitlets.readthedocs.io/en/stable/", None), + "ctapipe": ("https://ctapipe.readthedocs.io/en/v0.20.0/", None), } # -- Options for HTML output ------------------------------------------------- html_theme = "pydata_sphinx_theme" +html_theme_options = { + "navigation_with_keys": False, +} # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ["_static"] html_logo = "_static/cta.png" + +# fixes "no sub file found" for members inherited from traitlets +numpydoc_class_members_toctree = False diff --git a/pyproject.toml b/pyproject.toml index 639ded9..f2ab531 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ requires-python = ">=3.9" dependencies = [ "numpy", "protozfits", - "ctapipe~=0.19.3", + "ctapipe~=0.20.0", ] [project.optional-dependencies] @@ -44,6 +44,9 @@ all = [ "ctapipe_io_zfits[test,doc,dev]", ] +[project.entry-points.ctapipe_io] +ProtozfitsDL0EventSource = "ctapipe_io_zfits.dl0:ProtozfitsDL0EventSource" + [project.urls] repository = "https://github.com/cta-observatory/ctapipe_io_zfits" @@ -52,5 +55,8 @@ repository = "https://github.com/cta-observatory/ctapipe_io_zfits" where = ["src"] exclude = ["ctapipe_io_zfits._dev_version"] +[tool.setuptools.package-data] +ctapipe_io_zfits = ["resources/*"] + [tool.setuptools_scm] write_to = 'src/ctapipe_io_zfits/_version.py' diff --git a/src/ctapipe_io_zfits/__init__.py b/src/ctapipe_io_zfits/__init__.py index e5ea576..7db5af6 100644 --- a/src/ctapipe_io_zfits/__init__.py +++ b/src/ctapipe_io_zfits/__init__.py @@ -1,8 +1,11 @@ """ EventSource implementations for protozfits files """ +from .dl0 import ProtozfitsDL0EventSource, ProtozfitsDL0TelescopeEventSource from .version import __version__ __all__ = [ "__version__", + "ProtozfitsDL0EventSource", + "ProtozfitsDL0TelescopeEventSource", ] diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py new file mode 100644 index 0000000..203f76c --- /dev/null +++ b/src/ctapipe_io_zfits/conftest.py @@ -0,0 +1,222 @@ +from contextlib import ExitStack +from datetime import timedelta, timezone +from zoneinfo import ZoneInfo + +import astropy.units as u +import numpy as np +import pytest +from astropy.time import Time +from ctapipe.containers import EventType +from protozfits import CTA_DL0_Subarray_pb2 as DL0_Subarray +from protozfits import CTA_DL0_Telescope_pb2 as DL0_Telescope +from protozfits import ProtobufZOFits +from protozfits.CoreMessages_pb2 import AnyArray + +from ctapipe_io_zfits.time import time_to_cta_high_res + +ANY_ARRAY_TYPE_TO_NUMPY_TYPE = { + AnyArray.S8: np.int8, + AnyArray.U8: np.uint8, + AnyArray.S16: np.int16, + AnyArray.U16: np.uint16, + AnyArray.S32: np.int32, + AnyArray.U32: np.uint32, + AnyArray.S64: np.int64, + AnyArray.U64: np.uint64, + AnyArray.FLOAT: np.float32, + AnyArray.DOUBLE: np.float64, +} + +DTYPE_TO_ANYARRAY_TYPE = {v: k for k, v in ANY_ARRAY_TYPE_TO_NUMPY_TYPE.items()} + +acada_user = "ctao-acada-n" +obs_start = Time("2023-08-02T02:15:31") +timezone_cta_n = ZoneInfo("Atlantic/Canary") + + +def to_anyarray(array): + type_ = DTYPE_TO_ANYARRAY_TYPE[array.dtype.type] + return AnyArray(type=type_, data=array.tobytes()) + + +def evening_of_obs(time, tz): + dt = time.to_datetime(timezone.utc).astimezone(tz) + if dt.hour < 12: + return (dt - timedelta(days=1)).date() + + return dt.date() + + +@pytest.fixture(scope="session") +def acada_base(tmp_path_factory): + return tmp_path_factory.mktemp("acada_base_") + + +@pytest.fixture(scope="session") +def dl0_base(acada_base): + """DL0 Directory Structure. + + See Table 6 of the ACADA-DPPS ICD. + """ + dl0 = acada_base / "DL0" + dl0.mkdir(exist_ok=True) + + lst_base = dl0 / "TEL001" / acada_user / "acada-adh" + lst_events = lst_base / "events" + lst_monitoring = lst_base / "monitoring" + array_triggers = dl0 / "array" / acada_user / "acada-adh" / "triggers" + array_monitoring = dl0 / "array" / acada_user / "acada-adh" / "monitoring" + + evening = evening_of_obs(obs_start, timezone_cta_n) + + for directory in (lst_events, lst_monitoring, array_triggers, array_monitoring): + date_path = directory / f"{evening:%Y/%m/%d}" + date_path.mkdir(exist_ok=True, parents=True) + + return dl0 + + +@pytest.fixture(scope="session") +def dummy_dl0(dl0_base): + trigger_dir = dl0_base / "array" / acada_user / "acada-adh/triggers/2023/08/01/" + lst_event_dir = dl0_base / "TEL001" / acada_user / "acada-adh/events/2023/08/01/" + subarray_id = 1 + sb_id = 123 + obs_id = 456 + sb_creator_id = 1 + sdh_ids = (1, 2, 3, 4) + + obs_start_path_string = f"{obs_start.to_datetime(timezone.utc):%Y%m%dT%H%M%S}" + filename = f"SUB{subarray_id:03d}_SWAT001_{obs_start_path_string}_SBID{sb_id:019d}_OBSID{obs_id:019d}_SUBARRAY_CHUNK000.fits.fz" # noqa + # sdh_id and chunk_id will be filled later -> double {{}} + lst_event_pattern = f"TEL001_SDH{{sdh_id:03d}}_{obs_start_path_string}_SBID{sb_id:019d}_OBSID{obs_id:019d}_TEL_SHOWER_CHUNK{{chunk_id:03d}}.fits.fz" # noqa + trigger_path = trigger_dir / filename + + # subarray_data_stream = DL0_Subarray.DataStream( + # subarray_id=subarray_id, + # sb_id=sb_id, + # obs_id=obs_id, + # producer_id=1 # FIXME: what is correct here?, + # sb_creator_id=sb_creator_id, + # ) + + lst_data_stream = DL0_Telescope.DataStream( + tel_id=1, + sb_id=sb_id, + obs_id=obs_id, + waveform_scale=80.0, + waveform_offset=5.0, + sb_creator_id=sb_creator_id, + ) + camera_configuration = DL0_Telescope.CameraConfiguration( + tel_id=1, + local_run_id=789, + config_time_s=obs_start.unix, + camera_config_id=47, + pixel_id_map=to_anyarray(np.arange(1855)), + module_id_map=to_anyarray(np.arange(265)), + num_pixels=1855, + num_channels=2, + num_samples_nominal=40, + num_samples_long=0, + num_modules=265, + sampling_frequncy=1024, + ) + + time = obs_start + + ctx = ExitStack() + proto_kwargs = dict( + n_tiles=5, rows_per_tile=20, compression_block_size_kb=64 * 1024 + ) + + chunksize = 10 + events_written = {sdh_id: 0 for sdh_id in sdh_ids} + current_chunk = {sdh_id: -1 for sdh_id in sdh_ids} + lst_event_files = {} + + def open_next_event_file(sdh_id): + if sdh_id in lst_event_files: + lst_event_files[sdh_id].close() + + current_chunk[sdh_id] += 1 + chunk_id = current_chunk[sdh_id] + path = lst_event_dir / lst_event_pattern.format( + sdh_id=sdh_id, chunk_id=chunk_id + ) + f = ctx.enter_context(ProtobufZOFits(**proto_kwargs)) + f.open(str(path)) + f.move_to_new_table("DataStream") + f.write_message(lst_data_stream) + f.move_to_new_table("CameraConfiguration") + f.write_message(camera_configuration) + f.move_to_new_table("Events") + lst_event_files[sdh_id] = f + events_written[sdh_id] = 0 + + def convert_waveform(waveform): + scale = lst_data_stream.waveform_scale + offset = lst_data_stream.waveform_offset + return ((waveform + offset) * scale).astype(np.uint16) + + with ctx: + trigger_file = ctx.enter_context(ProtobufZOFits(**proto_kwargs)) + trigger_file.open(str(trigger_path)) + # trigger_file.move_to_new_table("DataStream") + # trigger_file.write_message(subarray_data_stream) + trigger_file.move_to_new_table("SubarrayEvents") + + for sdh_id in sdh_ids: + open_next_event_file(sdh_id) + + for i in range(100): + event_id = i + 1 + time_s, time_qns = time_to_cta_high_res(time) + + trigger_file.write_message( + DL0_Subarray.Event( + event_id=event_id, + trigger_type=1, + sb_id=sb_id, + obs_id=obs_id, + event_time_s=int(time_s), + event_time_qns=int(time_qns), + trigger_ids=to_anyarray(np.array([event_id])), + tel_ids=to_anyarray(np.array([1])), + ) + ) + + sdh_id = sdh_ids[i % len(sdh_ids)] + # TODO: randomize event to test actually parsing it + + # TODO: fill actual signal into waveform, not just 0 + waveform = np.zeros((1, 1855, 40), dtype=np.float32) + + lst_event_files[sdh_id].write_message( + DL0_Telescope.Event( + event_id=event_id, + tel_id=camera_configuration.tel_id, + event_type=EventType.SUBARRAY.value, + event_time_s=int(time_s), + event_time_qns=int(time_qns), + # identified as signal, low gain stored, high gain stored + pixel_status=to_anyarray(np.full(1855, 0b00001101, dtype=np.uint8)), + waveform=to_anyarray(convert_waveform(waveform)), + num_channels=1, + num_samples=40, + num_pixels_survived=1855, + ) + ) + events_written[sdh_id] += 1 + if events_written[sdh_id] >= chunksize: + open_next_event_file(sdh_id) + + time = time + 0.001 * u.s + + return trigger_path + + +@pytest.fixture(scope="session") +def dummy_tel_file(dummy_dl0, dl0_base): + name = "TEL001_SDH001_20230802T021531_SBID0000000000000000123_OBSID0000000000000000456_TEL_SHOWER_CHUNK000.fits.fz" # noqa + return dl0_base / "TEL001/ctao-acada-n/acada-adh/events/2023/08/01/" / name diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py new file mode 100644 index 0000000..3549d2d --- /dev/null +++ b/src/ctapipe_io_zfits/dl0.py @@ -0,0 +1,404 @@ +""" +DL0 Protozfits EventSource +""" +import logging +from contextlib import ExitStack +from typing import Dict, Tuple + +import numpy as np +from ctapipe.containers import ( + ArrayEventContainer, + DL0CameraContainer, + EventIndexContainer, + EventType, + ObservationBlockContainer, + PixelStatus, + SchedulingBlockContainer, + TelescopeTriggerContainer, + TriggerContainer, +) +from ctapipe.core.traits import Integer +from ctapipe.instrument import SubarrayDescription +from ctapipe.io import DataLevel, EventSource +from ctapipe.io.simteleventsource import GainChannel +from protozfits import File + +from .instrument import build_subarray_description, get_array_elements_by_id +from .multifile import MultiFiles +from .time import cta_high_res_to_time + +__all__ = [ + "ProtozfitsDL0EventSource", + "ProtozfitsDL0TelescopeEventSource", +] + +log = logging.getLogger(__name__) + +ARRAY_ELEMENTS = get_array_elements_by_id() + + +def _is_compatible(input_url, extname, allowed_protos): + from astropy.io import fits + + # this allows us to just use try/except around the opening of the fits file, + stack = ExitStack() + + with stack: + try: + hdul = stack.enter_context(fits.open(input_url)) + except Exception as e: + log.debug(f"Error trying to open input file as fits: {e}") + return False + + if extname not in hdul: + log.debug("FITS file does not contain '%s' HDU", extname) + return False + + header = hdul[extname].header + + if header["XTENSION"] != "BINTABLE": + log.debug("%s HDU is not a bintable", extname) + return False + + if not header.get("ZTABLE", False): + log.debug("ZTABLE is not in header or False") + return False + + proto_class = header.get("PBFHEAD") + if proto_class is None: + log.debug("Missing PBFHEAD key") + return False + + if proto_class not in allowed_protos: + log.debug(f"Unsupported PBFHEAD: {proto_class} not in {allowed_protos}") + return False + + return True + + +def _fill_dl0_container( + tel_event, + data_stream, + camera_config, + camera_geometry, + ignore_samples_start=0, + ignore_samples_end=0, +): + n_channels = tel_event.num_channels + n_pixels_stored = tel_event.num_pixels_survived + n_samples = tel_event.num_samples + shape = (n_channels, n_pixels_stored, n_samples) + waveform = tel_event.waveform.reshape(shape) + offset = data_stream.waveform_offset + scale = data_stream.waveform_scale + + zfits_waveform = waveform.astype(np.float32) / scale - offset + + pixel_status = tel_event.pixel_status + # FIXME: seems ACADA doesn't set pixels to "stored" when no DVR is applied + if n_pixels_stored == camera_config.num_pixels and np.all( + PixelStatus.get_dvr_status(pixel_status) == 0 + ): + pixel_status = pixel_status | PixelStatus.DVR_STORED_AS_SIGNAL + + pixel_stored = PixelStatus.get_dvr_status(pixel_status) != 0 + n_pixels_nominal = camera_geometry.n_pixels + + # fill not readout pixels with 0, reorder pixels, use 2d array when gain reduced + if n_channels == 2: + waveform = np.zeros((n_channels, n_pixels_nominal, n_samples), dtype=np.float32) + waveform[:, camera_config.pixel_id_map[pixel_stored]] = zfits_waveform + else: + waveform = np.zeros((n_pixels_nominal, n_samples), dtype=np.float32) + waveform[camera_config.pixel_id_map[pixel_stored]] = zfits_waveform[0] + + if ignore_samples_start != 0 or ignore_samples_end != 0: + start = ignore_samples_start + end = n_samples - ignore_samples_end + waveform = waveform[..., start:end] + + # reorder to nominal pixel order + pixel_status = np.zeros(n_pixels_nominal, dtype=tel_event.pixel_status.dtype) + pixel_status[camera_config.pixel_id_map] = pixel_status + + channel_info = PixelStatus.get_channel_info(pixel_status) + if n_channels == 1: + selected_gain_channel = np.where( + channel_info == PixelStatus.HIGH_GAIN_STORED, + GainChannel.HIGH, + GainChannel.LOW, + ) + else: + selected_gain_channel = None + + return DL0CameraContainer( + pixel_status=tel_event.pixel_status, + event_type=EventType(tel_event.event_type), + selected_gain_channel=selected_gain_channel, + event_time=cta_high_res_to_time( + tel_event.event_time_s, + tel_event.event_time_qns, + ), + waveform=waveform, + first_cell_id=tel_event.first_cell_id, + ) + + +class ProtozfitsDL0EventSource(EventSource): + """ + DL0 Protozfits EventSource. + + The ``input_url`` must be the subarray trigger file, the source + will then look for the other data files according to the filename and + directory schema layed out in the draft of the ACADA - DPPS ICD. + """ + + subarray_id = Integer(default_value=1).tag(config=True) + + def __init__(self, input_url=None, **kwargs): + if input_url is not None: + kwargs["input_url"] = input_url + + super().__init__(**kwargs) + # we will open a lot of files, this helps keeping it clean + self._exit_stack = ExitStack() + self._subarray_trigger_file = self._exit_stack.enter_context( + File(str(self.input_url)) + ) + self._subarray_trigger_stream = None + if hasattr(self._subarray_trigger_file, "DataStream"): + self._subarray_trigger_stream = self._subarray_trigger_file.DataStream[0] + self.sb_id = self._subarray_trigger_stream.sb_id + self.obs_id = self._subarray_trigger_stream.obs_id + self.subarray_id = self._subarray_trigger_stream.subarray_id + else: + first_event = self._subarray_trigger_file.SubarrayEvents[0] + self.sb_id = first_event.sb_id + self.obs_id = first_event.obs_id + + self._subarray = build_subarray_description(self.subarray_id) + + self._observation_blocks = { + self.obs_id: ObservationBlockContainer( + obs_id=np.uint64(self.obs_id), sb_id=np.uint64(self.sb_id) + ) + } + self._scheduling_blocks = { + self.sb_id: SchedulingBlockContainer(sb_id=np.uint64(self.sb_id)) + } + + # /DL0///acada-adh/events///
/ + self._dl0_base = self.input_url.parents[7] + self._acada_user = self.input_url.parents[5].name + self._date_dirs = self.input_url.parent.relative_to(self.input_url.parents[3]) + + self._open_telescope_files() + + def _get_tel_events_directory(self, tel_id): + return ( + self._dl0_base + / f"TEL{tel_id:03d}" + / self._acada_user + / "acada-adh/events" + / self._date_dirs + ) + + @classmethod + def is_compatible(cls, input_url): + return _is_compatible( + input_url, + extname="SubarrayEvents", + allowed_protos={"DL0v1.Subarray.Event"}, + ) + + def _open_telescope_files(self): + self._telescope_files = {} + for tel_id in self.subarray.tel: + # get the directory, where we should look for files + tel_dir = self._get_tel_events_directory(tel_id) + + try: + first_file = sorted( + tel_dir.glob(f"*_SBID*{self.sb_id}_OBSID*{self.obs_id}*.fits.fz") + )[0] + except IndexError: + self.log.warning("No events file found for tel_id %d", tel_id) + continue + + self._telescope_files[tel_id] = self._exit_stack.enter_context( + MultiFiles(first_file, parent=self) + ) + + def close(self): + self._exit_stack.__exit__(None, None, None) + + def __exit__(self, exc_type, exc_value, traceback): + self._exit_stack.__exit__(exc_type, exc_value, traceback) + + @property + def is_simulation(self) -> bool: + return False + + @property + def datalevels(self) -> Tuple[DataLevel]: + return (DataLevel.DL0,) + + @property + def subarray(self) -> SubarrayDescription: + return self._subarray + + @property + def observation_blocks(self) -> Dict[int, ObservationBlockContainer]: + return self._observation_blocks + + @property + def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]: + return self._scheduling_blocks + + def _generator(self): + for count, subarray_trigger in enumerate( + self._subarray_trigger_file.SubarrayEvents + ): + array_event = ArrayEventContainer( + count=count, + index=EventIndexContainer( + obs_id=subarray_trigger.obs_id, event_id=subarray_trigger.event_id + ), + trigger=TriggerContainer( + time=cta_high_res_to_time( + subarray_trigger.event_time_s, subarray_trigger.event_time_qns + ), + tels_with_trigger=subarray_trigger.tel_ids.tolist(), + ), + ) + + for tel_id in array_event.trigger.tels_with_trigger: + tel_file = self._telescope_files[tel_id] + tel_event = next(tel_file) + if tel_event.event_id != array_event.index.event_id: + raise ValueError( + f"Telescope event for tel_id {tel_id} has different event id!" + f" event_id of subarray event: {array_event.index.event_id}" + f" event_id of telescope event: {tel_event.event_id}" + ) + + array_event.dl0.tel[tel_id] = _fill_dl0_container( + tel_event, + tel_file.data_stream, + tel_file.camera_config, + self.subarray.tel[tel_id].camera.geometry, + ) + + yield array_event + + +class ProtozfitsDL0TelescopeEventSource(EventSource): + """ + DL0 Protozfits Telescope EventSource. + + The ``input_url`` is one of the telescope events files. + """ + + subarray_id = Integer(default_value=1).tag(config=True) + ignore_samples_start = Integer(default_value=0).tag(config=True) + ignore_samples_end = Integer(default_value=0).tag(config=True) + + @classmethod + def is_compatible(cls, input_url): + return _is_compatible( + input_url, + extname="Events", + allowed_protos={"DL0v1.Telescope.Event"}, + ) + + def __init__(self, input_url=None, **kwargs): + # this enables passing input_url as posarg, kwarg and via the config/parent + if input_url is not None: + kwargs["input_url"] = input_url + + super().__init__(**kwargs) + + # we will open a lot of files, this helps keeping it clean + self._exit_stack = ExitStack() + self._subarray = build_subarray_description(self.subarray_id) + + self._multi_file = self._exit_stack.enter_context( + MultiFiles(self.input_url, parent=self) + ) + self.sb_id = self._multi_file.data_stream.sb_id + self.obs_id = self._multi_file.data_stream.obs_id + self.tel_id = self._multi_file.data_stream.tel_id + + self._observation_blocks = { + self.obs_id: ObservationBlockContainer( + obs_id=np.uint64(self.obs_id), sb_id=np.uint64(self.sb_id) + ) + } + self._scheduling_blocks = { + self.sb_id: SchedulingBlockContainer(sb_id=np.uint64(self.sb_id)) + } + + def close(self): + self._exit_stack.__exit__(None, None, None) + + def __exit__(self, exc_type, exc_value, traceback): + self._exit_stack.__exit__(exc_type, exc_value, traceback) + + @property + def is_simulation(self) -> bool: + """If data comes from simulations""" + return False + + @property + def datalevels(self) -> Tuple[DataLevel]: + """Provided data levels""" + return (DataLevel.DL0,) + + @property + def subarray(self) -> SubarrayDescription: + """The subarray""" + return self._subarray + + @property + def observation_blocks(self) -> Dict[int, ObservationBlockContainer]: + """The observation blocks""" + return self._observation_blocks + + @property + def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]: + """The scheduling blocks""" + return self._scheduling_blocks + + def _fill_event(self, count, zfits_event) -> ArrayEventContainer: + tel_id = self.tel_id + # until ctapipe allows telescope event sources + # we have to fill an arrayevent with just one telescope here + time = cta_high_res_to_time( + zfits_event.event_time_s, zfits_event.event_time_qns + ) + array_event = ArrayEventContainer( + count=count, + index=EventIndexContainer( + obs_id=self.obs_id, + event_id=zfits_event.event_id, + ), + trigger=TriggerContainer( + tels_with_trigger=[self.tel_id], + event_type=EventType(int(zfits_event.event_type)), + time=time, + ), + ) + array_event.trigger.tel[tel_id] = TelescopeTriggerContainer(time=time) + array_event.dl0.tel[tel_id] = _fill_dl0_container( + zfits_event, + self._multi_file.data_stream, + self._multi_file.camera_config, + self.subarray.tel[tel_id].camera.geometry, + ignore_samples_start=self.ignore_samples_start, + ignore_samples_end=self.ignore_samples_end, + ) + return array_event + + def _generator(self): + for count, zfits_event in enumerate(self._multi_file): + yield self._fill_event(count, zfits_event) diff --git a/src/ctapipe_io_zfits/instrument.py b/src/ctapipe_io_zfits/instrument.py new file mode 100644 index 0000000..e8bf10d --- /dev/null +++ b/src/ctapipe_io_zfits/instrument.py @@ -0,0 +1,127 @@ +import json +from functools import cache +from importlib.resources import as_file, files +from typing import Tuple + +import astropy.units as u +from astropy.coordinates import EarthLocation +from ctapipe.coordinates import CameraFrame +from ctapipe.core import Provenance +from ctapipe.instrument import ( + CameraDescription, + CameraGeometry, + CameraReadout, + OpticsDescription, + ReflectorShape, + SizeType, + SubarrayDescription, + TelescopeDescription, +) + +OPTICS = { + "LST": OpticsDescription( + name="LST", + size_type=SizeType.LST, + n_mirrors=1, + n_mirror_tiles=198, + reflector_shape=ReflectorShape.PARABOLIC, + equivalent_focal_length=u.Quantity(28, u.m), + effective_focal_length=u.Quantity(29.30565, u.m), + mirror_area=u.Quantity(386.73, u.m**2), + ) +} + + +__all__ = [ + "build_subarray_description", +] + +RESOURCES = files("ctapipe_io_zfits") / "resources" + + +def _load_json_resource(name): + with as_file(RESOURCES / name) as path: + with path.open("r") as f: + return json.load(f) + + +@cache +def _load_subarrays(): + return _load_json_resource("subarray-ids.json") + + +@cache +def _load_array_elements(): + return _load_json_resource("array-element-ids.json") + + +@cache +def get_subarrays_by_id(): + """Get a mapping of subarray_id to subarray definition""" + data = _load_subarrays() + return {subarray["id"]: subarray for subarray in data["subarrays"]} + + +@cache +def get_array_elements_by_id(): + """Get a mapping of ae_id to array element definition""" + data = _load_array_elements() + return {ae["id"]: ae for ae in data["array_elements"]} + + +@cache +def get_array_element_ids(subarray_id: int) -> Tuple[int]: + """Get array element ids for a given subarray_id""" + subarray = get_subarrays_by_id().get(subarray_id) + if subarray_id is None: + raise ValueError(f"Unknown subarray_id: {subarray_id}") + + return tuple(subarray["array_element_ids"]) + + +def build_subarray_description(subarray_id): + try: + subarray = get_subarrays_by_id()[subarray_id] + except KeyError: + raise ValueError(f"Unknown subarray_id: {subarray_id}") + + tel_ids = get_array_element_ids(subarray_id) + array_elements = get_array_elements_by_id() + + telescopes = {} + for tel_id in tel_ids: + name = array_elements[tel_id]["name"] + + if name.startswith("LST"): + optics = OPTICS["LST"] + + with as_file(RESOURCES / "LSTCam.camgeom.fits.gz") as path: + Provenance().add_input_file(path, "CameraGeometry") + geometry = CameraGeometry.from_table(path) + geometry.frame = CameraFrame(focal_length=optics.effective_focal_length) + + with as_file(RESOURCES / "LSTCam.camreadout.fits.gz") as path: + Provenance().add_input_file(path, "CameraReadout") + readout = CameraReadout.from_table(path) + + camera = CameraDescription("LSTCam", geometry=geometry, readout=readout) + else: + raise ValueError("Only LSTs supported at the moment") + + telescopes[tel_id] = TelescopeDescription( + name=name, + optics=optics, + camera=camera, + ) + + return SubarrayDescription( + name=subarray["name"], + tel_descriptions=telescopes, + # FIXME: fill actual telescope positions + tel_positions={tel_id: [0, 0, 0] * u.m for tel_id in telescopes}, + # FIXME: fill actual reference location + # height is currently the height of LST Prod2 obs level + reference_location=EarthLocation( + lon=0 * u.deg, lat=0 * u.deg, height=2199 * u.m + ), + ) diff --git a/src/ctapipe_io_zfits/multifile.py b/src/ctapipe_io_zfits/multifile.py new file mode 100644 index 0000000..3c9b62e --- /dev/null +++ b/src/ctapipe_io_zfits/multifile.py @@ -0,0 +1,274 @@ +import re +from dataclasses import dataclass, field +from pathlib import Path +from queue import Empty, PriorityQueue +from typing import Any + +from ctapipe.core import Component, Provenance +from ctapipe.core.traits import Bool, CaselessStrEnum +from protozfits import File + +__all__ = [ + "MultiFiles", +] + + +@dataclass(order=True) +class NextEvent: + """Class to get sorted access to events from multiple files""" + + priority: int + event: Any = field(compare=False) + data_source: str = field(compare=False) + + +@dataclass() +class FileInfo: + tel_id: int + data_source: str + sb_id: int + obs_id: int + chunk: int + data_type: str = "" + sb_id_padding: int = 0 + obs_id_padding: int = 0 + chunk_padding: int = 0 + + +filename_conventions = { + # Tel001_SDH_3001_20231003T204445_sbid2000000008_obid2000000016_9.fits.fz + "acada_rel1": { + "re": re.compile( + r"Tel(?P\d+)_(?PSDH_\d+)_(?P\d{8}T\d{6})_sbid(?P\d+)_obid(?P\d+)_(?P\d+)\.fits\.fz" # noqa + ), + "template": "Tel{tel_id:03d}_{data_source}_{timestamp}_sbid{sb_id:0{sb_id_padding}d}_obid{obs_id:0{obs_id_padding}d}_{chunk:0{chunk_padding}d}.fits.fz", # noqa + }, + "acada_dpps_icd": { + # TEL001_SDH0001_20231013T220427_SBID0000000002000000013_OBSID0000000002000000027_CHUNK000.fits.fz + "re": re.compile( + r"TEL(?P\d+)_(?PSDH\d+)_(?P\d{8}T\d{6})_SBID(?P\d+)_OBSID(?P\d+)(?P_[a-zA-Z0-9_]+)?_CHUNK(?P\d+)\.fits\.fz" # noqa + ), + "template": "TEL{tel_id:03d}_{data_source}_{timestamp}_SBID{sb_id:0{sb_id_padding}d}_OBSID{obs_id:0{obs_id_padding}d}{data_type}_CHUNK{chunk:0{chunk_padding}d}.fits.fz", # noqa + }, +} + + +def get_file_info(path, convention): + path = Path(path) + + regex = filename_conventions[convention]["re"] + m = regex.match(path.name) + if m is None: + raise ValueError( + f"Filename {path.name} did not match convention" + f" {convention} with regex {regex}" + ) + + groups = m.groupdict() + sb_id = int(groups["sb_id"]) + obs_id = int(groups["obs_id"]) + chunk = int(groups["chunk"]) + + sb_id_padding = len(groups["sb_id"]) + obs_id_padding = len(groups["obs_id"]) + chunk_padding = len(groups["chunk"]) + + return FileInfo( + tel_id=int(groups["tel_id"]), + data_source=groups["data_source"], + sb_id=sb_id, + obs_id=obs_id, + chunk=chunk, + data_type=groups.get("data_type") or "", + sb_id_padding=sb_id_padding, + obs_id_padding=obs_id_padding, + chunk_padding=chunk_padding, + ) + + +class MultiFiles(Component): + """Open data sources in parallel and iterate over events in order""" + + all_source_ids = Bool( + default_value=True, + help=( + "If true, open all files for different source_ids" + "(e.g. SDH001, SDH002) in parallel." + ), + ).tag(config=True) + + all_chunks = Bool( + default_value=True, + help="If true, open subsequent chunks when current one is exhausted", + ).tag(config=True) + + filename_convention = CaselessStrEnum( + values=list(filename_conventions.keys()), + default_value="acada_dpps_icd", + ).tag(config=True) + + pure_protobuf = Bool( + default_value=False, + ).tag(config=True) + + def __init__(self, path, *args, **kwargs): + """ + Load multiple data sources in parallel, yielding events in order + + Parameters + ---------- + path : str or pathlib.Path + Path to the first chunk for one of the data sources. + Path must match the given ``filename_convention``. + Data sources of the same sb_id / obs_id will be opened in parallel + """ + super().__init__(*args, **kwargs) + + self.path = Path(path) + + if not self.path.is_file(): + raise IOError(f"input path {path} is not a file") + + file_info = get_file_info(path, convention=self.filename_convention) + self.directory = self.path.parent + self.filename_template = filename_conventions[self.filename_convention][ + "template" + ] + + # figure out how many data sources we have: + data_source_pattern = self.filename_template.format( + tel_id=file_info.tel_id, + data_source="*", + timestamp="*", + sb_id=file_info.sb_id, + obs_id=file_info.obs_id, + chunk=file_info.chunk, + data_type=file_info.data_type, + sb_id_padding=file_info.sb_id_padding, + obs_id_padding=file_info.obs_id_padding, + chunk_padding=file_info.chunk_padding, + ) + + self.log.debug( + "Looking for parallel data source using pattern: %s", data_source_pattern + ) + paths = sorted(self.directory.glob(data_source_pattern)) + self.log.debug("Found %d matching paths: %s", len(paths), paths) + self.data_sources = { + get_file_info(path, convention=self.filename_convention).data_source + for path in paths + } + self.log.debug("Found the following data sources: %s", self.data_sources) + + self._current_chunk = { + data_source: file_info.chunk - 1 for data_source in self.data_sources + } + self._open_files = {} + + self._first_file_info = file_info + self._events = PriorityQueue() + self._events_tables = {} + self._events_headers = {} + self.camera_config = None + self.data_stream = None + + for data_source in self.data_sources: + self._load_next_chunk(data_source) + + @property + def n_open_files(self): + return len(self._open_files) + + def _load_next_chunk(self, data_source): + """Open the next (or first) subrun. + + Parameters + ---------- + stream : int or None + If None, assume the single-file case and just open it. + """ + if data_source in self._open_files: + self._open_files.pop(data_source).close() + + self._current_chunk[data_source] += 1 + chunk = self._current_chunk[data_source] + + pattern = self.filename_template.format( + tel_id=self._first_file_info.tel_id, + data_source=data_source, + timestamp="*", + sb_id=self._first_file_info.sb_id, + obs_id=self._first_file_info.obs_id, + data_type=self._first_file_info.data_type, + chunk=chunk, + sb_id_padding=self._first_file_info.sb_id_padding, + obs_id_padding=self._first_file_info.obs_id_padding, + chunk_padding=self._first_file_info.chunk_padding, + ) + try: + # currently there is a timing issue between acada / EVB resulting + # in two files with chunk000, the first file technically has the last + # events of the previous ob, so we sort and take the last entry + path = sorted(self.directory.glob(pattern))[-1] + except IndexError: + raise FileNotFoundError( + f"No file found for pattern {self.directory}/{pattern}" + ) + + Provenance().add_input_file(str(path), "DL0") + self.log.info("Opening file %s", path) + file_ = File(str(path), pure_protobuf=self.pure_protobuf) + self._open_files[data_source] = file_ + + events_table = file_.Events + self._events_tables[data_source] = events_table + self._events_headers[data_source] = events_table.header + + # load first event from each stream + event = next(events_table) + self._events.put_nowait(NextEvent(event.event_id, event, data_source)) + + if self.data_stream is None: + self.data_stream = file_.DataStream[0] + + if self.camera_config is None: + self.camera_config = file_.CameraConfiguration[0] + + def close(self): + """Close the underlying files""" + for f in self._open_files.values(): + f.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def __iter__(self): + return self + + def __next__(self): + # check for the minimal event id + if not self._events: + raise StopIteration + + try: + next_event = self._events.get_nowait() + except Empty: + raise StopIteration + + data_source = next_event.data_source + event = next_event.event + + try: + new = next(self._events_tables[data_source]) + self._events.put_nowait(NextEvent(new.event_id, new, data_source)) + except StopIteration: + if self.all_chunks: + try: + self._load_next_chunk(data_source) + except FileNotFoundError: + pass + + return event diff --git a/src/ctapipe_io_zfits/paths.py b/src/ctapipe_io_zfits/paths.py new file mode 100644 index 0000000..f18e381 --- /dev/null +++ b/src/ctapipe_io_zfits/paths.py @@ -0,0 +1,67 @@ +import re +from dataclasses import dataclass +from datetime import date, datetime +from pathlib import Path +from typing import Optional, Union + + +@dataclass +class FileNameInfo: + data_source_id: str + ae_type: Optional[str] = None + ae_id: Optional[int] = None + subarray_id: Optional[int] = None + sb_id: Optional[int] = None + obs_id: Optional[int] = None + type_: Optional[str] = None + subtype: Optional[str] = None + chunk_id: Optional[int] = None + file_id: Optional[int] = None + suffix: Optional[str] = None + timestamp: Union[datetime, date, None] = None + + +#: regex to match filenames according to the ACADA DPPS ICD naming pattern +FILENAME_RE = re.compile( + r"(?:(?:SUB(?P\d+))|(?:(?PTEL|AUX)(?P\d+)))" + r"(?:_(?P[A-Z]+\d*))" + r"(?:_(?P[0-9]{8})(?:T(?P