Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15443: Upgrade RocksDB to 9.7.3 #18275

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final
netty-transport-native-unix-common-4.1.115.Final
opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
rocksdbjni-7.9.2
rocksdbjni-9.7.3
scala-library-2.13.15
scala-logging_2.13-3.9.5
scala-reflect-2.13.15
Expand Down
2 changes: 1 addition & 1 deletion docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,7 @@ <h5 class="anchor-heading"><a id="kafka_streams_rocksdb_monitoring" class="ancho
</tr>
<tr>
<td>number-open-files</td>
<td>The number of current open files.</td>
<td>This metric will return constant -1 because the RocksDB's counter NO_FILE_CLOSES has been removed in RocksDB 9.7.3</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
Expand Down
10 changes: 10 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<p>
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.
Several methods related to compressed block cache configuration in the <code>BlockBasedTableConfig</code> class have been removed, including <code>blockCacheCompressedNumShardBits</code>, <code>blockCacheCompressedSize</code>, and their corresponding setters. These functionalities are now consolidated under the <code>cache</code> option, and developers should configure their compressed block cache using the <code>setCache</code> method instead.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code> as a result the <code>number-open-files</code> metrics does not work as expected. Metric <code>number-open-files</code> returns constant -1 from now on until it will officially be removed.
The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHint</code> class, the removed methods from the <code>BlockBasedTableConfig</code> class, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ versions += [
protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.1",
re2j: "1.7",
rocksDB: "7.9.2",
rocksDB: "9.7.3",
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
Expand Down
swikarpat marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
Expand All @@ -37,6 +36,7 @@
import org.rocksdb.DbPath;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LoggerInterface;
import org.rocksdb.MemTableConfig;
import org.rocksdb.MergeOperator;
import org.rocksdb.Options;
Expand Down Expand Up @@ -332,14 +332,6 @@ public Statistics statistics() {
return dbOptions.statistics();
}

@Deprecated
public int baseBackgroundCompactions() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It is currently a no-op method which returns a default value of -1.";
log.warn(message);
return -1;
}

@Override
public Options setMaxSubcompactions(final int maxSubcompactions) {
dbOptions.setMaxSubcompactions(maxSubcompactions);
Expand Down Expand Up @@ -571,34 +563,6 @@ public long dbWriteBufferSize() {
return dbOptions.dbWriteBufferSize();
}

@Override
public Options setAccessHintOnCompactionStart(final AccessHint accessHint) {
dbOptions.setAccessHintOnCompactionStart(accessHint);
return this;
}

@Override
public AccessHint accessHintOnCompactionStart() {
return dbOptions.accessHintOnCompactionStart();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad that RockDB does not mark methods as deprecated but just removes stuff.

This was always an issue for us (even if we have to admit, that we are upgrade from a quite old version...). I guess we just need to bite the bullet and make this breaking change... Seem we need to start tracking RocksDB changes more closely and frequently and do KIPs early on to prepare for breaking changes. (What basically implies, that we can upgrade RocksDB only in major release if there are breaking changes...)

Or we change our "policy" for RocksDB compatibility and document it properly, reserving the right to break compatibility via RocksDB version bumps also in minor release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this drives me crazy. Personally I think it's ok to bite the bullet on this one but let's definitely come up with a strategy for this going forward. I'd say a good compromise is to meet in the middle and do both, that is, publish an official policy for RocksDB APIs which says there is no guarantee of the usual one-year deprecation & only major version removal, and any deprecations are best-effort. I'm less worried about the deprecations and more about restricting our ability to upgrade rocksdb to only major releases.

At the same time, I think we can and should do better. I like the idea of looking ahead at each new RocksDB release to mark any changed or removed APIs exposed to our users so we can deprecate them ASAP. We can hash out the details of how often and how to share this responsibility later (for example maybe this should be one of the Streams RM responsibilities, to look for upcoming API changes in future rocks and make sure they get deprecated in that release

}

@Deprecated
public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was not affecting compaction even in earlier versions. " +
"It is currently a no-op method.";
log.warn(message);
return this;
}

@Deprecated
public boolean newTableReaderForCompactionInputs() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It is now a method which always returns false.";
log.warn(message);
return false;
}

@Override
public Options setCompactionReadaheadSize(final long compactionReadaheadSize) {
dbOptions.setCompactionReadaheadSize(compactionReadaheadSize);
Expand Down Expand Up @@ -843,7 +807,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) {
}

@Override
public Options setLogger(final org.rocksdb.Logger logger) {
public Options setLogger(final LoggerInterface logger) {
dbOptions.setLogger(logger);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was removed, too, can you add it to the PR description.

Copy link
Contributor Author

@swikarpat swikarpat Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax thanks for feedback. I wanted to confirm setLogger() method. Parameter inside setLogger() has changed to LoggerInterface. I've made corrections and put method back in that java class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to change it like you did. If users use a logger class that is derived from Logger they have to modify it in any case because Logger now implements interface LoggerInterface. Thus, the change of the method will most likely not directly affect users on a source level. It will affect them on binary level, though, but that is the trade-off here.

return this;
}
Expand Down Expand Up @@ -914,6 +878,16 @@ public Options setCompressionType(final CompressionType compressionType) {
return this;
}

@Override
public Options setMemtableMaxRangeDeletions(final int n) {
columnFamilyOptions.setMemtableMaxRangeDeletions(n);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return columnFamilyOptions.memtableMaxRangeDeletions();
}

@Override
public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) {
Expand Down Expand Up @@ -1464,26 +1438,6 @@ public boolean allowIngestBehind() {
return dbOptions.allowIngestBehind();
}

@Deprecated
public Options setPreserveDeletes(final boolean preserveDeletes) {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was marked for deprecation in earlier versions. " +
"The behaviour can be replicated by using user-defined timestamps. " +
"It is currently a no-op method.";
log.warn(message);
// no-op
return this;
}

@Deprecated
public boolean preserveDeletes() {
final String message = "This method has been removed from the underlying RocksDB. " +
"It was marked for deprecation in earlier versions. " +
"It is currently a no-op method with a default value of false.";
log.warn(message);
return false;
}

@Override
public Options setTwoWriteQueues(final boolean twoWriteQueues) {
dbOptions.setTwoWriteQueues(twoWriteQueues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ public void record(final long now) {
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
- valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.rocksdb.AbstractCompactionFilter.Context;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
Expand Down Expand Up @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
add("setMaxBackgroundCompactions");
add("maxBackgroundFlushes");
add("setMaxBackgroundFlushes");
add("tablePropertiesCollectorFactory");
add("setTablePropertiesCollectorFactory");
addAll(walRelatedMethods);
}
};
Expand Down Expand Up @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class<?>[] parameterTypes) throws
case "java.util.Collection":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AccessHint":
parameters[i] = AccessHint.NONE;
break;
case "org.rocksdb.Cache":
parameters[i] = new LRUCache(1L);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,17 @@ public void shouldRecordStatisticsBasedMetrics() {
final double expectedCompactionTimeMaxSensor = 24.0;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4);
final double expectedNumberOfOpenFilesSensor = -1;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
final double expectedNumberOfFileErrorsSensor = 11 + 34;

recorder.record(now);

verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class));
verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class));
verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);
Expand Down
Loading