Skip to content

Commit

Permalink
KAFKA-15443: Upgrade RocksDB to 9.7.3
Browse files Browse the repository at this point in the history
This PR addresses the following compatibility issues introduced by the RocksDB upgrade:

Removal of AccessHint: The AccessHint class was completely removed in RocksDB 9.7.3. This required removing all import statements, variable declarations, method parameters, method return types, and static method calls related to AccessHint in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java Unused methods are removed in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

Removal of NO_FILE_CLOSES: The NO_FILE_CLOSES metric was also removed in RocksDB 9.7.3. The calculation for numberOfOpenFiles in RocksDBMetricsRecorder.java has been adjusted to now track the total number of file opens since the last reset. The previous calculation, which subtracted NO_FILE_CLOSES from NO_FILE_OPENS, is no longer possible.
The reason RocksDB team removed NO_FILE_CLOSES it seems to me they detected not properly working: https://github.com/search?q=repo%3Afacebook%2Frocksdb+NO_FILE_CLOSES&type=issues

KAFKA-15443: Upgrade RocksDB to 9.7.3

Update docs/ops.html

Co-authored-by: Bruno Cadonna <[email protected]>

Update docs/streams/upgrade-guide.html

Co-authored-by: Bruno Cadonna <[email protected]>

Update docs/streams/upgrade-guide.html

Co-authored-by: Bruno Cadonna <[email protected]>

Update upgrade-guide.html
  • Loading branch information
swikarpat committed Jan 8, 2025
1 parent 5efaae6 commit 3a4c2b7
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 72 deletions.
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final
netty-transport-native-unix-common-4.1.115.Final
opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
rocksdbjni-7.9.2
rocksdbjni-9.7.3
scala-library-2.13.15
scala-logging_2.13-3.9.5
scala-reflect-2.13.15
Expand Down
2 changes: 1 addition & 1 deletion docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,7 @@ <h5 class="anchor-heading"><a id="kafka_streams_rocksdb_monitoring" class="ancho
</tr>
<tr>
<td>number-open-files</td>
<td>The number of current open files.</td>
<td>This metric will return constant -1 because the RocksDB's counter NO_FILE_CLOSES has been removed in RocksDB 9.7.3</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
Expand Down
10 changes: 10 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<p>
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.
Several methods related to compressed block cache configuration in the <code>BlockBasedTableConfig</code> class have been removed, including <code>blockCacheCompressedNumShardBits</code>, <code>blockCacheCompressedSize</code>, and their corresponding setters. These functionalities are now consolidated under the <code>cache</code> option, and developers should configure their compressed block cache using the <code>setCache</code> method instead.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code> as a result the <code>number-open-files</code> metrics does not work as expected. Metric <code>number-open-files</code> returns constant -1 from now on until it will officially be removed.
The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHint</code> class, the removed methods from the <code>BlockBasedTableConfig</code> class, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ versions += [
protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.1",
re2j: "1.7",
rocksDB: "7.9.2",
rocksDB: "9.7.3",
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
Expand All @@ -37,6 +36,7 @@
import org.rocksdb.DbPath;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LoggerInterface;
import org.rocksdb.MemTableConfig;
import org.rocksdb.MergeOperator;
import org.rocksdb.Options;
Expand Down Expand Up @@ -332,14 +332,6 @@ public Statistics statistics() {
return dbOptions.statistics();
}

@Deprecated
public int baseBackgroundCompactions() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It is currently a no-op method which returns a default value of -1.";
log.warn(message);
return -1;
}

@Override
public Options setMaxSubcompactions(final int maxSubcompactions) {
dbOptions.setMaxSubcompactions(maxSubcompactions);
Expand Down Expand Up @@ -571,34 +563,6 @@ public long dbWriteBufferSize() {
return dbOptions.dbWriteBufferSize();
}

@Override
public Options setAccessHintOnCompactionStart(final AccessHint accessHint) {
dbOptions.setAccessHintOnCompactionStart(accessHint);
return this;
}

@Override
public AccessHint accessHintOnCompactionStart() {
return dbOptions.accessHintOnCompactionStart();
}

@Deprecated
public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was not affecting compaction even in earlier versions. " +
"It is currently a no-op method.";
log.warn(message);
return this;
}

@Deprecated
public boolean newTableReaderForCompactionInputs() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It is now a method which always returns false.";
log.warn(message);
return false;
}

@Override
public Options setCompactionReadaheadSize(final long compactionReadaheadSize) {
dbOptions.setCompactionReadaheadSize(compactionReadaheadSize);
Expand Down Expand Up @@ -843,7 +807,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) {
}

@Override
public Options setLogger(final org.rocksdb.Logger logger) {
public Options setLogger(final LoggerInterface logger) {
dbOptions.setLogger(logger);
return this;
}
Expand Down Expand Up @@ -914,6 +878,16 @@ public Options setCompressionType(final CompressionType compressionType) {
return this;
}

@Override
public Options setMemtableMaxRangeDeletions(final int n) {
columnFamilyOptions.setMemtableMaxRangeDeletions(n);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return columnFamilyOptions.memtableMaxRangeDeletions();
}

@Override
public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) {
Expand Down Expand Up @@ -1464,26 +1438,6 @@ public boolean allowIngestBehind() {
return dbOptions.allowIngestBehind();
}

@Deprecated
public Options setPreserveDeletes(final boolean preserveDeletes) {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was marked for deprecation in earlier versions. " +
"The behaviour can be replicated by using user-defined timestamps. " +
"It is currently a no-op method.";
log.warn(message);
// no-op
return this;
}

@Deprecated
public boolean preserveDeletes() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was marked for deprecation in earlier versions. " +
"It is currently a no-op method with a default value of false.";
log.warn(message);
return false;
}

@Override
public Options setTwoWriteQueues(final boolean twoWriteQueues) {
dbOptions.setTwoWriteQueues(twoWriteQueues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ public void record(final long now) {
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
- valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.rocksdb.AbstractCompactionFilter.Context;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
Expand Down Expand Up @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
add("setMaxBackgroundCompactions");
add("maxBackgroundFlushes");
add("setMaxBackgroundFlushes");
add("tablePropertiesCollectorFactory");
add("setTablePropertiesCollectorFactory");
addAll(walRelatedMethods);
}
};
Expand Down Expand Up @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class<?>[] parameterTypes) throws
case "java.util.Collection":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AccessHint":
parameters[i] = AccessHint.NONE;
break;
case "org.rocksdb.Cache":
parameters[i] = new LRUCache(1L);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,17 @@ public void shouldRecordStatisticsBasedMetrics() {
final double expectedCompactionTimeMaxSensor = 24.0;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4);
final double expectedNumberOfOpenFilesSensor = -1;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
final double expectedNumberOfFileErrorsSensor = 11 + 34;

recorder.record(now);

verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class));
verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class));
verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);
Expand Down

0 comments on commit 3a4c2b7

Please sign in to comment.