Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka configuration. #15

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"bootstrap_servers": [
"HOST:9092"
],
"individual_message_commit": false,
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
},
Expand Down
34 changes: 33 additions & 1 deletion src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def build_main_arg_parser() -> argparse.ArgumentParser:

@dataclass
class RunOptions:
"""RunOptions dataclass to store the configuration options.

Most of options don't have default values because they are expected
to be set by the user either in the configuration file or through
command line arguments.
"""

config_file: str
verbose: bool
file_log: bool
Expand All @@ -102,12 +109,36 @@ class RunOptions:
pyscicat: Optional[str] = None


@dataclass
class kafkaOptions:
"""KafkaOptions dataclass to store the configuration options.

Default values are provided as they are not
expected to be set by command line arguments.
"""

topics: list[str] | str = "KAFKA_TOPIC_1,KAFKA_TOPIC_2"
"""List of Kafka topics. Multiple topics can be separated by commas."""
group_id: str = "GROUP_ID"
"""Kafka consumer group ID."""
bootstrap_servers: list[str] | str = "localhost:9092"
"""List of Kafka bootstrap servers. Multiple servers can be separated by commas."""
individual_message_commit: bool = False
"""Commit for each topic individually."""
enable_auto_commit: bool = True
"""Enable Kafka auto commit."""
auto_offset_reset: str = "earliest"
"""Kafka auto offset reset."""


@dataclass
class ScicatConfig:
original_dict: Mapping
"""Original configuration dictionary in the json file."""
run_options: RunOptions
"""Merged configuration dictionary with command line arguments."""
kafka_options: kafkaOptions
"""Kafka configuration options read from files."""

def to_dict(self) -> dict:
"""Return the configuration as a dictionary."""
Expand All @@ -119,7 +150,7 @@ def to_dict(self) -> dict:
if isinstance(value, Mapping):
original_dict[key] = dict(value)

copied = ScicatConfig(original_dict, self.run_options)
copied = ScicatConfig(original_dict, self.run_options, self.kafka_options)
return asdict(copied)


Expand Down Expand Up @@ -153,4 +184,5 @@ def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig:
return ScicatConfig(
original_dict=MappingProxyType(config_dict),
run_options=RunOptions(**run_option_dict),
kafka_options=kafkaOptions(**config_dict.setdefault('kafka', dict())),
)
15 changes: 15 additions & 0 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging

from scicat_configuration import build_main_arg_parser, build_scicat_config
from scicat_kafka import build_consumer
from scicat_logging import build_logger


def quit(logger: logging.Logger) -> None:
"""Log the message and exit the program."""
import sys

logger.info("Exiting ingestor")
sys.exit()


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_main_arg_parser()
Expand All @@ -14,3 +25,7 @@ def main() -> None:
# Log the configuration as dictionary so that it is easier to read from the logs
logger.info('Starting the Scicat Ingestor with the following configuration:')
logger.info(config.to_dict())

# Kafka consumer
if build_consumer(config.kafka_options, logger) is None:
quit(logger)
62 changes: 62 additions & 0 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging

from confluent_kafka import Consumer

from scicat_configuration import kafkaOptions


def collect_consumer_options(options: kafkaOptions) -> dict:
"""Build a Kafka consumer and configure it according to the ``options``."""
from dataclasses import asdict

# Build logger and formatter
config_dict = {
key.replace('_', '.'): value
for key, value in asdict(options).items()
if key not in ('topics', 'individual_message_commit')
}
config_dict['enable.auto.commit'] = (
not options.individual_message_commit
) and options.enable_auto_commit
return config_dict


def collect_kafka_topics(options: kafkaOptions) -> list[str]:
"""Return the Kafka topics as a list."""
if isinstance(options.topics, str):
return options.topics.split(',')
elif isinstance(options.topics, list):
return options.topics
else:
raise TypeError('The topics must be a list or a comma-separated string.')


def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer:
"""Build a Kafka consumer and configure it according to the ``options``."""
consumer_options = collect_consumer_options(kafka_options)
logger.info('Connecting to Kafka with the following parameters:')
logger.info(consumer_options)
consumer = Consumer(consumer_options)
if not validate_consumer(consumer, logger):
return None

kafka_topics = collect_kafka_topics(kafka_options)
logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}')
consumer.subscribe(kafka_topics)
return Consumer(consumer_options)


def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
try:
consumer.list_topics(timeout=1)
except Exception as err:
logger.error(
"Kafka consumer could not be instantiated. "
f"Error message from kafka thread: \n{err}"
)
return False
else:
logger.info('Kafka consumer successfully instantiated')
return True
3 changes: 2 additions & 1 deletion tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from scicat_configuration import RunOptions, ScicatConfig
from scicat_configuration import RunOptions, ScicatConfig, kafkaOptions


@pytest.fixture
Expand All @@ -22,6 +22,7 @@ def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig:
check_by_job_id=True,
pyscicat='test',
),
kafka_options=kafkaOptions(),
)


Expand Down
6 changes: 6 additions & 0 deletions tests/test_scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@ def test_scicat_config_original_dict_read_only(scicat_config: ScicatConfig) -> N
assert isinstance(scicat_config.original_dict, MappingProxyType)
for sub_option in scicat_config.original_dict.values():
assert isinstance(sub_option, MappingProxyType)


def test_scicat_config_kafka_options(scicat_config: ScicatConfig) -> None:
"""Test if the Kafka options are correctly read."""
assert scicat_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"]
assert scicat_config.kafka_options.enable_auto_commit