Skip to content

Commit

Permalink
Merge pull request #26 from cfpb/lf_group_by_flag
Browse files Browse the repository at this point in the history
lf group by flag
  • Loading branch information
lchen-2101 authored Jan 14, 2025
2 parents d8e464b + e5a6f0a commit 01f8101
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions validation_aws/src/sbl_validation_processor/results_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,24 @@ def aggregate_validation_results(bucket, key, results):

lf_to_use = max_err_lf if use_max_err_lf else lf

validation_groups = lf_to_use.select("validation_id").unique().sort(pl.col("validation_id")).collect()
use_lf_group_by = bool(json.loads(os.getenv("USE_LF_GROUP_BY", "false").lower()))

log.info(f"total validation groups: {validation_groups.height}")
if use_lf_group_by:
df = lf_to_use.group_by(pl.col("validation_id")).head(max_group_size).collect()
validation_group_results = df_to_dicts(df)
else:
validation_groups = lf_to_use.select("validation_id").unique().sort(pl.col("validation_id")).collect()

for validation_id in validation_groups["validation_id"]:
log.info(f"valiation_id: {validation_id}")

validation_group_result = lf.filter(pl.col("validation_id") == validation_id).head(max_group_size).collect()
log.info(f"validation_results: {validation_group_result.height}")
# log.info(f"total validation groups: {validation_groups.height}")

validation_group_results.extend(grouped_df_to_dicts(validation_group_result))
log.info(f"validation groups iter mem: {process.memory_info().rss / mb_factor} MB")
for validation_id in validation_groups["validation_id"]:
# log.info(f"valiation_id: {validation_id}")

validation_group_result = lf_to_use.filter(pl.col("validation_id") == validation_id).head(max_group_size).collect()
# log.info(f"validation_results: {validation_group_result.height}")

validation_group_results.extend(grouped_df_to_dicts(validation_group_result))
# log.info(f"validation groups iter mem: {process.memory_info().rss / mb_factor} MB")
log.info(f"mem after json lf collect with using truncated lf? {use_max_err_lf}: {process.memory_info().rss / mb_factor} MB")

if error_counts + warning_counts == 0:
Expand All @@ -203,6 +208,10 @@ def aggregate_validation_results(bucket, key, results):
)

build_final_json(validation_group_results, results)
log.info(f'use_lf_group_by: {use_lf_group_by}')
log.info(f'results["syntax_errors"]["details"]: {len(results["syntax_errors"]["details"])}')
log.info(f'results["logic_errors"]["details"]: {len(results["logic_errors"]["details"])}')
log.info(f'results["logic_warnings"]["details"]: {len(results["logic_warnings"]["details"])}')
submission.state = final_state
submission.validation_results = results
db_session.commit()
Expand Down

0 comments on commit 01f8101

Please sign in to comment.