From c71b5b77a1b66ed1c8c7c83ba2aab908695bca08 Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Wed, 21 Aug 2024 18:38:29 -0700 Subject: [PATCH] #961: Unbroke historical chunking logic for disp-s1. See associated bug for details --- data_subscriber/cslc/cslc_query.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/data_subscriber/cslc/cslc_query.py b/data_subscriber/cslc/cslc_query.py index c31e4e29..ebfe71f5 100644 --- a/data_subscriber/cslc/cslc_query.py +++ b/data_subscriber/cslc/cslc_query.py @@ -437,7 +437,6 @@ def query_cmr(self, args, token, cmr, settings, timerange, now): def create_download_job_params(self, query_timerange, chunk_batch_ids): '''Same as base class except inject batch_ids for k granules''' - assert len(chunk_batch_ids) == 1 chunk_batch_ids.extend(list(self.k_batch_ids[chunk_batch_ids[0]])) return super().create_download_job_params(query_timerange, chunk_batch_ids) @@ -477,17 +476,22 @@ def eliminate_none_frames(self, granules): def get_download_chunks(self, batch_id_to_urls_map): '''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering''' + frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0]) chunk_map = defaultdict(list) for batch_chunk in batch_id_to_urls_map.items(): - chunk_map[batch_chunk[0]].append(batch_chunk) # We don't actually care about the URLs, we only care about the batch_id - - '''indices = self.download_batch_ids[batch_chunk[0]] - for index in indices: - chunk_map[index].append(batch_chunk) - if (len(chunk_map[index]) > self.args.k): - logger.error([chunk for chunk, data in chunk_map[index]]) - err_str = f"Number of download batches {len(chunk_map[index])} for frame {index} is greater than K {self.args.k}." - raise AssertionError(err_str)''' + + # Chunking is done differently between historical and forward/reprocessing + if self.proc_mode == "historical": + chunk_map[frame_id].append(batch_chunk) + else: + chunk_map[batch_chunk[0]].append( + batch_chunk) # We don't actually care about the URLs, we only care about the batch_id + + if self.proc_mode == "historical": + if (len(chunk_map[frame_id]) != self.args.k): + logger.error([chunk for chunk, data in chunk_map[frame_id]]) + err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} does not equal K {self.args.k}." + raise AssertionError(err_str) return chunk_map.values()