-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes for deployment #97
Conversation
@@ -57,7 +57,7 @@ def build_consumer(kafka_options: KafkaOptions, logger: logging.Logger) -> Consu | |||
kafka_topics = collect_kafka_topics(kafka_options) | |||
logger.info("Subscribing to the following Kafka topics: %s", kafka_topics) | |||
consumer.subscribe(kafka_topics) | |||
return Consumer(consumer_options) | |||
return consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it fast enough for ingestor? subscribe
makes the consumer slower since broker
tries to find the best partitian every time it polls a message.
I'm just checking since we had this problem in live data reduction: https://docs.docs.esss.dk/dmsc/groups/dram/data-reduction/data-streaming.html#assign-topicpartition-to-a-consumer-manually-and-do-not-subscribe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the previous code, we were subscribing to the topics before instantiating a new consumer. That resulted in a new consumer which did not received any messages.
Once I made the change, I started getting the messages.
That said, I do not know the implementation of the kafka library, so I do not know if that is relevant or the problem was solved by divine intervention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the previous code, we were subscribing to the topics before instantiating a new consumer.
Ah yeah I found out : D.... that was a bug I guess.. (sorry)
but I was talking about a different thing. You can either subscribe a topic or assign a topic-partition to the consumer and latter is much faster.
But since the file ingerstor is not expecting too many messages I think it'll be fine.
Just let me know if you find it too slow. We have a solution.
logger.info("WRDN message received.") | ||
return True | ||
else: | ||
logger.error("Unexpected data type: %s", data_type) | ||
logger.info("Message of type %s ignored", message_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're expecting other types of message under this topic...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!!!
We should expect other types of messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch fixes for deployment
I'll just... merge it...? |
This PR includes all the last minute changes that were necessary to run the online and offline ingestor separately in the staging environment serving YMIR and CODA.
Changes summary: