Skip to content

Commit

Permalink
Fixed checking background offline ingestors, moved ingestor informati…
Browse files Browse the repository at this point in the history
…on as default variables
  • Loading branch information
nitrosx committed Jan 14, 2025
1 parent 6dc7533 commit 51457c9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 31 deletions.
5 changes: 3 additions & 2 deletions src/scicat_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from dataclasses import asdict
from typing import Any
from urllib.parse import quote, urljoin
from urllib.parse import quote, urljoin, quote_plus

import requests
from scicat_configuration import SciCatOptions
Expand Down Expand Up @@ -121,14 +121,15 @@ def check_dataset_by_pid(
pid: str, config: SciCatOptions, logger: logging.Logger
) -> bool:
response = _get_from_scicat(
url=urljoin(config.host_address, f"datasets/{quote(pid)}"),
url=urljoin(config.host_address, f"datasets/{quote_plus(pid)}"),
headers=config.headers,
timeout=config.timeout,
stream=config.stream,
verify=config.verify,
)
dataset_exists: bool
if not response.ok:
logger.info("Request url : \n%s", response.url)
logger.error(
"Failed to check dataset existence by pid with status code: %s. "
"Error message from scicat backend: \n%s\n"
Expand Down
41 changes: 19 additions & 22 deletions src/scicat_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os.path
import pathlib
import re
import urllib
import uuid
from collections.abc import Callable, Iterable
from dataclasses import asdict, dataclass, field
Expand Down Expand Up @@ -99,6 +100,7 @@ def to_list(value: Any) -> list:
"dict": to_dict,
"list": to_list,
"email": to_string,
"link": to_string,
# TODO: Add email converter
}
)
Expand Down Expand Up @@ -137,6 +139,9 @@ def convert_to_type(input_value: Any, dtype_desc: str) -> Any:
"str-replace": lambda value, recipe: str(value).replace(
recipe.pattern, recipe.replacement
),
"urlsafe": lambda value, recipe: urllib.parse.quote_plus(value),
"to-lower": lambda value, recipe: str(value).lower(),
"to-upper": lambda value, recipe: str(value).upper()
}
)

Expand Down Expand Up @@ -168,12 +173,16 @@ def extract_variables_values(
variables: dict[str, MetadataSchemaVariable],
h5file: h5py.File,
config: OfflineIngestorConfig,
schema_id: str
) -> dict:
nexus_file = pathlib.Path(config.nexus_file)
variable_map = {
"ingestor_run_id": str(uuid.uuid4()),
"data_file_path": pathlib.Path(config.nexus_file),
"data_file_path": str(nexus_file),
"data_file_name": str(nexus_file.name),
"now": datetime.datetime.now(tz=datetime.UTC).isoformat(),
"ingestor_files_directory": config.ingestion.file_handling.ingestor_files_directory,
"ingestor_metadata_schema_id": schema_id,
}
for variable_name, variable_recipe in variables.items():
source = variable_recipe.source
Expand Down Expand Up @@ -464,7 +473,6 @@ def _render_variable_as_type(value: Any, variable_map: dict, dtype: str) -> Any:

def _create_scientific_metadata(
*,
metadata_schema_id: str,
sm_schemas: list[MetadataItem],
variable_map: dict,
) -> dict:
Expand All @@ -481,24 +489,15 @@ def _create_scientific_metadata(
"""
return {
# Default field
"ingestor_metadata_schema_id": {
"value": metadata_schema_id,
"unit": "",
"human_name": "Ingestor metadata schema ID",
"type": "string",
},
**{
field.machine_name: {
"value": _render_variable_as_type(
field.value, variable_map, field.type
),
"unit": getattr(field, "unit", ""),
"human_name": getattr(field, "human_name", field.machine_name),
"type": field.type,
}
for field in sm_schemas
},
field.machine_name: {
"value": _render_variable_as_type(
field.value, variable_map, field.type
),
"unit": getattr(field, "unit", ""),
"human_name": getattr(field, "human_name", field.machine_name),
"type": field.type,
}
for field in sm_schemas
}


Expand All @@ -522,7 +521,6 @@ def _validate_metadata_schemas(

def create_scicat_dataset_instance(
*,
metadata_schema_id: str, # metadata-schema["id"]
metadata_schema: dict[str, MetadataItem], # metadata-schema["schema"]
variable_map: dict,
data_file_list: list[DataFileListItem],
Expand Down Expand Up @@ -553,7 +551,6 @@ def create_scicat_dataset_instance(
numberOfFiles=len(data_file_list),
isPublished=False,
scientificMetadata=_create_scientific_metadata(
metadata_schema_id=metadata_schema_id,
sm_schemas=_filter_by_field_type(
metadata_schema.values(), SCIENTIFIC_METADATA_TYPE
), # Scientific metadata schemas
Expand Down
6 changes: 4 additions & 2 deletions src/scicat_offline_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ def main() -> None:

# define variables values
variable_map = extract_variables_values(
metadata_schema.variables, h5file, config
metadata_schema.variables,
h5file,
config,
metadata_schema.id
)

data_file_list = create_data_file_list(
Expand All @@ -165,7 +168,6 @@ def main() -> None:
# Prepare scicat dataset instance(entry)
logger.info("Preparing scicat dataset instance ...")
local_dataset_instance = create_scicat_dataset_instance(
metadata_schema_id=metadata_schema.id,
metadata_schema=metadata_schema.schema,
variable_map=variable_map,
data_file_list=data_file_list,
Expand Down
24 changes: 19 additions & 5 deletions src/scicat_online_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,24 @@ def dump_message_to_file_if_needed(
logger.info("Message file saved")


def _individual_message_commit(job_id, message, consumer, logger: logging.Logger):
def _individual_message_commit(
job_id,
message,
consumer,
logger: logging.Logger
):
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=message)


def _check_offline_ingestors(
offline_ingestors, consumer, config, logger: logging.Logger
offline_ingestors,
consumer,
config,
logger: logging.Logger
) -> int:
logger.info("%s offline ingestors running", len(offline_ingestors))
jobs_done = []
for job_id, job_item in offline_ingestors.items():
result = job_item["proc"].poll()
if result is not None:
Expand All @@ -81,15 +90,20 @@ def _check_offline_ingestors(
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(
job_id, job_item["message"], consumer, logger
job_id,
job_item["message"],
consumer,
logger
)
else:
logger.error("Offline ingestor error for job id %s", job_id)
logger.info(
"Removed ingestor for message with job id %s from queue", job_id
)
offline_ingestors.pop(job_id)

jobs_done.append(job_id)
logger.info("%s offline ingestors done", len(jobs_done))
for job_id in jobs_done:
offline_ingestors.pop(job_id)
return len(offline_ingestors)


Expand Down

0 comments on commit 51457c9

Please sign in to comment.