Skip to content

Commit

Permalink
feat(db): rocksdb backup opt
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed Mar 12, 2024
1 parent e81a5aa commit 270deef
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 58 deletions.
28 changes: 27 additions & 1 deletion chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package org.tron.core.db2.common;

import com.google.common.collect.Maps;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.core.db.common.iterator.DBIterator;

@Slf4j(topic = "DB")
public class RocksDB implements DB<byte[], byte[]>, Flusher {

@Getter
Expand Down Expand Up @@ -83,4 +91,22 @@ public DB<byte[], byte[]> newInstance() {
public void stat() {
this.db.stat();
}
}

public void backup(String dir) {
try {
db.backup(dir);
} catch (RocksDBException | IllegalArgumentException | IllegalStateException e) {
logger.warn("Backup {} to {} failed: {}", this.getDbName(), dir, e.getMessage());
}
}

public static void destroy(String name, String dir) {
try {
// delete engine.properties first
Files.deleteIfExists(Paths.get(dir, name, "engine.properties"));
org.rocksdb.RocksDB.destroyDB(Paths.get(dir, name).toString(), new Options());
} catch (RocksDBException | IOException e) {
logger.warn("Destroy {} from {} failed: {}", name, dir, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {

// add a persistent storage, the store name is: trans-cache
// when fullnode startup, transactionCache initializes transactions from this store
@Getter
private DB<byte[], byte[]> persistentStore;

// replace persistentStore and optimizes startup performance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public synchronized void close() {
@Override
public synchronized void reset() {
head().reset();
head().close();
head = head.getRoot().newInstance();
head = head.getRoot();
}

@Override
Expand Down
17 changes: 16 additions & 1 deletion common/src/main/java/org/tron/common/utils/PropUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -106,4 +111,14 @@ public static boolean writeProperty(String file, String key, String value) {
return true;
}

}
public static void writeProperties(String file, Map<String, String> params) {
try (Writer w = Files.newBufferedWriter(Paths.get(file), StandardCharsets.UTF_8)) {
Properties properties = new Properties();
params.forEach(properties::setProperty);
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
4 changes: 4 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -1575,10 +1575,14 @@ private static void initRocksDbSettings(Config config) {
int targetFileSizeMultiplier = config.hasPath(prefix + "targetFileSizeMultiplier") ? config
.getInt(prefix + "targetFileSizeMultiplier") : 1;

int maxOpenFiles = config.hasPath(prefix + "maxOpenFiles")
? config.getInt(prefix + "maxOpenFiles") : 5000;

PARAMETER.rocksDBCustomSettings = RocksDbSettings
.initCustomSettings(levelNumber, compactThreads, blocksize, maxBytesForLevelBase,
maxBytesForLevelMultiplier, level0FileNumCompactionTrigger,
targetFileSizeBase, targetFileSizeMultiplier);
PARAMETER.rocksDBCustomSettings.withMaxOpenFiles(maxOpenFiles);
RocksDbSettings.loggingSettings();
}

Expand Down
146 changes: 92 additions & 54 deletions framework/src/main/java/org/tron/core/db/backup/BackupDbUtil.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package org.tron.core.db.backup;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.error.TronDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.PropUtil;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.args.Args;
import org.tron.core.db.RevokingDatabase;
import org.tron.core.db2.core.Chainbase;
import org.tron.core.db2.core.SnapshotManager;
import org.tron.core.db2.core.SnapshotRoot;
import org.tron.core.db.TronDatabase;
import org.tron.core.db.TronStoreWithRevoking;
import org.tron.core.db2.common.RocksDB;
import org.tron.core.db2.common.TxCacheDB;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.store.DynamicPropertiesStore;

@Slf4j(topic = "DB")
@Component
Expand All @@ -26,11 +40,42 @@ public class BackupDbUtil {

@Getter
private static final int DB_BACKUP_STATE_DEFAULT = 11;
@Getter
@Autowired
private RevokingDatabase db;
private List<ITronChainBase<?>> stores;
@Autowired
private DynamicPropertiesStore dynamicPropertiesStore;
private List<RocksDB> rocksDBsToBackup;
private CommonParameter parameter = Args.getInstance();


@PostConstruct
private void init() {
rocksDBsToBackup = stores.stream().map(this::getRocksDB)
.filter(Objects::nonNull)
.filter(db -> !"tmp".equalsIgnoreCase(db.getDbName()))
.filter(db -> !db.getDbName().startsWith("checkpoint"))
.collect(Collectors.toList());
}

private RocksDB getRocksDB(ITronChainBase<?> store) {
if (store instanceof TronStoreWithRevoking && ((TronStoreWithRevoking<?>) store).getDb()
.getClass() == RocksDB.class) {
return (RocksDB) ((TronStoreWithRevoking<?>) store).getDb();
} else if (store instanceof TronStoreWithRevoking
&& ((TronStoreWithRevoking<?>) store).getDb()
.getClass() == TxCacheDB.class
&& ((TxCacheDB) ((TronStoreWithRevoking<?>) store).getDb()).getPersistentStore().getClass()
== RocksDB.class) {
return (RocksDB) ((TxCacheDB) ((TronStoreWithRevoking<?>) store).getDb())
.getPersistentStore();
} else if (store instanceof TronDatabase && ((TronDatabase<?>) store).getDbSource().getClass()
== RocksDbDataSourceImpl.class) {
return new RocksDB((RocksDbDataSourceImpl) ((TronDatabase<?>) store).getDbSource());
} else {
return null;
}
}

private int getBackupState() {
try {
return Integer.valueOf(PropUtil
Expand All @@ -41,25 +86,22 @@ private int getBackupState() {
}
}

private void setBackupState(int status) {
PropUtil.writeProperty(parameter.getDbBackupConfig()
.getPropPath(), BackupDbUtil.DB_BACKUP_STATE,
String.valueOf(status));
private void setBackupState(int status, long blockNum) {
Map<String, String> params = new HashMap<>();
params.put("header", String.valueOf(blockNum));
params.put(BackupDbUtil.DB_BACKUP_STATE, String.valueOf(status));
PropUtil.writeProperties(parameter.getDbBackupConfig().getPropPath(), params);
}

private void switchBackupState() {
private void switchBackupState(long blockNum) {
switch (State.valueOf(getBackupState())) {
case BAKINGONE:
setBackupState(State.BAKEDONE.getStatus());
case BAKEDTWO:
setBackupState(State.BAKEDONE.getStatus(), blockNum);
break;
case BAKEDONE:
setBackupState(State.BAKEDTWO.getStatus());
break;
case BAKINGTWO:
setBackupState(State.BAKEDTWO.getStatus());
break;
case BAKEDTWO:
setBackupState(State.BAKEDONE.getStatus());
setBackupState(State.BAKEDTWO.getStatus(), blockNum);
break;
default:
break;
Expand All @@ -68,43 +110,35 @@ private void switchBackupState() {

public void doBackup(BlockCapsule block) {
long t1 = System.currentTimeMillis();
long header = dynamicPropertiesStore.getLatestBlockHeaderNumberFromDB();
try {
switch (State.valueOf(getBackupState())) {
case BAKINGONE:
case BAKEDTWO:
deleteBackup(DB_BACKUP_INDEX1);
backup(DB_BACKUP_INDEX1);
switchBackupState();
switchBackupState(header);
deleteBackup(DB_BACKUP_INDEX2);
break;
case BAKEDONE:
deleteBackup(DB_BACKUP_INDEX2);
backup(DB_BACKUP_INDEX2);
switchBackupState();
deleteBackup(DB_BACKUP_INDEX1);
break;
case BAKINGTWO:
deleteBackup(DB_BACKUP_INDEX2);
backup(DB_BACKUP_INDEX2);
switchBackupState();
switchBackupState(header);
deleteBackup(DB_BACKUP_INDEX1);
break;
case BAKEDTWO:
deleteBackup(DB_BACKUP_INDEX1);
backup(DB_BACKUP_INDEX1);
switchBackupState();
deleteBackup(DB_BACKUP_INDEX2);
break;
default:
logger.warn("invalid backup state {}.", getBackupState());
}
} catch (RocksDBException | SecurityException e) {
logger.warn("Backup db error.", e);
}
long timeUsed = System.currentTimeMillis() - t1;
logger
.info("Current block number is {}, backup all store use {} ms!", block.getNum(), timeUsed);
if (timeUsed >= 3000) {
logger.warn("Backing up db uses too much time. {} ms.", timeUsed);
long timeUsed = System.currentTimeMillis() - t1;
logger
.info("Current block number is {},root header is {}, backup all store use {} ms!",
block.getNum(), header, timeUsed);
if (timeUsed >= 3000) {
logger.warn("Backing up db uses too much time. {} ms.", timeUsed);
}
} catch (RocksDBException | SecurityException | IOException e) {
throw new TronDBException("Backup rocksdb on " + header + " failed.", e);
}
}

Expand All @@ -117,17 +151,20 @@ private void backup(int i) throws RocksDBException {
} else {
throw new RuntimeException(String.format("error backup with undefined index %d", i));
}
List<Chainbase> stores = ((SnapshotManager) db).getDbs();
for (Chainbase store : stores) {
if (((SnapshotRoot) (store.getHead().getRoot())).getDb().getClass()
== org.tron.core.db2.common.RocksDB.class) {
((org.tron.core.db2.common.RocksDB) ((SnapshotRoot) (store.getHead().getRoot())).getDb())
.getDb().backup(path);
}
String finalPath = path;
rocksDBsToBackup.parallelStream().forEach(db -> db.backup(finalPath));
List<String> backedDBs = Arrays.stream(Objects.requireNonNull(
Paths.get(finalPath).toFile().listFiles(File::isDirectory)))
.map(File::getName).collect(Collectors.toList());
List<String> dbNames = rocksDBsToBackup.stream().map(RocksDB::getDbName)
.collect(Collectors.toList());
dbNames.removeAll(backedDBs);
if (!dbNames.isEmpty()) {
throw new RocksDBException("Some db not backed up: " + dbNames);
}
}

private void deleteBackup(int i) {
private void deleteBackup(int i) throws RocksDBException, IOException {
String path = "";
if (i == DB_BACKUP_INDEX1) {
path = parameter.getDbBackupConfig().getBak1path();
Expand All @@ -136,13 +173,14 @@ private void deleteBackup(int i) {
} else {
throw new RuntimeException(String.format("error deleteBackup with undefined index %d", i));
}
List<Chainbase> stores = ((SnapshotManager) db).getDbs();
for (Chainbase store : stores) {
if (((SnapshotRoot) (store.getHead().getRoot())).getDb().getClass()
== org.tron.core.db2.common.RocksDB.class) {
((org.tron.core.db2.common.RocksDB) (((SnapshotRoot) (store.getHead().getRoot())).getDb()))
.getDb().deleteDbBakPath(path);
}
String finalPath = path;
rocksDBsToBackup.parallelStream().forEach(db -> RocksDB.destroy(db.getDbName(), finalPath));
FileUtils.cleanDirectory(new File(finalPath)); // clean bak dir by File or other.
List<String> backedDBs = Arrays.stream(Objects.requireNonNull(
Paths.get(finalPath).toFile().listFiles(File::isDirectory)))
.map(File::getName).collect(Collectors.toList());
if (!backedDBs.isEmpty()) {
throw new RocksDBException("Some db not delete: " + backedDBs);
}
}

Expand Down
3 changes: 3 additions & 0 deletions framework/src/test/java/org/tron/core/db2/ChainbaseTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.tron.core.db2;

import com.google.common.collect.Streams;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void testPrefixQueryForLeveldb() {
testDb(chainbase);
testRoot(dataSource);
chainbase.reset();
Assert.assertFalse(chainbase.iterator().hasNext());
chainbase.close();
}

Expand All @@ -95,6 +97,7 @@ public void testPrefixQueryForRocksdb() {
testDb(chainbase);
testRoot(dataSource);
chainbase.reset();
Assert.assertEquals(0, Streams.stream(chainbase.iterator()).count());
chainbase.close();
}

Expand Down
Loading

0 comments on commit 270deef

Please sign in to comment.