From 124ef0642ee6b758c7e03a1ee63a11ff03987897 Mon Sep 17 00:00:00 2001 From: swikar1 <123534169+swikar1@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:24:29 -0800 Subject: [PATCH] KAFKA-15443: Upgrade RocksDB to 9.7.3 This PR addresses the following compatibility issues introduced by the RocksDB upgrade: Removal of AccessHint: The AccessHint class was completely removed in RocksDB 9.7.3. This required removing all import statements, variable declarations, method parameters, method return types, and static method calls related to AccessHint in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java Unused methods are removed in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java Removal of NO_FILE_CLOSES: The NO_FILE_CLOSES metric was also removed in RocksDB 9.7.3. The calculation for numberOfOpenFiles in RocksDBMetricsRecorder.java has been adjusted to now track the total number of file opens since the last reset. The previous calculation, which subtracted NO_FILE_CLOSES from NO_FILE_OPENS, is no longer possible. The reason RocksDB team removed NO_FILE_CLOSES it seems to me they detected not properly working: https://github.com/search?q=repo%3Afacebook%2Frocksdb+NO_FILE_CLOSES&type=issues --- LICENSE-binary | 2 +- docs/ops.html | 5 ---- docs/streams/upgrade-guide.html | 17 ++++++++++++-- gradle/dependencies.gradle | 2 +- ...ToDbOptionsColumnFamilyOptionsAdapter.java | 23 ++++++++++--------- .../metrics/RocksDBMetricsRecorder.java | 3 +-- ...OptionsColumnFamilyOptionsAdapterTest.java | 6 ++--- .../metrics/RocksDBMetricsRecorderTest.java | 8 +++---- 8 files changed, 35 insertions(+), 31 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 5383cf60ec6e5..686ebe39030e1 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final netty-transport-native-unix-common-4.1.115.Final opentelemetry-proto-1.0.0-alpha plexus-utils-3.5.1 -rocksdbjni-7.9.2 +rocksdbjni-9.7.3 scala-library-2.13.15 scala-logging_2.13-3.9.5 scala-reflect-2.13.15 diff --git a/docs/ops.html b/docs/ops.html index 02ea5bd788f5d..fc5c3d171172b 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3473,11 +3473,6 @@
< More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG can be found in KIP-295.

-

Streams API changes in 4.0.0

-

In this release the ClientInstanceIds instance stores the global consumerUuid for the KIP-714 @@ -173,6 +171,21 @@

Streams API See KIP-1112 for more details.

+

Streams API changes in 4.0.0

+ +

+ Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes. + The org.rocksdb.AccessHint class, along with its associated methods, has been removed. + + org.rocksdb.Options.setMemtableMaxRangeDeletions(int p1): ColumnFamilyOptionsInterface has been added, providing control over the maximum number of range deletions in the memtable. + org.rocksdb.Options.memtableMaxRangeDeletions(): int has been added, allowing retrieval of the configured maximum number of range deletions in the memtable. + + The org.rocksdb.Options.setLogger() method now accepts a LoggerInterface as a parameter instead of the previous Logger. + The NO_FILE_CLOSES field has been removed from the org.rocksdb.TickerTypeenum. + Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations. + These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the rocksdb.config.setter, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed AccessHintclass, the NO_FILE_CLOSES field from TickerType, or relying on the previous signature of setLogger() will need to update their implementations. +

+

Streams API changes in 3.9.0

diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 52ef2fcfde1b4..473edec0c6619 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -119,7 +119,7 @@ versions += [ protobuf: "3.25.5", // a dependency of opentelemetryProto pcollections: "4.0.1", re2j: "1.7", - rocksDB: "7.9.2", + rocksDB: "9.7.3", // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index b7b611f8be0d3..5a8083ac094c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -22,7 +22,6 @@ import org.rocksdb.AbstractEventListener; import org.rocksdb.AbstractSlice; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; @@ -37,6 +36,7 @@ import org.rocksdb.DbPath; import org.rocksdb.Env; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LoggerInterface; import org.rocksdb.MemTableConfig; import org.rocksdb.MergeOperator; import org.rocksdb.Options; @@ -571,16 +571,7 @@ public long dbWriteBufferSize() { return dbOptions.dbWriteBufferSize(); } - @Override - public Options setAccessHintOnCompactionStart(final AccessHint accessHint) { - dbOptions.setAccessHintOnCompactionStart(accessHint); - return this; - } - @Override - public AccessHint accessHintOnCompactionStart() { - return dbOptions.accessHintOnCompactionStart(); - } @Deprecated public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) { @@ -843,7 +834,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) { } @Override - public Options setLogger(final org.rocksdb.Logger logger) { + public Options setLogger(final LoggerInterface logger) { dbOptions.setLogger(logger); return this; } @@ -914,6 +905,16 @@ public Options setCompressionType(final CompressionType compressionType) { return this; } + @Override + public Options setMemtableMaxRangeDeletions(final int n) { + columnFamilyOptions.setMemtableMaxRangeDeletions(n); + return this; + } + + @Override + public int memtableMaxRangeDeletions() { + return columnFamilyOptions.memtableMaxRangeDeletions(); + } @Override public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 03b1f7eaf02f1..10e8cb804fece 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -462,8 +462,7 @@ public void record(final long now) { writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); - numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS) - - valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES); + numberOfOpenFiles = -1; numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); memtableFlushTimeSum += memtableFlushTimeData.getSum(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 5ddcf5bef551e..08248b020544e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -28,7 +28,6 @@ import org.rocksdb.AbstractCompactionFilter.Context; import org.rocksdb.AbstractCompactionFilterFactory; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionPriority; @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { add("setMaxBackgroundCompactions"); add("maxBackgroundFlushes"); add("setMaxBackgroundFlushes"); + add("tablePropertiesCollectorFactory"); + add("setTablePropertiesCollectorFactory"); addAll(walRelatedMethods); } }; @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class[] parameterTypes) throws case "java.util.Collection": parameters[i] = new ArrayList<>(); break; - case "org.rocksdb.AccessHint": - parameters[i] = AccessHint.NONE; - break; case "org.rocksdb.Cache": parameters[i] = new LRUCache(1L); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index 7ec3f4bf38c76..a0c068b59ee5d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -474,10 +474,8 @@ public void shouldRecordStatisticsBasedMetrics() { final double expectedCompactionTimeMaxSensor = 24.0; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); - when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); - when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); - final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4); + final double expectedNumberOfOpenFilesSensor = -1; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); @@ -485,8 +483,8 @@ public void shouldRecordStatisticsBasedMetrics() { recorder.record(now); - verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class)); - verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class)); verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class)); verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class)); verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);