From 3a5d231c7b86c1fab44525bc9c426cf34fa8cea6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 23 Jan 2025 16:25:39 -0800 Subject: [PATCH] fix failing tests --- .../processor/internals/ProcessorContextImplTest.java | 6 +++++- .../java/org/apache/kafka/streams/TopologyTestDriver.java | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 336dd2ce50857..37f1def3dd358 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -645,6 +645,8 @@ public void shouldThrowUnsupportedOperationExceptionOnForward() { mockProcessorNodeWithLocalKeyValueStore(); context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value") @@ -665,6 +667,8 @@ public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { mockProcessorNodeWithLocalKeyValueStore(); context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value", To.child("child-name")) @@ -787,7 +791,7 @@ public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, - () -> context.recordContext.timestamp() + () -> context.timestamp() ); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2fc8400239d61..a4cee67ad5fae 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -61,6 +62,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -447,7 +449,6 @@ private void setupGlobalTask(final Time mockWallClockTime, streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ); globalStateTask.initialize(); - globalProcessorContext.setRecordContext(null); } else { globalStateManager = null; globalStateTask = null; @@ -492,6 +493,7 @@ private void setupTask(final StreamsConfig streamsConfig, streamsMetrics, cache ); + context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); task = new StreamTask( TASK_ID, @@ -511,7 +513,6 @@ private void setupTask(final StreamsConfig streamsConfig, ); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.processorContext().setRecordContext(null); for (final TopicPartition tp: task.inputPartitions()) { task.updateNextOffsets(tp, new OffsetAndMetadata(0, Optional.empty(), "")); }