Skip to content

Commit

Permalink
pull array_agg of jd/epoch from configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
johndonor3 committed May 10, 2024
1 parent 87f2897 commit a6752ed
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions src/too/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ def dump_to_parquet(
"""

from sdssdb.peewee.sdss5db import catalogdb, opsdb

path = pathlib.Path(path)

observatory = observatory.upper()
if observatory not in ["APO", "LCO"]:
raise ValueError("Invalid observatory.")

os.environ["OBSERVATORY"] = observatory.upper()

from sdssdb.peewee.sdss5db import catalogdb, opsdb

path = pathlib.Path(path)

# Check that the model connection is the same. This should be the case as
# long as database has been created using connect_to_database(), which will
Expand All @@ -67,6 +69,7 @@ def dump_to_parquet(

opsdb.AssignmentToFocal.bind(database)
Assn2Focal = opsdb.AssignmentToFocal
Configuration = opsdb.Configuration

columns = (
TooTarget.too_id,
Expand All @@ -93,12 +96,13 @@ def dump_to_parquet(
TooMeta.inertial,
TooMeta.sky_brightness_mode,
TooMeta.n_exposures,
TooMeta.observe_from_mjd,
)

targets = (
TooTarget.select(
*columns,
fn.count(Assn2Focal.catalogid).alias("count"),
fn.array_agg(Configuration.epoch).alias("jd"),
)
.join(TooMeta, on=(TooMeta.too_id == TooTarget.too_id))
.switch(TooTarget)
Expand All @@ -109,10 +113,17 @@ def dump_to_parquet(
JOIN.LEFT_OUTER,
on=(Assn2Focal.catalogid == Catalog.catalogid),
)
.join(Configuration, JOIN.LEFT_OUTER)
.group_by(*columns)
.dicts()
)

length = 0
for t in targets:
t["jd"] = [j if j else -1 for j in t["jd"]]
if len(t["jd"]) > length:
length = len(t["jd"])

dataframe = polars.from_dicts(
list(targets),
schema={
Expand Down Expand Up @@ -140,15 +151,18 @@ def dump_to_parquet(
"inertial": polars.Boolean,
"sky_brightness_mode": polars.String,
"n_exposures": polars.Int32,
"count": polars.Int32,
"observe_from_mjd": polars.Int32,
"jd": polars.Array(polars.Float32, length),
},
)

df_field = match_fields(dataframe, database)
df_observatory = df_field.filter(polars.col.observatory == observatory)

hist = dataframe["jd"].to_numpy()
start = dataframe["observe_from_mjd"].to_numpy()
n_done = [len(numpy.where(h > s)[0]) for h, s in zip(hist, start)]
n_requested = df_observatory["n_exposures"].to_numpy()
n_done = df_observatory["count"].to_numpy()
elligible = df_observatory.filter(numpy.greater(n_requested, n_done)).sort("too_id")

log.info(f"Found {len(elligible)} targets of {len(targets)} total.")
Expand Down

0 comments on commit a6752ed

Please sign in to comment.