Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiple indices #81

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ MEM_LIMIT=4294967296
# on dev connect to the same network as off-server
COMMON_NET_NAME=po_default

# Sentry DNS for bug tracking
# Sentry DNS for bug tracking, used only in staging and production
SENTRY_DNS=

# Log level to use, DEBUG by default in dev
Expand Down
164 changes: 102 additions & 62 deletions app/_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from redis import Redis

from app._types import JSONType
from app.config import Config, TaxonomyConfig, settings
from app.config import Config, IndexConfig, TaxonomyConfig
from app.indexing import (
DocumentProcessor,
generate_index_object,
Expand All @@ -25,7 +25,7 @@


class BaseDocumentFetcher(abc.ABC):
def __init__(self, config: Config) -> None:
def __init__(self, config: IndexConfig) -> None:
self.config = config

@abc.abstractmethod
Expand All @@ -42,10 +42,10 @@ def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:
pass


def load_document_fetcher(config: Config) -> BaseDocumentFetcher:
def load_document_fetcher(config: IndexConfig) -> BaseDocumentFetcher:
"""Load the document fetcher class from the config.

:param config: the config object
:param config: the index configuration to use
:return: the initialized document fetcher
"""
fetcher_cls = load_class_object_from_string(config.document_fetcher)
Expand All @@ -54,6 +54,7 @@ def load_document_fetcher(config: Config) -> BaseDocumentFetcher:

def get_processed_since(
redis_client: Redis,
redis_stream_name: str,
start_timestamp_ms: int,
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
Expand All @@ -63,19 +64,19 @@ def get_processed_since(
timestamp.

:param redis_client: the Redis client
:param redis_stream_name: the name of the Redis stream to read from
:param start_timestamp_ms: the timestamp to start from, in milliseconds
:param id_field_name: the name of the field containing the ID
:param document_fetcher: the document fetcher
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a tuple containing the timestamp (in milliseconds) and the document
"""
stream_name = settings.redis_import_stream_name
fetched_ids = set()
# We start from the given timestamp
min_id = f"{start_timestamp_ms}-0"

while True:
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
batch = redis_client.xrange(redis_stream_name, min=min_id, count=batch_size)
if not batch:
# We reached the end of the stream
break
Expand All @@ -93,7 +94,7 @@ def get_processed_since(
logger.debug(f"Skipping ID {id_} because it was already fetched")
continue
fetched_ids.add(id_)
document = document_fetcher.fetch_document(stream_name, item)
document = document_fetcher.fetch_document(redis_stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
Expand All @@ -102,46 +103,58 @@ def get_processed_since(

def get_new_updates(
redis_client: Redis,
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
stream_names: list[str],
id_field_names: dict[str, str],
document_fetchers: dict[str, BaseDocumentFetcher],
batch_size: int = 100,
):
) -> Iterator[tuple[str, int, JSONType]]:
"""Reads new updates from Redis Stream, starting from the moment this
function is called.

The function will block until new updates are available.

:param redis_client: the Redis client
:param id_field_name: the name of the field containing the ID
:param document_fetcher: the document fetcher
:param stream_names: the names of the Redis streams to read from
:param id_field_names: the name of the field containing the ID for each
stream
:param document_fetchers: the document fetcher for each stream
:param batch_size: the size of the batch to fetch, defaults to 100.
:yield: a tuple containing the timestamp (in milliseconds) and the document
:yield: a tuple containing the stream name, the timestamp (in
milliseconds) and the document
"""
stream_name = settings.redis_import_stream_name
# We start from the last ID
min_id = "$"
min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {
stream_name: "$" for stream_name in stream_names
}
while True:
logger.debug(
"Listening to new updates from stream %s (ID: %s)", stream_name, min_id
"Listening to new updates from streams %s (ID: %s)", stream_names, min_ids
)
# We use block=0 to wait indefinitely for new updates
response = redis_client.xread({stream_name: min_id}, block=0, count=batch_size)
response = redis_client.xread(streams=min_ids, block=0, count=batch_size)
response = cast(list[tuple[str, list[tuple[str, dict]]]], response)
# We only have one stream, so we only have one response
# The response is a list of tuples (stream_name, batch)
_, batch = response[0]
# We update the min_id to the last ID of the batch
min_id = batch[-1][0]
for timestamp_id, item in batch:
id_ = item[id_field_name]
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
yield timestamp, document

for stream_name, batch in response:
# We update the min_id to the last ID of the batch
min_id = batch[-1][0]
min_ids[stream_name] = min_id
id_field_name = id_field_names[stream_name]
document_fetcher = document_fetchers[stream_name]
for timestamp_id, item in batch:
id_ = item[id_field_name]
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
logger.debug(
"Stream %s: Skipping ID %s because it was not found",
stream_name,
id_,
)
continue
yield stream_name, timestamp, document


def get_document_dict(
Expand Down Expand Up @@ -224,22 +237,20 @@ def update_alias(es_client: Elasticsearch, next_index: str, index_alias: str):
:param index_alias: the alias to update
"""
es_client.indices.update_aliases(
body={
"actions": [
{
"remove": {
"alias": index_alias,
"index": f"{index_alias}-*",
},
},
{"add": {"alias": index_alias, "index": next_index}},
],
},
actions=[
{
"remove": {
"alias": index_alias,
"index": f"{index_alias}-*",
}
},
{"add": {"alias": index_alias, "index": next_index}},
]
)


def import_parallel(
config: Config,
config: IndexConfig,
file_path: Path,
next_index: str,
num_items: int | None,
Expand Down Expand Up @@ -278,12 +289,12 @@ def import_parallel(
logger.error("Encountered errors: %s", errors)


def import_taxonomies(config: Config, next_index: str):
def import_taxonomies(config: IndexConfig, next_index: str):
"""Import taxonomies into Elasticsearch.

A single taxonomy index is used to store all taxonomy items.

:param config: the configuration to use
:param config: the index configuration to use
:param next_index: the index to write to
"""
# open a connection for this process
Expand All @@ -305,6 +316,7 @@ def import_taxonomies(config: Config, next_index: str):


def get_redis_products(
stream_name: str,
processor: DocumentProcessor,
fetcher: BaseDocumentFetcher,
index: str,
Expand All @@ -313,6 +325,7 @@ def get_redis_products(
):
"""Fetch IDs of documents to update from Redis.

:param stream_name: the name of the Redis stream to read from
:param processor: the document processor to use to process the documents
:param fetcher: the document fetcher to use to fetch the documents from
the document ID
Expand All @@ -325,6 +338,7 @@ def get_redis_products(
processed = 0
for _, row in get_processed_since(
redis_client,
stream_name,
last_updated_timestamp_ms,
id_field_name,
document_fetcher=fetcher,
Expand All @@ -334,12 +348,18 @@ def get_redis_products(
logger.info("Processed %d updates from Redis", processed)


def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
def get_redis_updates(
es_client: Elasticsearch, index: str, config: IndexConfig
) -> None:
"""Fetch updates from Redis and index them.

:param index: the index to write to
:param config: the configuration to use
:param config: the index configuration to use
"""
if config.redis_stream_name is None:
logger.info(f"Redis updates are disabled for index {index}")
return

processor = DocumentProcessor(config)
fetcher = load_document_fetcher(config)
# Ensure all documents are searchable after the import
Expand All @@ -362,7 +382,12 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
for success, info in parallel_bulk(
es_client,
get_redis_products(
processor, fetcher, index, id_field_name, last_updated_timestamp_ms
config.redis_stream_name,
processor,
fetcher,
index,
id_field_name,
last_updated_timestamp_ms,
),
):
if not success:
Expand All @@ -372,7 +397,7 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
def run_full_import(
file_path: Path,
num_processes: int,
config: Config,
config: IndexConfig,
num_items: int | None = None,
):
"""Run a full data import from a JSONL.
Expand All @@ -386,7 +411,7 @@ def run_full_import(
:param file_path: the path of the JSONL file to import
:param num_processes: the number of processes to use to perform parallel
import
:param config: the configuration to use
:param config: the index configuration to use
:param num_items: the number of items to import, defaults to None (all)
"""
es_client = connection.get_es_client()
Expand Down Expand Up @@ -421,10 +446,10 @@ def run_full_import(
update_alias(es_client, next_index, config.index.name)


def perform_taxonomy_import(config: Config) -> None:
def perform_taxonomy_import(config: IndexConfig) -> None:
"""Create a new index for taxonomies and import them.

:param config: the configuration to use
:param config: the index configuration to use
"""
es_client = connection.get_es_client()
# we create a temporary index to import to
Expand Down Expand Up @@ -461,20 +486,35 @@ def run_update_daemon(config: Config) -> None:
logger.error("Could not connect to Redis")
return

document_fetcher = load_document_fetcher(config)
processor = DocumentProcessor(config)
if len(list(config.indices)) != 1:
raise ValueError("Only one index is supported")

processors: dict[str, DocumentProcessor] = {}
document_fetchers: dict[str, BaseDocumentFetcher] = {}
id_field_names: dict[str, str] = {}
stream_name_to_index_id: dict[str, str] = {}

for index_id, index_config in config.indices.items():
stream_name = index_config.redis_stream_name
if stream_name is not None:
processors[stream_name] = DocumentProcessor(index_config)
document_fetchers[stream_name] = load_document_fetcher(index_config)
id_field_names[stream_name] = index_config.index.id_field_name
stream_name_to_index_id[stream_name] = index_id

for _, document in get_new_updates(
for stream_name, _, document in get_new_updates(
redis_client,
config.index.id_field_name,
document_fetcher,
list(id_field_names.keys()),
id_field_names=id_field_names,
document_fetchers=document_fetchers,
):
document = processor.from_dict(document)
if not document:
processed_document = processors[stream_name].from_dict(document)
if not processed_document:
continue
_id = document.pop("_id")
_id = processed_document.pop("_id")
index_id = stream_name_to_index_id[stream_name]
es_client.index(
index=config.index.name,
body=document,
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
Loading
Loading