Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into feature/optimize…
Browse files Browse the repository at this point in the history
…_stake2_code
  • Loading branch information
lxcmyf committed Aug 23, 2023
2 parents 699c044 + a4100b6 commit 4998589
Show file tree
Hide file tree
Showing 43 changed files with 1,409 additions and 276 deletions.
57 changes: 5 additions & 52 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.tron.core.db.RecentBlockStore;
import org.tron.core.db.RecentTransactionStore;
import org.tron.core.db.TransactionStore;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.exception.BadItemException;
import org.tron.core.exception.HeaderNotFound;
import org.tron.core.exception.ItemNotFoundException;
Expand Down Expand Up @@ -245,54 +244,6 @@ public class ChainBaseManager {
@Setter
private long lowestBlockNum = -1; // except num = 0.

public void closeOneStore(ITronChainBase database) {
logger.info("******** Begin to close {}. ********", database.getName());
try {
database.close();
} catch (Exception e) {
logger.info("Failed to close {}.", database.getName(), e);
} finally {
logger.info("******** End to close {}. ********", database.getName());
}
}

public void closeAllStore() {
dbStatService.shutdown();
closeOneStore(transactionRetStore);
closeOneStore(recentBlockStore);
closeOneStore(transactionHistoryStore);
closeOneStore(transactionStore);
closeOneStore(accountStore);
closeOneStore(blockStore);
closeOneStore(blockIndexStore);
closeOneStore(accountIdIndexStore);
closeOneStore(accountIndexStore);
closeOneStore(witnessScheduleStore);
closeOneStore(assetIssueStore);
closeOneStore(dynamicPropertiesStore);
closeOneStore(abiStore);
closeOneStore(codeStore);
closeOneStore(contractStore);
closeOneStore(contractStateStore);
closeOneStore(storageRowStore);
closeOneStore(exchangeStore);
closeOneStore(proposalStore);
closeOneStore(votesStore);
closeOneStore(delegatedResourceStore);
closeOneStore(delegatedResourceAccountIndexStore);
closeOneStore(assetIssueV2Store);
closeOneStore(exchangeV2Store);
closeOneStore(nullifierStore);
closeOneStore(merkleTreeStore);
closeOneStore(delegationStore);
closeOneStore(proofStore);
closeOneStore(commonStore);
closeOneStore(commonDataBase);
closeOneStore(pbftSignDataStore);
closeOneStore(sectionBloomStore);
closeOneStore(accountAssetStore);
}

// for test only
public List<ByteString> getWitnesses() {
return witnessScheduleStore.getActiveWitnesses();
Expand All @@ -316,9 +267,7 @@ public BlockCapsule getHead() throws HeaderNotFound {
}

public synchronized BlockId getHeadBlockId() {
return new BlockId(
dynamicPropertiesStore.getLatestBlockHeaderHash(),
dynamicPropertiesStore.getLatestBlockHeaderNumber());
return new BlockId(dynamicPropertiesStore.getLatestBlockHeaderHash());
}

public long getHeadBlockNum() {
Expand Down Expand Up @@ -434,6 +383,10 @@ private void init() {
this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL;
}

public void shutdown() {
dbStatService.shutdown();
}

public boolean isLiteNode() {
return getNodeType() == NodeType.LITE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ public static byte[] getOwner(Transaction.Contract contract) {
}
}
return owner.toByteArray();
} catch (InvalidProtocolBufferException invalidProtocolBufferException) {
logger.warn("InvalidProtocolBufferException occurred because {}, please verify the interface "
+ "input parameters", invalidProtocolBufferException.getMessage());
return new byte[0];
} catch (Exception ex) {
logger.error(ex.getMessage());
return new byte[0];
Expand Down
9 changes: 8 additions & 1 deletion chainbase/src/main/java/org/tron/core/db/TronDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ public void reset() {
*/
@Override
public void close() {
dbSource.closeDB();
logger.info("******** Begin to close {}. ********", getName());
try {
dbSource.closeDB();
} catch (Exception e) {
logger.warn("Failed to close {}.", getName(), e);
} finally {
logger.info("******** End to close {}. ********", getName());
}
}

public abstract void put(byte[] key, T item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ public String getName() {

@Override
public void close() {
revokingDB.close();
logger.info("******** Begin to close {}. ********", getName());
try {
revokingDB.close();
} catch (Exception e) {
logger.warn("Failed to close {}.", getName(), e);
} finally {
logger.info("******** End to close {}. ********", getName());
}
}

@Override
Expand Down
161 changes: 158 additions & 3 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.bouncycastle.util.encoders.Hex;
Expand All @@ -17,6 +33,7 @@
import org.tron.common.storage.leveldb.LevelDbDataSourceImpl;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.JsonUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.capsule.BytesCapsule;
Expand All @@ -42,6 +59,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
private BloomFilter<byte[]>[] bloomFilters = new BloomFilter[2];
// filterStartBlock record the start block of the active filter
private volatile long filterStartBlock = INVALID_BLOCK;
private volatile long currentBlockNum = INVALID_BLOCK;
// currentFilterIndex records the index of the active filter
private volatile int currentFilterIndex = 0;

Expand All @@ -57,6 +75,16 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
// replace persistentStore and optimizes startup performance
private RecentTransactionStore recentTransactionStore;

private final Path cacheFile0;
private final Path cacheFile1;
private final Path cacheProperties;
private final Path cacheDir;
private AtomicBoolean isValid = new AtomicBoolean(false);

@Getter
@Setter
private volatile boolean alive;

public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
this.name = name;
this.TRANSACTION_COUNT =
Expand Down Expand Up @@ -85,6 +113,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache");
this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0");
this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1");
this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties");

}

Expand All @@ -110,6 +142,11 @@ private void initCache() {
}

public void init() {
if (recovery()) {
isValid.set(true);
setAlive(true);
return;
}
long size = recentTransactionStore.size();
if (size != MAX_BLOCK_SIZE) {
// 0. load from persistentStore
Expand All @@ -129,6 +166,8 @@ public void init() {
logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.",
bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(),
System.currentTimeMillis() - start);
isValid.set(true);
setAlive(true);
}

@Override
Expand Down Expand Up @@ -172,7 +211,7 @@ public void put(byte[] key, byte[] value) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
}
bloomFilters[currentFilterIndex].put(key);

currentBlockNum = blockNum;
if (lastMetricBlock != blockNum) {
lastMetricBlock = blockNum;
Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE,
Expand Down Expand Up @@ -208,22 +247,138 @@ public Iterator<Entry<byte[], byte[]>> iterator() {
}

@Override
public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
public synchronized void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
isValid.set(false);
batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes()));
isValid.set(true);
}

@Override
public void close() {
reset();
if (!isAlive()) {
return;
}
dump();
bloomFilters[0] = null;
bloomFilters[1] = null;
persistentStore.close();
setAlive(false);
}

@Override
public void reset() {
}

private boolean recovery() {
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("recovery bloomFilters start.");
CompletableFuture<Boolean> loadProperties = CompletableFuture.supplyAsync(this::loadProperties);
CompletableFuture<Boolean> tk0 = loadProperties.thenApplyAsync(
v -> recovery(0, this.cacheFile0));
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync(
v -> recovery(1, this.cacheFile1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
}

private boolean recovery(int index, Path file) {
try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
logger.info("recovery bloomFilter[{}] from file.", index);
long start = System.currentTimeMillis();
bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel());
logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean handleException(Throwable e) {
bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
try {
Files.deleteIfExists(this.cacheFile0);
Files.deleteIfExists(this.cacheFile1);
} catch (Exception ignored) {

}
logger.info("recovery bloomFilters failed. {}", e.getMessage());
logger.info("rollback to previous mode.");
return false;
}

private void dump() {
if (!isValid.get()) {
logger.info("bloomFilters is not valid.");
}
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("dump bloomFilters start.");
CompletableFuture<Void> task0 = CompletableFuture.runAsync(
() -> dump(0, this.cacheFile0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(
() -> dump(1, this.cacheFile1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
}

private void dump(int index, Path file) {
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean loadProperties() {
try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream(
this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)),
StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.load(r);
filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock"));
currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum"));
currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex"));
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void writeProperties() {
try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock));
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum));
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex));
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,6 @@ public synchronized void disable() {

@Override
public void shutdown() {
logger.info("******** Begin to pop revokingDb. ********");
logger.info("******** Before revokingDb size: {}.", size);
checkTmpStore.close();
logger.info("******** End to pop revokingDb. ********");
if (pruneCheckpointThread != null) {
pruneCheckpointThread.shutdown();
}
Expand Down
Loading

0 comments on commit 4998589

Please sign in to comment.