Skip to content

Commit

Permalink
KAFKA-15443: Upgrade RocksDB to 9.7.3
Browse files Browse the repository at this point in the history
This PR addresses the following compatibility issues introduced by the RocksDB upgrade:

Removal of AccessHint: The AccessHint class was completely removed in RocksDB 9.7.3. This required removing all import statements, variable declarations, method parameters, method return types, and static method calls related to AccessHint in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java Unused methods are removed in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

Removal of NO_FILE_CLOSES: The NO_FILE_CLOSES metric was also removed in RocksDB 9.7.3. The calculation for numberOfOpenFiles in RocksDBMetricsRecorder.java has been adjusted to now track the total number of file opens since the last reset. The previous calculation, which subtracted NO_FILE_CLOSES from NO_FILE_OPENS, is no longer possible.
The reason RocksDB team removed NO_FILE_CLOSES it seems to me they detected not properly working: https://github.com/search?q=repo%3Afacebook%2Frocksdb+NO_FILE_CLOSES&type=issues
  • Loading branch information
swikarpat committed Jan 4, 2025
1 parent 409a43e commit 124ef06
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 31 deletions.
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final
netty-transport-native-unix-common-4.1.115.Final
opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
rocksdbjni-7.9.2
rocksdbjni-9.7.3
scala-library-2.13.15
scala-logging_2.13-3.9.5
scala-reflect-2.13.15
Expand Down
5 changes: 0 additions & 5 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -3473,11 +3473,6 @@ <h5 class="anchor-heading"><a id="kafka_streams_rocksdb_monitoring" class="ancho
<td>The maximum duration in ms of disc compactions.</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
<td>number-open-files</td>
<td>The number of current open files.</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
<td>number-file-errors-total</td>
<td>The total number of file errors occurred.</td>
Expand Down
17 changes: 15 additions & 2 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
In this release the <code>ClientInstanceIds</code> instance stores the global consumer<code>Uuid</code> for the
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid">KIP-714</a>
Expand Down Expand Up @@ -173,6 +171,21 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.

<code>org.rocksdb.Options.setMemtableMaxRangeDeletions(int p1): ColumnFamilyOptionsInterface</code> has been added, providing control over the maximum number of range deletions in the memtable.
<code>org.rocksdb.Options.memtableMaxRangeDeletions(): int</code> has been added, allowing retrieval of the configured maximum number of range deletions in the memtable.

The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHintclass</code>, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ versions += [
protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.1",
re2j: "1.7",
rocksDB: "7.9.2",
rocksDB: "9.7.3",
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
Expand All @@ -37,6 +36,7 @@
import org.rocksdb.DbPath;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LoggerInterface;
import org.rocksdb.MemTableConfig;
import org.rocksdb.MergeOperator;
import org.rocksdb.Options;
Expand Down Expand Up @@ -571,16 +571,7 @@ public long dbWriteBufferSize() {
return dbOptions.dbWriteBufferSize();
}

@Override
public Options setAccessHintOnCompactionStart(final AccessHint accessHint) {
dbOptions.setAccessHintOnCompactionStart(accessHint);
return this;
}

@Override
public AccessHint accessHintOnCompactionStart() {
return dbOptions.accessHintOnCompactionStart();
}

@Deprecated
public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) {
Expand Down Expand Up @@ -843,7 +834,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) {
}

@Override
public Options setLogger(final org.rocksdb.Logger logger) {
public Options setLogger(final LoggerInterface logger) {
dbOptions.setLogger(logger);
return this;
}
Expand Down Expand Up @@ -914,6 +905,16 @@ public Options setCompressionType(final CompressionType compressionType) {
return this;
}

@Override
public Options setMemtableMaxRangeDeletions(final int n) {
columnFamilyOptions.setMemtableMaxRangeDeletions(n);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return columnFamilyOptions.memtableMaxRangeDeletions();
}

@Override
public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ public void record(final long now) {
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
- valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.rocksdb.AbstractCompactionFilter.Context;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
Expand Down Expand Up @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
add("setMaxBackgroundCompactions");
add("maxBackgroundFlushes");
add("setMaxBackgroundFlushes");
add("tablePropertiesCollectorFactory");
add("setTablePropertiesCollectorFactory");
addAll(walRelatedMethods);
}
};
Expand Down Expand Up @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class<?>[] parameterTypes) throws
case "java.util.Collection":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AccessHint":
parameters[i] = AccessHint.NONE;
break;
case "org.rocksdb.Cache":
parameters[i] = new LRUCache(1L);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,17 @@ public void shouldRecordStatisticsBasedMetrics() {
final double expectedCompactionTimeMaxSensor = 24.0;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4);
final double expectedNumberOfOpenFilesSensor = -1;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
final double expectedNumberOfFileErrorsSensor = 11 + 34;

recorder.record(now);

verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class));
verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class));
verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);
Expand Down

0 comments on commit 124ef06

Please sign in to comment.