diff --git a/.env b/.env index 356de98e..9ebc6fed 100644 --- a/.env +++ b/.env @@ -41,7 +41,7 @@ MEM_LIMIT=4294967296 # on dev connect to the same network as off-server COMMON_NET_NAME=po_default -# Sentry DNS for bug tracking +# Sentry DNS for bug tracking, used only in staging and production SENTRY_DNS= # Log level to use, DEBUG by default in dev diff --git a/app/_import.py b/app/_import.py index 353fe44b..ec357cb3 100644 --- a/app/_import.py +++ b/app/_import.py @@ -11,7 +11,7 @@ from redis import Redis from app._types import JSONType -from app.config import Config, TaxonomyConfig, settings +from app.config import Config, IndexConfig, TaxonomyConfig from app.indexing import ( DocumentProcessor, generate_index_object, @@ -25,7 +25,7 @@ class BaseDocumentFetcher(abc.ABC): - def __init__(self, config: Config) -> None: + def __init__(self, config: IndexConfig) -> None: self.config = config @abc.abstractmethod @@ -42,10 +42,10 @@ def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None: pass -def load_document_fetcher(config: Config) -> BaseDocumentFetcher: +def load_document_fetcher(config: IndexConfig) -> BaseDocumentFetcher: """Load the document fetcher class from the config. - :param config: the config object + :param config: the index configuration to use :return: the initialized document fetcher """ fetcher_cls = load_class_object_from_string(config.document_fetcher) @@ -54,6 +54,7 @@ def load_document_fetcher(config: Config) -> BaseDocumentFetcher: def get_processed_since( redis_client: Redis, + redis_stream_name: str, start_timestamp_ms: int, id_field_name: str, document_fetcher: BaseDocumentFetcher, @@ -63,19 +64,19 @@ def get_processed_since( timestamp. :param redis_client: the Redis client + :param redis_stream_name: the name of the Redis stream to read from :param start_timestamp_ms: the timestamp to start from, in milliseconds :param id_field_name: the name of the field containing the ID :param document_fetcher: the document fetcher :param batch_size: the size of the batch to fetch, defaults to 100 :yield: a tuple containing the timestamp (in milliseconds) and the document """ - stream_name = settings.redis_import_stream_name fetched_ids = set() # We start from the given timestamp min_id = f"{start_timestamp_ms}-0" while True: - batch = redis_client.xrange(stream_name, min=min_id, count=batch_size) + batch = redis_client.xrange(redis_stream_name, min=min_id, count=batch_size) if not batch: # We reached the end of the stream break @@ -93,7 +94,7 @@ def get_processed_since( logger.debug(f"Skipping ID {id_} because it was already fetched") continue fetched_ids.add(id_) - document = document_fetcher.fetch_document(stream_name, item) + document = document_fetcher.fetch_document(redis_stream_name, item) if document is None: logger.debug(f"Skipping ID {id_} because it was not found") continue @@ -102,46 +103,58 @@ def get_processed_since( def get_new_updates( redis_client: Redis, - id_field_name: str, - document_fetcher: BaseDocumentFetcher, + stream_names: list[str], + id_field_names: dict[str, str], + document_fetchers: dict[str, BaseDocumentFetcher], batch_size: int = 100, -): +) -> Iterator[tuple[str, int, JSONType]]: """Reads new updates from Redis Stream, starting from the moment this function is called. The function will block until new updates are available. :param redis_client: the Redis client - :param id_field_name: the name of the field containing the ID - :param document_fetcher: the document fetcher + :param stream_names: the names of the Redis streams to read from + :param id_field_names: the name of the field containing the ID for each + stream + :param document_fetchers: the document fetcher for each stream :param batch_size: the size of the batch to fetch, defaults to 100. - :yield: a tuple containing the timestamp (in milliseconds) and the document + :yield: a tuple containing the stream name, the timestamp (in + milliseconds) and the document """ - stream_name = settings.redis_import_stream_name # We start from the last ID - min_id = "$" + min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = { + stream_name: "$" for stream_name in stream_names + } while True: logger.debug( - "Listening to new updates from stream %s (ID: %s)", stream_name, min_id + "Listening to new updates from streams %s (ID: %s)", stream_names, min_ids ) # We use block=0 to wait indefinitely for new updates - response = redis_client.xread({stream_name: min_id}, block=0, count=batch_size) + response = redis_client.xread(streams=min_ids, block=0, count=batch_size) response = cast(list[tuple[str, list[tuple[str, dict]]]], response) - # We only have one stream, so we only have one response # The response is a list of tuples (stream_name, batch) - _, batch = response[0] - # We update the min_id to the last ID of the batch - min_id = batch[-1][0] - for timestamp_id, item in batch: - id_ = item[id_field_name] - logger.debug("Fetched ID: %s", id_) - # Get the timestamp from the ID - timestamp = int(timestamp_id.split("-")[0]) - document = document_fetcher.fetch_document(stream_name, item) - if document is None: - logger.debug(f"Skipping ID {id_} because it was not found") - continue - yield timestamp, document + + for stream_name, batch in response: + # We update the min_id to the last ID of the batch + min_id = batch[-1][0] + min_ids[stream_name] = min_id + id_field_name = id_field_names[stream_name] + document_fetcher = document_fetchers[stream_name] + for timestamp_id, item in batch: + id_ = item[id_field_name] + logger.debug("Fetched ID: %s", id_) + # Get the timestamp from the ID + timestamp = int(timestamp_id.split("-")[0]) + document = document_fetcher.fetch_document(stream_name, item) + if document is None: + logger.debug( + "Stream %s: Skipping ID %s because it was not found", + stream_name, + id_, + ) + continue + yield stream_name, timestamp, document def get_document_dict( @@ -224,22 +237,20 @@ def update_alias(es_client: Elasticsearch, next_index: str, index_alias: str): :param index_alias: the alias to update """ es_client.indices.update_aliases( - body={ - "actions": [ - { - "remove": { - "alias": index_alias, - "index": f"{index_alias}-*", - }, - }, - {"add": {"alias": index_alias, "index": next_index}}, - ], - }, + actions=[ + { + "remove": { + "alias": index_alias, + "index": f"{index_alias}-*", + } + }, + {"add": {"alias": index_alias, "index": next_index}}, + ] ) def import_parallel( - config: Config, + config: IndexConfig, file_path: Path, next_index: str, num_items: int | None, @@ -278,12 +289,12 @@ def import_parallel( logger.error("Encountered errors: %s", errors) -def import_taxonomies(config: Config, next_index: str): +def import_taxonomies(config: IndexConfig, next_index: str): """Import taxonomies into Elasticsearch. A single taxonomy index is used to store all taxonomy items. - :param config: the configuration to use + :param config: the index configuration to use :param next_index: the index to write to """ # open a connection for this process @@ -305,6 +316,7 @@ def import_taxonomies(config: Config, next_index: str): def get_redis_products( + stream_name: str, processor: DocumentProcessor, fetcher: BaseDocumentFetcher, index: str, @@ -313,6 +325,7 @@ def get_redis_products( ): """Fetch IDs of documents to update from Redis. + :param stream_name: the name of the Redis stream to read from :param processor: the document processor to use to process the documents :param fetcher: the document fetcher to use to fetch the documents from the document ID @@ -325,6 +338,7 @@ def get_redis_products( processed = 0 for _, row in get_processed_since( redis_client, + stream_name, last_updated_timestamp_ms, id_field_name, document_fetcher=fetcher, @@ -334,12 +348,18 @@ def get_redis_products( logger.info("Processed %d updates from Redis", processed) -def get_redis_updates(es_client: Elasticsearch, index: str, config: Config): +def get_redis_updates( + es_client: Elasticsearch, index: str, config: IndexConfig +) -> None: """Fetch updates from Redis and index them. :param index: the index to write to - :param config: the configuration to use + :param config: the index configuration to use """ + if config.redis_stream_name is None: + logger.info(f"Redis updates are disabled for index {index}") + return + processor = DocumentProcessor(config) fetcher = load_document_fetcher(config) # Ensure all documents are searchable after the import @@ -362,7 +382,12 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config): for success, info in parallel_bulk( es_client, get_redis_products( - processor, fetcher, index, id_field_name, last_updated_timestamp_ms + config.redis_stream_name, + processor, + fetcher, + index, + id_field_name, + last_updated_timestamp_ms, ), ): if not success: @@ -372,7 +397,7 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config): def run_full_import( file_path: Path, num_processes: int, - config: Config, + config: IndexConfig, num_items: int | None = None, ): """Run a full data import from a JSONL. @@ -386,7 +411,7 @@ def run_full_import( :param file_path: the path of the JSONL file to import :param num_processes: the number of processes to use to perform parallel import - :param config: the configuration to use + :param config: the index configuration to use :param num_items: the number of items to import, defaults to None (all) """ es_client = connection.get_es_client() @@ -421,10 +446,10 @@ def run_full_import( update_alias(es_client, next_index, config.index.name) -def perform_taxonomy_import(config: Config) -> None: +def perform_taxonomy_import(config: IndexConfig) -> None: """Create a new index for taxonomies and import them. - :param config: the configuration to use + :param config: the index configuration to use """ es_client = connection.get_es_client() # we create a temporary index to import to @@ -461,20 +486,35 @@ def run_update_daemon(config: Config) -> None: logger.error("Could not connect to Redis") return - document_fetcher = load_document_fetcher(config) - processor = DocumentProcessor(config) + if len(list(config.indices)) != 1: + raise ValueError("Only one index is supported") + + processors: dict[str, DocumentProcessor] = {} + document_fetchers: dict[str, BaseDocumentFetcher] = {} + id_field_names: dict[str, str] = {} + stream_name_to_index_id: dict[str, str] = {} + + for index_id, index_config in config.indices.items(): + stream_name = index_config.redis_stream_name + if stream_name is not None: + processors[stream_name] = DocumentProcessor(index_config) + document_fetchers[stream_name] = load_document_fetcher(index_config) + id_field_names[stream_name] = index_config.index.id_field_name + stream_name_to_index_id[stream_name] = index_id - for _, document in get_new_updates( + for stream_name, _, document in get_new_updates( redis_client, - config.index.id_field_name, - document_fetcher, + list(id_field_names.keys()), + id_field_names=id_field_names, + document_fetchers=document_fetchers, ): - document = processor.from_dict(document) - if not document: + processed_document = processors[stream_name].from_dict(document) + if not processed_document: continue - _id = document.pop("_id") + _id = processed_document.pop("_id") + index_id = stream_name_to_index_id[stream_name] es_client.index( - index=config.index.name, - body=document, + index=config.indices[index_id].index.name, + body=processed_document, id=_id, ) diff --git a/app/api.py b/app/api.py index cce96d46..33a57c2d 100644 --- a/app/api.py +++ b/app/api.py @@ -32,12 +32,18 @@ # failure, but we add a warning message as it's not expected in a # production settings logger.warning("Main configuration is not set, use CONFIG_PATH envvar") - FILTER_QUERY_BUILDER = None - RESULT_PROCESSOR = None + FILTER_QUERY_BUILDERS = {} + RESULT_PROCESSORS = {} else: # we cache query builder and result processor here for faster processing - FILTER_QUERY_BUILDER = build_elasticsearch_query_builder(config.CONFIG) - RESULT_PROCESSOR = load_result_processor(config.CONFIG) + FILTER_QUERY_BUILDERS = { + index_id: build_elasticsearch_query_builder(index_config) + for index_id, index_config in config.CONFIG.indices.items() + } + RESULT_PROCESSORS = { + index_id: load_result_processor(index_config) + for index_id, index_config in config.CONFIG.indices.items() + } app = FastAPI( @@ -57,14 +63,46 @@ connection.get_es_client() +INDEX_ID_QUERY_PARAM = Query( + description="""Index ID to use for the search, if not provided, the default index is used. + If there is only one index, this parameter is not needed.""" +) + + +def check_index_id_is_defined(index_id: str | None, config: config.Config) -> None: + """Check that the index ID is defined in the configuration. + + Raise an HTTPException if it's not the case. + + :param index_id: index ID to check + :param config: configuration to check against + """ + if index_id is None: + if len(config.indices) > 1: + raise HTTPException( + status_code=400, + detail=f"Index ID must be provided when there is more than one index, available indices: {list(config.indices.keys())}", + ) + elif index_id not in config.indices: + raise HTTPException( + status_code=404, + detail=f"Index ID '{index_id}' not found, available indices: {list(config.indices.keys())}", + ) + + @app.get("/document/{identifier}") -def get_document(identifier: str): +def get_document( + identifier: str, index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None +): """Fetch a document from Elasticsearch with specific ID.""" check_config_is_defined() global_config = cast(config.Config, config.CONFIG) - id_field_name = global_config.index.id_field_name + check_index_id_is_defined(index_id, global_config) + index_id, index_config = global_config.get_index_config(index_id) + + id_field_name = index_config.index.id_field_name results = ( - Search(index=global_config.index.name) + Search(index=index_config.index.name) .query("term", **{id_field_name: identifier}) .extra(size=1) .execute() @@ -125,10 +163,16 @@ def search( and be sortable. If it is not provided, results are sorted by descending relevance score.""" ), ] = None, + index_id: Annotated[ + str | None, + INDEX_ID_QUERY_PARAM, + ] = None, ) -> SearchResponse: check_config_is_defined() global_config = cast(config.Config, config.CONFIG) - result_processor = cast(BaseResultProcessor, RESULT_PROCESSOR) + check_index_id_is_defined(index_id, global_config) + index_id, index_config = global_config.get_index_config(index_id) + result_processor = cast(BaseResultProcessor, RESULT_PROCESSORS[index_id]) if q is None and sort_by is None: raise HTTPException( status_code=400, detail="`sort_by` must be provided when `q` is missing" @@ -157,12 +201,11 @@ def search( langs=langs_set, size=page_size, page=page, - config=global_config, + config=index_config, sort_by=sort_by, # filter query builder is generated from elasticsearch mapping and - # takes ~40ms to generate, build-it before hand as we're using global - # Config - filter_query_builder=FILTER_QUERY_BUILDER, + # takes ~40ms to generate, build-it before hand to avoid this delay + filter_query_builder=FILTER_QUERY_BUILDERS[index_id], ) logger.debug("Elasticsearch query: %s", query.to_dict()) @@ -196,16 +239,19 @@ def taxonomy_autocomplete( int | None, Query(description="Fuzziness level to use, default to no fuzziness."), ] = None, + index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None, ): check_config_is_defined() global_config = cast(config.Config, config.CONFIG) + check_index_id_is_defined(index_id, global_config) + index_id, index_config = global_config.get_index_config(index_id) taxonomy_names_list = taxonomy_names.split(",") query = build_completion_query( q=q, taxonomy_names=taxonomy_names_list, lang=lang, size=size, - config=global_config, + config=index_config, fuzziness=fuzziness, ) try: @@ -234,11 +280,19 @@ def html_search( page_size: int = 24, langs: str = "fr,en", sort_by: str | None = None, + index_id: Annotated[str | None, INDEX_ID_QUERY_PARAM] = None, ): if not q: return templates.TemplateResponse("search.html", {"request": request}) - results = search(q=q, langs=langs, page_size=page_size, page=page, sort_by=sort_by) + results = search( + q=q, + langs=langs, + page_size=page_size, + page=page, + sort_by=sort_by, + index_id=index_id, + ) template_data: dict[str, Any] = { "q": q or "", "request": request, diff --git a/app/cli/main.py b/app/cli/main.py index 1d23985c..7d1c8bd3 100644 --- a/app/cli/main.py +++ b/app/cli/main.py @@ -28,6 +28,12 @@ def import_data( file_okay=True, exists=True, ), + index_id: Optional[str] = typer.Option( + default=None, + help="Each index has its own configuration in the configuration file, " + "and the ID is used to know which index to use. " + "If there is only one index, this option is not needed.", + ), ): """Import data into Elasticsearch.""" import time @@ -46,10 +52,16 @@ def import_data( start_time = time.perf_counter() check_config_is_defined() global_config = cast(config.Config, config.CONFIG) + index_id, index_config = global_config.get_index_config(index_id) + if index_config is None: + raise typer.BadParameter( + "You must specify an index ID when there are multiple indices" + ) + run_full_import( input_path, num_processes, - global_config, + index_config, num_items=num_items, ) end_time = time.perf_counter() @@ -65,6 +77,12 @@ def import_taxonomies( file_okay=True, exists=True, ), + index_id: Optional[str] = typer.Option( + default=None, + help="Each index has its own configuration in the configuration file, " + "and the ID is used to know which index to use. " + "If there is only one index, this option is not needed.", + ), ): """Import taxonomies into Elasticsearch.""" import time @@ -82,9 +100,14 @@ def import_taxonomies( check_config_is_defined() global_config = cast(config.Config, config.CONFIG) + index_id, index_config = global_config.get_index_config(index_id) + if index_config is None: + raise typer.BadParameter( + "You must specify an index ID when there are multiple indices" + ) start_time = time.perf_counter() - perform_taxonomy_import(global_config) + perform_taxonomy_import(index_config) end_time = time.perf_counter() logger.info("Import time: %s seconds", end_time - start_time) diff --git a/app/config.py b/app/config.py index 22979dd0..87528c6a 100644 --- a/app/config.py +++ b/app/config.py @@ -39,11 +39,6 @@ class Settings(BaseSettings): elasticsearch_url: str = "http://localhost:9200" redis_host: str = "localhost" redis_port: int = 6379 - # the name of the Redis stream to read from when listening to product - # updates - redis_import_stream_name: str = "product_update" - # TODO: this should be in the config below - openfoodfacts_base_url: str = "https://world.openfoodfacts.org" sentry_dns: str | None = None log_level: LoggingLevel = LoggingLevel.INFO taxonomy_cache_dir: Path = Path("data/taxonomies") @@ -215,7 +210,7 @@ def has_lang_subfield(self) -> bool: return self.type in (FieldType.taxonomy, FieldType.text_lang) -class IndexConfig(BaseModel): +class ESIndexConfig(BaseModel): name: Annotated[str, Field(description="name of the index alias to use")] id_field_name: Annotated[ str, Field(description="name of the field to use for `_id`") @@ -240,7 +235,7 @@ class TaxonomyIndexConfig(BaseModel): name: Annotated[ str, Field(description="name of the taxonomy index alias to use"), - ] = "taxonomy" + ] number_of_shards: Annotated[ int, Field(description="number of shards to use for the index") ] = 4 @@ -272,8 +267,10 @@ class TaxonomyConfig(BaseModel): ] -class Config(BaseModel): - index: Annotated[IndexConfig, Field(description="configuration of the index")] +class IndexConfig(BaseModel): + index: Annotated[ + ESIndexConfig, Field(description="configuration of the Elasticsearch index") + ] fields: Annotated[ dict[str, FieldConfig], Field( @@ -337,6 +334,14 @@ class Config(BaseModel): set[str], Field(description="list of documents IDs to ignore") ] = Field(default_factory=set) + redis_stream_name: Annotated[ + str | None, + Field( + description="name of the Redis stream to read from when listening to document updates. " + "If not provided, document updates won't be listened to for this index." + ), + ] = None + @model_validator(mode="after") def taxonomy_name_should_be_defined(self): """Validator that checks that for if `taxonomy_type` is defined for a @@ -354,7 +359,7 @@ def taxonomy_name_should_be_defined(self): @model_validator(mode="after") def field_references_must_exist_and_be_valid(self): - """Validator that checks that every field reference in IndexConfig + """Validator that checks that every field reference in ESIndexConfig refers to an existing field and is valid.""" if self.index.id_field_name not in self.fields: raise ValueError( @@ -402,6 +407,51 @@ def get_taxonomy_langs(self) -> set[str]: # langs will be stored in a unique `other` subfield return (set(self.taxonomy.exported_langs)) & set(ANALYZER_LANG_MAPPING) + +class Config(BaseModel): + indices: dict[str, IndexConfig] = Field( + description="configuration of indices. " + "The key is the ID of the index that can be referenced at query time. " + "One index corresponds to a specific set of documents and can be queried independently." + ) + default_index: Annotated[ + str, + Field( + description="the default index to use when no index is specified in the query", + ), + ] + + @model_validator(mode="after") + def defaut_index_must_exist(self): + """Validator that checks that default_index exists.""" + if self.default_index not in self.indices: + raise ValueError( + f"default_index={self.default_index} but index was not declared (available indices: {list(self.indices.keys())})" + ) + return self + + @model_validator(mode="after") + def redis_stream_name_should_be_unique(self): + """Validator that checks that every redis_stream_name is unique.""" + redis_stream_names = [ + index.redis_stream_name + for index in self.indices.values() + if index.redis_stream_name is not None + ] + if len(redis_stream_names) != len(set(redis_stream_names)): + raise ValueError("redis_stream_name should be unique") + return self + + def get_index_config(self, index_id: str | None) -> tuple[str, IndexConfig]: + """Return a (index_id, IndexConfig) for the given index_id. + + If no index_id is provided, the default index is used. + If the index_id is not found, (index_id, None) is returned. + """ + if index_id is None: + index_id = self.default_index + return index_id, self.indices[index_id] + @classmethod def from_yaml(cls, path: Path) -> "Config": """Create a Config from a yaml configuration file.""" diff --git a/app/indexing.py b/app/indexing.py index 0860e722..3eec8c76 100644 --- a/app/indexing.py +++ b/app/indexing.py @@ -12,6 +12,7 @@ Config, FieldConfig, FieldType, + IndexConfig, TaxonomyConfig, TaxonomySourceConfig, ) @@ -263,7 +264,7 @@ class DocumentProcessor: into a dict that is ready to be indexed by Elasticsearch. """ - def __init__(self, config: Config) -> None: + def __init__(self, config: IndexConfig) -> None: self.config = config self.supported_langs = config.get_supported_langs() self.taxonomy_langs = config.get_taxonomy_langs() @@ -339,7 +340,7 @@ def from_dict(self, data: JSONType) -> JSONType | None: return inputs -def generate_mapping_object(config: Config) -> Mapping: +def generate_mapping_object(config: IndexConfig) -> Mapping: mapping = Mapping() supported_langs = config.supported_langs taxonomy_langs = config.taxonomy.exported_langs @@ -356,7 +357,7 @@ def generate_mapping_object(config: Config) -> Mapping: return mapping -def generate_index_object(index_name: str, config: Config) -> Index: +def generate_index_object(index_name: str, config: IndexConfig) -> Index: index = Index(index_name) index.settings( number_of_shards=config.index.number_of_shards, @@ -367,7 +368,7 @@ def generate_index_object(index_name: str, config: Config) -> Index: return index -def generate_taxonomy_mapping_object(config: Config) -> Mapping: +def generate_taxonomy_mapping_object(config: IndexConfig) -> Mapping: mapping = Mapping() supported_langs = config.supported_langs mapping.field("id", dsl_field.Keyword(required=True)) @@ -395,7 +396,7 @@ def generate_taxonomy_mapping_object(config: Config) -> Mapping: return mapping -def generate_taxonomy_index_object(index_name: str, config: Config) -> Index: +def generate_taxonomy_index_object(index_name: str, config: IndexConfig) -> Index: index = Index(index_name) taxonomy_index_config = config.taxonomy.index index.settings( diff --git a/app/postprocessing.py b/app/postprocessing.py index 38da8280..890f1fdb 100644 --- a/app/postprocessing.py +++ b/app/postprocessing.py @@ -1,12 +1,12 @@ from elasticsearch_dsl.response import Response from app._types import JSONType -from app.config import Config, FieldType +from app.config import FieldType, IndexConfig from app.utils import load_class_object_from_string class BaseResultProcessor: - def __init__(self, config: Config) -> None: + def __init__(self, config: IndexConfig) -> None: self.config = config def process(self, response: Response, projection: set[str] | None) -> JSONType: @@ -46,10 +46,10 @@ def process_after(self, result: JSONType) -> JSONType: return result -def load_result_processor(config: Config) -> BaseResultProcessor | None: +def load_result_processor(config: IndexConfig) -> BaseResultProcessor | None: """Load the result processor class from the config. - :param config: the config object + :param config: the index configuration to use :return: the initialized result processor """ result_processor_cls = ( diff --git a/app/query.py b/app/query.py index 59383f45..483e2192 100644 --- a/app/query.py +++ b/app/query.py @@ -17,7 +17,7 @@ SearchResponseError, SuccessSearchResponse, ) -from app.config import Config, FieldType +from app.config import FieldType, IndexConfig from app.indexing import generate_index_object from app.postprocessing import BaseResultProcessor from app.utils import get_logger @@ -25,7 +25,7 @@ logger = get_logger(__name__) -def build_elasticsearch_query_builder(config: Config) -> ElasticsearchQueryBuilder: +def build_elasticsearch_query_builder(config: IndexConfig) -> ElasticsearchQueryBuilder: index = generate_index_object(config.index.name, config) options = SchemaAnalyzer(index.to_dict()).query_builder_options() options["default_operator"] = ElasticsearchQueryBuilder.MUST @@ -50,7 +50,7 @@ def __call__(self, tree): return self.visit(tree) -def build_query_clause(query: str, langs: set[str], config: Config) -> Query: +def build_query_clause(query: str, langs: set[str], config: IndexConfig) -> Query: fields = [] supported_langs = config.get_supported_langs() taxonomy_langs = config.get_taxonomy_langs() @@ -166,12 +166,12 @@ def parse_lucene_dsl_query( return filter_query, remaining_terms -def parse_sort_by_parameter(sort_by: str | None, config: Config) -> str | None: +def parse_sort_by_parameter(sort_by: str | None, config: IndexConfig) -> str | None: """Parse `sort_by` parameter, special handling is performed for `text_lang` subfield. :param sort_by: the raw `sort_by` value - :param config: the Config to use + :param config: the index configuration to use :return: None if `sort_by` is not provided or the final value otherwise """ if sort_by is None: @@ -193,7 +193,7 @@ def parse_sort_by_parameter(sort_by: str | None, config: Config) -> str | None: return sort_by -def create_aggregation_clauses(config: Config) -> dict[str, Agg]: +def create_aggregation_clauses(config: IndexConfig) -> dict[str, Agg]: """Create term bucket aggregation clauses for all relevant fields as defined in the config. """ @@ -209,7 +209,7 @@ def build_search_query( langs: set[str], size: int, page: int, - config: Config, + config: IndexConfig, filter_query_builder: ElasticsearchQueryBuilder, sort_by: str | None = None, ) -> Query: @@ -220,7 +220,7 @@ def build_search_query( select language subfields for some field types :param size: number of results to return :param page: requested page (starts at 1). - :param config: configuration to use + :param config: the index configuration to use :param filter_query_builder: luqum elasticsearch query builder :param sort_by: sorting key, defaults to None (=relevance-based sorting) :return: the built Query @@ -263,7 +263,7 @@ def build_completion_query( taxonomy_names: list[str], lang: str, size: int, - config: Config, + config: IndexConfig, fuzziness: int | None = 2, ): """Build an elasticsearch_dsl completion Query. @@ -272,7 +272,7 @@ def build_completion_query( :param taxonomy_names: a list of taxonomies we want to search in :param lang: the language we want search in :param size: number of results to return - :param config: configuration to use + :param config: the index configuration to use :param fuzziness: fuzziness parameter for completion query :return: the built Query """ diff --git a/config_schema.json b/config_schema.json index c006a0e0..925684dd 100644 --- a/config_schema.json +++ b/config_schema.json @@ -1,5 +1,43 @@ { "$defs": { + "ESIndexConfig": { + "properties": { + "name": { + "description": "name of the index alias to use", + "title": "Name", + "type": "string" + }, + "id_field_name": { + "description": "name of the field to use for `_id`", + "title": "Id Field Name", + "type": "string" + }, + "last_modified_field_name": { + "description": "name of the field containing the date of last modification, used for incremental updates using Redis queues. The field value must be an int/float representing the timestamp.", + "title": "Last Modified Field Name", + "type": "string" + }, + "number_of_shards": { + "default": 4, + "description": "number of shards to use for the index", + "title": "Number Of Shards", + "type": "integer" + }, + "number_of_replicas": { + "default": 1, + "description": "number of replicas to use for the index", + "title": "Number Of Replicas", + "type": "integer" + } + }, + "required": [ + "name", + "id_field_name", + "last_modified_field_name" + ], + "title": "ESIndexConfig", + "type": "object" + }, "FieldConfig": { "properties": { "name": { @@ -103,38 +141,125 @@ }, "IndexConfig": { "properties": { - "name": { - "description": "name of the index alias to use", - "title": "Name", + "index": { + "allOf": [ + { + "$ref": "#/$defs/ESIndexConfig" + } + ], + "description": "configuration of the Elasticsearch index" + }, + "fields": { + "additionalProperties": { + "$ref": "#/$defs/FieldConfig" + }, + "description": "configuration of all fields in the index, keys are field names and values contain the field configuration", + "title": "Fields", + "type": "object" + }, + "split_separator": { + "default": ",", + "description": "separator to use when splitting values, for fields that have split=True", + "title": "Split Separator", "type": "string" }, - "id_field_name": { - "description": "name of the field to use for `_id`", - "title": "Id Field Name", + "lang_separator": { + "default": "_", + "description": "for `text_lang` FieldType, the separator between the name of the field and the language code, ex: product_name_it if lang_separator=\"_\"", + "title": "Lang Separator", "type": "string" }, - "last_modified_field_name": { - "description": "name of the field containing the date of last modification, used for incremental updates using Redis queues. The field value must be an int/float representing the timestamp.", - "title": "Last Modified Field Name", + "taxonomy": { + "allOf": [ + { + "$ref": "#/$defs/TaxonomyConfig" + } + ], + "description": "configuration of the taxonomies used" + }, + "supported_langs": { + "description": "A list of all supported languages, it is used to build index mapping", + "items": { + "type": "string" + }, + "title": "Supported Langs", + "type": "array" + }, + "document_fetcher": { + "description": "The full qualified reference to the document fetcher, i.e. the class responsible from fetching the document using the document ID present in the Redis Stream.", + "examples": [ + "app.openfoodfacts.DocumentFetcher" + ], + "title": "Document Fetcher", "type": "string" }, - "number_of_shards": { - "default": 4, - "description": "number of shards to use for the index", - "title": "Number Of Shards", - "type": "integer" + "preprocessor": { + "anyOf": [ + { + "description": "The full qualified reference to the preprocessor to use before data import. This is used to adapt the data schema or to add search-a-licious specific fields for example.", + "examples": [ + "app.openfoodfacts.DocumentPreprocessor" + ], + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Preprocessor" }, - "number_of_replicas": { - "default": 1, - "description": "number of replicas to use for the index", - "title": "Number Of Replicas", - "type": "integer" + "result_processor": { + "anyOf": [ + { + "description": "The full qualified reference to the elasticsearch result processor to use after search query to Elasticsearch. This is used to add custom fields for example.", + "examples": [ + "app.openfoodfacts.ResultProcessor" + ], + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Result Processor" + }, + "match_phrase_boost": { + "default": 2.0, + "description": "How much we boost exact matches on individual fields", + "title": "Match Phrase Boost", + "type": "number" + }, + "document_denylist": { + "description": "list of documents IDs to ignore", + "items": { + "type": "string" + }, + "title": "Document Denylist", + "type": "array", + "uniqueItems": true + }, + "redis_stream_name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "name of the Redis stream to read from when listening to document updates. If not provided, document updates won't be listened to for this index.", + "title": "Redis Stream Name" } }, "required": [ - "name", - "id_field_name", - "last_modified_field_name" + "index", + "fields", + "taxonomy", + "supported_langs", + "document_fetcher" ], "title": "IndexConfig", "type": "object" @@ -177,7 +302,6 @@ "TaxonomyIndexConfig": { "properties": { "name": { - "default": "taxonomy", "description": "name of the taxonomy index alias to use", "title": "Name", "type": "string" @@ -195,6 +319,9 @@ "type": "integer" } }, + "required": [ + "name" + ], "title": "TaxonomyIndexConfig", "type": "object" }, @@ -223,112 +350,23 @@ } }, "properties": { - "index": { - "allOf": [ - { - "$ref": "#/$defs/IndexConfig" - } - ], - "description": "configuration of the index" - }, - "fields": { + "indices": { "additionalProperties": { - "$ref": "#/$defs/FieldConfig" + "$ref": "#/$defs/IndexConfig" }, - "description": "configuration of all fields in the index, keys are field names and values contain the field configuration", - "title": "Fields", + "description": "configuration of indices. The key is the ID of the index that can be referenced at query time. One index corresponds to a specific set of documents and can be queried independently.", + "title": "Indices", "type": "object" }, - "split_separator": { - "default": ",", - "description": "separator to use when splitting values, for fields that have split=True", - "title": "Split Separator", - "type": "string" - }, - "lang_separator": { - "default": "_", - "description": "for `text_lang` FieldType, the separator between the name of the field and the language code, ex: product_name_it if lang_separator=\"_\"", - "title": "Lang Separator", - "type": "string" - }, - "taxonomy": { - "allOf": [ - { - "$ref": "#/$defs/TaxonomyConfig" - } - ], - "description": "configuration of the taxonomies used" - }, - "supported_langs": { - "description": "A list of all supported languages, it is used to build index mapping", - "items": { - "type": "string" - }, - "title": "Supported Langs", - "type": "array" - }, - "document_fetcher": { - "description": "The full qualified reference to the document fetcher, i.e. the class responsible from fetching the document using the document ID present in the Redis Stream.", - "examples": [ - "app.openfoodfacts.DocumentFetcher" - ], - "title": "Document Fetcher", + "default_index": { + "description": "the default index to use when no index is specified in the query", + "title": "Default Index", "type": "string" - }, - "preprocessor": { - "anyOf": [ - { - "description": "The full qualified reference to the preprocessor to use before data import. This is used to adapt the data schema or to add search-a-licious specific fields for example.", - "examples": [ - "app.openfoodfacts.DocumentPreprocessor" - ], - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "title": "Preprocessor" - }, - "result_processor": { - "anyOf": [ - { - "description": "The full qualified reference to the elasticsearch result processor to use after search query to Elasticsearch. This is used to add custom fields for example.", - "examples": [ - "app.openfoodfacts.ResultProcessor" - ], - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "title": "Result Processor" - }, - "match_phrase_boost": { - "default": 2.0, - "description": "How much we boost exact matches on individual fields", - "title": "Match Phrase Boost", - "type": "number" - }, - "document_denylist": { - "description": "list of documents IDs to ignore", - "items": { - "type": "string" - }, - "title": "Document Denylist", - "type": "array", - "uniqueItems": true } }, "required": [ - "index", - "fields", - "taxonomy", - "supported_langs", - "document_fetcher" + "indices", + "default_index" ], "title": "JSON schema for search-a-licious configuration file", "type": "object", diff --git a/data/config/openfoodfacts.yml b/data/config/openfoodfacts.yml index 5b1b0204..a7eeccb9 100644 --- a/data/config/openfoodfacts.yml +++ b/data/config/openfoodfacts.yml @@ -1,360 +1,365 @@ -index: - id_field_name: code - last_modified_field_name: last_modified_t - name: openfoodfacts - number_of_replicas: 1 - number_of_shards: 4 -fields: - code: - required: true - type: keyword - obsolete: - required: true - type: bool - product_name: - full_text_search: true - type: text_lang - generic_name: - full_text_search: true - type: text_lang - abbreviated_product_name: - type: text_lang - categories: - full_text_search: true - input_field: categories_tags - taxonomy_name: category - type: taxonomy - labels: - full_text_search: true - input_field: labels_tags - taxonomy_name: label - type: taxonomy - brands: - full_text_search: true - split: true - type: text - brands_tags: - type: keyword - bucket_agg: true - stores: - split: true - type: text - emb_codes: - split: true - type: text - lang: - type: keyword - bucket_agg: true - lc: - type: keyword - owner: - type: keyword - bucket_agg: true - quantity: - type: text - categories_tags: - type: keyword - bucket_agg: true - labels_tags: - type: keyword - bucket_agg: true - countries_tags: - type: keyword - bucket_agg: true - states_tags: - type: keyword - bucket_agg: true - origins_tags: - type: keyword - ingredients_tags: - type: keyword - unique_scans_n: - type: integer - scans_n: - type: integer - nutrition_grades: - type: keyword - bucket_agg: true - ecoscore_grade: - type: keyword - bucket_agg: true - nova_groups: - type: keyword - bucket_agg: true - last_modified_t: - type: date - created_t: - type: date - images: - type: disabled - additives_n: - type: integer - allergens_tags: - type: keyword - ecoscore_data: - type: disabled - ecoscore_score: - type: integer - forest_footprint_data: - type: disabled - ingredients_analysis_tags: - type: keyword - ingredients_n: - type: integer - nova_group: - type: integer - nutrient_levels: - type: disabled - nutriments: - type: object - nutriscore_data: - type: disabled - nutriscore_grade: - type: keyword - traces_tags: - type: keyword - unknown_ingredients_n: - type: integer - popularity_key: - type: long - nutriscore_score: - type: integer - completeness: - type: float -document_denylist: -- '8901552007122' -lang_separator: _ -match_phrase_boost: 2.0 -preprocessor: app.openfoodfacts.DocumentPreprocessor -document_fetcher: app.openfoodfacts.DocumentFetcher -result_processor: app.openfoodfacts.ResultProcessor -split_separator: ',' -taxonomy: - sources: - - name: category - url: https://static.openfoodfacts.org/data/taxonomies/categories.full.json - - name: label - url: https://static.openfoodfacts.org/data/taxonomies/labels.full.json - - name: additive - url: https://static.openfoodfacts.org/data/taxonomies/additives.full.json - - name: allergen - url: https://static.openfoodfacts.org/data/taxonomies/allergens.full.json - - name: amino_acid - url: https://static.openfoodfacts.org/data/taxonomies/amino_acids.full.json - - name: country - url: https://static.openfoodfacts.org/data/taxonomies/countries.full.json - - name: data_quality - url: https://static.openfoodfacts.org/data/taxonomies/data_quality.full.json - - name: food_group - url: https://static.openfoodfacts.org/data/taxonomies/food_groups.full.json - - name: improvement - url: https://static.openfoodfacts.org/data/taxonomies/improvements.full.json - - name: ingredient - url: https://static.openfoodfacts.org/data/taxonomies/ingredients.full.json - - name: ingredients_analysis - url: https://static.openfoodfacts.org/data/taxonomies/ingredients_analysis.full.json - - name: ingredients_processing - url: https://static.openfoodfacts.org/data/taxonomies/ingredients_processing.full.json - - name: language - url: https://static.openfoodfacts.org/data/taxonomies/languages.full.json - - name: mineral - url: https://static.openfoodfacts.org/data/taxonomies/minerals.full.json - - name: misc - url: https://static.openfoodfacts.org/data/taxonomies/misc.full.json - - name: nova_group - url: https://static.openfoodfacts.org/data/taxonomies/nova_groups.full.json - - name: nucleotide - url: https://static.openfoodfacts.org/data/taxonomies/nucleotides.full.json - - name: nutrient - url: https://static.openfoodfacts.org/data/taxonomies/nutrients.full.json - - name: origin - url: https://static.openfoodfacts.org/data/taxonomies/origins.full.json - - name: other_nutritional_substance - url: https://static.openfoodfacts.org/data/taxonomies/other_nutritional_substances.full.json - - name: packaging_material - url: https://static.openfoodfacts.org/data/taxonomies/packaging_materials.full.json - - name: packaging_recycling - url: https://static.openfoodfacts.org/data/taxonomies/packaging_recycling.full.json - - name: packaging_shape - url: https://static.openfoodfacts.org/data/taxonomies/packaging_shapes.full.json - - name: periods_after_opening - url: https://static.openfoodfacts.org/data/taxonomies/periods_after_opening.full.json - - name: preservation - url: https://static.openfoodfacts.org/data/taxonomies/preservation.full.json - - name: state - url: https://static.openfoodfacts.org/data/taxonomies/states.full.json - - name: vitamin - url: https://static.openfoodfacts.org/data/taxonomies/vitamins.full.json - - name: brand - url: https://static.openfoodfacts.org/data/taxonomies/brands.full.json - exported_langs: - - en - - fr - - es - - de - - it - - nl - index: - number_of_replicas: 1 - number_of_shards: 4 -supported_langs: -- aa -- ab -- ae -- af -- ak -- am -- ar -- as -- at -- au -- ay -- az -- be -- bg -- bi -- bn -- br -- bs -- ca -- ch -- co -- cs -- cu -- cy -- da -- de -- dv -- dz -- el -- en -- eo -- es -- et -- eu -- fa -- fi -- fj -- fo -- fr -- fy -- ga -- gb -- gd -- gl -- gn -- gp -- gu -- gv -- ha -- he -- hi -- hk -- ho -- hr -- ht -- hu -- hy -- hz -- id -- in -- io -- is -- it -- iw -- ja -- jp -- jv -- ka -- kk -- kl -- km -- kn -- ko -- ku -- ky -- la -- lb -- lc -- ln -- lo -- lt -- lu -- lv -- mg -- mh -- mi -- mk -- ml -- mn -- mo -- mr -- ms -- mt -- my -- na -- nb -- nd -- ne -- nl -- nn -- 'no' -- nr -- ny -- oc -- om -- pa -- pl -- ps -- pt -- qq -- qu -- re -- rm -- rn -- ro -- rs -- ru -- rw -- sd -- se -- sg -- sh -- si -- sk -- sl -- sm -- sn -- so -- sq -- sr -- ss -- st -- sv -- sw -- ta -- te -- tg -- th -- ti -- tk -- tl -- tn -- to -- tr -- ts -- ug -- uk -- ur -- us -- uz -- ve -- vi -- wa -- wo -- xh -- xx -- yi -- yo -- zh -- zu +indices: + "off": + index: + id_field_name: code + last_modified_field_name: last_modified_t + name: openfoodfacts + number_of_replicas: 1 + number_of_shards: 4 + fields: + code: + required: true + type: keyword + obsolete: + required: true + type: bool + product_name: + full_text_search: true + type: text_lang + generic_name: + full_text_search: true + type: text_lang + abbreviated_product_name: + type: text_lang + categories: + full_text_search: true + input_field: categories_tags + taxonomy_name: category + type: taxonomy + labels: + full_text_search: true + input_field: labels_tags + taxonomy_name: label + type: taxonomy + brands: + full_text_search: true + split: true + type: text + brands_tags: + type: keyword + bucket_agg: true + stores: + split: true + type: text + emb_codes: + split: true + type: text + lang: + type: keyword + bucket_agg: true + lc: + type: keyword + owner: + type: keyword + bucket_agg: true + quantity: + type: text + categories_tags: + type: keyword + bucket_agg: true + labels_tags: + type: keyword + bucket_agg: true + countries_tags: + type: keyword + bucket_agg: true + states_tags: + type: keyword + bucket_agg: true + origins_tags: + type: keyword + ingredients_tags: + type: keyword + unique_scans_n: + type: integer + scans_n: + type: integer + nutrition_grades: + type: keyword + bucket_agg: true + ecoscore_grade: + type: keyword + bucket_agg: true + nova_groups: + type: keyword + bucket_agg: true + last_modified_t: + type: date + created_t: + type: date + images: + type: disabled + additives_n: + type: integer + allergens_tags: + type: keyword + ecoscore_data: + type: disabled + ecoscore_score: + type: integer + forest_footprint_data: + type: disabled + ingredients_analysis_tags: + type: keyword + ingredients_n: + type: integer + nova_group: + type: integer + nutrient_levels: + type: disabled + nutriments: + type: object + nutriscore_data: + type: disabled + nutriscore_grade: + type: keyword + traces_tags: + type: keyword + unknown_ingredients_n: + type: integer + popularity_key: + type: long + nutriscore_score: + type: integer + completeness: + type: float + document_denylist: + - '8901552007122' + lang_separator: _ + match_phrase_boost: 2.0 + preprocessor: app.openfoodfacts.DocumentPreprocessor + document_fetcher: app.openfoodfacts.DocumentFetcher + result_processor: app.openfoodfacts.ResultProcessor + split_separator: ',' + redis_stream_name: product_updates_off + taxonomy: + sources: + - name: category + url: https://static.openfoodfacts.org/data/taxonomies/categories.full.json + - name: label + url: https://static.openfoodfacts.org/data/taxonomies/labels.full.json + - name: additive + url: https://static.openfoodfacts.org/data/taxonomies/additives.full.json + - name: allergen + url: https://static.openfoodfacts.org/data/taxonomies/allergens.full.json + - name: amino_acid + url: https://static.openfoodfacts.org/data/taxonomies/amino_acids.full.json + - name: country + url: https://static.openfoodfacts.org/data/taxonomies/countries.full.json + - name: data_quality + url: https://static.openfoodfacts.org/data/taxonomies/data_quality.full.json + - name: food_group + url: https://static.openfoodfacts.org/data/taxonomies/food_groups.full.json + - name: improvement + url: https://static.openfoodfacts.org/data/taxonomies/improvements.full.json + - name: ingredient + url: https://static.openfoodfacts.org/data/taxonomies/ingredients.full.json + - name: ingredients_analysis + url: https://static.openfoodfacts.org/data/taxonomies/ingredients_analysis.full.json + - name: ingredients_processing + url: https://static.openfoodfacts.org/data/taxonomies/ingredients_processing.full.json + - name: language + url: https://static.openfoodfacts.org/data/taxonomies/languages.full.json + - name: mineral + url: https://static.openfoodfacts.org/data/taxonomies/minerals.full.json + - name: misc + url: https://static.openfoodfacts.org/data/taxonomies/misc.full.json + - name: nova_group + url: https://static.openfoodfacts.org/data/taxonomies/nova_groups.full.json + - name: nucleotide + url: https://static.openfoodfacts.org/data/taxonomies/nucleotides.full.json + - name: nutrient + url: https://static.openfoodfacts.org/data/taxonomies/nutrients.full.json + - name: origin + url: https://static.openfoodfacts.org/data/taxonomies/origins.full.json + - name: other_nutritional_substance + url: https://static.openfoodfacts.org/data/taxonomies/other_nutritional_substances.full.json + - name: packaging_material + url: https://static.openfoodfacts.org/data/taxonomies/packaging_materials.full.json + - name: packaging_recycling + url: https://static.openfoodfacts.org/data/taxonomies/packaging_recycling.full.json + - name: packaging_shape + url: https://static.openfoodfacts.org/data/taxonomies/packaging_shapes.full.json + - name: periods_after_opening + url: https://static.openfoodfacts.org/data/taxonomies/periods_after_opening.full.json + - name: preservation + url: https://static.openfoodfacts.org/data/taxonomies/preservation.full.json + - name: state + url: https://static.openfoodfacts.org/data/taxonomies/states.full.json + - name: vitamin + url: https://static.openfoodfacts.org/data/taxonomies/vitamins.full.json + - name: brand + url: https://static.openfoodfacts.org/data/taxonomies/brands.full.json + exported_langs: + - en + - fr + - es + - de + - it + - nl + index: + number_of_replicas: 1 + number_of_shards: 4 + name: off_taxonomy + supported_langs: + - aa + - ab + - ae + - af + - ak + - am + - ar + - as + - at + - au + - ay + - az + - be + - bg + - bi + - bn + - br + - bs + - ca + - ch + - co + - cs + - cu + - cy + - da + - de + - dv + - dz + - el + - en + - eo + - es + - et + - eu + - fa + - fi + - fj + - fo + - fr + - fy + - ga + - gb + - gd + - gl + - gn + - gp + - gu + - gv + - ha + - he + - hi + - hk + - ho + - hr + - ht + - hu + - hy + - hz + - id + - in + - io + - is + - it + - iw + - ja + - jp + - jv + - ka + - kk + - kl + - km + - kn + - ko + - ku + - ky + - la + - lb + - lc + - ln + - lo + - lt + - lu + - lv + - mg + - mh + - mi + - mk + - ml + - mn + - mo + - mr + - ms + - mt + - my + - na + - nb + - nd + - ne + - nl + - nn + - 'no' + - nr + - ny + - oc + - om + - pa + - pl + - ps + - pt + - qq + - qu + - re + - rm + - rn + - ro + - rs + - ru + - rw + - sd + - se + - sg + - sh + - si + - sk + - sl + - sm + - sn + - so + - sq + - sr + - ss + - st + - sv + - sw + - ta + - te + - tg + - th + - ti + - tk + - tl + - tn + - to + - tr + - ts + - ug + - uk + - ur + - us + - uz + - ve + - vi + - wa + - wo + - xh + - xx + - yi + - yo + - zh + - zu +default_index: "off" \ No newline at end of file diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 48355d33..1b4ddf89 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -11,7 +11,14 @@ @pytest.fixture def default_config(): - """Fixture that returns default Open Food Facts configuration for tests.""" + """Fixture that returns default Open Food Facts index configuration for + tests.""" + yield Config.from_yaml(DEFAULT_CONFIG_PATH).indices["off"] + + +@pytest.fixture +def default_global_config(): + """Fixture that returns default global configuration for tests.""" yield Config.from_yaml(DEFAULT_CONFIG_PATH) diff --git a/tests/unit/data/openfoodfacts_config.yml b/tests/unit/data/openfoodfacts_config.yml index 4fbf11f3..a7eeccb9 100644 --- a/tests/unit/data/openfoodfacts_config.yml +++ b/tests/unit/data/openfoodfacts_config.yml @@ -1,360 +1,365 @@ -index: - id_field_name: code - last_modified_field_name: last_modified_t - name: openfoodfacts - number_of_replicas: 1 - number_of_shards: 4 -fields: - code: - required: true - type: keyword - obsolete: - required: true - type: bool - product_name: - full_text_search: true - type: text_lang - generic_name: - full_text_search: true - type: text_lang - abbreviated_product_name: - type: text_lang - categories: - full_text_search: true - input_field: categories_tags - taxonomy_name: category - type: taxonomy - labels: - full_text_search: true - input_field: labels_tags - taxonomy_name: label - type: taxonomy - brands: - full_text_search: true - split: true - type: text - brands_tags: - type: keyword - bucket_agg: true - stores: - split: true - type: text - emb_codes: - split: true - type: text - lang: - type: keyword - bucket_agg: true - lc: - type: keyword - owner: - type: keyword - bucket_agg: true - quantity: - type: text - categories_tags: - type: keyword - bucket_agg: true - labels_tags: - type: keyword - bucket_agg: true - countries_tags: - type: keyword - bucket_agg: true - states_tags: - type: keyword - bucket_agg: true - origins_tags: - type: keyword - ingredients_tags: - type: keyword - unique_scans_n: - type: integer - scans_n: - type: integer - nutrition_grades: - type: keyword - bucket_agg: true - ecoscore_grade: - type: keyword - bucket_agg: true - nova_groups: - type: keyword - bucket_agg: true - last_modified_t: - type: date - created_t: - type: date - images: - type: disabled - additives_n: - type: integer - allergens_tags: - type: keyword - ecoscore_data: - type: disabled - ecoscore_score: - type: integer - forest_footprint_data: - type: disabled - ingredients_analysis_tags: - type: keyword - ingredients_n: - type: integer - nova_group: - type: integer - nutrient_levels: - type: disabled - nutriments: - type: object - nutriscore_data: - type: disabled - nutriscore_grade: - type: keyword - traces_tags: - type: keyword - unknown_ingredients_n: - type: integer - popularity_key: - type: integer - nutriscore_score: - type: integer - completeness: - type: float -document_denylist: -- '8901552007122' -lang_separator: _ -match_phrase_boost: 2.0 -preprocessor: app.openfoodfacts.DocumentPreprocessor -document_fetcher: app.openfoodfacts.DocumentFetcher -result_processor: app.openfoodfacts.ResultProcessor -split_separator: ',' -taxonomy: - sources: - - name: category - url: https://static.openfoodfacts.org/data/taxonomies/categories.full.json - - name: label - url: https://static.openfoodfacts.org/data/taxonomies/labels.full.json - - name: additive - url: https://static.openfoodfacts.org/data/taxonomies/additives.full.json - - name: allergen - url: https://static.openfoodfacts.org/data/taxonomies/allergens.full.json - - name: amino_acid - url: https://static.openfoodfacts.org/data/taxonomies/amino_acids.full.json - - name: country - url: https://static.openfoodfacts.org/data/taxonomies/countries.full.json - - name: data_quality - url: https://static.openfoodfacts.org/data/taxonomies/data_quality.full.json - - name: food_group - url: https://static.openfoodfacts.org/data/taxonomies/food_groups.full.json - - name: improvement - url: https://static.openfoodfacts.org/data/taxonomies/improvements.full.json - - name: ingredient - url: https://static.openfoodfacts.org/data/taxonomies/ingredients.full.json - - name: ingredients_analysis - url: https://static.openfoodfacts.org/data/taxonomies/ingredients_analysis.full.json - - name: ingredients_processing - url: https://static.openfoodfacts.org/data/taxonomies/ingredients_processing.full.json - - name: language - url: https://static.openfoodfacts.org/data/taxonomies/languages.full.json - - name: mineral - url: https://static.openfoodfacts.org/data/taxonomies/minerals.full.json - - name: misc - url: https://static.openfoodfacts.org/data/taxonomies/misc.full.json - - name: nova_group - url: https://static.openfoodfacts.org/data/taxonomies/nova_groups.full.json - - name: nucleotide - url: https://static.openfoodfacts.org/data/taxonomies/nucleotides.full.json - - name: nutrient - url: https://static.openfoodfacts.org/data/taxonomies/nutrients.full.json - - name: origin - url: https://static.openfoodfacts.org/data/taxonomies/origins.full.json - - name: other_nutritional_substance - url: https://static.openfoodfacts.org/data/taxonomies/other_nutritional_substances.full.json - - name: packaging_material - url: https://static.openfoodfacts.org/data/taxonomies/packaging_materials.full.json - - name: packaging_recycling - url: https://static.openfoodfacts.org/data/taxonomies/packaging_recycling.full.json - - name: packaging_shape - url: https://static.openfoodfacts.org/data/taxonomies/packaging_shapes.full.json - - name: periods_after_opening - url: https://static.openfoodfacts.org/data/taxonomies/periods_after_opening.full.json - - name: preservation - url: https://static.openfoodfacts.org/data/taxonomies/preservation.full.json - - name: state - url: https://static.openfoodfacts.org/data/taxonomies/states.full.json - - name: vitamin - url: https://static.openfoodfacts.org/data/taxonomies/vitamins.full.json - - name: brand - url: https://static.openfoodfacts.org/data/taxonomies/brands.full.json - exported_langs: - - en - - fr - - es - - de - - it - - nl - index: - number_of_replicas: 1 - number_of_shards: 4 -supported_langs: -- aa -- ab -- ae -- af -- ak -- am -- ar -- as -- at -- au -- ay -- az -- be -- bg -- bi -- bn -- br -- bs -- ca -- ch -- co -- cs -- cu -- cy -- da -- de -- dv -- dz -- el -- en -- eo -- es -- et -- eu -- fa -- fi -- fj -- fo -- fr -- fy -- ga -- gb -- gd -- gl -- gn -- gp -- gu -- gv -- ha -- he -- hi -- hk -- ho -- hr -- ht -- hu -- hy -- hz -- id -- in -- io -- is -- it -- iw -- ja -- jp -- jv -- ka -- kk -- kl -- km -- kn -- ko -- ku -- ky -- la -- lb -- lc -- ln -- lo -- lt -- lu -- lv -- mg -- mh -- mi -- mk -- ml -- mn -- mo -- mr -- ms -- mt -- my -- na -- nb -- nd -- ne -- nl -- nn -- 'no' -- nr -- ny -- oc -- om -- pa -- pl -- ps -- pt -- qq -- qu -- re -- rm -- rn -- ro -- rs -- ru -- rw -- sd -- se -- sg -- sh -- si -- sk -- sl -- sm -- sn -- so -- sq -- sr -- ss -- st -- sv -- sw -- ta -- te -- tg -- th -- ti -- tk -- tl -- tn -- to -- tr -- ts -- ug -- uk -- ur -- us -- uz -- ve -- vi -- wa -- wo -- xh -- xx -- yi -- yo -- zh -- zu +indices: + "off": + index: + id_field_name: code + last_modified_field_name: last_modified_t + name: openfoodfacts + number_of_replicas: 1 + number_of_shards: 4 + fields: + code: + required: true + type: keyword + obsolete: + required: true + type: bool + product_name: + full_text_search: true + type: text_lang + generic_name: + full_text_search: true + type: text_lang + abbreviated_product_name: + type: text_lang + categories: + full_text_search: true + input_field: categories_tags + taxonomy_name: category + type: taxonomy + labels: + full_text_search: true + input_field: labels_tags + taxonomy_name: label + type: taxonomy + brands: + full_text_search: true + split: true + type: text + brands_tags: + type: keyword + bucket_agg: true + stores: + split: true + type: text + emb_codes: + split: true + type: text + lang: + type: keyword + bucket_agg: true + lc: + type: keyword + owner: + type: keyword + bucket_agg: true + quantity: + type: text + categories_tags: + type: keyword + bucket_agg: true + labels_tags: + type: keyword + bucket_agg: true + countries_tags: + type: keyword + bucket_agg: true + states_tags: + type: keyword + bucket_agg: true + origins_tags: + type: keyword + ingredients_tags: + type: keyword + unique_scans_n: + type: integer + scans_n: + type: integer + nutrition_grades: + type: keyword + bucket_agg: true + ecoscore_grade: + type: keyword + bucket_agg: true + nova_groups: + type: keyword + bucket_agg: true + last_modified_t: + type: date + created_t: + type: date + images: + type: disabled + additives_n: + type: integer + allergens_tags: + type: keyword + ecoscore_data: + type: disabled + ecoscore_score: + type: integer + forest_footprint_data: + type: disabled + ingredients_analysis_tags: + type: keyword + ingredients_n: + type: integer + nova_group: + type: integer + nutrient_levels: + type: disabled + nutriments: + type: object + nutriscore_data: + type: disabled + nutriscore_grade: + type: keyword + traces_tags: + type: keyword + unknown_ingredients_n: + type: integer + popularity_key: + type: long + nutriscore_score: + type: integer + completeness: + type: float + document_denylist: + - '8901552007122' + lang_separator: _ + match_phrase_boost: 2.0 + preprocessor: app.openfoodfacts.DocumentPreprocessor + document_fetcher: app.openfoodfacts.DocumentFetcher + result_processor: app.openfoodfacts.ResultProcessor + split_separator: ',' + redis_stream_name: product_updates_off + taxonomy: + sources: + - name: category + url: https://static.openfoodfacts.org/data/taxonomies/categories.full.json + - name: label + url: https://static.openfoodfacts.org/data/taxonomies/labels.full.json + - name: additive + url: https://static.openfoodfacts.org/data/taxonomies/additives.full.json + - name: allergen + url: https://static.openfoodfacts.org/data/taxonomies/allergens.full.json + - name: amino_acid + url: https://static.openfoodfacts.org/data/taxonomies/amino_acids.full.json + - name: country + url: https://static.openfoodfacts.org/data/taxonomies/countries.full.json + - name: data_quality + url: https://static.openfoodfacts.org/data/taxonomies/data_quality.full.json + - name: food_group + url: https://static.openfoodfacts.org/data/taxonomies/food_groups.full.json + - name: improvement + url: https://static.openfoodfacts.org/data/taxonomies/improvements.full.json + - name: ingredient + url: https://static.openfoodfacts.org/data/taxonomies/ingredients.full.json + - name: ingredients_analysis + url: https://static.openfoodfacts.org/data/taxonomies/ingredients_analysis.full.json + - name: ingredients_processing + url: https://static.openfoodfacts.org/data/taxonomies/ingredients_processing.full.json + - name: language + url: https://static.openfoodfacts.org/data/taxonomies/languages.full.json + - name: mineral + url: https://static.openfoodfacts.org/data/taxonomies/minerals.full.json + - name: misc + url: https://static.openfoodfacts.org/data/taxonomies/misc.full.json + - name: nova_group + url: https://static.openfoodfacts.org/data/taxonomies/nova_groups.full.json + - name: nucleotide + url: https://static.openfoodfacts.org/data/taxonomies/nucleotides.full.json + - name: nutrient + url: https://static.openfoodfacts.org/data/taxonomies/nutrients.full.json + - name: origin + url: https://static.openfoodfacts.org/data/taxonomies/origins.full.json + - name: other_nutritional_substance + url: https://static.openfoodfacts.org/data/taxonomies/other_nutritional_substances.full.json + - name: packaging_material + url: https://static.openfoodfacts.org/data/taxonomies/packaging_materials.full.json + - name: packaging_recycling + url: https://static.openfoodfacts.org/data/taxonomies/packaging_recycling.full.json + - name: packaging_shape + url: https://static.openfoodfacts.org/data/taxonomies/packaging_shapes.full.json + - name: periods_after_opening + url: https://static.openfoodfacts.org/data/taxonomies/periods_after_opening.full.json + - name: preservation + url: https://static.openfoodfacts.org/data/taxonomies/preservation.full.json + - name: state + url: https://static.openfoodfacts.org/data/taxonomies/states.full.json + - name: vitamin + url: https://static.openfoodfacts.org/data/taxonomies/vitamins.full.json + - name: brand + url: https://static.openfoodfacts.org/data/taxonomies/brands.full.json + exported_langs: + - en + - fr + - es + - de + - it + - nl + index: + number_of_replicas: 1 + number_of_shards: 4 + name: off_taxonomy + supported_langs: + - aa + - ab + - ae + - af + - ak + - am + - ar + - as + - at + - au + - ay + - az + - be + - bg + - bi + - bn + - br + - bs + - ca + - ch + - co + - cs + - cu + - cy + - da + - de + - dv + - dz + - el + - en + - eo + - es + - et + - eu + - fa + - fi + - fj + - fo + - fr + - fy + - ga + - gb + - gd + - gl + - gn + - gp + - gu + - gv + - ha + - he + - hi + - hk + - ho + - hr + - ht + - hu + - hy + - hz + - id + - in + - io + - is + - it + - iw + - ja + - jp + - jv + - ka + - kk + - kl + - km + - kn + - ko + - ku + - ky + - la + - lb + - lc + - ln + - lo + - lt + - lu + - lv + - mg + - mh + - mi + - mk + - ml + - mn + - mo + - mr + - ms + - mt + - my + - na + - nb + - nd + - ne + - nl + - nn + - 'no' + - nr + - ny + - oc + - om + - pa + - pl + - ps + - pt + - qq + - qu + - re + - rm + - rn + - ro + - rs + - ru + - rw + - sd + - se + - sg + - sh + - si + - sk + - sl + - sm + - sn + - so + - sq + - sr + - ss + - st + - sv + - sw + - ta + - te + - tg + - th + - ti + - tk + - tl + - tn + - to + - tr + - ts + - ug + - uk + - ur + - us + - uz + - ve + - vi + - wa + - wo + - xh + - xx + - yi + - yo + - zh + - zu +default_index: "off" \ No newline at end of file diff --git a/tests/unit/test__import.py b/tests/unit/test__import.py index ccd05a3d..b171adf6 100644 --- a/tests/unit/test__import.py +++ b/tests/unit/test__import.py @@ -5,6 +5,8 @@ from typing import cast from unittest.mock import MagicMock, patch +from redis import Redis + from app._import import ( BaseDocumentFetcher, gen_documents, @@ -16,7 +18,7 @@ update_alias, ) from app._types import JSONType -from app.config import Config +from app.config import Config, IndexConfig from app.indexing import DocumentProcessor @@ -28,7 +30,7 @@ def __init__(self, xrange_return_values: list): def xrange( self, name: str, min: str = "-", max: str = "+", count: int | None = None ): - assert name == "product_update" + assert name == "product_updates_off" assert max == "+" assert count == 100 if self.call_count >= len(self.xrange_return_values): @@ -38,20 +40,19 @@ def xrange( class TestDocumentFetcher(BaseDocumentFetcher): - def __init__(self, config: Config, *args, **kwargs): + def __init__(self, config: IndexConfig, *args, **kwargs): self.missing_documents = kwargs.pop("missing_documents", set()) super().__init__(config, *args, **kwargs) def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None: - assert stream_name == "product_update" + assert stream_name == "product_updates_off" id_ = item["code"] if id_ in self.missing_documents: return None return {"code": id_, "name": f"Document {id_}"} -def test_get_processed_since(default_config): - default_config = cast(Config, default_config) +def test_get_processed_since(default_config: IndexConfig): id_field_name = default_config.index.id_field_name return_values = [ [ @@ -65,7 +66,7 @@ def test_get_processed_since(default_config): ("1629878400004-0", {"code": "4"}), ] ] - redis_client = TestRedisXrangeClient(return_values) + redis_client = cast(Redis, TestRedisXrangeClient(return_values)) # Wed Aug 25 08:00:00 2021 UTC start_timestamp_ms = 1629878400000 # Example start timestamp document_fetcher = TestDocumentFetcher(default_config, missing_documents={"4"}) @@ -74,6 +75,7 @@ def test_get_processed_since(default_config): results = list( get_processed_since( redis_client, + cast(str, default_config.redis_stream_name), start_timestamp_ms, id_field_name, document_fetcher, @@ -88,7 +90,11 @@ def test_get_processed_since(default_config): results = list( get_processed_since( - redis_client, start_timestamp_ms, id_field_name, document_fetcher + redis_client, + cast(str, default_config.redis_stream_name), + start_timestamp_ms, + id_field_name, + document_fetcher, ) ) # Check that results are empty @@ -101,7 +107,7 @@ def __init__(self, xread_return_values: list): self.call_count = 0 def xread(self, streams: dict, block: int, count: int | None = None): - assert set(streams.keys()) == {"product_update"} + assert set(streams.keys()) == {"product_updates_off"} assert block == 0 assert count == 100 if self.call_count >= len(self.xread_return_values): @@ -110,44 +116,51 @@ def xread(self, streams: dict, block: int, count: int | None = None): return self.xread_return_values[self.call_count - 1] -def test_get_new_updates(default_config): +def test_get_new_updates(default_config: IndexConfig): + redis_stream_name = cast(str, default_config.redis_stream_name) return_values = [ [ ( - "product_update", + redis_stream_name, [("1629878400002-0", {"code": "4"})], ) ], [ ( - "product_update", + redis_stream_name, [("1629878400000-0", {"code": "1"})], ) ], [ ( - "product_update", + redis_stream_name, [("1629878400001-0", {"code": "2"})], ) ], [ ( - "product_update", + redis_stream_name, [("1629878400003-0", {"code": "3"})], ) ], ] - default_config = cast(Config, default_config) - redis_client = TestRedisXreadClient(return_values) + redis_client = cast(Redis, TestRedisXreadClient(return_values)) document_fetcher = TestDocumentFetcher(default_config, missing_documents={"4"}) # Call the function and iterate over the results updates_iter = get_new_updates( - redis_client, default_config.index.id_field_name, document_fetcher + redis_client, + [redis_stream_name], + {redis_stream_name: default_config.index.id_field_name}, + {redis_stream_name: document_fetcher}, ) results = next(updates_iter) - assert results == (1629878400000, {"code": "1", "name": "Document 1"}) + assert results == ( + redis_stream_name, + 1629878400000, + {"code": "1", "name": "Document 1"}, + ) def test_load_document_fetcher(default_config): @@ -262,16 +275,17 @@ def test_update_alias(default_config): ) -def test_run_update_daemon(default_config): +def test_run_update_daemon(default_global_config: Config): + off_config: IndexConfig = default_global_config.indices["off"] es_client_mock = MagicMock() redis_client_mock = MagicMock() - document_fetcher_mock = TestDocumentFetcher(default_config) + document_fetcher_mock = TestDocumentFetcher(off_config) # Replace with your desired test data updates = [ - (1629878400000, {"code": "1", "name": "Document 1"}), - (1629878400001, {"code": "2", "name": "Document 2"}), - (1629878400002, {"code": "3", "name": "Document 3"}), + ("product_updates_off", 1629878400000, {"code": "1", "name": "Document 1"}), + ("product_updates_off", 1629878400001, {"code": "2", "name": "Document 2"}), + ("product_updates_off", 1629878400002, {"code": "3", "name": "Document 3"}), ] # Mock the necessary dependencies @@ -285,12 +299,12 @@ def test_run_update_daemon(default_config): "app._import.load_document_fetcher", load_document_fetcher_mock ), patch("app._import.get_new_updates", MagicMock(return_value=updates)): # Call the function - run_update_daemon(default_config) + run_update_daemon(default_global_config) # Assertions connection_mock.get_es_client.assert_called_once() connection_mock.get_redis_client.assert_called_once() - load_document_fetcher_mock.assert_called_once_with(default_config) + load_document_fetcher_mock.assert_called_once_with(off_config) for i, mock_call in enumerate(es_client_mock.index.mock_calls): assert mock_call.args == () diff --git a/tests/unit/test_indexing.py b/tests/unit/test_indexing.py index 446eeedb..31032f13 100644 --- a/tests/unit/test_indexing.py +++ b/tests/unit/test_indexing.py @@ -70,7 +70,7 @@ def test_process_text_lang_field(data, input_field, split, expected): ) ], exported_langs=["en"], - index=TaxonomyIndexConfig(), + index=TaxonomyIndexConfig(name="off_taxonomy"), ) diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 4e0f6687..5a471585 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -6,7 +6,7 @@ from luqum.parser import parser from app._types import JSONType -from app.config import Config +from app.config import IndexConfig from app.query import ( UnknownOperationRemover, build_search_query, @@ -158,7 +158,7 @@ def test_build_search_query( page: int, sort_by: str | None, update_results: bool, - default_config: Config, + default_config: IndexConfig, default_filter_query_builder: ElasticsearchQueryBuilder, ): query = build_search_query(