diff --git a/LICENSE-binary b/LICENSE-binary index 5383cf60ec6e5..686ebe39030e1 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final netty-transport-native-unix-common-4.1.115.Final opentelemetry-proto-1.0.0-alpha plexus-utils-3.5.1 -rocksdbjni-7.9.2 +rocksdbjni-9.7.3 scala-library-2.13.15 scala-logging_2.13-3.9.5 scala-reflect-2.13.15 diff --git a/docs/ops.html b/docs/ops.html index 02ea5bd788f5d..18a1bf44c5cba 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3475,7 +3475,7 @@
Streams API See KIP-1112 for more details.

+

+ Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes. + The org.rocksdb.AccessHint class, along with its associated methods, has been removed. + Several methods related to compressed block cache configuration in the BlockBasedTableConfig class have been removed, including blockCacheCompressedNumShardBits, blockCacheCompressedSize, and their corresponding setters. These functionalities are now consolidated under the cache option, and developers should configure their compressed block cache using the setCache method instead. + The NO_FILE_CLOSES field has been removed from the org.rocksdb.TickerTypeenum as a result the number-open-files metrics does not work as expected. Metric number-open-files returns constant -1 from now on until it will officially be removed. + The org.rocksdb.Options.setLogger() method now accepts a LoggerInterface as a parameter instead of the previous Logger. + Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations. + These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the rocksdb.config.setter, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed AccessHint class, the removed methods from the BlockBasedTableConfig class, the NO_FILE_CLOSES field from TickerType, or relying on the previous signature of setLogger() will need to update their implementations. +

+

Streams API changes in 3.9.0

diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 52ef2fcfde1b4..473edec0c6619 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -119,7 +119,7 @@ versions += [ protobuf: "3.25.5", // a dependency of opentelemetryProto pcollections: "4.0.1", re2j: "1.7", - rocksDB: "7.9.2", + rocksDB: "9.7.3", // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index b7b611f8be0d3..5b2f1e06b344f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -22,7 +22,6 @@ import org.rocksdb.AbstractEventListener; import org.rocksdb.AbstractSlice; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; @@ -37,6 +36,7 @@ import org.rocksdb.DbPath; import org.rocksdb.Env; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LoggerInterface; import org.rocksdb.MemTableConfig; import org.rocksdb.MergeOperator; import org.rocksdb.Options; @@ -332,14 +332,6 @@ public Statistics statistics() { return dbOptions.statistics(); } - @Deprecated - public int baseBackgroundCompactions() { - final String message = "This method has been removed from the underlying RocksDB. " + - "It is currently a no-op method which returns a default value of -1."; - log.warn(message); - return -1; - } - @Override public Options setMaxSubcompactions(final int maxSubcompactions) { dbOptions.setMaxSubcompactions(maxSubcompactions); @@ -571,34 +563,6 @@ public long dbWriteBufferSize() { return dbOptions.dbWriteBufferSize(); } - @Override - public Options setAccessHintOnCompactionStart(final AccessHint accessHint) { - dbOptions.setAccessHintOnCompactionStart(accessHint); - return this; - } - - @Override - public AccessHint accessHintOnCompactionStart() { - return dbOptions.accessHintOnCompactionStart(); - } - - @Deprecated - public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) { - final String message = "This method has been removed from the underlying RocksDB. " + - "It was not affecting compaction even in earlier versions. " + - "It is currently a no-op method."; - log.warn(message); - return this; - } - - @Deprecated - public boolean newTableReaderForCompactionInputs() { - final String message = "This method has been removed from the underlying RocksDB. " + - "It is now a method which always returns false."; - log.warn(message); - return false; - } - @Override public Options setCompactionReadaheadSize(final long compactionReadaheadSize) { dbOptions.setCompactionReadaheadSize(compactionReadaheadSize); @@ -843,7 +807,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) { } @Override - public Options setLogger(final org.rocksdb.Logger logger) { + public Options setLogger(final LoggerInterface logger) { dbOptions.setLogger(logger); return this; } @@ -914,6 +878,16 @@ public Options setCompressionType(final CompressionType compressionType) { return this; } + @Override + public Options setMemtableMaxRangeDeletions(final int n) { + columnFamilyOptions.setMemtableMaxRangeDeletions(n); + return this; + } + + @Override + public int memtableMaxRangeDeletions() { + return columnFamilyOptions.memtableMaxRangeDeletions(); + } @Override public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) { @@ -1464,26 +1438,6 @@ public boolean allowIngestBehind() { return dbOptions.allowIngestBehind(); } - @Deprecated - public Options setPreserveDeletes(final boolean preserveDeletes) { - final String message = "This method has been removed from the underlying RocksDB. " + - "It was marked for deprecation in earlier versions. " + - "The behaviour can be replicated by using user-defined timestamps. " + - "It is currently a no-op method."; - log.warn(message); - // no-op - return this; - } - - @Deprecated - public boolean preserveDeletes() { - final String message = "This method has been removed from the underlying RocksDB. " + - "It was marked for deprecation in earlier versions. " + - "It is currently a no-op method with a default value of false."; - log.warn(message); - return false; - } - @Override public Options setTwoWriteQueues(final boolean twoWriteQueues) { dbOptions.setTwoWriteQueues(twoWriteQueues); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 03b1f7eaf02f1..10e8cb804fece 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -462,8 +462,7 @@ public void record(final long now) { writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); - numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS) - - valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES); + numberOfOpenFiles = -1; numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); memtableFlushTimeSum += memtableFlushTimeData.getSum(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 5ddcf5bef551e..08248b020544e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -28,7 +28,6 @@ import org.rocksdb.AbstractCompactionFilter.Context; import org.rocksdb.AbstractCompactionFilterFactory; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionPriority; @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { add("setMaxBackgroundCompactions"); add("maxBackgroundFlushes"); add("setMaxBackgroundFlushes"); + add("tablePropertiesCollectorFactory"); + add("setTablePropertiesCollectorFactory"); addAll(walRelatedMethods); } }; @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class[] parameterTypes) throws case "java.util.Collection": parameters[i] = new ArrayList<>(); break; - case "org.rocksdb.AccessHint": - parameters[i] = AccessHint.NONE; - break; case "org.rocksdb.Cache": parameters[i] = new LRUCache(1L); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index 7ec3f4bf38c76..a0c068b59ee5d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -474,10 +474,8 @@ public void shouldRecordStatisticsBasedMetrics() { final double expectedCompactionTimeMaxSensor = 24.0; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); - when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); - when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); - final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4); + final double expectedNumberOfOpenFilesSensor = -1; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); @@ -485,8 +483,8 @@ public void shouldRecordStatisticsBasedMetrics() { recorder.record(now); - verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class)); - verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class)); verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class)); verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class)); verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);