From 1868814e476202152725a562f3ebdcb73487af56 Mon Sep 17 00:00:00 2001 From: Joanne Bogart Date: Wed, 29 Nov 2023 15:41:41 -0800 Subject: [PATCH 1/3] Part of implementation for using parquet format star truth --- skycatalogs/catalog_creator.py | 79 +++++++++++++++++++------------- skycatalogs/scripts/create_sc.py | 7 ++- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/skycatalogs/catalog_creator.py b/skycatalogs/catalog_creator.py index 4a3485e2..d8715d18 100644 --- a/skycatalogs/catalog_creator.py +++ b/skycatalogs/catalog_creator.py @@ -16,6 +16,7 @@ from .utils.config_utils import assemble_MW_extinction, assemble_cosmology from .utils.config_utils import assemble_object_types, assemble_provenance from .utils.config_utils import write_yaml +from .utils.star_parquet_input import _star_parquet_reader from .utils.parquet_schema_utils import make_galaxy_schema from .utils.parquet_schema_utils import make_galaxy_flux_schema from .utils.parquet_schema_utils import make_star_flux_schema @@ -194,7 +195,7 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, main_only=False, flux_parallel=16, galaxy_nside=32, galaxy_stride=1000000, provenance=None, dc2=False, sn_object_type='sncosmo', galaxy_type='cosmodc2', - include_roman_flux=False): + include_roman_flux=False, star_input_fmt='sqlite'): """ Store context for catalog creation @@ -240,6 +241,7 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, sn_object_type Which object type to use for SNe. galaxy_type Currently allowed values are cosmodc2 and diffsky include_roman_flux Calculate and write Roman flux values + star_input_fmt May be either 'sqlite' or 'parquet' Might want to add a way to specify template for output file name and template for input sedLookup file name. @@ -250,6 +252,8 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, _star_db = '/global/cfs/cdirs/lsst/groups/SSim/DC2/dc2_stellar_healpixel.db' _sn_db = '/global/cfs/cdirs/lsst/groups/SSim/DC2/cosmoDC2_v1.1.4/sne_cosmoDC2_v1.1.4_MS_DDF_healpix.db' + _star_parquet = '/global/cfs/cdirs/descssim/postDC2/UW_star_catalog' + self._galaxy_stride = galaxy_stride if pkg_root: self._pkg_root = pkg_root @@ -280,7 +284,10 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, self._star_truth = star_truth if self._star_truth is None: - self._star_truth = _star_db + if self._star_input_fmt == 'sqlite': + self._star_truth = _star_db + elif self._star_input_fmt == 'parquet': + self._star_truth = _star_parquet self._cat = None self._parts = parts @@ -830,19 +837,22 @@ def create_pointsource_pixel(self, pixel, arrow_schema, star_cat=None, if star_cat: # Get data for this pixel - cols = ','.join(['format("%s",simobjid) as id', 'ra', - 'decl as dec', - 'magNorm as magnorm', 'mura', 'mudecl as mudec', - 'radialVelocity as radial_velocity', 'parallax', - 'sedFilename as sed_filepath', 'ebv']) - q = f'select {cols} from stars where hpid={pixel} ' - with sqlite3.connect(star_cat) as conn: - star_df = pd.read_sql_query(q, conn) - - star_df['sed_filepath'] = get_star_sed_path(star_df['sed_filepath']) - + if self._star_input_fmt == 'sqlite': + cols = ','.join(['format("%s",simobjid) as id', 'ra', + 'decl as dec', 'magNorm as magnorm', 'mura', + 'mudecl as mudec', + 'radialVelocity as radial_velocity', + 'parallax', + 'sedFilename as sed_filepath', 'ebv']) + q = f'select {cols} from stars where hpid={pixel} ' + with sqlite3.connect(star_cat) as conn: + star_df = pd.read_sql_query(q, conn) + elif self._star_input_fmt == 'parquet': + star_df = _star_parquet_reader(self._star_truth, pixel, + arrow_schema) nobj = len(star_df['id']) self._logger.debug(f'Found {nobj} stars') + star_df['sed_filepath'] = get_star_sed_path(star_df['sed_filepath']) star_df['object_type'] = np.full((nobj,), 'star') star_df['host_galaxy_id'] = np.zeros((nobj,), np.int64()) @@ -873,26 +883,29 @@ def create_pointsource_pixel(self, pixel, arrow_schema, star_cat=None, params_df = pd.read_sql_query(q2, conn) nobj = len(sn_df['ra']) - sn_df['object_type'] = np.full((nobj,), self._sn_object_type) - - sn_df['MW_rv'] = make_MW_extinction_rv(sn_df['ra'], sn_df['dec']) - sn_df['MW_av'] = make_MW_extinction_av(sn_df['ra'], sn_df['dec']) - - # Add fillers for columns not relevant for sn - sn_df['sed_filepath'] = np.full((nobj), '') - sn_df['magnorm'] = np.full((nobj,), None) - sn_df['mura'] = np.full((nobj,), None) - sn_df['mudec'] = np.full((nobj,), None) - sn_df['radial_velocity'] = np.full((nobj,), None) - sn_df['parallax'] = np.full((nobj,), None) - sn_df['variability_model'] = np.full((nobj,), 'salt2_extended') - - # Form array of struct from params_df - sn_df['salt2_params'] = params_df.to_records(index=False) - out_table = pa.Table.from_pandas(sn_df, schema=arrow_schema) - self._logger.debug('Created arrow table from sn dataframe') - - writer.write_table(out_table) + if nobj > 0: + sn_df['object_type'] = np.full((nobj,), self._sn_object_type) + + sn_df['MW_rv'] = make_MW_extinction_rv(sn_df['ra'], + sn_df['dec']) + sn_df['MW_av'] = make_MW_extinction_av(sn_df['ra'], + sn_df['dec']) + + # Add fillers for columns not relevant for sn + sn_df['sed_filepath'] = np.full((nobj), '') + sn_df['magnorm'] = np.full((nobj,), None) + sn_df['mura'] = np.full((nobj,), None) + sn_df['mudec'] = np.full((nobj,), None) + sn_df['radial_velocity'] = np.full((nobj,), None) + sn_df['parallax'] = np.full((nobj,), None) + sn_df['variability_model'] = np.full((nobj,), 'salt2_extended') + + # Form array of struct from params_df + sn_df['salt2_params'] = params_df.to_records(index=False) + out_table = pa.Table.from_pandas(sn_df, schema=arrow_schema) + self._logger.debug('Created arrow table from sn dataframe') + + writer.write_table(out_table) writer.close() if self._provenance == 'yaml': diff --git a/skycatalogs/scripts/create_sc.py b/skycatalogs/scripts/create_sc.py index d1b77c27..620c7120 100644 --- a/skycatalogs/scripts/create_sc.py +++ b/skycatalogs/scripts/create_sc.py @@ -88,6 +88,10 @@ depends on value of --galaxy-type option''') parser.add_argument('--include-roman-flux', action='store_true', help='If supplied calculate & store Roman as well as LSST fluxes') +parser.add_argument('--star-input-fmt', default='sqlite', + choice=['sqlite', 'parquet'], help=''' + star truth may come from either sqlite db or collection + of parquet files''') args = parser.parse_args() @@ -139,7 +143,8 @@ provenance=provenance, dc2=args.dc2, galaxy_type=args.galaxy_type, galaxy_truth=args.galaxy_truth, - include_roman_flux=args.include_roman_flux) + include_roman_flux=args.include_roman_flux, + star_input_fmt=args.star_input_fmt) logger.info(f'Starting with healpix pixel {parts[0]}') if not args.no_galaxies: logger.info("Creating galaxy catalogs") From 386f4ef0e39cdd3e252401d79fbef4126b1e820b Mon Sep 17 00:00:00 2001 From: Joanne Bogart Date: Wed, 29 Nov 2023 16:00:29 -0800 Subject: [PATCH 2/3] Complete initial implementation for parquet star truth. --- skycatalogs/catalog_creator.py | 2 +- skycatalogs/scripts/create_sc.py | 2 +- skycatalogs/utils/star_parquet_input.py | 95 +++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 skycatalogs/utils/star_parquet_input.py diff --git a/skycatalogs/catalog_creator.py b/skycatalogs/catalog_creator.py index d8715d18..7218e700 100644 --- a/skycatalogs/catalog_creator.py +++ b/skycatalogs/catalog_creator.py @@ -283,6 +283,7 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, self._sn_object_type = sn_object_type self._star_truth = star_truth + self._star_input_fmt = star_input_fmt if self._star_truth is None: if self._star_input_fmt == 'sqlite': self._star_truth = _star_db @@ -322,7 +323,6 @@ def __init__(self, parts, area_partition=None, skycatalog_root=None, self._provenance = provenance self._dc2 = dc2 self._include_roman_flux = include_roman_flux - self._obs_sed_factory = None def _make_tophat_columns(self, dat, names, cmp): diff --git a/skycatalogs/scripts/create_sc.py b/skycatalogs/scripts/create_sc.py index 620c7120..0503eac0 100644 --- a/skycatalogs/scripts/create_sc.py +++ b/skycatalogs/scripts/create_sc.py @@ -89,7 +89,7 @@ parser.add_argument('--include-roman-flux', action='store_true', help='If supplied calculate & store Roman as well as LSST fluxes') parser.add_argument('--star-input-fmt', default='sqlite', - choice=['sqlite', 'parquet'], help=''' + choices=['sqlite', 'parquet'], help=''' star truth may come from either sqlite db or collection of parquet files''') diff --git a/skycatalogs/utils/star_parquet_input.py b/skycatalogs/utils/star_parquet_input.py new file mode 100644 index 00000000..ad932a48 --- /dev/null +++ b/skycatalogs/utils/star_parquet_input.py @@ -0,0 +1,95 @@ +# import pyarrow as pa +import os +import pyarrow.parquet as pq +import pandas as pd +import numpy as np +import numpy.ma as ma +import healpy + + +def _find_star_files(dirpath, pixel): + ''' + Parameters + ---------- + dirpath string Path to directory containing input parquet files + HTM partitioning + pixel int Healpix pixel + + Returns + ------- + List of parquet file paths for files which might have sources belonging + to specified pixel + ''' + # For now, hard-code to 3 files which have everything needed + # for joint simulation region + + fnames = ['stars_chunk_8800000000000_8900000000000.parquet', + 'stars_chunk_9200000000000_9300000000000.parquet', + 'stars_chunk_9700000000000_9800000000000.parquet'] + + return [os.path.join(dirpath, fname) for fname in fnames] + + +def _calculate_pixel_mask(ra, dec, pixel, nside=32): + ''' + Parameters + ---------- + ra Array of float + dec Array of float + pixel int + + Returns + ------- + Mask, set to 1 for all elements not in the pixel + ''' + ra = np.array(ra) + dec = np.array(dec) + in_pix = healpy.pixelfunc.ang2pix(nside, ra, dec, nest=False, lonlat=True) + mask = ma.logical_not(in_pix == pixel) + return mask + + +def _star_parquet_reader(dirpath, pixel, output_arrow_schema): + # Get requisite info from parquet files for sources in pixel. + # Next do renames and calculation for magnorm + to_read = ['simobjid', 'ra', 'decl', 'mura', 'mudecl', 'vrad', + 'parallax', 'sedfilename', 'flux_scale', 'ebv'] + rename = {'decl': 'dec', 'sedfilename': 'sed_filepath', 'mudecl': 'mudec', + 'vrad': 'radial_velocity'} + out_fields = ['id', 'ra', 'dec', 'mura', 'mudec', 'radial_velocity', + 'parallax', 'sed_filepath', 'magnorm', 'ebv'] + + paths = _find_star_files(dirpath, pixel) + out_dict = {} + for k in out_fields: + out_dict[k] = [] + for f in paths: + f_dict = {} + for k in out_fields: + f_dict[k] = [] + meta = pq.read_metadata(f) + pq_file = pq.ParquetFile(f) + for rg in range(meta.num_row_groups): + tbl = pq_file.read_row_group(rg, columns=to_read) + msk = _calculate_pixel_mask(tbl['ra'], tbl['decl'], pixel) + for col in to_read: + if col in rename: + f_dict[rename[col]] += list(ma.array(np.array(tbl[col]), + mask=msk).compressed()) + elif col == 'flux_scale': + # compute magnorm from flux_scale + tmp = ma.array(np.array(tbl['flux_scale']), + mask=msk).compressed() + f_dict['magnorm'] += list(-2.5*np.log(tmp)/np.log(10.0) - 18.402732642) + elif col == 'simobjid': + # convert simobjid to string, change name to id + tmp = [str(sim_id) for sim_id in tbl['simobjid']] + f_dict['id'] += list(ma.array(np.array(tmp), + mask=msk).compressed()) + else: + f_dict[col] += list(ma.array(np.array(tbl[col]), + mask=msk).compressed()) + for k in out_fields: + out_dict[k] += f_dict[k] + + return pd.DataFrame(out_dict) From 59c905c4b6ec61279933d54372d3ec9aaad4ecbb Mon Sep 17 00:00:00 2001 From: Joanne Bogart Date: Thu, 30 Nov 2023 16:49:14 -0800 Subject: [PATCH 3/3] skybrightness is not needed for ci environment --- .github/workflows/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd9f5bc4..af0dcf53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,8 +49,7 @@ jobs: run: | mkdir rubin_sim_data mkdir rubin_sim_data/sims_sed_library - # Just get the skybrightness, throughputs, and SED data for now. - curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/rubin_sim_data/skybrightness_may_2021.tgz | tar -C rubin_sim_data -xz + # Just get the throughputs and SED data for now. curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/rubin_sim_data/throughputs_2023_09_07.tgz | tar -C rubin_sim_data -xz curl https://s3df.slac.stanford.edu/groups/rubin/static/sim-data/sed_library/seds_170124.tar.gz | tar -C rubin_sim_data/sims_sed_library -xz