-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13722: remove usage of old ProcessorContext #18292
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - I just have a question regarding if recordContext
could ever be null, I'm guessing the chances for that are slim to none, but I wanted to ask.
@@ -40,7 +40,7 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, | |||
|
|||
log.warn( | |||
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", | |||
context.taskId(), | |||
// context.taskId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just delete this? Same for below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the "verification part", so it' won't get merged.
internalProcessorContext.partition(), | ||
internalProcessorContext.offset(), | ||
internalProcessorContext.headers(), | ||
internalProcessorContext.recordContext().topic(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever any chance recordContext()
could return null?
We don't have to address this now, more just asking the question in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question. Looking into the code, I believe it should never be null
.
While AbstractProcessorContext
has a null
-check, eg
public String topic() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For topic, the dummy value is `null`.
return null;
} else {
return recordContext.topic();
}
}
the comment seems to indicate that it could only be null
for the old Processor API which we did remove. I am also not 100% sure, but my read it, it should never be null
. -- We did not see any failing tests either, what I would hope is verification enough?
Looking into the code a little bit more, I did not find any call to setRecordContext
which would pass in null
(to unset it). So recordContext
would be null
only initially when the ProcessorContext
is created, but it seems it's always set properly before we process records or call punctuation. -- So the only issue with being null
could be if we would not set it after ProcessorContext
creation and before processing data or calling punctuations, but it seems we are doing the right thing (after it was set once, we could maybe have a wrong object if we forget to call setRecordContext
correctly, but it could not be null
any longer).
Does this help?
We want to deprecate an remove the old ProcessorContext. Thus, we need to refactor Kafak Streams runtime code, to not make calls into the old ProcessorContext but only into new code path.
b7d8987
to
07915d7
Compare
We want to deprecate and remove the old ProcessorContext. Thus, we need
to refactor Kafka Streams runtime code, to not make calls into the old
ProcessorContext but only into new code path.