Skip to content

Commit

Permalink
#961: Unbroke historical chunking logic for disp-s1. See associated b…
Browse files Browse the repository at this point in the history
…ug for details
  • Loading branch information
philipjyoon committed Aug 22, 2024
1 parent 626b064 commit c71b5b7
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions data_subscriber/cslc/cslc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ def query_cmr(self, args, token, cmr, settings, timerange, now):
def create_download_job_params(self, query_timerange, chunk_batch_ids):
'''Same as base class except inject batch_ids for k granules'''

assert len(chunk_batch_ids) == 1
chunk_batch_ids.extend(list(self.k_batch_ids[chunk_batch_ids[0]]))
return super().create_download_job_params(query_timerange, chunk_batch_ids)

Expand Down Expand Up @@ -477,17 +476,22 @@ def eliminate_none_frames(self, granules):
def get_download_chunks(self, batch_id_to_urls_map):
'''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering'''

frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0])
chunk_map = defaultdict(list)
for batch_chunk in batch_id_to_urls_map.items():
chunk_map[batch_chunk[0]].append(batch_chunk) # We don't actually care about the URLs, we only care about the batch_id

'''indices = self.download_batch_ids[batch_chunk[0]]
for index in indices:
chunk_map[index].append(batch_chunk)
if (len(chunk_map[index]) > self.args.k):
logger.error([chunk for chunk, data in chunk_map[index]])
err_str = f"Number of download batches {len(chunk_map[index])} for frame {index} is greater than K {self.args.k}."
raise AssertionError(err_str)'''

# Chunking is done differently between historical and forward/reprocessing
if self.proc_mode == "historical":
chunk_map[frame_id].append(batch_chunk)
else:
chunk_map[batch_chunk[0]].append(
batch_chunk) # We don't actually care about the URLs, we only care about the batch_id

if self.proc_mode == "historical":
if (len(chunk_map[frame_id]) != self.args.k):
logger.error([chunk for chunk, data in chunk_map[frame_id]])
err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} does not equal K {self.args.k}."
raise AssertionError(err_str)

return chunk_map.values()

Expand Down

0 comments on commit c71b5b7

Please sign in to comment.