From 62427afe392913afae630e56601053067309faad Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 20 Dec 2024 20:49:16 -0800 Subject: [PATCH] Verification --- .../LogAndContinueExceptionHandler.java | 2 +- .../errors/LogAndFailExceptionHandler.java | 2 +- .../streams/processor/ProcessorContext.java | 566 +++++------ .../internals/AbstractProcessorContext.java | 132 +-- .../ForwardingDisabledProcessorContext.java | 244 ++--- .../internals/GlobalProcessorContextImpl.java | 30 +- .../internals/ProcessorContextImpl.java | 98 +- .../internals/KTableTransformValuesTest.java | 6 +- .../AbstractProcessorContextTest.java | 158 ++-- ...orwardingDisabledProcessorContextTest.java | 26 +- .../GlobalProcessorContextImplTest.java | 34 +- .../internals/ProcessorContextImplTest.java | 238 ++--- .../internals/ProcessorContextTest.java | 52 +- .../internals/ProcessorNodeTest.java | 14 +- .../processor/internals/StreamTaskTest.java | 4 +- .../test/InternalMockProcessorContext.java | 132 +-- .../test/MockInternalProcessorContext.java | 79 +- .../kafka/test/NoOpProcessorContext.java | 20 +- .../processor/MockProcessorContext.java | 358 +++---- .../streams/MockProcessorContextTest.java | 888 +++++++++--------- 20 files changed, 1542 insertions(+), 1541 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index 3622da137c16e..f512db9e994ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -40,7 +40,7 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, log.warn( "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), +// context.taskId(), record.topic(), record.partition(), record.offset(), diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java index aaef5ca050b78..4fef049224a80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -40,7 +40,7 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, log.error( "Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), +// context.taskId(), record.topic(), record.partition(), record.offset(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 38c27646b0f99..a6102ac430127 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -16,21 +16,21 @@ */ package org.apache.kafka.streams.processor; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.io.File; -import java.time.Duration; -import java.util.Map; +//import org.apache.kafka.common.header.Headers; +//import org.apache.kafka.common.serialization.Deserializer; +//import org.apache.kafka.common.serialization.Serde; +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.StreamsMetrics; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.errors.StreamsException; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +//import org.apache.kafka.streams.processor.api.ProcessorSupplier; +//import org.apache.kafka.streams.state.StoreBuilder; +// +//import java.io.File; +//import java.time.Duration; +//import java.util.Map; /** * Processor context interface. @@ -43,272 +43,272 @@ */ @SuppressWarnings("deprecation") public interface ProcessorContext { - - /** - * Return the application id. - * - * @return the application id - */ - String applicationId(); - - /** - * Return the task id. - * - * @return the task id - */ - TaskId taskId(); - - /** - * Return the default key serde. - * - * @return the key serializer - */ - Serde keySerde(); - - /** - * Return the default value serde. - * - * @return the value serializer - */ - Serde valueSerde(); - - /** - * Return the state directory for the partition. - * - * @return the state directory - */ - File stateDir(); - - /** - * Return Metrics instance. - * - * @return StreamsMetrics - */ - StreamsMetrics metrics(); - - /** - * Register and possibly restores the specified storage engine. - * - * @param store the storage engine - * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart - * - * @throws IllegalStateException If store gets registered after initialized is already finished - * @throws StreamsException if the store's change log does not contain the partition - */ - void register(final StateStore store, - final StateRestoreCallback stateRestoreCallback); - - /** - * Get the state store given the store name. - * - * @param name The store name - * @param The type or interface of the store to return - * @return The state store instance - * - * @throws ClassCastException if the return type isn't a type or interface of the actual returned store. - */ - S getStateStore(final String name); - - /** - * Schedule a periodic operation for processors. A processor may call this method during - * {@link org.apache.kafka.streams.kstream.ValueTransformer#init(ProcessorContext) initialization} or - * {@link org.apache.kafka.streams.kstream.ValueTransformer#transform(Object) processing} to - * schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}. - * The type parameter controls what notion of time is used for punctuation: - *
    - *
  • {@link PunctuationType#STREAM_TIME} — uses "stream time", which is advanced by the processing of messages - * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. - * The first punctuation will be triggered by the first record that is processed. - * NOTE: Only advanced if messages arrive
  • - *
  • {@link PunctuationType#WALL_CLOCK_TIME} — uses system time (the wall-clock time), - * which is advanced independent of whether new messages arrive. - * The first punctuation will be triggered after interval has elapsed. - * NOTE: This is best effort only as its granularity is limited by how long an iteration of the - * processing loop takes to complete
  • - *
- * - * Skipping punctuations: Punctuations will not be triggered more than once at any given timestamp. - * This means that "missed" punctuation will be skipped. - * It's possible to "miss" a punctuation if: - *
    - *
  • with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval
  • - *
  • with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...
  • - *
- * - * @param interval the time interval between punctuations (supported minimum is 1 millisecond) - * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME} - * @param callback a function consuming timestamps representing the current stream or system time - * @return a handle allowing cancellation of the punctuation schedule established by this method - * @throws IllegalArgumentException if the interval is not representable in milliseconds - */ - Cancellable schedule(final Duration interval, - final PunctuationType type, - final Punctuator callback); - - /** - * Forward a key/value pair to all downstream processors. - * Used the input record's timestamp as timestamp for the output record. - * - *

If this method is called with {@link Punctuator#punctuate(long)} the record that - * is sent downstream won't have any associated record metadata like topic, partition, or offset. - * - * @param key key - * @param value value - */ - void forward(final K key, final V value); - - /** - * Forward a key/value pair to the specified downstream processors. - * Can be used to set the timestamp of the output record. - * - *

If this method is called with {@link Punctuator#punctuate(long)} the record that - * is sent downstream won't have any associated record metadata like topic, partition, or offset. - * - * @param key key - * @param value value - * @param to the options to use when forwarding - */ - void forward(final K key, final V value, final To to); - - /** - * Request a commit. - */ - void commit(); - - /** - * Return the topic name of the current input record; could be {@code null} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated topic. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid topic name, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the topic name - */ - String topic(); - - /** - * Return the partition id of the current input record; could be {@code -1} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated partition id. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid partition id, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the partition id - */ - int partition(); - - /** - * Return the offset of the current input record; could be {@code -1} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated offset. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid offset, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the offset - */ - long offset(); - - /** - * Return the headers of the current input record; could be an empty header if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record might not have any associated headers. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide valid headers, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the headers - */ - Headers headers(); - - /** - * Return the current timestamp. - * - *

If it is triggered while processing a record streamed from the source processor, - * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from - * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. - * Note, that an upstream {@link org.apache.kafka.streams.processor.api.Processor} might have set a new timestamp by calling - * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(org.apache.kafka.streams.processor.api.Record)}. - * In particular, some Kafka Streams DSL operators set result record timestamps explicitly, - * to guarantee deterministic results. - * - *

If it is triggered while processing a record generated not from the source processor (for example, - * if this method is invoked from the punctuate call), timestamp is defined as the current - * task's stream time, which is defined as the largest timestamp of any record processed by the task. - * - * @return the timestamp - */ - long timestamp(); - - /** - * Return all the application config properties as key/value pairs. - * - *

The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig} - * object and associated to the ProcessorContext. - * - *

The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property - * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG} - * will be of type {@link Class}, even if it was specified as a String to - * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}). - * - * @return all the key/values from the StreamsConfig properties - */ - Map appConfigs(); - - /** - * Return all the application config properties with the given key prefix, as key/value pairs - * stripping the prefix. - * - *

The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig} - * object and associated to the ProcessorContext. - * - * @param prefix the properties prefix - * @return the key/values matching the given prefix from the StreamsConfig properties. - */ - Map appConfigsWithPrefix(final String prefix); - - /** - * Return the current system timestamp (also called wall-clock time) in milliseconds. - * - *

Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. - * Thus, it may return a different value compared to {@code System.currentTimeMillis()}. - * - * @return the current system timestamp in milliseconds - */ - long currentSystemTimeMs(); - - /** - * Return the current stream-time in milliseconds. - * - *

Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far - * (including the currently processed record), i.e., it can be considered a high-watermark. - * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. - * - *

Note: this method is not supported for global processors (cf. - * {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) Topology.addGlobalStore(...)} - * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}), - * because there is no concept of stream-time for this case. - * Calling this method in a global processor will result in an {@link UnsupportedOperationException}. - * - * @return the current stream-time in milliseconds - */ - long currentStreamTimeMs(); +// +// /** +// * Return the application id. +// * +// * @return the application id +// */ +// String applicationId(); +// +// /** +// * Return the task id. +// * +// * @return the task id +// */ +// TaskId taskId(); +// +// /** +// * Return the default key serde. +// * +// * @return the key serializer +// */ +// Serde keySerde(); +// +// /** +// * Return the default value serde. +// * +// * @return the value serializer +// */ +// Serde valueSerde(); +// +// /** +// * Return the state directory for the partition. +// * +// * @return the state directory +// */ +// File stateDir(); +// +// /** +// * Return Metrics instance. +// * +// * @return StreamsMetrics +// */ +// StreamsMetrics metrics(); +// +// /** +// * Register and possibly restores the specified storage engine. +// * +// * @param store the storage engine +// * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart +// * +// * @throws IllegalStateException If store gets registered after initialized is already finished +// * @throws StreamsException if the store's change log does not contain the partition +// */ +// void register(final StateStore store, +// final StateRestoreCallback stateRestoreCallback); +// +// /** +// * Get the state store given the store name. +// * +// * @param name The store name +// * @param The type or interface of the store to return +// * @return The state store instance +// * +// * @throws ClassCastException if the return type isn't a type or interface of the actual returned store. +// */ +// S getStateStore(final String name); +// +// /** +// * Schedule a periodic operation for processors. A processor may call this method during +// * {@link org.apache.kafka.streams.kstream.ValueTransformer#init(ProcessorContext) initialization} or +// * {@link org.apache.kafka.streams.kstream.ValueTransformer#transform(Object) processing} to +// * schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}. +// * The type parameter controls what notion of time is used for punctuation: +// *

    +// *
  • {@link PunctuationType#STREAM_TIME} — uses "stream time", which is advanced by the processing of messages +// * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. +// * The first punctuation will be triggered by the first record that is processed. +// * NOTE: Only advanced if messages arrive
  • +// *
  • {@link PunctuationType#WALL_CLOCK_TIME} — uses system time (the wall-clock time), +// * which is advanced independent of whether new messages arrive. +// * The first punctuation will be triggered after interval has elapsed. +// * NOTE: This is best effort only as its granularity is limited by how long an iteration of the +// * processing loop takes to complete
  • +// *
+// * +// * Skipping punctuations: Punctuations will not be triggered more than once at any given timestamp. +// * This means that "missed" punctuation will be skipped. +// * It's possible to "miss" a punctuation if: +// *
    +// *
  • with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval
  • +// *
  • with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...
  • +// *
+// * +// * @param interval the time interval between punctuations (supported minimum is 1 millisecond) +// * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME} +// * @param callback a function consuming timestamps representing the current stream or system time +// * @return a handle allowing cancellation of the punctuation schedule established by this method +// * @throws IllegalArgumentException if the interval is not representable in milliseconds +// */ +// Cancellable schedule(final Duration interval, +// final PunctuationType type, +// final Punctuator callback); +// +// /** +// * Forward a key/value pair to all downstream processors. +// * Used the input record's timestamp as timestamp for the output record. +// * +// *

If this method is called with {@link Punctuator#punctuate(long)} the record that +// * is sent downstream won't have any associated record metadata like topic, partition, or offset. +// * +// * @param key key +// * @param value value +// */ +// void forward(final K key, final V value); +// +// /** +// * Forward a key/value pair to the specified downstream processors. +// * Can be used to set the timestamp of the output record. +// * +// *

If this method is called with {@link Punctuator#punctuate(long)} the record that +// * is sent downstream won't have any associated record metadata like topic, partition, or offset. +// * +// * @param key key +// * @param value value +// * @param to the options to use when forwarding +// */ +// void forward(final K key, final V value, final To to); +// +// /** +// * Request a commit. +// */ +// void commit(); +// +// /** +// * Return the topic name of the current input record; could be {@code null} if it is not +// * available. +// * +// *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) +// * punctuation callback}, or while processing a record that was forwarded by a punctuation +// * callback, the record won't have an associated topic. +// * Another example is +// * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} +// * (and siblings), that do not always guarantee to provide a valid topic name, as they might be +// * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. +// * +// * @return the topic name +// */ +// String topic(); +// +// /** +// * Return the partition id of the current input record; could be {@code -1} if it is not +// * available. +// * +// *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) +// * punctuation callback}, or while processing a record that was forwarded by a punctuation +// * callback, the record won't have an associated partition id. +// * Another example is +// * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} +// * (and siblings), that do not always guarantee to provide a valid partition id, as they might be +// * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. +// * +// * @return the partition id +// */ +// int partition(); +// +// /** +// * Return the offset of the current input record; could be {@code -1} if it is not +// * available. +// * +// *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) +// * punctuation callback}, or while processing a record that was forwarded by a punctuation +// * callback, the record won't have an associated offset. +// * Another example is +// * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} +// * (and siblings), that do not always guarantee to provide a valid offset, as they might be +// * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. +// * +// * @return the offset +// */ +// long offset(); +// +// /** +// * Return the headers of the current input record; could be an empty header if it is not +// * available. +// * +// *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) +// * punctuation callback}, or while processing a record that was forwarded by a punctuation +// * callback, the record might not have any associated headers. +// * Another example is +// * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} +// * (and siblings), that do not always guarantee to provide valid headers, as they might be +// * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. +// * +// * @return the headers +// */ +// Headers headers(); +// +// /** +// * Return the current timestamp. +// * +// *

If it is triggered while processing a record streamed from the source processor, +// * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from +// * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. +// * Note, that an upstream {@link org.apache.kafka.streams.processor.api.Processor} might have set a new timestamp by calling +// * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(org.apache.kafka.streams.processor.api.Record)}. +// * In particular, some Kafka Streams DSL operators set result record timestamps explicitly, +// * to guarantee deterministic results. +// * +// *

If it is triggered while processing a record generated not from the source processor (for example, +// * if this method is invoked from the punctuate call), timestamp is defined as the current +// * task's stream time, which is defined as the largest timestamp of any record processed by the task. +// * +// * @return the timestamp +// */ +// long timestamp(); +// +// /** +// * Return all the application config properties as key/value pairs. +// * +// *

The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig} +// * object and associated to the ProcessorContext. +// * +// *

The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property +// * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG} +// * will be of type {@link Class}, even if it was specified as a String to +// * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}). +// * +// * @return all the key/values from the StreamsConfig properties +// */ +// Map appConfigs(); +// +// /** +// * Return all the application config properties with the given key prefix, as key/value pairs +// * stripping the prefix. +// * +// *

The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig} +// * object and associated to the ProcessorContext. +// * +// * @param prefix the properties prefix +// * @return the key/values matching the given prefix from the StreamsConfig properties. +// */ +// Map appConfigsWithPrefix(final String prefix); +// +// /** +// * Return the current system timestamp (also called wall-clock time) in milliseconds. +// * +// *

Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. +// * Thus, it may return a different value compared to {@code System.currentTimeMillis()}. +// * +// * @return the current system timestamp in milliseconds +// */ +// long currentSystemTimeMs(); +// +// /** +// * Return the current stream-time in milliseconds. +// * +// *

Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far +// * (including the currently processed record), i.e., it can be considered a high-watermark. +// * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. +// * +// *

Note: this method is not supported for global processors (cf. +// * {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) Topology.addGlobalStore(...)} +// * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}), +// * because there is no concept of stream-time for this case. +// * Calling this method in a global processor will result in an {@link UnsupportedOperationException}. +// * +// * @return the current stream-time in milliseconds +// */ +// long currentStreamTimeMs(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 0256df19a5a13..44efecb887ed8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; +//import org.apache.kafka.common.header.Headers; +//import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.CommitCallback; @@ -129,70 +129,70 @@ public void register(final StateStore store, stateManager().registerStore(store, stateRestoreCallback, checkpoint); } - @Override - public String topic() { - if (recordContext == null) { - // This is only exposed via the deprecated ProcessorContext, - // in which case, we're preserving the pre-existing behavior - // of returning dummy values when the record context is undefined. - // For topic, the dummy value is `null`. - return null; - } else { - return recordContext.topic(); - } - } - - @Override - public int partition() { - if (recordContext == null) { - // This is only exposed via the deprecated ProcessorContext, - // in which case, we're preserving the pre-existing behavior - // of returning dummy values when the record context is undefined. - // For partition, the dummy value is `-1`. - return -1; - } else { - return recordContext.partition(); - } - } - - @Override - public long offset() { - if (recordContext == null) { - // This is only exposed via the deprecated ProcessorContext, - // in which case, we're preserving the pre-existing behavior - // of returning dummy values when the record context is undefined. - // For offset, the dummy value is `-1L`. - return -1L; - } else { - return recordContext.offset(); - } - } - - @Override - public Headers headers() { - if (recordContext == null) { - // This is only exposed via the deprecated ProcessorContext, - // in which case, we're preserving the pre-existing behavior - // of returning dummy values when the record context is undefined. - // For headers, the dummy value is an empty headers collection. - return new RecordHeaders(); - } else { - return recordContext.headers(); - } - } - - @Override - public long timestamp() { - if (recordContext == null) { - // This is only exposed via the deprecated ProcessorContext, - // in which case, we're preserving the pre-existing behavior - // of returning dummy values when the record context is undefined. - // For timestamp, the dummy value is `0L`. - return 0L; - } else { - return recordContext.timestamp(); - } - } +// @Override +// public String topic() { +// if (recordContext == null) { +// // This is only exposed via the deprecated ProcessorContext, +// // in which case, we're preserving the pre-existing behavior +// // of returning dummy values when the record context is undefined. +// // For topic, the dummy value is `null`. +// return null; +// } else { +// return recordContext.topic(); +// } +// } +// +// @Override +// public int partition() { +// if (recordContext == null) { +// // This is only exposed via the deprecated ProcessorContext, +// // in which case, we're preserving the pre-existing behavior +// // of returning dummy values when the record context is undefined. +// // For partition, the dummy value is `-1`. +// return -1; +// } else { +// return recordContext.partition(); +// } +// } +// +// @Override +// public long offset() { +// if (recordContext == null) { +// // This is only exposed via the deprecated ProcessorContext, +// // in which case, we're preserving the pre-existing behavior +// // of returning dummy values when the record context is undefined. +// // For offset, the dummy value is `-1L`. +// return -1L; +// } else { +// return recordContext.offset(); +// } +// } +// +// @Override +// public Headers headers() { +// if (recordContext == null) { +// // This is only exposed via the deprecated ProcessorContext, +// // in which case, we're preserving the pre-existing behavior +// // of returning dummy values when the record context is undefined. +// // For headers, the dummy value is an empty headers collection. +// return new RecordHeaders(); +// } else { +// return recordContext.headers(); +// } +// } +// +// @Override +// public long timestamp() { +// if (recordContext == null) { +// // This is only exposed via the deprecated ProcessorContext, +// // in which case, we're preserving the pre-existing behavior +// // of returning dummy values when the record context is undefined. +// // For timestamp, the dummy value is `0L`. +// return 0L; +// } else { +// return recordContext.timestamp(); +// } +// } @Override public Map appConfigs() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java index 17028abe34b36..a0ec0879e567b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java @@ -16,22 +16,22 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.Cancellable; +//import org.apache.kafka.common.header.Headers; +//import org.apache.kafka.common.serialization.Serde; +//import org.apache.kafka.streams.StreamsMetrics; +//import org.apache.kafka.streams.errors.StreamsException; +//import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; - -import java.io.File; -import java.time.Duration; -import java.util.Map; +//import org.apache.kafka.streams.processor.PunctuationType; +//import org.apache.kafka.streams.processor.Punctuator; +//import org.apache.kafka.streams.processor.StateRestoreCallback; +//import org.apache.kafka.streams.processor.StateStore; +////import org.apache.kafka.streams.processor.TaskId; +//import org.apache.kafka.streams.processor.To; +// +//import java.io.File; +//import java.time.Duration; +//import java.util.Map; import java.util.Objects; /** @@ -49,111 +49,111 @@ public ForwardingDisabledProcessorContext(final ProcessorContext delegate) { this.delegate = Objects.requireNonNull(delegate, "delegate"); } - @Override - public String applicationId() { - return delegate.applicationId(); - } - - @Override - public TaskId taskId() { - return delegate.taskId(); - } - - @Override - public Serde keySerde() { - return delegate.keySerde(); - } - - @Override - public Serde valueSerde() { - return delegate.valueSerde(); - } - - @Override - public File stateDir() { - return delegate.stateDir(); - } - - @Override - public StreamsMetrics metrics() { - return delegate.metrics(); - } - - @Override - public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallback) { - delegate.register(store, stateRestoreCallback); - } - - @Override - public S getStateStore(final String name) { - return delegate.getStateStore(name); - } - - @Override - public Cancellable schedule(final Duration interval, - final PunctuationType type, - final Punctuator callback) throws IllegalArgumentException { - return delegate.schedule(interval, type, callback); - } - - @Override - public void forward(final K key, final V value) { - throw new StreamsException(EXPLANATION); - } - - @Override - public void forward(final K key, final V value, final To to) { - throw new StreamsException(EXPLANATION); - } - - @Override - public void commit() { - delegate.commit(); - } - - @Override - public String topic() { - return delegate.topic(); - } - - @Override - public int partition() { - return delegate.partition(); - } - - @Override - public long offset() { - return delegate.offset(); - } - - @Override - public Headers headers() { - return delegate.headers(); - } - - @Override - public long timestamp() { - return delegate.timestamp(); - } - - @Override - public Map appConfigs() { - return delegate.appConfigs(); - } - - @Override - public Map appConfigsWithPrefix(final String prefix) { - return delegate.appConfigsWithPrefix(prefix); - } - - @Override - public long currentSystemTimeMs() { - return delegate.currentSystemTimeMs(); - } - - @Override - public long currentStreamTimeMs() { - return delegate.currentStreamTimeMs(); - } +// @Override +// public String applicationId() { +// return delegate.applicationId(); +// } +// +// @Override +// public TaskId taskId() { +// return delegate.taskId(); +// } +// +// @Override +// public Serde keySerde() { +// return delegate.keySerde(); +// } +// +// @Override +// public Serde valueSerde() { +// return delegate.valueSerde(); +// } +// +// @Override +// public File stateDir() { +// return delegate.stateDir(); +// } +// +// @Override +// public StreamsMetrics metrics() { +// return delegate.metrics(); +// } +// +// @Override +// public void register(final StateStore store, +// final StateRestoreCallback stateRestoreCallback) { +// delegate.register(store, stateRestoreCallback); +// } +// +// @Override +// public S getStateStore(final String name) { +// return delegate.getStateStore(name); +// } +// +// @Override +// public Cancellable schedule(final Duration interval, +// final PunctuationType type, +// final Punctuator callback) throws IllegalArgumentException { +// return delegate.schedule(interval, type, callback); +// } +// +// @Override +// public void forward(final K key, final V value) { +// throw new StreamsException(EXPLANATION); +// } +// +// @Override +// public void forward(final K key, final V value, final To to) { +// throw new StreamsException(EXPLANATION); +// } +// +// @Override +// public void commit() { +// delegate.commit(); +// } +// +// @Override +// public String topic() { +// return delegate.topic(); +// } +// +// @Override +// public int partition() { +// return delegate.partition(); +// } +// +// @Override +// public long offset() { +// return delegate.offset(); +// } +// +// @Override +// public Headers headers() { +// return delegate.headers(); +// } +// +// @Override +// public long timestamp() { +// return delegate.timestamp(); +// } +// +// @Override +// public Map appConfigs() { +// return delegate.appConfigs(); +// } +// +// @Override +// public Map appConfigsWithPrefix(final String prefix) { +// return delegate.appConfigsWithPrefix(prefix); +// } +// +// @Override +// public long currentSystemTimeMs() { +// return delegate.currentSystemTimeMs(); +// } +// +// @Override +// public long currentStreamTimeMs() { +// return delegate.currentStreamTimeMs(); +// } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 01b694863fd0d..f19d3e87679d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -82,20 +82,20 @@ public void forward(final Record record, final String childName) { throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } - @Override - public void forward(final KIn key, final VIn value) { - forward(new Record<>(key, value, recordContext().timestamp(), headers())); - } - - /** - * No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes - */ - @Override - public void forward(final K key, final V value, final To to) { - if (!currentNode().children().isEmpty()) { - throw new IllegalStateException("This method should only be called on 'GlobalStateStore.flush' that should not have any children."); - } - } +// @Override +// public void forward(final KIn key, final VIn value) { +// forward(new Record<>(key, value, recordContext().timestamp(), headers())); +// } +// +// /** +// * No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes +// */ +// @Override +// public void forward(final K key, final V value, final To to) { +// if (!currentNode().children().isEmpty()) { +// throw new IllegalStateException("This method should only be called on 'GlobalStateStore.flush' that should not have any children."); +// } +// } @Override public void forward(final FixedKeyRecord record) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 1f864591f4cb5..4e776af676152 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -184,31 +184,31 @@ public S getStateStore(final String name) { return (S) wrapWithReadWriteStore(store); } - @Override - public void forward(final K key, - final V value) { - final Record toForward = new Record<>( - key, - value, - recordContext.timestamp(), - headers() - ); - forward(toForward); - } - - @Override - public void forward(final K key, - final V value, - final To to) { - final ToInternal toInternal = new ToInternal(to); - final Record toForward = new Record<>( - key, - value, - toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(), - headers() - ); - forward(toForward, toInternal.child()); - } +// @Override +// public void forward(final K key, +// final V value) { +// final Record toForward = new Record<>( +// key, +// value, +// recordContext.timestamp(), +// headers() +// ); +// forward(toForward); +// } +// +// @Override +// public void forward(final K key, +// final V value, +// final To to) { +// final ToInternal toInternal = new ToInternal(to); +// final Record toForward = new Record<>( +// key, +// value, +// toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(), +// headers() +// ); +// forward(toForward, toInternal.child()); +// } @Override public void forward(final FixedKeyRecord record) { @@ -313,29 +313,29 @@ public Cancellable schedule(final Duration interval, return streamTask.schedule(intervalMs, type, callback); } - @Override - public String topic() { - throwUnsupportedOperationExceptionIfStandby("topic"); - return super.topic(); - } - - @Override - public int partition() { - throwUnsupportedOperationExceptionIfStandby("partition"); - return super.partition(); - } - - @Override - public long offset() { - throwUnsupportedOperationExceptionIfStandby("offset"); - return super.offset(); - } - - @Override - public long timestamp() { - throwUnsupportedOperationExceptionIfStandby("timestamp"); - return super.timestamp(); - } +// @Override +// public String topic() { +// throwUnsupportedOperationExceptionIfStandby("topic"); +// return super.topic(); +// } +// +// @Override +// public int partition() { +// throwUnsupportedOperationExceptionIfStandby("partition"); +// return super.partition(); +// } +// +// @Override +// public long offset() { +// throwUnsupportedOperationExceptionIfStandby("offset"); +// return super.offset(); +// } +// +// @Override +// public long timestamp() { +// throwUnsupportedOperationExceptionIfStandby("timestamp"); +// return super.timestamp(); +// } @Override public long currentStreamTimeMs() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java index 402a27e461a13..b9ffee5ff99a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -465,9 +465,9 @@ private static void throwIfStoresNotAvailable(final ProcessorContext context, final List missing = new ArrayList<>(); for (final String storedName : expectedStoredNames) { - if (context.getStateStore(storedName) == null) { - missing.add(storedName); - } +// if (context.getStateStore(storedName) == null) { +// missing.add(storedName); +// } } if (!missing.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 26b8d36f3b08f..c13329dfa44aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.query.Position; @@ -48,10 +48,10 @@ import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.nullValue; +//import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.is; +//import static org.hamcrest.Matchers.emptyIterable; +//import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -89,76 +89,76 @@ public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() { assertThrows(NullPointerException.class, () -> context.register(null, null)); } - @Test - public void shouldReturnNullTopicIfNoRecordContext() { - context.setRecordContext(null); - assertThat(context.topic(), is(nullValue())); - } - - @Test - public void shouldNotThrowNullPointerExceptionOnTopicIfRecordContextTopicIsNull() { - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null, new RecordHeaders())); - assertThat(context.topic(), nullValue()); - } - - @Test - public void shouldReturnTopicFromRecordContext() { - assertThat(context.topic(), equalTo(recordContext.topic())); - } - - @Test - public void shouldReturnNullIfTopicEqualsNonExistTopic() { - context.setRecordContext(null); - assertThat(context.topic(), nullValue()); - } - - @Test - public void shouldReturnDummyPartitionIfNoRecordContext() { - context.setRecordContext(null); - assertThat(context.partition(), is(-1)); - } - - @Test - public void shouldReturnPartitionFromRecordContext() { - assertThat(context.partition(), equalTo(recordContext.partition())); - } - - @Test - public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() { - context.setRecordContext(null); - try { - context.offset(); - } catch (final IllegalStateException e) { - // pass - } - } - - @Test - public void shouldReturnOffsetFromRecordContext() { - assertThat(context.offset(), equalTo(recordContext.offset())); - } - - @Test - public void shouldReturnDummyTimestampIfNoRecordContext() { - context.setRecordContext(null); - assertThat(context.timestamp(), is(0L)); - } - - @Test - public void shouldReturnTimestampFromRecordContext() { - assertThat(context.timestamp(), equalTo(recordContext.timestamp())); - } - - @Test - public void shouldReturnHeadersFromRecordContext() { - assertThat(context.headers(), equalTo(recordContext.headers())); - } - - @Test - public void shouldReturnEmptyHeadersIfHeadersAreNotSet() { - context.setRecordContext(null); - assertThat(context.headers(), is(emptyIterable())); - } +// @Test +// public void shouldReturnNullTopicIfNoRecordContext() { +// context.setRecordContext(null); +// assertThat(context.topic(), is(nullValue())); +// } +// +// @Test +// public void shouldNotThrowNullPointerExceptionOnTopicIfRecordContextTopicIsNull() { +// context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null, new RecordHeaders())); +// assertThat(context.topic(), nullValue()); +// } +// +// @Test +// public void shouldReturnTopicFromRecordContext() { +// assertThat(context.topic(), equalTo(recordContext.topic())); +// } +// +// @Test +// public void shouldReturnNullIfTopicEqualsNonExistTopic() { +// context.setRecordContext(null); +// assertThat(context.topic(), nullValue()); +// } +// +// @Test +// public void shouldReturnDummyPartitionIfNoRecordContext() { +// context.setRecordContext(null); +// assertThat(context.partition(), is(-1)); +// } +// +// @Test +// public void shouldReturnPartitionFromRecordContext() { +// assertThat(context.partition(), equalTo(recordContext.partition())); +// } +// +// @Test +// public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() { +// context.setRecordContext(null); +// try { +// context.offset(); +// } catch (final IllegalStateException e) { +// // pass +// } +// } +// +// @Test +// public void shouldReturnOffsetFromRecordContext() { +// assertThat(context.offset(), equalTo(recordContext.offset())); +// } +// +// @Test +// public void shouldReturnDummyTimestampIfNoRecordContext() { +// context.setRecordContext(null); +// assertThat(context.timestamp(), is(0L)); +// } +// +// @Test +// public void shouldReturnTimestampFromRecordContext() { +// assertThat(context.timestamp(), equalTo(recordContext.timestamp())); +// } +// +// @Test +// public void shouldReturnHeadersFromRecordContext() { +// assertThat(context.headers(), equalTo(recordContext.headers())); +// } +// +// @Test +// public void shouldReturnEmptyHeadersIfHeadersAreNotSet() { +// context.setRecordContext(null); +// assertThat(context.headers(), is(emptyIterable())); +// } @Test public void appConfigsShouldReturnParsedValues() { @@ -227,11 +227,11 @@ public void forward(final Record record) {} @Override public void forward(final Record record, final String childName) {} - @Override - public void forward(final K key, final V value) {} - - @Override - public void forward(final K key, final V value, final To to) {} +// @Override +// public void forward(final K key, final V value) {} +// +// @Override +// public void forward(final K key, final V value, final To to) {} @Override public void commit() {} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java index 1f132b50e542e..16e8b1a274174 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.errors.StreamsException; +//import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +//import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertThrows; +//import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; public class ForwardingDisabledProcessorContextTest { @@ -35,13 +35,13 @@ public void setUp() { context = new ForwardingDisabledProcessorContext(mock(ProcessorContext.class)); } - @Test - public void shouldThrowOnForward() { - assertThrows(StreamsException.class, () -> context.forward("key", "value")); - } - - @Test - public void shouldThrowOnForwardWithTo() { - assertThrows(StreamsException.class, () -> context.forward("key", "value", To.all())); - } +// @Test +// public void shouldThrowOnForward() { +// assertThrows(StreamsException.class, () -> context.forward("key", "value")); +// } +// +// @Test +// public void shouldThrowOnForwardWithTo() { +// assertThrows(StreamsException.class, () -> context.forward("key", "value", To.all())); +// } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index e4425c187b7fa..9dad193c36631 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.header.internals.RecordHeaders; +//import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -40,8 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; +//import static org.mockito.ArgumentMatchers.any; +//import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -92,19 +92,19 @@ public void shouldReturnGlobalOrNullStore() { assertNull(globalContext.getStateStore(UNKNOWN_STORE)); } - @Test - public void shouldForwardToSingleChild() { - doNothing().when(child).process(any()); - - when(recordContext.timestamp()).thenReturn(0L); - when(recordContext.headers()).thenReturn(new RecordHeaders()); - globalContext.forward((Object /*forcing a call to the K/V forward*/) null, null); - } - - @Test - public void shouldFailToForwardUsingToParameter() { - assertThrows(IllegalStateException.class, () -> globalContext.forward(null, null, To.all())); - } +// @Test +// public void shouldForwardToSingleChild() { +// doNothing().when(child).process(any()); +// +// when(recordContext.timestamp()).thenReturn(0L); +// when(recordContext.headers()).thenReturn(new RecordHeaders()); +// globalContext.forward((Object /*forcing a call to the K/V forward*/) null, null); +// } +// +// @Test +// public void shouldFailToForwardUsingToParameter() { +// assertThrows(IllegalStateException.class, () -> globalContext.forward(null, null, To.all())); +// } @Test public void shouldNotFailOnNoOpCommit() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 336dd2ce50857..855effa6dd526 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -631,45 +631,45 @@ public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() { ); } - @Test - public void shouldThrowUnsupportedOperationExceptionOnForward() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.forward("key", "value") - ); - } - - @Test - public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.forward("key", "value", To.child("child-name")) - ); - } +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnForward() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.forward("key", "value") +// ); +// } +// +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.forward("key", "value", To.child("child-name")) +// ); +// } @Test public void shouldThrowUnsupportedOperationExceptionOnCommit() { @@ -711,85 +711,85 @@ public void shouldThrowUnsupportedOperationExceptionOnSchedule() { ); } - @Test - public void shouldThrowUnsupportedOperationExceptionOnTopic() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.topic() - ); - } - - @Test - public void shouldThrowUnsupportedOperationExceptionOnPartition() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.partition() - ); - } - - @Test - public void shouldThrowUnsupportedOperationExceptionOnOffset() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.offset() - ); - } - - @Test - public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - - context = getStandbyContext(); - assertThrows( - UnsupportedOperationException.class, - () -> context.recordContext.timestamp() - ); - } +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnTopic() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.topic() +// ); +// } +// +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnPartition() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.partition() +// ); +// } +// +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnOffset() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.offset() +// ); +// } +// +// @Test +// public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { +// foreachSetUp(); +// +// when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +// +// context = buildProcessorContextImpl(streamsConfig, stateManager); +// +// final StreamTask task = mock(StreamTask.class); +// context.transitionToActive(task, null, null); +// +// mockProcessorNodeWithLocalKeyValueStore(); +// +// context = getStandbyContext(); +// assertThrows( +// UnsupportedOperationException.class, +// () -> context.timestamp() +// ); +// } @Test public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java index ed7b47e62bf9c..a2b012eaa0852 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java @@ -25,13 +25,13 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.time.Duration; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.jupiter.api.Assertions.fail; +//import org.junit.jupiter.api.Test; +// +//import java.time.Duration; +// +//import static org.hamcrest.MatcherAssert.assertThat; +//import static org.hamcrest.Matchers.equalTo; +//import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -58,23 +58,23 @@ public void prepare() { ((InternalProcessorContext) context).transitionToActive(mock(StreamTask.class), null, null); } - @Test - public void shouldNotAllowToScheduleZeroMillisecondPunctuation() { - try { - context.schedule(Duration.ofMillis(0L), null, null); - fail("Should have thrown IllegalArgumentException"); - } catch (final IllegalArgumentException expected) { - assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond.")); - } - } - - @Test - public void shouldNotAllowToScheduleSubMillisecondPunctuation() { - try { - context.schedule(Duration.ofNanos(999_999L), null, null); - fail("Should have thrown IllegalArgumentException"); - } catch (final IllegalArgumentException expected) { - assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond.")); - } - } +// @Test +// public void shouldNotAllowToScheduleZeroMillisecondPunctuation() { +// try { +// context.schedule(Duration.ofMillis(0L), null, null); +// fail("Should have thrown IllegalArgumentException"); +// } catch (final IllegalArgumentException expected) { +// assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond.")); +// } +// } +// +// @Test +// public void shouldNotAllowToScheduleSubMillisecondPunctuation() { +// try { +// context.schedule(Duration.ofNanos(999_999L), null, null); +// fail("Should have thrown IllegalArgumentException"); +// } catch (final IllegalArgumentException expected) { +// assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond.")); +// } +// } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 5b4303a16955e..ca11b5f8e84ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -322,9 +322,9 @@ private InternalProcessorContext mockInternalProcessorContext() when(internalProcessorContext.taskId()).thenReturn(TASK_ID); when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime())); - when(internalProcessorContext.topic()).thenReturn(TOPIC); - when(internalProcessorContext.partition()).thenReturn(PARTITION); - when(internalProcessorContext.offset()).thenReturn(OFFSET); +// when(internalProcessorContext.topic()).thenReturn(TOPIC); +// when(internalProcessorContext.partition()).thenReturn(PARTITION); +// when(internalProcessorContext.offset()).thenReturn(OFFSET); when(internalProcessorContext.recordContext()).thenReturn( new ProcessorRecordContext( TIMESTAMP, @@ -353,12 +353,12 @@ public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.Processin @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { - assertEquals(internalProcessorContext.topic(), context.topic()); - assertEquals(internalProcessorContext.partition(), context.partition()); - assertEquals(internalProcessorContext.offset(), context.offset()); +// assertEquals(internalProcessorContext.topic(), context.topic()); +// assertEquals(internalProcessorContext.partition(), context.partition()); +// assertEquals(internalProcessorContext.offset(), context.offset()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); - assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp()); +// assertEquals(internalProcessorContext.timestamp(), context.timestamp()); assertEquals(KEY, record.key()); assertEquals(VALUE, record.value()); assertInstanceOf(RuntimeException.class, exception); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 5dab53290263a..d89aa18defdba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1716,13 +1716,13 @@ public void shouldNotShareHeadersBetweenPunctuateIterations() { processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, - timestamp -> task.processorContext().headers().add("dummy", null) + timestamp -> task.processorContext().recordContext().headers().add("dummy", null) ); task.punctuate( processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, - timestamp -> assertFalse(task.processorContext().headers().iterator().hasNext()) + timestamp -> assertFalse(task.processorContext().recordContext().headers().iterator().hasNext()) ); } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 228df8d63a1ac..105cf087f0006 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; @@ -344,33 +344,33 @@ public void forward(final Record record, } } - @Override - public void forward(final Object key, final Object value) { - forward(key, value, To.all()); - } - - @SuppressWarnings("unchecked") - @Override - public void forward(final Object key, final Object value, final To to) { - toInternal.update(to); - if (toInternal.hasTimestamp()) { - setTime(toInternal.timestamp()); - } - final ProcessorNode thisNode = currentNode; - try { - for (final ProcessorNode childNode : thisNode.children()) { - if (toInternal.child() == null || toInternal.child().equals(childNode.name())) { - currentNode = childNode; - final Record record = new Record<>(key, value, toInternal.timestamp(), headers()); - ((ProcessorNode) childNode).process(record); - toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple - // Processors and toInternal might have been modified - } - } - } finally { - currentNode = thisNode; - } - } +// @Override +// public void forward(final Object key, final Object value) { +// forward(key, value, To.all()); +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public void forward(final Object key, final Object value, final To to) { +// toInternal.update(to); +// if (toInternal.hasTimestamp()) { +// setTime(toInternal.timestamp()); +// } +// final ProcessorNode thisNode = currentNode; +// try { +// for (final ProcessorNode childNode : thisNode.children()) { +// if (toInternal.child() == null || toInternal.child().equals(childNode.name())) { +// currentNode = childNode; +// final Record record = new Record<>(key, value, toInternal.timestamp(), headers()); +// ((ProcessorNode) childNode).process(record); +// toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple +// // Processors and toInternal might have been modified +// } +// } +// } finally { +// currentNode = thisNode; +// } +// } // allow only setting time but not other fields in for record context, // and also not throwing exceptions if record context is not available. @@ -387,13 +387,13 @@ public void setTime(final long timestamp) { this.timestamp = timestamp; } - @Override - public long timestamp() { - if (recordContext == null) { - return timestamp; - } - return recordContext.timestamp(); - } +// @Override +// public long timestamp() { +// if (recordContext == null) { +// return timestamp; +// } +// return recordContext.timestamp(); +// } @Override public long currentSystemTimeMs() { @@ -405,37 +405,37 @@ public long currentStreamTimeMs() { throw new UnsupportedOperationException("this method is not supported in InternalMockProcessorContext"); } - @Override - public String topic() { - if (recordContext == null) { - return null; - } - return recordContext.topic(); - } - - @Override - public int partition() { - if (recordContext == null) { - return -1; - } - return recordContext.partition(); - } - - @Override - public long offset() { - if (recordContext == null) { - return -1L; - } - return recordContext.offset(); - } - - @Override - public Headers headers() { - if (recordContext == null) { - return new RecordHeaders(); - } - return recordContext.headers(); - } +// @Override +// public String topic() { +// if (recordContext == null) { +// return null; +// } +// return recordContext.topic(); +// } +// +// @Override +// public int partition() { +// if (recordContext == null) { +// return -1; +// } +// return recordContext.partition(); +// } +// +// @Override +// public long offset() { +// if (recordContext == null) { +// return -1L; +// } +// return recordContext.offset(); +// } +// +// @Override +// public Headers headers() { +// if (recordContext == null) { +// return new RecordHeaders(); +// } +// return recordContext.headers(); +// } @Override public TaskType taskType() { diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 019410642fcf8..9e6a737c46e9c 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -89,7 +89,8 @@ public StreamsMetricsImpl metrics() { @Override public ProcessorRecordContext recordContext() { - return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers()); + //return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers()); + return null; } @Override @@ -160,43 +161,43 @@ public StateRestoreCallback stateRestoreCallback(final String storeName) { return restoreCallbacks.get(storeName); } - @Override - public void forward(K key, V value) { - throw new UnsupportedOperationException("Migrate to new implementation"); - } - - @Override - public void forward(K key, V value, To to) { - throw new UnsupportedOperationException("Migrate to new implementation"); - } - - @Override - public String topic() { - if (recordMetadata().isPresent()) return recordMetadata().get().topic(); - else return null; - } - - @Override - public int partition() { - if (recordMetadata().isPresent()) return recordMetadata().get().partition(); - else return 0; - } - - @Override - public long offset() { - if (recordMetadata().isPresent()) return recordMetadata().get().offset(); - else return 0; - } - - @Override - public Headers headers() { - return headers; - } - - @Override - public long timestamp() { - return timestamp; - } +// @Override +// public void forward(K key, V value) { +// throw new UnsupportedOperationException("Migrate to new implementation"); +// } +// +// @Override +// public void forward(K key, V value, To to) { +// throw new UnsupportedOperationException("Migrate to new implementation"); +// } +// +// @Override +// public String topic() { +// if (recordMetadata().isPresent()) return recordMetadata().get().topic(); +// else return null; +// } +// +// @Override +// public int partition() { +// if (recordMetadata().isPresent()) return recordMetadata().get().partition(); +// else return 0; +// } +// +// @Override +// public long offset() { +// if (recordMetadata().isPresent()) return recordMetadata().get().offset(); +// else return 0; +// } +// +// @Override +// public Headers headers() { +// return headers; +// } +// +// @Override +// public long timestamp() { +// return timestamp; +// } @Override public TaskType taskType() { diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 734f744f20f54..108ef148552c6 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; @@ -89,15 +89,15 @@ public void forward(final Record record, final String childName) { forwardedValues.put(record.key(), record.value()); } - @Override - public void forward(final K key, final V value) { - forwardedValues.put(key, value); - } - - @Override - public void forward(final K key, final V value, final To to) { - forwardedValues.put(key, value); - } +// @Override +// public void forward(final K key, final V value) { +// forwardedValues.put(key, value); +// } +// +// @Override +// public void forward(final K key, final V value, final To to) { +// forwardedValues.put(key, value); +// } @Override public void commit() {} diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index fc7d27a3bb792..53a92712097a4 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -20,12 +20,12 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.serialization.Serde; +//import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.internals.ApiUtils; +//import org.apache.kafka.streams.StreamsMetrics; +//import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import java.io.File; -import java.time.Duration; +//import java.time.Duration; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -61,13 +61,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S private final File stateDir; // settable record metadata ================================================ - private String topic; - private Integer partition; - private Long offset; - private Headers headers; - private Long recordTimestamp; - private Long currentSystemTimeMs; - private Long currentStreamTimeMs; +// private String topic; +// private Integer partition; +// private Long offset; +// private Headers headers; +// private Long recordTimestamp; +// private Long currentSystemTimeMs; +// private Long currentStreamTimeMs; // mocks ================================================ private final Map stateStores = new HashMap<>(); @@ -243,64 +243,64 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); } - @Override - public String applicationId() { - return config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - } - - @Override - public TaskId taskId() { - return taskId; - } - - @Override - public Map appConfigs() { - final Map combined = new HashMap<>(); - combined.putAll(config.originals()); - combined.putAll(config.values()); - return combined; - } - - @Override - public Map appConfigsWithPrefix(final String prefix) { - return config.originalsWithPrefix(prefix); - } - - @Override - public long currentSystemTimeMs() { - if (currentSystemTimeMs == null) { - throw new IllegalStateException("System time must be set before use via setCurrentSystemTimeMs()."); - } - return currentSystemTimeMs; - } - - @Override - public long currentStreamTimeMs() { - if (currentStreamTimeMs == null) { - throw new IllegalStateException("Stream time must be set before use via setCurrentStreamTimeMs()."); - } - return currentStreamTimeMs; - } - - @Override - public Serde keySerde() { - return config.defaultKeySerde(); - } - - @Override - public Serde valueSerde() { - return config.defaultValueSerde(); - } - - @Override - public File stateDir() { - return stateDir; - } - - @Override - public StreamsMetrics metrics() { - return metrics; - } +// @Override +// public String applicationId() { +// return config.getString(StreamsConfig.APPLICATION_ID_CONFIG); +// } +// +// @Override +// public TaskId taskId() { +// return taskId; +// } +// +// @Override +// public Map appConfigs() { +// final Map combined = new HashMap<>(); +// combined.putAll(config.originals()); +// combined.putAll(config.values()); +// return combined; +// } +// +// @Override +// public Map appConfigsWithPrefix(final String prefix) { +// return config.originalsWithPrefix(prefix); +// } +// +// @Override +// public long currentSystemTimeMs() { +// if (currentSystemTimeMs == null) { +// throw new IllegalStateException("System time must be set before use via setCurrentSystemTimeMs()."); +// } +// return currentSystemTimeMs; +// } +// +// @Override +// public long currentStreamTimeMs() { +// if (currentStreamTimeMs == null) { +// throw new IllegalStateException("Stream time must be set before use via setCurrentStreamTimeMs()."); +// } +// return currentStreamTimeMs; +// } +// +// @Override +// public Serde keySerde() { +// return config.defaultKeySerde(); +// } +// +// @Override +// public Serde valueSerde() { +// return config.defaultValueSerde(); +// } +// +// @Override +// public File stateDir() { +// return stateDir; +// } +// +// @Override +// public StreamsMetrics metrics() { +// return metrics; +// } // settable record metadata ================================================ @@ -319,11 +319,11 @@ public void setRecordMetadata(final String topic, final long offset, final Headers headers, final long timestamp) { - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.headers = headers; - this.recordTimestamp = timestamp; +// this.topic = topic; +// this.partition = partition; +// this.offset = offset; +// this.headers = headers; +// this.recordTimestamp = timestamp; } /** @@ -334,7 +334,7 @@ public void setRecordMetadata(final String topic, */ @SuppressWarnings({"WeakerAccess", "unused"}) public void setTopic(final String topic) { - this.topic = topic; +// this.topic = topic; } /** @@ -345,7 +345,7 @@ public void setTopic(final String topic) { */ @SuppressWarnings({"WeakerAccess", "unused"}) public void setPartition(final int partition) { - this.partition = partition; +// this.partition = partition; } /** @@ -356,7 +356,7 @@ public void setPartition(final int partition) { */ @SuppressWarnings({"WeakerAccess", "unused"}) public void setOffset(final long offset) { - this.offset = offset; +// this.offset = offset; } /** @@ -367,7 +367,7 @@ public void setOffset(final long offset) { */ @SuppressWarnings({"WeakerAccess", "unused"}) public void setHeaders(final Headers headers) { - this.headers = headers; +// this.headers = headers; } @@ -379,94 +379,94 @@ public void setHeaders(final Headers headers) { */ @SuppressWarnings({"WeakerAccess"}) public void setRecordTimestamp(final long recordTimestamp) { - this.recordTimestamp = recordTimestamp; +// this.recordTimestamp = recordTimestamp; } public void setCurrentSystemTimeMs(final long currentSystemTimeMs) { - this.currentSystemTimeMs = currentSystemTimeMs; +// this.currentSystemTimeMs = currentSystemTimeMs; } public void setCurrentStreamTimeMs(final long currentStreamTimeMs) { - this.currentStreamTimeMs = currentStreamTimeMs; - } - - @Override - public String topic() { - if (topic == null) { - throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic()."); - } - return topic; - } - - @Override - public int partition() { - if (partition == null) { - throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition()."); - } - return partition; - } - - @Override - public long offset() { - if (offset == null) { - throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset()."); - } - return offset; - } - - /** - * Returns the headers of the current input record; could be {@code null} if it is not - * available. - * - *

Note, that headers should never be {@code null} in the actual Kafka Streams runtime, - * even if they could be empty. However, this mock does not guarantee non-{@code null} headers. - * Thus, you either need to add a {@code null} check to your production code to use this mock - * for testing, or you always need to set headers manually via {@link #setHeaders(Headers)} to - * avoid a {@link NullPointerException} from your {@link org.apache.kafka.streams.kstream.ValueTransformer}implementation. - * - * @return the headers - */ - @Override - public Headers headers() { - return headers; - } - - @Override - public long timestamp() { - if (recordTimestamp == null) { - throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setRecordTimestamp()."); - } - return recordTimestamp; - } - - // mocks ================================================ - - @Override - public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) { - stateStores.put(store.name(), store); - } - - @SuppressWarnings("unchecked") - @Override - public S getStateStore(final String name) { - return (S) stateStores.get(name); - } - - @Override - public Cancellable schedule(final Duration interval, - final PunctuationType type, - final Punctuator callback) throws IllegalArgumentException { - final long intervalMs = ApiUtils.validateMillisecondDuration(interval, "interval"); - if (intervalMs < 1) { - throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond."); - } - final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback); - - punctuators.add(capturedPunctuator); - - return capturedPunctuator::cancel; - } +// this.currentStreamTimeMs = currentStreamTimeMs; + } + +// @Override +// public String topic() { +// if (topic == null) { +// throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic()."); +// } +// return topic; +// } +// +// @Override +// public int partition() { +// if (partition == null) { +// throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition()."); +// } +// return partition; +// } +// +// @Override +// public long offset() { +// if (offset == null) { +// throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset()."); +// } +// return offset; +// } +// +// /** +// * Returns the headers of the current input record; could be {@code null} if it is not +// * available. +// * +// *

Note, that headers should never be {@code null} in the actual Kafka Streams runtime, +// * even if they could be empty. However, this mock does not guarantee non-{@code null} headers. +// * Thus, you either need to add a {@code null} check to your production code to use this mock +// * for testing, or you always need to set headers manually via {@link #setHeaders(Headers)} to +// * avoid a {@link NullPointerException} from your {@link org.apache.kafka.streams.kstream.ValueTransformer}implementation. +// * +// * @return the headers +// */ +// @Override +// public Headers headers() { +// return headers; +// } +// +// @Override +// public long timestamp() { +// if (recordTimestamp == null) { +// throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setRecordTimestamp()."); +// } +// return recordTimestamp; +// } +// +// // mocks ================================================ +// +// @Override +// public void register(final StateStore store, +// final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) { +// stateStores.put(store.name(), store); +// } +// +// @SuppressWarnings("unchecked") +// @Override +// public S getStateStore(final String name) { +// return (S) stateStores.get(name); +// } +// +// @Override +// public Cancellable schedule(final Duration interval, +// final PunctuationType type, +// final Punctuator callback) throws IllegalArgumentException { +// final long intervalMs = ApiUtils.validateMillisecondDuration(interval, "interval"); +// if (intervalMs < 1) { +// throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond."); +// } +// final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback); +// +// punctuators.add(capturedPunctuator); +// +// return capturedPunctuator::cancel; +// } /** * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}. @@ -478,21 +478,21 @@ public List scheduledPunctuators() { return new LinkedList<>(punctuators); } - @Override - public void forward(final K key, final V value) { - forward(key, value, To.all()); - } - - @Override - public void forward(final K key, final V value, final To to) { - capturedForwards.add( - new CapturedForward( - new KeyValue<>(key, value), - to.timestamp == -1 ? to.withTimestamp(recordTimestamp == null ? -1 : recordTimestamp) : to, - headers - ) - ); - } +// @Override +// public void forward(final K key, final V value) { +// forward(key, value, To.all()); +// } +// +// @Override +// public void forward(final K key, final V value, final To to) { +// capturedForwards.add( +// new CapturedForward( +// new KeyValue<>(key, value), +// to.timestamp == -1 ? to.withTimestamp(recordTimestamp == null ? -1 : recordTimestamp) : to, +// headers +// ) +// ); +// } /** * Get all the forwarded data this context has observed. The returned list will not be @@ -532,10 +532,10 @@ public void resetForwards() { capturedForwards.clear(); } - @Override - public void commit() { - committed = true; - } +// @Override +// public void commit() { +// committed = true; +// } /** * Whether {@link ProcessorContext#commit()} has been called in this context. diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index ebb38dd773ad6..a13d45edc2082 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -16,451 +16,451 @@ */ package org.apache.kafka.streams; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.MockProcessorContext; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; - -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.time.Duration; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +//import org.apache.kafka.common.header.internals.RecordHeaders; +//import org.apache.kafka.common.metrics.MetricConfig; +//import org.apache.kafka.common.metrics.Metrics; +//import org.apache.kafka.common.serialization.Serdes; +//import org.apache.kafka.common.utils.Time; +//import org.apache.kafka.streams.kstream.Transformer; +//import org.apache.kafka.streams.processor.MockProcessorContext; +//import org.apache.kafka.streams.processor.ProcessorContext; +//import org.apache.kafka.streams.processor.PunctuationType; +//import org.apache.kafka.streams.processor.Punctuator; +//import org.apache.kafka.streams.processor.StateStore; +//import org.apache.kafka.streams.processor.TaskId; +//import org.apache.kafka.streams.processor.To; +//import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +//import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +//import org.apache.kafka.streams.state.KeyValueStore; +//import org.apache.kafka.streams.state.StoreBuilder; +//import org.apache.kafka.streams.state.Stores; +// +//import org.junit.jupiter.api.Test; +// +//import java.io.File; +//import java.time.Duration; +//import java.util.HashMap; +//import java.util.Iterator; +//import java.util.Map; +//import java.util.Properties; +// +//import static org.junit.jupiter.api.Assertions.assertEquals; +//import static org.junit.jupiter.api.Assertions.assertFalse; +//import static org.junit.jupiter.api.Assertions.assertNull; +//import static org.junit.jupiter.api.Assertions.assertTrue; +//import static org.junit.jupiter.api.Assertions.fail; +//import static org.mockito.ArgumentMatchers.any; +//import static org.mockito.ArgumentMatchers.anyString; +//import static org.mockito.Mockito.doAnswer; +//import static org.mockito.Mockito.mock; +//import static org.mockito.Mockito.when; @Deprecated public class MockProcessorContextTest { - - @Test - public void shouldCaptureOutputRecords() { - final Transformer> transformer = new Transformer<>() { - private ProcessorContext context; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public KeyValue transform(final String key, final Long value) { - context.forward(key + value, key.length() + value); - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(); - transformer.init(context); - - transformer.transform("foo", 5L); - transformer.transform("barbaz", 50L); - - final Iterator forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - - context.resetForwards(); - - assertEquals(0, context.forwarded().size()); - } - - @Test - public void shouldCaptureOutputRecordsUsingTo() { - final Transformer> transformer = new Transformer<>() { - private ProcessorContext context; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public KeyValue transform(final String key, final Long value) { - context.forward(key + value, key.length() + value); - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(); - - transformer.init(context); - - transformer.transform("foo", 5L); - transformer.transform("barbaz", 50L); - - final Iterator forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - - context.resetForwards(); - - assertEquals(0, context.forwarded().size()); - } - - @Test - public void shouldCaptureRecordsOutputToChildByName() { - final Transformer> transformer = new Transformer<>() { - private ProcessorContext context; - private int count = 0; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public KeyValue transform(final String key, final Long value) { - if (count == 0) { - context.forward("start", -1L, To.all()); // broadcast - } - final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete"); - context.forward(key + value, key.length() + value, toChild); - count++; - - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(); - - transformer.init(context); - - transformer.transform("foo", 5L); - transformer.transform("barbaz", 50L); - - { - final Iterator forwarded = context.forwarded().iterator(); - - final MockProcessorContext.CapturedForward forward1 = forwarded.next(); - assertEquals(new KeyValue<>("start", -1L), forward1.keyValue()); - assertNull(forward1.childName()); - - final MockProcessorContext.CapturedForward forward2 = forwarded.next(); - assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue()); - assertEquals("george", forward2.childName()); - - final MockProcessorContext.CapturedForward forward3 = forwarded.next(); - assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue()); - assertEquals("pete", forward3.childName()); - - assertFalse(forwarded.hasNext()); - } - - { - final Iterator forwarded = context.forwarded("george").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - - { - final Iterator forwarded = context.forwarded("pete").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - - { - final Iterator forwarded = context.forwarded("steve").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - } - - @Test - public void shouldCaptureCommitsAndAllowReset() { - final Transformer transformer = new Transformer<>() { - private ProcessorContext context; - private int count = 0; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public Object transform(final String key, final Long value) { - if (++count > 2) { - context.commit(); - } - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(); - - transformer.init(context); - - transformer.transform("foo", 5L); - transformer.transform("barbaz", 50L); - - assertFalse(context.committed()); - - transformer.transform("foobar", 500L); - - assertTrue(context.committed()); - - context.resetCommit(); - - assertFalse(context.committed()); - } - - @Test - public void shouldStoreAndReturnStateStores() { - final Transformer transformer = new Transformer<>() { - private ProcessorContext context; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public Object transform(final String key, final Long value) { - final KeyValueStore stateStore = context.getStateStore("my-state"); - stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value); - stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value); - - return null; - } - - @Override - public void close() { } - }; - - final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("my-state"), - Serdes.String(), - Serdes.Long()).withLoggingDisabled(); - - final KeyValueStore store = storeBuilder.build(); - - final InternalProcessorContext mockInternalProcessorContext = mock(InternalProcessorContext.class); - final Map stateStores = new HashMap<>(); - doAnswer(invocation -> { - final StateStore stateStore = invocation.getArgument(0); - stateStores.put(stateStore.name(), stateStore); - return null; - }).when(mockInternalProcessorContext).register(any(), any()); - when(mockInternalProcessorContext.getStateStore(anyString())).thenAnswer(invocation -> { - final String name = invocation.getArgument(0); - return stateStores.get(name); - } - ); - when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl( - new Metrics(new MetricConfig()), - Thread.currentThread().getName(), - "processId", - Time.SYSTEM - )); - when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1)); - - store.init(mockInternalProcessorContext, store); - - transformer.init(mockInternalProcessorContext); - - transformer.transform("foo", 5L); - transformer.transform("bar", 50L); - - assertEquals(5L, (long) store.get("foo")); - assertEquals(50L, (long) store.get("bar")); - assertEquals(55L, (long) store.get("all")); - } - - @Test - public void shouldCaptureApplicationAndRecordMetadata() { - final Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); - - final Transformer> transformer = new Transformer<>() { - private ProcessorContext context; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public KeyValue transform(final String key, final Long value) { - context.forward("appId", context.applicationId()); - context.forward("taskId", context.taskId()); - - context.forward("topic", context.topic()); - context.forward("partition", context.partition()); - context.forward("offset", context.offset()); - context.forward("timestamp", context.timestamp()); - - context.forward("key", key); - context.forward("value", value); - - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(config); - transformer.init(context); - - try { - transformer.transform("foo", 5L); - fail("Should have thrown an exception."); - } catch (final IllegalStateException expected) { - // expected, since the record metadata isn't initialized - } - - context.resetForwards(); - context.setRecordMetadata("t1", 0, 0L, new RecordHeaders(), 0L); - - { - transformer.transform("foo", 5L); - final Iterator forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue()); - } - - context.resetForwards(); - - // record metadata should be "sticky" - context.setOffset(1L); - context.setRecordTimestamp(10L); - context.setCurrentSystemTimeMs(20L); - context.setCurrentStreamTimeMs(30L); - - { - transformer.transform("bar", 50L); - final Iterator forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue()); - assertEquals(20L, context.currentSystemTimeMs()); - assertEquals(30L, context.currentStreamTimeMs()); - } - - context.resetForwards(); - // record metadata should be "sticky" - context.setTopic("t2"); - context.setPartition(30); - - { - transformer.transform("baz", 500L); - final Iterator forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue()); - } - } - - @Test - public void shouldCapturePunctuator() { - final Transformer> transformer = new Transformer<>() { - @Override - public void init(final ProcessorContext context) { - context.schedule( - Duration.ofSeconds(1L), - PunctuationType.WALL_CLOCK_TIME, - timestamp -> context.commit() - ); - } - - @Override - public KeyValue transform(final String key, final Long value) { - return null; - } - - @Override - public void close() { } - }; - - final MockProcessorContext context = new MockProcessorContext(); - - transformer.init(context); - - final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); - assertEquals(1000L, capturedPunctuator.getIntervalMs()); - assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType()); - assertFalse(capturedPunctuator.cancelled()); - - final Punctuator punctuator = capturedPunctuator.getPunctuator(); - assertFalse(context.committed()); - punctuator.punctuate(1234L); - assertTrue(context.committed()); - } - - @SuppressWarnings("resource") - @Test - public void fullConstructorShouldSetAllExpectedAttributes() { - final Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor"); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); - - final File dummyFile = new File(""); - final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile); - - assertEquals("testFullConstructor", context.applicationId()); - assertEquals(new TaskId(1, 1), context.taskId()); - assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG)); - assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id")); - assertEquals(Serdes.StringSerde.class, context.keySerde().getClass()); - assertEquals(Serdes.LongSerde.class, context.valueSerde().getClass()); - assertEquals(dummyFile, context.stateDir()); - } +// +// @Test +// public void shouldCaptureOutputRecords() { +// final Transformer> transformer = new Transformer<>() { +// private ProcessorContext context; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public KeyValue transform(final String key, final Long value) { +// context.forward(key + value, key.length() + value); +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(); +// transformer.init(context); +// +// transformer.transform("foo", 5L); +// transformer.transform("barbaz", 50L); +// +// final Iterator forwarded = context.forwarded().iterator(); +// assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); +// assertFalse(forwarded.hasNext()); +// +// context.resetForwards(); +// +// assertEquals(0, context.forwarded().size()); +// } +// +// @Test +// public void shouldCaptureOutputRecordsUsingTo() { +// final Transformer> transformer = new Transformer<>() { +// private ProcessorContext context; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public KeyValue transform(final String key, final Long value) { +// context.forward(key + value, key.length() + value); +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(); +// +// transformer.init(context); +// +// transformer.transform("foo", 5L); +// transformer.transform("barbaz", 50L); +// +// final Iterator forwarded = context.forwarded().iterator(); +// assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); +// assertFalse(forwarded.hasNext()); +// +// context.resetForwards(); +// +// assertEquals(0, context.forwarded().size()); +// } +// +// @Test +// public void shouldCaptureRecordsOutputToChildByName() { +// final Transformer> transformer = new Transformer<>() { +// private ProcessorContext context; +// private int count = 0; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public KeyValue transform(final String key, final Long value) { +// if (count == 0) { +// context.forward("start", -1L, To.all()); // broadcast +// } +// final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete"); +// context.forward(key + value, key.length() + value, toChild); +// count++; +// +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(); +// +// transformer.init(context); +// +// transformer.transform("foo", 5L); +// transformer.transform("barbaz", 50L); +// +// { +// final Iterator forwarded = context.forwarded().iterator(); +// +// final MockProcessorContext.CapturedForward forward1 = forwarded.next(); +// assertEquals(new KeyValue<>("start", -1L), forward1.keyValue()); +// assertNull(forward1.childName()); +// +// final MockProcessorContext.CapturedForward forward2 = forwarded.next(); +// assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue()); +// assertEquals("george", forward2.childName()); +// +// final MockProcessorContext.CapturedForward forward3 = forwarded.next(); +// assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue()); +// assertEquals("pete", forward3.childName()); +// +// assertFalse(forwarded.hasNext()); +// } +// +// { +// final Iterator forwarded = context.forwarded("george").iterator(); +// assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); +// assertFalse(forwarded.hasNext()); +// } +// +// { +// final Iterator forwarded = context.forwarded("pete").iterator(); +// assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); +// assertFalse(forwarded.hasNext()); +// } +// +// { +// final Iterator forwarded = context.forwarded("steve").iterator(); +// assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); +// assertFalse(forwarded.hasNext()); +// } +// } +// +// @Test +// public void shouldCaptureCommitsAndAllowReset() { +// final Transformer transformer = new Transformer<>() { +// private ProcessorContext context; +// private int count = 0; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public Object transform(final String key, final Long value) { +// if (++count > 2) { +// context.commit(); +// } +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(); +// +// transformer.init(context); +// +// transformer.transform("foo", 5L); +// transformer.transform("barbaz", 50L); +// +// assertFalse(context.committed()); +// +// transformer.transform("foobar", 500L); +// +// assertTrue(context.committed()); +// +// context.resetCommit(); +// +// assertFalse(context.committed()); +// } +// +// @Test +// public void shouldStoreAndReturnStateStores() { +// final Transformer transformer = new Transformer<>() { +// private ProcessorContext context; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public Object transform(final String key, final Long value) { +// final KeyValueStore stateStore = context.getStateStore("my-state"); +// stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value); +// stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value); +// +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( +// Stores.inMemoryKeyValueStore("my-state"), +// Serdes.String(), +// Serdes.Long()).withLoggingDisabled(); +// +// final KeyValueStore store = storeBuilder.build(); +// +// final InternalProcessorContext mockInternalProcessorContext = mock(InternalProcessorContext.class); +// final Map stateStores = new HashMap<>(); +// doAnswer(invocation -> { +// final StateStore stateStore = invocation.getArgument(0); +// stateStores.put(stateStore.name(), stateStore); +// return null; +// }).when(mockInternalProcessorContext).register(any(), any()); +// when(mockInternalProcessorContext.getStateStore(anyString())).thenAnswer(invocation -> { +// final String name = invocation.getArgument(0); +// return stateStores.get(name); +// } +// ); +// when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl( +// new Metrics(new MetricConfig()), +// Thread.currentThread().getName(), +// "processId", +// Time.SYSTEM +// )); +// when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1)); +// +// store.init(mockInternalProcessorContext, store); +// +// transformer.init(mockInternalProcessorContext); +// +// transformer.transform("foo", 5L); +// transformer.transform("bar", 50L); +// +// assertEquals(5L, (long) store.get("foo")); +// assertEquals(50L, (long) store.get("bar")); +// assertEquals(55L, (long) store.get("all")); +// } +// +// @Test +// public void shouldCaptureApplicationAndRecordMetadata() { +// final Properties config = new Properties(); +// config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); +// +// final Transformer> transformer = new Transformer<>() { +// private ProcessorContext context; +// +// @Override +// public void init(final ProcessorContext context) { +// this.context = context; +// } +// +// @Override +// public KeyValue transform(final String key, final Long value) { +// context.forward("appId", context.applicationId()); +// context.forward("taskId", context.taskId()); +// +// context.forward("topic", context.topic()); +// context.forward("partition", context.partition()); +// context.forward("offset", context.offset()); +// context.forward("timestamp", context.timestamp()); +// +// context.forward("key", key); +// context.forward("value", value); +// +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(config); +// transformer.init(context); +// +// try { +// transformer.transform("foo", 5L); +// fail("Should have thrown an exception."); +// } catch (final IllegalStateException expected) { +// // expected, since the record metadata isn't initialized +// } +// +// context.resetForwards(); +// context.setRecordMetadata("t1", 0, 0L, new RecordHeaders(), 0L); +// +// { +// transformer.transform("foo", 5L); +// final Iterator forwarded = context.forwarded().iterator(); +// assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue()); +// } +// +// context.resetForwards(); +// +// // record metadata should be "sticky" +// context.setOffset(1L); +// context.setRecordTimestamp(10L); +// context.setCurrentSystemTimeMs(20L); +// context.setCurrentStreamTimeMs(30L); +// +// { +// transformer.transform("bar", 50L); +// final Iterator forwarded = context.forwarded().iterator(); +// assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue()); +// assertEquals(20L, context.currentSystemTimeMs()); +// assertEquals(30L, context.currentStreamTimeMs()); +// } +// +// context.resetForwards(); +// // record metadata should be "sticky" +// context.setTopic("t2"); +// context.setPartition(30); +// +// { +// transformer.transform("baz", 500L); +// final Iterator forwarded = context.forwarded().iterator(); +// assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue()); +// assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue()); +// } +// } +// +// @Test +// public void shouldCapturePunctuator() { +// final Transformer> transformer = new Transformer<>() { +// @Override +// public void init(final ProcessorContext context) { +// context.schedule( +// Duration.ofSeconds(1L), +// PunctuationType.WALL_CLOCK_TIME, +// timestamp -> context.commit() +// ); +// } +// +// @Override +// public KeyValue transform(final String key, final Long value) { +// return null; +// } +// +// @Override +// public void close() { } +// }; +// +// final MockProcessorContext context = new MockProcessorContext(); +// +// transformer.init(context); +// +// final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); +// assertEquals(1000L, capturedPunctuator.getIntervalMs()); +// assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType()); +// assertFalse(capturedPunctuator.cancelled()); +// +// final Punctuator punctuator = capturedPunctuator.getPunctuator(); +// assertFalse(context.committed()); +// punctuator.punctuate(1234L); +// assertTrue(context.committed()); +// } +// +// @SuppressWarnings("resource") +// @Test +// public void fullConstructorShouldSetAllExpectedAttributes() { +// final Properties config = new Properties(); +// config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor"); +// config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); +// config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); +// +// final File dummyFile = new File(""); +// final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile); +// +// assertEquals("testFullConstructor", context.applicationId()); +// assertEquals(new TaskId(1, 1), context.taskId()); +// assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG)); +// assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id")); +// assertEquals(Serdes.StringSerde.class, context.keySerde().getClass()); +// assertEquals(Serdes.LongSerde.class, context.valueSerde().getClass()); +// assertEquals(dummyFile, context.stateDir()); +// } }