Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db): optimize RocksDB's data backup function #5757

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package org.tron.core.db2.common;

import com.google.common.collect.Maps;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.core.db.common.iterator.DBIterator;

@Slf4j(topic = "DB")
public class RocksDB implements DB<byte[], byte[]>, Flusher {

@Getter
Expand Down Expand Up @@ -83,4 +91,22 @@ public DB<byte[], byte[]> newInstance() {
public void stat() {
this.db.stat();
}
}

public void backup(String dir) {
try {
db.backup(dir);
} catch (RocksDBException | IllegalArgumentException | IllegalStateException e) {
logger.warn("Backup {} to {} failed: {}", this.getDbName(), dir, e.getMessage());
}
}

public static void destroy(String name, String dir) {
try {
// delete engine.properties first
Files.deleteIfExists(Paths.get(dir, name, "engine.properties"));
org.rocksdb.RocksDB.destroyDB(Paths.get(dir, name).toString(), new Options());
} catch (RocksDBException | IOException e) {
logger.warn("Destroy {} from {} failed: {}", name, dir, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {

// add a persistent storage, the store name is: trans-cache
// when fullnode startup, transactionCache initializes transactions from this store
@Getter
private DB<byte[], byte[]> persistentStore;

// replace persistentStore and optimizes startup performance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public synchronized void close() {
@Override
public synchronized void reset() {
head().reset();
head().close();
head = head.getRoot().newInstance();
head = head.getRoot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid closing databases that need to be backed up:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you give more detailed descriptions about this change in the PR? cuz I can not know the reason why should make this change through issue

}

@Override
Expand Down
17 changes: 16 additions & 1 deletion common/src/main/java/org/tron/common/utils/PropUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -106,4 +111,14 @@ public static boolean writeProperty(String file, String key, String value) {
return true;
}

}
public static void writeProperties(String file, Map<String, String> params) {
try (Writer w = Files.newBufferedWriter(Paths.get(file), StandardCharsets.UTF_8)) {
Properties properties = new Properties();
params.forEach(properties::setProperty);
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
4 changes: 4 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -1584,10 +1584,14 @@ private static void initRocksDbSettings(Config config) {
int targetFileSizeMultiplier = config.hasPath(prefix + "targetFileSizeMultiplier") ? config
.getInt(prefix + "targetFileSizeMultiplier") : 1;

int maxOpenFiles = config.hasPath(prefix + "maxOpenFiles")
? config.getInt(prefix + "maxOpenFiles") : 5000;

PARAMETER.rocksDBCustomSettings = RocksDbSettings
.initCustomSettings(levelNumber, compactThreads, blocksize, maxBytesForLevelBase,
maxBytesForLevelMultiplier, level0FileNumCompactionTrigger,
targetFileSizeBase, targetFileSizeMultiplier);
PARAMETER.rocksDBCustomSettings.withMaxOpenFiles(maxOpenFiles);
RocksDbSettings.loggingSettings();
}

Expand Down
146 changes: 92 additions & 54 deletions framework/src/main/java/org/tron/core/db/backup/BackupDbUtil.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package org.tron.core.db.backup;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.error.TronDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.PropUtil;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.args.Args;
import org.tron.core.db.RevokingDatabase;
import org.tron.core.db2.core.Chainbase;
import org.tron.core.db2.core.SnapshotManager;
import org.tron.core.db2.core.SnapshotRoot;
import org.tron.core.db.TronDatabase;
import org.tron.core.db.TronStoreWithRevoking;
import org.tron.core.db2.common.RocksDB;
import org.tron.core.db2.common.TxCacheDB;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.store.DynamicPropertiesStore;

@Slf4j(topic = "DB")
@Component
Expand All @@ -26,11 +40,42 @@ public class BackupDbUtil {

@Getter
private static final int DB_BACKUP_STATE_DEFAULT = 11;
@Getter
@Autowired
private RevokingDatabase db;
private List<ITronChainBase<?>> stores;
@Autowired
private DynamicPropertiesStore dynamicPropertiesStore;
private List<RocksDB> rocksDBsToBackup;
private CommonParameter parameter = Args.getInstance();


@PostConstruct
private void init() {
rocksDBsToBackup = stores.stream().map(this::getRocksDB)
.filter(Objects::nonNull)
.filter(db -> !"tmp".equalsIgnoreCase(db.getDbName()))
.filter(db -> !db.getDbName().startsWith("checkpoint"))
.collect(Collectors.toList());
}

private RocksDB getRocksDB(ITronChainBase<?> store) {
if (store instanceof TronStoreWithRevoking && ((TronStoreWithRevoking<?>) store).getDb()
.getClass() == RocksDB.class) {
return (RocksDB) ((TronStoreWithRevoking<?>) store).getDb();
} else if (store instanceof TronStoreWithRevoking
&& ((TronStoreWithRevoking<?>) store).getDb()
.getClass() == TxCacheDB.class
&& ((TxCacheDB) ((TronStoreWithRevoking<?>) store).getDb()).getPersistentStore().getClass()
== RocksDB.class) {
return (RocksDB) ((TxCacheDB) ((TronStoreWithRevoking<?>) store).getDb())
.getPersistentStore();
} else if (store instanceof TronDatabase && ((TronDatabase<?>) store).getDbSource().getClass()
== RocksDbDataSourceImpl.class) {
return new RocksDB((RocksDbDataSourceImpl) ((TronDatabase<?>) store).getDbSource());
} else {
return null;
}
}

private int getBackupState() {
try {
return Integer.valueOf(PropUtil
Expand All @@ -41,25 +86,22 @@ private int getBackupState() {
}
}

private void setBackupState(int status) {
PropUtil.writeProperty(parameter.getDbBackupConfig()
.getPropPath(), BackupDbUtil.DB_BACKUP_STATE,
String.valueOf(status));
private void setBackupState(int status, long blockNum) {
Map<String, String> params = new HashMap<>();
params.put("header", String.valueOf(blockNum));
params.put(BackupDbUtil.DB_BACKUP_STATE, String.valueOf(status));
PropUtil.writeProperties(parameter.getDbBackupConfig().getPropPath(), params);
}

private void switchBackupState() {
private void switchBackupState(long blockNum) {
switch (State.valueOf(getBackupState())) {
case BAKINGONE:
setBackupState(State.BAKEDONE.getStatus());
case BAKEDTWO:
setBackupState(State.BAKEDONE.getStatus(), blockNum);
break;
case BAKEDONE:
setBackupState(State.BAKEDTWO.getStatus());
break;
case BAKINGTWO:
setBackupState(State.BAKEDTWO.getStatus());
break;
case BAKEDTWO:
setBackupState(State.BAKEDONE.getStatus());
setBackupState(State.BAKEDTWO.getStatus(), blockNum);
break;
default:
break;
Expand All @@ -68,43 +110,35 @@ private void switchBackupState() {

public void doBackup(BlockCapsule block) {
long t1 = System.currentTimeMillis();
long header = dynamicPropertiesStore.getLatestBlockHeaderNumberFromDB();
try {
switch (State.valueOf(getBackupState())) {
case BAKINGONE:
case BAKEDTWO:
deleteBackup(DB_BACKUP_INDEX1);
backup(DB_BACKUP_INDEX1);
switchBackupState();
switchBackupState(header);
deleteBackup(DB_BACKUP_INDEX2);
break;
case BAKEDONE:
deleteBackup(DB_BACKUP_INDEX2);
backup(DB_BACKUP_INDEX2);
switchBackupState();
deleteBackup(DB_BACKUP_INDEX1);
break;
case BAKINGTWO:
deleteBackup(DB_BACKUP_INDEX2);
backup(DB_BACKUP_INDEX2);
switchBackupState();
switchBackupState(header);
deleteBackup(DB_BACKUP_INDEX1);
break;
case BAKEDTWO:
deleteBackup(DB_BACKUP_INDEX1);
backup(DB_BACKUP_INDEX1);
switchBackupState();
deleteBackup(DB_BACKUP_INDEX2);
break;
default:
logger.warn("invalid backup state {}.", getBackupState());
}
} catch (RocksDBException | SecurityException e) {
logger.warn("Backup db error.", e);
}
long timeUsed = System.currentTimeMillis() - t1;
logger
.info("Current block number is {}, backup all store use {} ms!", block.getNum(), timeUsed);
if (timeUsed >= 3000) {
logger.warn("Backing up db uses too much time. {} ms.", timeUsed);
long timeUsed = System.currentTimeMillis() - t1;
logger
.info("Current block number is {},root header is {}, backup all store use {} ms!",
block.getNum(), header, timeUsed);
if (timeUsed >= 3000) {
logger.warn("Backing up db uses too much time. {} ms.", timeUsed);
}
} catch (RocksDBException | SecurityException | IOException e) {
throw new TronDBException("Backup rocksdb on " + header + " failed.", e);
}
}

Expand All @@ -117,17 +151,20 @@ private void backup(int i) throws RocksDBException {
} else {
throw new RuntimeException(String.format("error backup with undefined index %d", i));
}
List<Chainbase> stores = ((SnapshotManager) db).getDbs();
for (Chainbase store : stores) {
if (((SnapshotRoot) (store.getHead().getRoot())).getDb().getClass()
== org.tron.core.db2.common.RocksDB.class) {
((org.tron.core.db2.common.RocksDB) ((SnapshotRoot) (store.getHead().getRoot())).getDb())
.getDb().backup(path);
}
String finalPath = path;
rocksDBsToBackup.parallelStream().forEach(db -> db.backup(finalPath));
List<String> backedDBs = Arrays.stream(Objects.requireNonNull(
Paths.get(finalPath).toFile().listFiles(File::isDirectory)))
.map(File::getName).collect(Collectors.toList());
List<String> dbNames = rocksDBsToBackup.stream().map(RocksDB::getDbName)
.collect(Collectors.toList());
dbNames.removeAll(backedDBs);
if (!dbNames.isEmpty()) {
throw new RocksDBException("Some db not backed up: " + dbNames);
}
}

private void deleteBackup(int i) {
private void deleteBackup(int i) throws RocksDBException, IOException {
String path = "";
if (i == DB_BACKUP_INDEX1) {
path = parameter.getDbBackupConfig().getBak1path();
Expand All @@ -136,13 +173,14 @@ private void deleteBackup(int i) {
} else {
throw new RuntimeException(String.format("error deleteBackup with undefined index %d", i));
}
List<Chainbase> stores = ((SnapshotManager) db).getDbs();
for (Chainbase store : stores) {
if (((SnapshotRoot) (store.getHead().getRoot())).getDb().getClass()
== org.tron.core.db2.common.RocksDB.class) {
((org.tron.core.db2.common.RocksDB) (((SnapshotRoot) (store.getHead().getRoot())).getDb()))
.getDb().deleteDbBakPath(path);
}
String finalPath = path;
rocksDBsToBackup.parallelStream().forEach(db -> RocksDB.destroy(db.getDbName(), finalPath));
FileUtils.cleanDirectory(new File(finalPath)); // clean bak dir by File or other.
List<String> backedDBs = Arrays.stream(Objects.requireNonNull(
Paths.get(finalPath).toFile().listFiles(File::isDirectory)))
.map(File::getName).collect(Collectors.toList());
if (!backedDBs.isEmpty()) {
throw new RocksDBException("Some db not delete: " + backedDBs);
}
}

Expand Down
3 changes: 3 additions & 0 deletions framework/src/test/java/org/tron/core/db2/ChainbaseTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.tron.core.db2;

import com.google.common.collect.Streams;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void testPrefixQueryForLeveldb() {
testDb(chainbase);
testRoot(dataSource);
chainbase.reset();
Assert.assertFalse(chainbase.iterator().hasNext());
chainbase.close();
}

Expand All @@ -95,6 +97,7 @@ public void testPrefixQueryForRocksdb() {
testDb(chainbase);
testRoot(dataSource);
chainbase.reset();
Assert.assertEquals(0, Streams.stream(chainbase.iterator()).count());
chainbase.close();
}

Expand Down
Loading
Loading