Skip to content

Commit

Permalink
add verification of DL (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 authored May 22, 2020
1 parent 049069f commit 2d46340
Show file tree
Hide file tree
Showing 23 changed files with 559 additions and 146 deletions.
34 changes: 0 additions & 34 deletions benchmark-client/config.toml

This file was deleted.

This file was deleted.

33 changes: 33 additions & 0 deletions client-test/benchmark-config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[modules]
[modules.preprocessor]
name = "client.transfer.TransferPreparer"
path = "client-test/build/libs/client-test-all.jar"
[modules.processor]
name = "client.transfer.TransferProcessor"
path = "client-test/build/libs/client-test-all.jar"
[modules.postprocessor]
name = "client.transfer.TransferReporter"
path = "client-test/build/libs/client-test-all.jar"

[common]
concurrency = 1
run_for_sec = 200
ramp_for_sec = 10

[stats]
realtime_report_enabled = true

[test_config]
num_accounts = 100000
population_concurrency = 32

[contract]
population_contract_name = "contract.BatchCreate"
population_contract_path = "client-test/build/classes/java/main/contract/BatchCreate.class"
transfer_contract_name = "contract.Transfer"
transfer_contract_path = "client-test/build/classes/java/main/contract/Transfer.class"

[client_config]
dl_server = "localhost"
certificate = "client-test/sample-keys/certificate.pem"
private_key = "client-test/sample-keys/private-key.pem"
6 changes: 6 additions & 0 deletions benchmark-client/build.gradle → client-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ repositories {
dependencies {
compileOnly project(':kelpie')
implementation group: 'com.scalar-labs', name: 'scalardl-java-client-sdk', version: '2.0.5'
implementation group: 'com.scalar-labs', name: 'scalardb', version: '2.0.1'
implementation group: "io.github.resilience4j", name: "resilience4j-retry", version: "1.3.1"
}

shadowJar {
exclude 'contract/*'
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package benchmark.client;
package client;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.scalar.dl.client.config.ClientConfig;
import com.scalar.dl.client.service.ClientModule;
import com.scalar.dl.client.service.ClientService;
import com.scalar.kelpie.config.Config;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import java.time.Duration;
import java.util.Properties;

public class Common {
private static String HOST = "localhost";
private static String PORT = "50051";
private static ClientConfig config;
private static final String CERT_HOLDER_ID = "test_holder";
private static final int MAX_RETRIES = 10;
private static final Duration WAIT_DURATION = Duration.ofMillis(1000);
private static final long SLEEP_BASE_MILLIS = 100L;

public static final int INITIAL_BALANCE = 10000;

public static ClientConfig getClientConfig(Config config) {
String host = config.getUserString("client_config", "dl_server", HOST);
Expand All @@ -36,4 +45,26 @@ public static ClientService getClientService(Config config) {

return injector.getInstance(ClientService.class);
}

public static int getTotalInitialBalance(Config config) {
int numAccounts = (int) config.getUserLong("test_config", "num_accounts");

return INITIAL_BALANCE * numAccounts;
}

public static Retry getRetryWithFixedWaitDuration(String name) {
RetryConfig retryConfig =
RetryConfig.custom().maxAttempts(MAX_RETRIES).waitDuration(WAIT_DURATION).build();

return Retry.of(name, retryConfig);
}

public static Retry getRetryWithExponentialBackoff(String name) {
IntervalFunction intervalFunc = IntervalFunction.ofExponentialBackoff(SLEEP_BASE_MILLIS, 2.0);

RetryConfig retryConfig =
RetryConfig.custom().maxAttempts(MAX_RETRIES).intervalFunction(intervalFunc).build();

return Retry.of(name, retryConfig);
}
}
171 changes: 171 additions & 0 deletions client-test/src/main/java/client/transfer/TransferChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package client.transfer;

import client.Common;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.scalar.db.api.TransactionState;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.transaction.CoordinatorException;
import com.scalar.db.service.StorageModule;
import com.scalar.db.service.StorageService;
import com.scalar.db.transaction.consensuscommit.Coordinator;
import com.scalar.dl.client.service.ClientService;
import com.scalar.kelpie.config.Config;
import com.scalar.kelpie.exception.PostProcessException;
import com.scalar.kelpie.modules.PostProcessor;
import io.github.resilience4j.retry.Retry;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;

public class TransferChecker extends PostProcessor {

public TransferChecker(Config config) {
super(config);
}

@Override
public void execute() {
List<JsonObject> results = readBalancesWithRetry();

int committed = getNumOfCommittedFromCoordinator();

if (!isConsistent(results, committed)) {
throw new PostProcessException("Inconsistency happened!");
}
}

private List<JsonObject> readBalancesWithRetry() {
logInfo("reading latest assets...");

Retry retry = Common.getRetryWithExponentialBackoff("readBalances");
Supplier<List<JsonObject>> decorated = Retry.decorateSupplier(retry, this::readBalances);

try {
return decorated.get();
} catch (Exception e) {
throw new PostProcessException("Reading records failed repeatedly", e);
}
}

@Override
public void close() {}

private List<JsonObject> readBalances() {
int numAccounts = (int) config.getUserLong("test_config", "num_accounts");
List<JsonObject> results = new ArrayList<>();

boolean isFailed = false;
ClientService service = Common.getClientService(config);
String name = config.getUserString("contract", "balance_contract_name");

for (int i = 0; i < numAccounts; i++) {
try {
JsonObject argument =
Json.createObjectBuilder()
.add("asset_id", String.valueOf(i))
.add("nonce", UUID.randomUUID().toString())
.build();

JsonObject result = service.executeContract(name, argument).getResult().get();
results.add(result);
} catch (Exception e) {
// continue to read other records
isFailed = true;
}
}

if (isFailed) {
// for Retry
throw new RuntimeException("at least 1 record couldn't be read");
}

return results;
}

private int getNumOfCommittedFromCoordinator() {
Coordinator coordinator = getCoordinator();
Retry retry = Common.getRetryWithExponentialBackoff("checkCoordinator");
Function<String, Optional<Coordinator.State>> decorated =
Retry.decorateFunction(retry, id -> getState(coordinator, id));

JsonObject unknownTransactions = getPreviousState().getJsonObject("unknown_transaction");
int committed = 0;
for (String txId : unknownTransactions.keySet()) {
Optional<Coordinator.State> state;
try {
state = decorated.apply(txId);
} catch (Exception e) {
throw new PostProcessException("Reading the status failed repeatedly", e);
}
if (state.isPresent() && state.get().getState().equals(TransactionState.COMMITTED)) {
JsonArray ids = unknownTransactions.getJsonArray(txId);
logInfo(
"id: "
+ txId
+ " from: "
+ ids.getInt(0)
+ " to: "
+ ids.getInt(1)
+ " succeeded, not failed");
committed++;
}
}

return committed;
}

private Coordinator getCoordinator() {
Properties props = new Properties();
String contactPoints = config.getUserString("test_config", "contact_points");
props.setProperty("scalar.db.contact_points", contactPoints);
props.setProperty("scalar.db.username", "cassandra");
props.setProperty("scalar.db.password", "cassandra");

DatabaseConfig dbConfig = new DatabaseConfig(props);
Injector injector = Guice.createInjector(new StorageModule(dbConfig));
StorageService storage = injector.getInstance(StorageService.class);

return new Coordinator(storage);
}

private Optional<Coordinator.State> getState(Coordinator coordinator, String txId) {
try {
logInfo("reading the status of " + txId);

return coordinator.getState(txId);
} catch (CoordinatorException e) {
// convert the exception for Retry
throw new RuntimeException("Failed to read the state from the coordinator", e);
}
}

private boolean isConsistent(List<JsonObject> results, int committed) {
int totalVersion = results.stream().mapToInt(r -> r.getInt("age")).sum();
int totalBalance = results.stream().mapToInt(r -> r.getInt("balance")).sum();
int expectedTotalVersion = ((int) getStats().getSuccessCount() + committed) * 2;
int expectedTotalBalance = Common.getTotalInitialBalance(config);

logInfo("total version: " + totalVersion);
logInfo("expected total version: " + expectedTotalVersion);
logInfo("total balance: " + totalBalance);
logInfo("expected total balance: " + expectedTotalBalance);

if (totalVersion != expectedTotalVersion) {
logError("version mismatch !");
return false;
}
if (totalBalance != expectedTotalBalance) {
logError("balance mismatch !");
return false;
}
return true;
}
}
Loading

0 comments on commit 2d46340

Please sign in to comment.