Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

<perf>(xatransaction): Improve the xaTransaction list interface #596

Merged
merged 5 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void setPaths(Set<String> paths) {

public static class ListXATransactionsRequest {
private int size;
private String path;
private Map<String, Long> offsets = Collections.synchronizedMap(new HashMap<>());

public int getSize() {
Expand All @@ -75,6 +76,14 @@ public Map<String, Long> getOffsets() {
public void setOffsets(Map<String, Long> offsets) {
this.offsets = offsets;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}
}

@Override
Expand Down Expand Up @@ -229,6 +238,7 @@ public void handle(
host.getAccountManager().getAdminUA(),
xaRequest.getData().getOffsets(),
xaRequest.getData().getSize(),
xaRequest.getData().getPath(),
(exception, xaTransactionListResponse) -> {
if (logger.isDebugEnabled()) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,44 @@ public void asyncGetXATransaction(
}
}

private ListXAReduceCallback getListXACallback(
int count, Map<String, Long> offsets, int size, ListXATransactionsCallback callback) {
List<XAResponse.ChainErrorMessage> errors =
Collections.synchronizedList(new LinkedList<>());
Map<String, Long> nextOffsets = new ConcurrentHashMap<>(offsets);

return (chain, listXAResponse) -> {

// first time to list, reset offsets
if (nextOffsets.get(chain) == -1) {
nextOffsets.put(chain, listXAResponse.getTotal() - 1);
}

XATransactionListResponse response = new XATransactionListResponse();
if (Objects.nonNull(listXAResponse.getChainErrorMessage())) {
errors.add(listXAResponse.getChainErrorMessage());
if (!errors.isEmpty()) {
XAResponse xaResponse = new XAResponse();
xaResponse.setStatus(-1);
xaResponse.setChainErrorMessages(errors);
response.setXaResponse(xaResponse);
}
} else {
response.setXaList(listXAResponse.getXaTransactions());
// update offsets
Long nextOffset =
nextOffsets.get(chain) - listXAResponse.getXaTransactions().size();
nextOffsets.put(chain, nextOffset);
response.setNextOffsets(nextOffsets);
if (nextOffsets.get(chain) == -1) {
response.setFinished(true);
}
response.recoverUsername(accountManager);
}
callback.onResponse(null, response);
};
}

public interface ListXATransactionsCallback {
void onResponse(WeCrossException e, XATransactionListResponse xaTransactionListResponse);
}
Expand Down Expand Up @@ -728,6 +766,7 @@ public void asyncListXATransactions(
UniversalAccount ua,
Map<String, Long> offsets,
int size,
String chainPath,
ListXATransactionsCallback callback) {

try {
Expand All @@ -742,7 +781,17 @@ public void asyncListXATransactions(

XATransactionListResponse response = new XATransactionListResponse();

List<Path> chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet());
ListXAReduceCallback reduceCallback = null;
List<Path> chainPaths = new ArrayList<>();
if (Objects.isNull(chainPath) || chainPath.isEmpty()) {
chainPaths = setToList(zoneManager.getAllChainsInfo(false).keySet());
// has sort operation callback
reduceCallback = getListXAReduceCallback(offsets.size(), offsets, size, callback);
} else {
chainPaths.add(Path.decode(chainPath));
// Remove sort operation callback
reduceCallback = getListXACallback(offsets.size(), offsets, size, callback);
}

int chainNum = chainPaths.size();
if (chainNum == 0) {
Expand All @@ -761,8 +810,6 @@ public void asyncListXATransactions(
}
}

ListXAReduceCallback reduceCallback =
getListXAReduceCallback(offsets.size(), offsets, size, callback);
for (String chain : offsets.keySet()) {
if (!requireIgnore || offsets.get(chain) != -1L) {
asyncListXATransactions(
Expand Down
Loading