Skip to content

Commit

Permalink
Merge pull request #245 from ydb-platform/operation_manager
Browse files Browse the repository at this point in the history
Updated implementation of the long operations
  • Loading branch information
alex268 authored Mar 26, 2024
2 parents 0b47bb5 + 3385446 commit 1d640da
Show file tree
Hide file tree
Showing 39 changed files with 1,615 additions and 1,096 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
- develop
- release*
pull_request:
type: [opened, reopened, edited]
type: [opened, reopened, edited, synchronize]

jobs:
build:
Expand Down Expand Up @@ -57,5 +57,5 @@ jobs:
run: mvn $MAVEN_ARGS test

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4

2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
- develop
- release*
pull_request:
type: [opened, reopened, edited]
type: [opened, reopened, edited, synchronize]

jobs:
build:
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<artifactId>ydb-sdk-topic</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-export</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-coordination</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.operation.OperationUtils;
import tech.ydb.core.operation.Operation;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.proto.coordination.AlterNodeRequest;
import tech.ydb.proto.coordination.CreateNodeRequest;
import tech.ydb.proto.coordination.DescribeNodeRequest;
Expand All @@ -39,6 +40,12 @@ private String validatePath(String path) {
return path.startsWith("/") ? path : rpc.getDatabase() + "/" + path;
}

private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
return GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
.build();
}

@Override
public CoordinationSession createSession(String path, CoordinationSessionSettings settings) {
return new SessionImpl(rpc, Clock.systemUTC(), validatePath(path), settings);
Expand All @@ -48,34 +55,34 @@ public CoordinationSession createSession(String path, CoordinationSessionSetting
public CompletableFuture<Status> createNode(String path, CoordinationNodeSettings settings) {
CreateNodeRequest request = CreateNodeRequest.newBuilder()
.setPath(validatePath(path))
.setOperationParams(OperationUtils.createParams(settings))
.setOperationParams(Operation.buildParams(settings))
.setConfig(settings.getConfig().toProto())
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
return rpc.createNode(request, grpcSettings);
}

@Override
public CompletableFuture<Status> alterNode(String path, CoordinationNodeSettings settings) {
AlterNodeRequest request = AlterNodeRequest.newBuilder()
.setPath(validatePath(path))
.setOperationParams(OperationUtils.createParams(settings))
.setOperationParams(Operation.buildParams(settings))
.setConfig(settings.getConfig().toProto())
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
return rpc.alterNode(request, grpcSettings);
}

@Override
public CompletableFuture<Status> dropNode(String path, DropCoordinationNodeSettings settings) {
DropNodeRequest request = DropNodeRequest.newBuilder()
.setPath(validatePath(path))
.setOperationParams(OperationUtils.createParams(settings))
.setOperationParams(Operation.buildParams(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
return rpc.dropNode(request, grpcSettings);
}

Expand All @@ -84,10 +91,10 @@ public CompletableFuture<Result<NodeConfig>> describeNode(String path,
DescribeCoordinationNodeSettings settings) {
DescribeNodeRequest request = DescribeNodeRequest.newBuilder()
.setPath(validatePath(path))
.setOperationParams(OperationUtils.createParams(settings))
.setOperationParams(Operation.buildParams(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
return rpc.describeNode(request, grpcSettings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.operation.OperationManager;
import tech.ydb.core.operation.OperationBinder;
import tech.ydb.proto.coordination.AlterNodeRequest;
import tech.ydb.proto.coordination.AlterNodeResponse;
import tech.ydb.proto.coordination.CreateNodeRequest;
Expand All @@ -28,17 +28,17 @@
* @author Kirill Kurdyukov
*/
class RpcImpl implements Rpc {
private static final Function<Result<CreateNodeResponse>, Status> CREATE_NODE_STATUS = OperationManager
.syncStatusUnwrapper(CreateNodeResponse::getOperation);
private static final Function<Result<CreateNodeResponse>, Status> CREATE_NODE_STATUS = OperationBinder
.bindSync(CreateNodeResponse::getOperation);

private static final Function<Result<AlterNodeResponse>, Status> ALTER_NODE_STATUS = OperationManager
.syncStatusUnwrapper(AlterNodeResponse::getOperation);
private static final Function<Result<AlterNodeResponse>, Status> ALTER_NODE_STATUS = OperationBinder
.bindSync(AlterNodeResponse::getOperation);

private static final Function<Result<DropNodeResponse>, Status> DROP_NODE_STATUS = OperationManager
.syncStatusUnwrapper(DropNodeResponse::getOperation);
private static final Function<Result<DropNodeResponse>, Status> DROP_NODE_STATUS = OperationBinder
.bindSync(DropNodeResponse::getOperation);

private static final Function<Result<DescribeNodeResponse>, Result<DescribeNodeResult>> DESCRIBE_NODE_RESULT =
OperationManager.syncResultUnwrapper(DescribeNodeResponse::getOperation, DescribeNodeResult.class);
OperationBinder.bindSync(DescribeNodeResponse::getOperation, DescribeNodeResult.class);

private final GrpcTransport transport;

Expand Down
42 changes: 0 additions & 42 deletions core/src/main/java/tech/ydb/core/StatusExtractor.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.impl.auth.GrpcAuthRpc;
import tech.ydb.core.operation.OperationManager;
import tech.ydb.core.operation.OperationBinder;
import tech.ydb.proto.auth.YdbAuth;
import tech.ydb.proto.auth.v1.AuthServiceGrpc;

Expand Down Expand Up @@ -105,7 +105,7 @@ private void tryLogin(CompletableFuture<Token> future) {
.build();

transport.unaryCall(AuthServiceGrpc.getLoginMethod(), grpcSettings, request)
.thenApply(OperationManager.syncResultUnwrapper(
.thenApply(OperationBinder.bindSync(
YdbAuth.LoginResponse::getOperation,
YdbAuth.LoginResult.class
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import tech.ydb.core.impl.auth.AuthCallOptions;
import tech.ydb.core.impl.pool.EndpointRecord;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.core.operation.OperationManager;
import tech.ydb.core.operation.OperationBinder;
import tech.ydb.proto.discovery.DiscoveryProtos;
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;

Expand Down Expand Up @@ -60,7 +60,7 @@ public CompletableFuture<Result<DiscoveryProtos.ListEndpointsResult>> listEndpoi

return transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request)
.whenComplete((res, ex) -> transport.close())
.thenApply(OperationManager.syncResultUnwrapper(
.thenApply(OperationBinder.bindSync(
DiscoveryProtos.ListEndpointsResponse::getOperation,
DiscoveryProtos.ListEndpointsResult.class
));
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/tech/ydb/core/operation/AsyncOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package tech.ydb.core.operation;

import java.util.concurrent.ScheduledExecutorService;

/**
*
* @author Aleksandr Gorshenin
*/
interface AsyncOperation<T> extends Operation<T> {
ScheduledExecutorService getScheduler();
}
61 changes: 61 additions & 0 deletions core/src/main/java/tech/ydb/core/operation/FailedOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package tech.ydb.core.operation;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import tech.ydb.core.Result;
import tech.ydb.core.Status;

/**
*
* @author Aleksandr Gorshenin
*/
class FailedOperation<T> implements Operation<T> {
private final T value;
private final Status status;

FailedOperation(T value, Status status) {
this.value = value;
this.status = status;
}

@Override
public String getId() {
return null;
}

@Override
public boolean isReady() {
return true;
}

@Override
public T getValue() {
return value;
}

@Override
public CompletableFuture<Status> cancel() {
return CompletableFuture.completedFuture(status);
}

@Override
public CompletableFuture<Status> forget() {
return CompletableFuture.completedFuture(status);
}

@Override
public CompletableFuture<Result<Boolean>> fetch() {
return CompletableFuture.completedFuture(Result.fail(status));
}

@Override
public <R> Operation<R> transform(Function<T, R> func) {
return new FailedOperation<>(func.apply(value), status);
}

@Override
public String toString() {
return "FailedOperation{status=" + status + "}";
}
}
69 changes: 41 additions & 28 deletions core/src/main/java/tech/ydb/core/operation/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,57 @@
import javax.annotation.Nullable;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.settings.OperationSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.OperationProtos;
import tech.ydb.proto.common.CommonProtos;

/**
* @author Kirill Kurdyukov
* @author Aleksandr Gorshenin
* @param <T> type of the operation result
*/
public class Operation<V> {
private final String operationId;
private final OperationManager operationManager;
private final CompletableFuture<Result<V>> resultFuture;

Operation(String operationId, OperationManager operationManager, CompletableFuture<Result<V>> resultFuture) {
this.operationId = operationId;
this.operationManager = operationManager;
this.resultFuture = resultFuture;
}
public interface Operation<T> {

public <T> Operation<T> transform(Function<V, T> transform) {
return new Operation<>(
this.operationId,
this.operationManager,
this.resultFuture.thenApply(result -> result.map(transform))
);
}
@Nullable
String getId();

boolean isReady();

@Nullable
public String getOperationId() {
return operationId;
}
T getValue();

public CompletableFuture<Result<V>> getResultFuture() {
return resultFuture;
}
CompletableFuture<Status> cancel();
CompletableFuture<Status> forget();

CompletableFuture<Result<Boolean>> fetch();

<R> Operation<R> transform(Function<T, R> mapper);

static OperationProtos.OperationParams buildParams(OperationSettings settings) {
OperationProtos.OperationParams.Builder builder = OperationProtos.OperationParams.newBuilder();

if (settings.getOperationTimeout() != null) {
builder.setOperationTimeout(ProtobufUtils.durationToProto(settings.getOperationTimeout()));
}
if (settings.getCancelTimeout() != null) {
builder.setCancelAfter(ProtobufUtils.durationToProto(settings.getCancelTimeout()));
}
if (settings.getReportCostInfo() != null) {
if (settings.getReportCostInfo()) {
builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.ENABLED);
} else {
builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.DISABLED);
}
}

public CompletableFuture<Result<V>> cancel() {
if (resultFuture.isDone()) {
return resultFuture;
if (settings.isAsyncMode()) {
builder.setOperationMode(OperationProtos.OperationParams.OperationMode.ASYNC);
} else {
builder.setOperationMode(OperationProtos.OperationParams.OperationMode.SYNC);
}

return operationManager.cancel(this)
.thenCompose(cancelOperationResponseResult -> getResultFuture());
return builder.build();
}
}
Loading

0 comments on commit 1d640da

Please sign in to comment.