Skip to content

Commit

Permalink
feat: add support for multiple indices
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Jan 31, 2024
1 parent 335dffe commit 4edabfe
Show file tree
Hide file tree
Showing 15 changed files with 1,215 additions and 978 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ MEM_LIMIT=4294967296
# on dev connect to the same network as off-server
COMMON_NET_NAME=po_default

# Sentry DNS for bug tracking
# Sentry DNS for bug tracking, used only in staging and production
SENTRY_DNS=

# Log level to use, DEBUG by default in dev
Expand Down
164 changes: 102 additions & 62 deletions app/_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from redis import Redis

from app._types import JSONType
from app.config import Config, TaxonomyConfig, settings
from app.config import Config, IndexConfig, TaxonomyConfig
from app.indexing import (
DocumentProcessor,
generate_index_object,
Expand All @@ -25,7 +25,7 @@


class BaseDocumentFetcher(abc.ABC):
def __init__(self, config: Config) -> None:
def __init__(self, config: IndexConfig) -> None:
self.config = config

@abc.abstractmethod
Expand All @@ -42,10 +42,10 @@ def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:
pass


def load_document_fetcher(config: Config) -> BaseDocumentFetcher:
def load_document_fetcher(config: IndexConfig) -> BaseDocumentFetcher:
"""Load the document fetcher class from the config.
:param config: the config object
:param config: the index configuration to use
:return: the initialized document fetcher
"""
fetcher_cls = load_class_object_from_string(config.document_fetcher)
Expand All @@ -54,6 +54,7 @@ def load_document_fetcher(config: Config) -> BaseDocumentFetcher:

def get_processed_since(
redis_client: Redis,
redis_stream_name: str,
start_timestamp_ms: int,
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
Expand All @@ -63,19 +64,19 @@ def get_processed_since(
timestamp.
:param redis_client: the Redis client
:param redis_stream_name: the name of the Redis stream to read from
:param start_timestamp_ms: the timestamp to start from, in milliseconds
:param id_field_name: the name of the field containing the ID
:param document_fetcher: the document fetcher
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a tuple containing the timestamp (in milliseconds) and the document
"""
stream_name = settings.redis_import_stream_name
fetched_ids = set()
# We start from the given timestamp
min_id = f"{start_timestamp_ms}-0"

while True:
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
batch = redis_client.xrange(redis_stream_name, min=min_id, count=batch_size)
if not batch:
# We reached the end of the stream
break
Expand All @@ -93,7 +94,7 @@ def get_processed_since(
logger.debug(f"Skipping ID {id_} because it was already fetched")
continue
fetched_ids.add(id_)
document = document_fetcher.fetch_document(stream_name, item)
document = document_fetcher.fetch_document(redis_stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
Expand All @@ -102,46 +103,58 @@ def get_processed_since(

def get_new_updates(
redis_client: Redis,
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
stream_names: list[str],
id_field_names: dict[str, str],
document_fetchers: dict[str, BaseDocumentFetcher],
batch_size: int = 100,
):
) -> Iterator[tuple[str, int, JSONType]]:
"""Reads new updates from Redis Stream, starting from the moment this
function is called.
The function will block until new updates are available.
:param redis_client: the Redis client
:param id_field_name: the name of the field containing the ID
:param document_fetcher: the document fetcher
:param stream_names: the names of the Redis streams to read from
:param id_field_names: the name of the field containing the ID for each
stream
:param document_fetchers: the document fetcher for each stream
:param batch_size: the size of the batch to fetch, defaults to 100.
:yield: a tuple containing the timestamp (in milliseconds) and the document
:yield: a tuple containing the stream name, the timestamp (in
milliseconds) and the document
"""
stream_name = settings.redis_import_stream_name
# We start from the last ID
min_id = "$"
min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {
stream_name: "$" for stream_name in stream_names
}
while True:
logger.debug(
"Listening to new updates from stream %s (ID: %s)", stream_name, min_id
"Listening to new updates from streams %s (ID: %s)", stream_names, min_ids
)
# We use block=0 to wait indefinitely for new updates
response = redis_client.xread({stream_name: min_id}, block=0, count=batch_size)
response = redis_client.xread(streams=min_ids, block=0, count=batch_size)
response = cast(list[tuple[str, list[tuple[str, dict]]]], response)
# We only have one stream, so we only have one response
# The response is a list of tuples (stream_name, batch)
_, batch = response[0]
# We update the min_id to the last ID of the batch
min_id = batch[-1][0]
for timestamp_id, item in batch:
id_ = item[id_field_name]
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
yield timestamp, document

for stream_name, batch in response:
# We update the min_id to the last ID of the batch
min_id = batch[-1][0]
min_ids[stream_name] = min_id
id_field_name = id_field_names[stream_name]
document_fetcher = document_fetchers[stream_name]
for timestamp_id, item in batch:
id_ = item[id_field_name]
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
logger.debug(
"Stream %s: Skipping ID %s because it was not found",
stream_name,
id_,
)
continue
yield stream_name, timestamp, document


def get_document_dict(
Expand Down Expand Up @@ -224,22 +237,20 @@ def update_alias(es_client: Elasticsearch, next_index: str, index_alias: str):
:param index_alias: the alias to update
"""
es_client.indices.update_aliases(
body={
"actions": [
{
"remove": {
"alias": index_alias,
"index": f"{index_alias}-*",
},
},
{"add": {"alias": index_alias, "index": next_index}},
],
},
actions=[
{
"remove": {
"alias": index_alias,
"index": f"{index_alias}-*",
}
},
{"add": {"alias": index_alias, "index": next_index}},
]
)


def import_parallel(
config: Config,
config: IndexConfig,
file_path: Path,
next_index: str,
num_items: int | None,
Expand Down Expand Up @@ -278,12 +289,12 @@ def import_parallel(
logger.error("Encountered errors: %s", errors)


def import_taxonomies(config: Config, next_index: str):
def import_taxonomies(config: IndexConfig, next_index: str):
"""Import taxonomies into Elasticsearch.
A single taxonomy index is used to store all taxonomy items.
:param config: the configuration to use
:param config: the index configuration to use
:param next_index: the index to write to
"""
# open a connection for this process
Expand All @@ -305,6 +316,7 @@ def import_taxonomies(config: Config, next_index: str):


def get_redis_products(
stream_name: str,
processor: DocumentProcessor,
fetcher: BaseDocumentFetcher,
index: str,
Expand All @@ -313,6 +325,7 @@ def get_redis_products(
):
"""Fetch IDs of documents to update from Redis.
:param stream_name: the name of the Redis stream to read from
:param processor: the document processor to use to process the documents
:param fetcher: the document fetcher to use to fetch the documents from
the document ID
Expand All @@ -325,6 +338,7 @@ def get_redis_products(
processed = 0
for _, row in get_processed_since(
redis_client,
stream_name,
last_updated_timestamp_ms,
id_field_name,
document_fetcher=fetcher,
Expand All @@ -334,12 +348,18 @@ def get_redis_products(
logger.info("Processed %d updates from Redis", processed)


def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
def get_redis_updates(
es_client: Elasticsearch, index: str, config: IndexConfig
) -> None:
"""Fetch updates from Redis and index them.
:param index: the index to write to
:param config: the configuration to use
:param config: the index configuration to use
"""
if config.redis_stream_name is None:
logger.info(f"Redis updates are disabled for index {index}")
return

processor = DocumentProcessor(config)
fetcher = load_document_fetcher(config)
# Ensure all documents are searchable after the import
Expand All @@ -362,7 +382,12 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
for success, info in parallel_bulk(
es_client,
get_redis_products(
processor, fetcher, index, id_field_name, last_updated_timestamp_ms
config.redis_stream_name,
processor,
fetcher,
index,
id_field_name,
last_updated_timestamp_ms,
),
):
if not success:
Expand All @@ -372,7 +397,7 @@ def get_redis_updates(es_client: Elasticsearch, index: str, config: Config):
def run_full_import(
file_path: Path,
num_processes: int,
config: Config,
config: IndexConfig,
num_items: int | None = None,
):
"""Run a full data import from a JSONL.
Expand All @@ -386,7 +411,7 @@ def run_full_import(
:param file_path: the path of the JSONL file to import
:param num_processes: the number of processes to use to perform parallel
import
:param config: the configuration to use
:param config: the index configuration to use
:param num_items: the number of items to import, defaults to None (all)
"""
es_client = connection.get_es_client()
Expand Down Expand Up @@ -421,10 +446,10 @@ def run_full_import(
update_alias(es_client, next_index, config.index.name)


def perform_taxonomy_import(config: Config) -> None:
def perform_taxonomy_import(config: IndexConfig) -> None:
"""Create a new index for taxonomies and import them.
:param config: the configuration to use
:param config: the index configuration to use
"""
es_client = connection.get_es_client()
# we create a temporary index to import to
Expand Down Expand Up @@ -461,20 +486,35 @@ def run_update_daemon(config: Config) -> None:
logger.error("Could not connect to Redis")
return

document_fetcher = load_document_fetcher(config)
processor = DocumentProcessor(config)
if len(list(config.indices)) != 1:
raise ValueError("Only one index is supported")

processors: dict[str, DocumentProcessor] = {}
document_fetchers: dict[str, BaseDocumentFetcher] = {}
id_field_names: dict[str, str] = {}
stream_name_to_index_id: dict[str, str] = {}

for index_id, index_config in config.indices.items():
stream_name = index_config.redis_stream_name
if stream_name is not None:
processors[stream_name] = DocumentProcessor(index_config)
document_fetchers[stream_name] = load_document_fetcher(index_config)
id_field_names[stream_name] = index_config.index.id_field_name
stream_name_to_index_id[stream_name] = index_id

for _, document in get_new_updates(
for stream_name, _, document in get_new_updates(
redis_client,
config.index.id_field_name,
document_fetcher,
list(id_field_names.keys()),
id_field_names=id_field_names,
document_fetchers=document_fetchers,
):
document = processor.from_dict(document)
if not document:
processed_document = processors[stream_name].from_dict(document)
if not processed_document:
continue
_id = document.pop("_id")
_id = processed_document.pop("_id")
index_id = stream_name_to_index_id[stream_name]
es_client.index(
index=config.index.name,
body=document,
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
Loading

0 comments on commit 4edabfe

Please sign in to comment.