Skip to content

Commit

Permalink
Verification
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Dec 21, 2024
1 parent 044a340 commit 62427af
Show file tree
Hide file tree
Showing 20 changed files with 1,542 additions and 1,541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,

log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
// context.taskId(),
record.topic(),
record.partition(),
record.offset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,

log.error(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
// context.taskId(),
record.topic(),
record.partition(),
record.offset(),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
//import org.apache.kafka.common.header.Headers;
//import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.CommitCallback;
Expand Down Expand Up @@ -129,70 +129,70 @@ public void register(final StateStore store,
stateManager().registerStore(store, stateRestoreCallback, checkpoint);
}

@Override
public String topic() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For topic, the dummy value is `null`.
return null;
} else {
return recordContext.topic();
}
}

@Override
public int partition() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For partition, the dummy value is `-1`.
return -1;
} else {
return recordContext.partition();
}
}

@Override
public long offset() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For offset, the dummy value is `-1L`.
return -1L;
} else {
return recordContext.offset();
}
}

@Override
public Headers headers() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For headers, the dummy value is an empty headers collection.
return new RecordHeaders();
} else {
return recordContext.headers();
}
}

@Override
public long timestamp() {
if (recordContext == null) {
// This is only exposed via the deprecated ProcessorContext,
// in which case, we're preserving the pre-existing behavior
// of returning dummy values when the record context is undefined.
// For timestamp, the dummy value is `0L`.
return 0L;
} else {
return recordContext.timestamp();
}
}
// @Override
// public String topic() {
// if (recordContext == null) {
// // This is only exposed via the deprecated ProcessorContext,
// // in which case, we're preserving the pre-existing behavior
// // of returning dummy values when the record context is undefined.
// // For topic, the dummy value is `null`.
// return null;
// } else {
// return recordContext.topic();
// }
// }
//
// @Override
// public int partition() {
// if (recordContext == null) {
// // This is only exposed via the deprecated ProcessorContext,
// // in which case, we're preserving the pre-existing behavior
// // of returning dummy values when the record context is undefined.
// // For partition, the dummy value is `-1`.
// return -1;
// } else {
// return recordContext.partition();
// }
// }
//
// @Override
// public long offset() {
// if (recordContext == null) {
// // This is only exposed via the deprecated ProcessorContext,
// // in which case, we're preserving the pre-existing behavior
// // of returning dummy values when the record context is undefined.
// // For offset, the dummy value is `-1L`.
// return -1L;
// } else {
// return recordContext.offset();
// }
// }
//
// @Override
// public Headers headers() {
// if (recordContext == null) {
// // This is only exposed via the deprecated ProcessorContext,
// // in which case, we're preserving the pre-existing behavior
// // of returning dummy values when the record context is undefined.
// // For headers, the dummy value is an empty headers collection.
// return new RecordHeaders();
// } else {
// return recordContext.headers();
// }
// }
//
// @Override
// public long timestamp() {
// if (recordContext == null) {
// // This is only exposed via the deprecated ProcessorContext,
// // in which case, we're preserving the pre-existing behavior
// // of returning dummy values when the record context is undefined.
// // For timestamp, the dummy value is `0L`.
// return 0L;
// } else {
// return recordContext.timestamp();
// }
// }

@Override
public Map<String, Object> appConfigs() {
Expand Down
Loading

0 comments on commit 62427af

Please sign in to comment.