diff --git a/src/too/dump.py b/src/too/dump.py index 275c9c3..af4e35f 100644 --- a/src/too/dump.py +++ b/src/too/dump.py @@ -42,13 +42,15 @@ def dump_to_parquet( """ - from sdssdb.peewee.sdss5db import catalogdb, opsdb - - path = pathlib.Path(path) - observatory = observatory.upper() if observatory not in ["APO", "LCO"]: raise ValueError("Invalid observatory.") + + os.environ["OBSERVATORY"] = observatory.upper() + + from sdssdb.peewee.sdss5db import catalogdb, opsdb + + path = pathlib.Path(path) # Check that the model connection is the same. This should be the case as # long as database has been created using connect_to_database(), which will @@ -67,6 +69,7 @@ def dump_to_parquet( opsdb.AssignmentToFocal.bind(database) Assn2Focal = opsdb.AssignmentToFocal + Configuration = opsdb.Configuration columns = ( TooTarget.too_id, @@ -93,12 +96,13 @@ def dump_to_parquet( TooMeta.inertial, TooMeta.sky_brightness_mode, TooMeta.n_exposures, + TooMeta.observe_from_mjd, ) targets = ( TooTarget.select( *columns, - fn.count(Assn2Focal.catalogid).alias("count"), + fn.array_agg(Configuration.epoch).alias("jd"), ) .join(TooMeta, on=(TooMeta.too_id == TooTarget.too_id)) .switch(TooTarget) @@ -109,10 +113,17 @@ def dump_to_parquet( JOIN.LEFT_OUTER, on=(Assn2Focal.catalogid == Catalog.catalogid), ) + .join(Configuration, JOIN.LEFT_OUTER) .group_by(*columns) .dicts() ) + length = 0 + for t in targets: + t["jd"] = [j if j else -1 for j in t["jd"]] + if len(t["jd"]) > length: + length = len(t["jd"]) + dataframe = polars.from_dicts( list(targets), schema={ @@ -140,15 +151,18 @@ def dump_to_parquet( "inertial": polars.Boolean, "sky_brightness_mode": polars.String, "n_exposures": polars.Int32, - "count": polars.Int32, + "observe_from_mjd": polars.Int32, + "jd": polars.Array(polars.Float32, length), }, ) df_field = match_fields(dataframe, database) df_observatory = df_field.filter(polars.col.observatory == observatory) + hist = dataframe["jd"].to_numpy() + start = dataframe["observe_from_mjd"].to_numpy() + n_done = [len(numpy.where(h > s)[0]) for h, s in zip(hist, start)] n_requested = df_observatory["n_exposures"].to_numpy() - n_done = df_observatory["count"].to_numpy() elligible = df_observatory.filter(numpy.greater(n_requested, n_done)).sort("too_id") log.info(f"Found {len(elligible)} targets of {len(targets)} total.")