Skip to content

Commit

Permalink
Skip empty masks
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyh committed Dec 21, 2023
1 parent 8007c03 commit d1bca28
Showing 1 changed file with 46 additions and 44 deletions.
90 changes: 46 additions & 44 deletions eodatasets3/wagl.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@
)
PRODUCT_SUITE_FROM_GRANULE = re.compile("(L1[GTPCS]{1,2})")

# Map wagl dataset band IDs to ESA MTD.xml Band IDs
WAGL_TO_ESA_BAND_NUMBER = {
"1": "0",
"2": "1",
"3": "2",
"4": "3",
"5": "4",
"6": "5",
"7": "6",
"8": "7",
"8A": "8",
"9": "9",
"10": "10",
"11": "11",
"12": "12",
}


class ProductMaturity(Enum):
provisional = "provisional"
Expand Down Expand Up @@ -102,13 +119,13 @@ def _unpack_products(
for pathname in [p for p in img_paths if f"/{product.upper()}/" in p]:
with do(f"Path {pathname!r}"):
dataset = h5group[pathname]
mtd_dict = missing_packets(dataset, granule)
band_masks = get_quality_masks(dataset, granule)
band_name = utils.normalise_band_name(dataset.attrs["alias"])
write_measurement_h5(
p,
f"{product}:{band_name}",
dataset,
mtd_dict=mtd_dict,
band_masks=band_masks,
overview_resampling=Resampling.average,
file_id=_file_id(dataset),
)
Expand All @@ -130,19 +147,23 @@ def _unpack_products(
)


def missing_packets(dataset: h5py.Dataset, granule: "Granule"):
# a dictionary of band_id: (mask_type, mask_offset)
# (the offset is the file path within the data package. this might
# be within the zip file, or directory)
BandMasks = Dict[str, Tuple[str, str]]


def get_quality_masks(dataset: h5py.Dataset, granule: "Granule") -> BandMasks:
"""
If missing packets, returns metadata dict and None if not
Get a dictionary of any quality masks to apply for each band.
"""
# Define bands to be masked
band_ids_mask = {str(i) for i in range(0, 13)}
band_ids_mask.add("8A")
if dataset.attrs["band_id"] not in band_ids_mask:
return None

if dataset.attrs["band_id"] not in WAGL_TO_ESA_BAND_NUMBER.keys():
return {}

platform_name = granule.wagl_metadata["source_datasets"]["platform_id"].lower()
if "sentinel" not in platform_name:
return None
return {}

level1_data_path = granule.source_level1_data

Expand Down Expand Up @@ -174,7 +195,7 @@ def missing_packets(dataset: h5py.Dataset, granule: "Granule"):
type,
f"zip+file://{level1_data_path.as_posix()}!/{mask_offset}",
)

return mtd_dict
# Sinergise data comes as a directory.
elif level1_data_path.is_dir():
mtd_dict = missing_packets_from_xml(
Expand All @@ -189,13 +210,12 @@ def missing_packets(dataset: h5py.Dataset, granule: "Granule"):
if not mask_path.exists():
mask_path = granule_metadata_xml.parent / "qi" / Path(location).name
mtd_dict[band_id] = (type, mask_path.as_posix())
return mtd_dict
else:
raise RuntimeError(
"Input level 1 data must be a zip file (ESA) or directory (Sinergise)"
)

return mtd_dict


def missing_packets_from_xml(granule_metadata_xml: bytes):
root = Et.fromstring(granule_metadata_xml, forbid_dtd=True)
Expand All @@ -210,33 +230,23 @@ def missing_packets_from_xml(granule_metadata_xml: bytes):
}


def mask_h5(dataset, mtd_dict):
# Map wagl dataset band IDs to ESA MTD.xml Band IDs
band_mapping = {
"1": "0",
"2": "1",
"3": "2",
"4": "3",
"5": "4",
"6": "5",
"7": "6",
"8": "7",
"8A": "8",
"9": "9",
"10": "10",
"11": "11",
"12": "12",
}
esa_band = band_mapping[dataset.attrs["band_id"]]
type, mask_string_path = mtd_dict[esa_band]
def load_and_mask_data(g: h5py.Dataset, masks: BandMasks):
band_id = g.attrs["band_id"]
esa_band = WAGL_TO_ESA_BAND_NUMBER.get(band_id)
if (esa_band is None) or (esa_band not in masks):
# No mask needs to be applied.
data_array = g[:] if hasattr(g, "chunks") else g
return data_array

type, mask_string_path = masks[esa_band]
if type == "MSK_TECQUA":
return mask_h5_vector(
dataset,
g,
mask_string_path,
)
elif type == "MSK_QUALIT":
return mask_h5_raster(
dataset,
g,
mask_string_path,
)
else:
Expand Down Expand Up @@ -295,7 +305,7 @@ def write_measurement_h5(
p: DatasetAssembler,
full_name: str,
g: h5py.Dataset,
mtd_dict: dict = None,
band_masks: Optional[BandMasks] = None,
overviews=images.DEFAULT_OVERVIEWS,
overview_resampling=Resampling.nearest,
expand_valid_data=True,
Expand All @@ -304,15 +314,7 @@ def write_measurement_h5(
"""
Write a measurement by copying it from a hdf5 dataset.
"""
if hasattr(g, "chunks"):
data_array = g[:]
else:
data_array = g

if mtd_dict is not None:
data = mask_h5(g, mtd_dict)
else:
data = data_array
data = load_and_mask_data(g, band_masks or {})

product_name, band_name = full_name.split(":")
p.write_measurement_numpy(
Expand Down

0 comments on commit d1bca28

Please sign in to comment.