diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c9b1f291f..41105b801 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -7,7 +7,7 @@ on: - develop - release* pull_request: - type: [opened, reopened, edited] + type: [opened, reopened, edited, synchronize] jobs: build: @@ -57,5 +57,5 @@ jobs: run: mvn $MAVEN_ARGS test - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 603332eba..773cbac17 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -7,7 +7,7 @@ on: - develop - release* pull_request: - type: [opened, reopened, edited] + type: [opened, reopened, edited, synchronize] jobs: build: diff --git a/bom/pom.xml b/bom/pom.xml index 702d14540..2c5cee3f2 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -80,6 +80,11 @@ ydb-sdk-topic ${project.version} + + tech.ydb + ydb-sdk-export + ${project.version} + tech.ydb ydb-sdk-coordination diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/ClientImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/ClientImpl.java index f76a6750a..02aecd1f5 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/ClientImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/ClientImpl.java @@ -13,7 +13,8 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.operation.OperationUtils; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.proto.coordination.AlterNodeRequest; import tech.ydb.proto.coordination.CreateNodeRequest; import tech.ydb.proto.coordination.DescribeNodeRequest; @@ -39,6 +40,12 @@ private String validatePath(String path) { return path.startsWith("/") ? path : rpc.getDatabase() + "/" + path; } + private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + return GrpcRequestSettings.newBuilder() + .withDeadline(settings.getRequestTimeout()) + .build(); + } + @Override public CoordinationSession createSession(String path, CoordinationSessionSettings settings) { return new SessionImpl(rpc, Clock.systemUTC(), validatePath(path), settings); @@ -48,11 +55,11 @@ public CoordinationSession createSession(String path, CoordinationSessionSetting public CompletableFuture createNode(String path, CoordinationNodeSettings settings) { CreateNodeRequest request = CreateNodeRequest.newBuilder() .setPath(validatePath(path)) - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setConfig(settings.getConfig().toProto()) .build(); - GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); return rpc.createNode(request, grpcSettings); } @@ -60,11 +67,11 @@ public CompletableFuture createNode(String path, CoordinationNodeSetting public CompletableFuture alterNode(String path, CoordinationNodeSettings settings) { AlterNodeRequest request = AlterNodeRequest.newBuilder() .setPath(validatePath(path)) - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setConfig(settings.getConfig().toProto()) .build(); - GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); return rpc.alterNode(request, grpcSettings); } @@ -72,10 +79,10 @@ public CompletableFuture alterNode(String path, CoordinationNodeSettings public CompletableFuture dropNode(String path, DropCoordinationNodeSettings settings) { DropNodeRequest request = DropNodeRequest.newBuilder() .setPath(validatePath(path)) - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .build(); - GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); return rpc.dropNode(request, grpcSettings); } @@ -84,10 +91,10 @@ public CompletableFuture> describeNode(String path, DescribeCoordinationNodeSettings settings) { DescribeNodeRequest request = DescribeNodeRequest.newBuilder() .setPath(validatePath(path)) - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .build(); - GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); return rpc.describeNode(request, grpcSettings); } diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/RpcImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/RpcImpl.java index 15e0be74f..2f5004fd7 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/RpcImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/RpcImpl.java @@ -10,7 +10,7 @@ import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.proto.coordination.AlterNodeRequest; import tech.ydb.proto.coordination.AlterNodeResponse; import tech.ydb.proto.coordination.CreateNodeRequest; @@ -28,17 +28,17 @@ * @author Kirill Kurdyukov */ class RpcImpl implements Rpc { - private static final Function, Status> CREATE_NODE_STATUS = OperationManager - .syncStatusUnwrapper(CreateNodeResponse::getOperation); + private static final Function, Status> CREATE_NODE_STATUS = OperationBinder + .bindSync(CreateNodeResponse::getOperation); - private static final Function, Status> ALTER_NODE_STATUS = OperationManager - .syncStatusUnwrapper(AlterNodeResponse::getOperation); + private static final Function, Status> ALTER_NODE_STATUS = OperationBinder + .bindSync(AlterNodeResponse::getOperation); - private static final Function, Status> DROP_NODE_STATUS = OperationManager - .syncStatusUnwrapper(DropNodeResponse::getOperation); + private static final Function, Status> DROP_NODE_STATUS = OperationBinder + .bindSync(DropNodeResponse::getOperation); private static final Function, Result> DESCRIBE_NODE_RESULT = - OperationManager.syncResultUnwrapper(DescribeNodeResponse::getOperation, DescribeNodeResult.class); + OperationBinder.bindSync(DescribeNodeResponse::getOperation, DescribeNodeResult.class); private final GrpcTransport transport; diff --git a/core/src/main/java/tech/ydb/core/StatusExtractor.java b/core/src/main/java/tech/ydb/core/StatusExtractor.java deleted file mode 100644 index 0668f0626..000000000 --- a/core/src/main/java/tech/ydb/core/StatusExtractor.java +++ /dev/null @@ -1,42 +0,0 @@ -package tech.ydb.core; - -import java.util.List; -import java.util.function.Function; - -import tech.ydb.proto.StatusCodesProtos.StatusIds; -import tech.ydb.proto.YdbIssueMessage.IssueMessage; - -public class StatusExtractor implements Function, Result> { - private final Function statusCodeExtractor; - private final Function> issueListExtractor; - - public StatusExtractor(Function statusCodeExtractor, - Function> issueListExtractor) { - this.statusCodeExtractor = statusCodeExtractor; - this.issueListExtractor = issueListExtractor; - } - - public Function getStatusCodeExtractor() { - return statusCodeExtractor; - } - - public Function> getIssueListExtractor() { - return issueListExtractor; - } - - @Override - public Result apply(Result result) { - if (!result.isSuccess()) { - return result; - } - final Status status = Status.of( - StatusCode.fromProto(statusCodeExtractor.apply(result.getValue())), - result.getStatus().getConsumedRu(), - Issue.fromPb(issueListExtractor.apply(result.getValue())) - ); - if (!status.isSuccess()) { - return Result.fail(status); - } - return Result.success(result.getValue(), status); - } -} diff --git a/core/src/main/java/tech/ydb/core/auth/StaticCredentialsRpc.java b/core/src/main/java/tech/ydb/core/auth/StaticCredentialsRpc.java index f9c71018f..759c491a0 100644 --- a/core/src/main/java/tech/ydb/core/auth/StaticCredentialsRpc.java +++ b/core/src/main/java/tech/ydb/core/auth/StaticCredentialsRpc.java @@ -20,7 +20,7 @@ import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.impl.auth.GrpcAuthRpc; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.proto.auth.YdbAuth; import tech.ydb.proto.auth.v1.AuthServiceGrpc; @@ -105,7 +105,7 @@ private void tryLogin(CompletableFuture future) { .build(); transport.unaryCall(AuthServiceGrpc.getLoginMethod(), grpcSettings, request) - .thenApply(OperationManager.syncResultUnwrapper( + .thenApply(OperationBinder.bindSync( YdbAuth.LoginResponse::getOperation, YdbAuth.LoginResult.class )) diff --git a/core/src/main/java/tech/ydb/core/impl/discovery/GrpcDiscoveryRpc.java b/core/src/main/java/tech/ydb/core/impl/discovery/GrpcDiscoveryRpc.java index bc2355997..0bbb801b4 100644 --- a/core/src/main/java/tech/ydb/core/impl/discovery/GrpcDiscoveryRpc.java +++ b/core/src/main/java/tech/ydb/core/impl/discovery/GrpcDiscoveryRpc.java @@ -15,7 +15,7 @@ import tech.ydb.core.impl.auth.AuthCallOptions; import tech.ydb.core.impl.pool.EndpointRecord; import tech.ydb.core.impl.pool.ManagedChannelFactory; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.proto.discovery.DiscoveryProtos; import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc; @@ -60,7 +60,7 @@ public CompletableFuture> listEndpoi return transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request) .whenComplete((res, ex) -> transport.close()) - .thenApply(OperationManager.syncResultUnwrapper( + .thenApply(OperationBinder.bindSync( DiscoveryProtos.ListEndpointsResponse::getOperation, DiscoveryProtos.ListEndpointsResult.class )); diff --git a/core/src/main/java/tech/ydb/core/operation/AsyncOperation.java b/core/src/main/java/tech/ydb/core/operation/AsyncOperation.java new file mode 100644 index 000000000..8f9d1e53b --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/AsyncOperation.java @@ -0,0 +1,11 @@ +package tech.ydb.core.operation; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * + * @author Aleksandr Gorshenin + */ +interface AsyncOperation extends Operation { + ScheduledExecutorService getScheduler(); +} diff --git a/core/src/main/java/tech/ydb/core/operation/FailedOperation.java b/core/src/main/java/tech/ydb/core/operation/FailedOperation.java new file mode 100644 index 000000000..7e0a127a2 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/FailedOperation.java @@ -0,0 +1,61 @@ +package tech.ydb.core.operation; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; + +/** + * + * @author Aleksandr Gorshenin + */ +class FailedOperation implements Operation { + private final T value; + private final Status status; + + FailedOperation(T value, Status status) { + this.value = value; + this.status = status; + } + + @Override + public String getId() { + return null; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public T getValue() { + return value; + } + + @Override + public CompletableFuture cancel() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture forget() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture> fetch() { + return CompletableFuture.completedFuture(Result.fail(status)); + } + + @Override + public Operation transform(Function func) { + return new FailedOperation<>(func.apply(value), status); + } + + @Override + public String toString() { + return "FailedOperation{status=" + status + "}"; + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/Operation.java b/core/src/main/java/tech/ydb/core/operation/Operation.java index a10f170b1..49cbc146e 100644 --- a/core/src/main/java/tech/ydb/core/operation/Operation.java +++ b/core/src/main/java/tech/ydb/core/operation/Operation.java @@ -6,44 +6,57 @@ import javax.annotation.Nullable; import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.settings.OperationSettings; +import tech.ydb.core.utils.ProtobufUtils; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.common.CommonProtos; /** * @author Kirill Kurdyukov + * @author Aleksandr Gorshenin + * @param type of the operation result */ -public class Operation { - private final String operationId; - private final OperationManager operationManager; - private final CompletableFuture> resultFuture; - - Operation(String operationId, OperationManager operationManager, CompletableFuture> resultFuture) { - this.operationId = operationId; - this.operationManager = operationManager; - this.resultFuture = resultFuture; - } +public interface Operation { - public Operation transform(Function transform) { - return new Operation<>( - this.operationId, - this.operationManager, - this.resultFuture.thenApply(result -> result.map(transform)) - ); - } + @Nullable + String getId(); + + boolean isReady(); @Nullable - public String getOperationId() { - return operationId; - } + T getValue(); - public CompletableFuture> getResultFuture() { - return resultFuture; - } + CompletableFuture cancel(); + CompletableFuture forget(); + + CompletableFuture> fetch(); + + Operation transform(Function mapper); + + static OperationProtos.OperationParams buildParams(OperationSettings settings) { + OperationProtos.OperationParams.Builder builder = OperationProtos.OperationParams.newBuilder(); + + if (settings.getOperationTimeout() != null) { + builder.setOperationTimeout(ProtobufUtils.durationToProto(settings.getOperationTimeout())); + } + if (settings.getCancelTimeout() != null) { + builder.setCancelAfter(ProtobufUtils.durationToProto(settings.getCancelTimeout())); + } + if (settings.getReportCostInfo() != null) { + if (settings.getReportCostInfo()) { + builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.ENABLED); + } else { + builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.DISABLED); + } + } - public CompletableFuture> cancel() { - if (resultFuture.isDone()) { - return resultFuture; + if (settings.isAsyncMode()) { + builder.setOperationMode(OperationProtos.OperationParams.OperationMode.ASYNC); + } else { + builder.setOperationMode(OperationProtos.OperationParams.OperationMode.SYNC); } - return operationManager.cancel(this) - .thenCompose(cancelOperationResponseResult -> getResultFuture()); + return builder.build(); } } diff --git a/core/src/main/java/tech/ydb/core/operation/OperationBinder.java b/core/src/main/java/tech/ydb/core/operation/OperationBinder.java new file mode 100644 index 000000000..10262142e --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/OperationBinder.java @@ -0,0 +1,114 @@ +package tech.ydb.core.operation; + +import java.util.function.Function; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.OperationProtos; + +/** + * + * @author Aleksandr Gorshenin + */ +public class OperationBinder { + @VisibleForTesting + static final Status UNEXPECTED_ASYNC = Status.of(StatusCode.CLIENT_INTERNAL_ERROR) + .withIssues(Issue.of("Unexpected async result of operation", Issue.Severity.ERROR)); + + private OperationBinder() { } + + private static Status status(OperationProtos.Operation operation) { + StatusCode code = StatusCode.fromProto(operation.getStatus()); + Double consumedRu = null; + if (operation.hasCostInfo()) { + consumedRu = operation.getCostInfo().getConsumedUnits(); + } + + return Status.of(code, consumedRu, Issue.fromPb(operation.getIssuesList())); + } + + private static Result result(OperationProtos.Operation operation, Class resultClass) { + Status status = status(operation); + if (!status.isSuccess()) { + return Result.fail(status); + } + + try { + M resultMessage = operation.getResult().unpack(resultClass); + return Result.success(resultMessage, status); + } catch (InvalidProtocolBufferException ex) { + return Result.error("Can't unpack message " + resultClass.getName(), ex); + } + } + + public static Function, Result> bindSync( + Function method, Class resultClass + ) { + return (result) -> { + if (!result.isSuccess()) { + return result.map(null); + } + OperationProtos.Operation operation = method.apply(result.getValue()); + if (!operation.getReady()) { + return Result.fail(UNEXPECTED_ASYNC); + } + return result(operation, resultClass); + }; + } + + public static Function, Status> bindSync(Function method) { + return (result) -> { + if (!result.isSuccess()) { + return result.getStatus(); + } + OperationProtos.Operation operation = method.apply(result.getValue()); + if (!operation.getReady()) { + return UNEXPECTED_ASYNC; + } + return status(operation); + }; + } + + public static Function, Operation> bindAsync( + GrpcTransport transport, Function method + ) { + return (result) -> { + if (!result.isSuccess()) { + Status status = result.getStatus(); + return new FailedOperation<>(status, status); + } + + OperationProtos.Operation operation = method.apply(result.getValue()); + if (operation.getReady()) { + return new ReadyOperation<>(operation.getId(), status(operation)); + } + + return new OperationImpl<>(transport, operation.getId(), OperationBinder::status); + }; + } + + public static Function, Operation>> bindAsync( + GrpcTransport transport, Function method, Class resultClass + ) { + return (result) -> { + if (!result.isSuccess()) { + Status status = result.getStatus(); + return new FailedOperation<>(Result.fail(status), status); + } + + OperationProtos.Operation operation = method.apply(result.getValue()); + if (operation.getReady()) { + return new ReadyOperation<>(operation.getId(), result(operation, resultClass)); + } + + return new OperationImpl<>(transport, operation.getId(), o -> result(o, resultClass)); + }; + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/OperationImpl.java b/core/src/main/java/tech/ydb/core/operation/OperationImpl.java new file mode 100644 index 000000000..be0cbcad2 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/OperationImpl.java @@ -0,0 +1,183 @@ +package tech.ydb.core.operation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.operation.v1.OperationServiceGrpc; + +/** + * + * @author Aleksandr Gorshenin + */ +class OperationImpl implements AsyncOperation { + private static final StatusMapper CANCEL_OPERATION = StatusMapper.of( + OperationProtos.CancelOperationResponse::getStatus, + OperationProtos.CancelOperationResponse::getIssuesList + ); + + private static final StatusMapper FORGET_OPERATION = StatusMapper.of( + OperationProtos.ForgetOperationResponse::getStatus, + OperationProtos.ForgetOperationResponse::getIssuesList + ); + + private final GrpcTransport transport; + private final String id; + private final Function valueExtractor; + private volatile T value = null; + + OperationImpl(GrpcTransport transport, String id, Function extractor) { + this.transport = transport; + this.id = id; + this.valueExtractor = extractor; + } + + @Override + public ScheduledExecutorService getScheduler() { + return transport.getScheduler(); + } + + @Override + public String getId() { + return id; + } + + @Override + public boolean isReady() { + return value != null; + } + + @Override + public T getValue() { + return value; + } + + @Override + public String toString() { + return "AsyncOperation{id=" + id + ", ready=" + (value != null) + "}"; + } + + @Override + public CompletableFuture cancel() { + if (value != null) { + return CompletableFuture.completedFuture(ReadyOperation.ALREADY_DONE_STATUS); + } + + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder().build(); + OperationProtos.CancelOperationRequest request = OperationProtos.CancelOperationRequest.newBuilder() + .setId(id) + .build(); + + return transport + .unaryCall(OperationServiceGrpc.getCancelOperationMethod(), settings, request) + .thenApply(CANCEL_OPERATION); + } + + @Override + public CompletableFuture forget() { + if (value != null) { + return CompletableFuture.completedFuture(ReadyOperation.ALREADY_DONE_STATUS); + } + + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder().build(); + OperationProtos.ForgetOperationRequest request = OperationProtos.ForgetOperationRequest.newBuilder() + .setId(id) + .build(); + + return transport + .unaryCall(OperationServiceGrpc.getForgetOperationMethod(), settings, request) + .thenApply(FORGET_OPERATION); + } + + @Override + public CompletableFuture> fetch() { + if (value != null) { + return CompletableFuture.completedFuture(Result.success(Boolean.TRUE)); + } + + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder().build(); + OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest.newBuilder() + .setId(id) + .build(); + + return transport + .unaryCall(OperationServiceGrpc.getGetOperationMethod(), settings, request) + .thenApply(res -> res.map(this::handleOperation)); + } + + private boolean handleOperation(OperationProtos.GetOperationResponse resp) { + OperationProtos.Operation operation = resp.getOperation(); + if (!operation.getReady()) { + return false; + } + + this.value = valueExtractor.apply(operation); + return true; + } + + @Override + public Operation transform(Function mapper) { + return new Proxy<>(mapper); + } + + private class Proxy implements AsyncOperation { + private final Function mapper; + + Proxy(Function mapper) { + this.mapper = mapper; + } + + @Override + public ScheduledExecutorService getScheduler() { + return transport.getScheduler(); + } + + @Override + public String getId() { + return id; + } + + @Override + public boolean isReady() { + return value != null; + } + + @Override + public R getValue() { + if (value == null) { + return null; + } + return mapper.apply(value); + } + + @Override + public CompletableFuture cancel() { + return OperationImpl.this.cancel(); + } + + @Override + public CompletableFuture forget() { + return OperationImpl.this.forget(); + } + + @Override + public CompletableFuture> fetch() { + return OperationImpl.this.fetch(); + } + + @Override + public Operation transform(Function func) { + return new Proxy<>(mapper.andThen(func)); + } + + @Override + public String toString() { + return "ProxyAsyncOperation{id=" + id + ", ready=" + (value != null) + "}"; + } + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/OperationManager.java b/core/src/main/java/tech/ydb/core/operation/OperationManager.java deleted file mode 100644 index 708c1ebf8..000000000 --- a/core/src/main/java/tech/ydb/core/operation/OperationManager.java +++ /dev/null @@ -1,215 +0,0 @@ -package tech.ydb.core.operation; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import tech.ydb.core.Issue; -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.proto.OperationProtos; -import tech.ydb.proto.operation.v1.OperationServiceGrpc; - -/** - * @author Kirill Kurdyukov - */ -public final class OperationManager { - - private static final Logger logger = LoggerFactory.getLogger(OperationManager.class); - private static final Status ASYNC_ARE_UNSUPPORTED = Status.of(StatusCode.CLIENT_INTERNAL_ERROR) - .withIssues(Issue.of("Async operations are not supported", Issue.Severity.ERROR)); - private static final long OPERATION_CHECK_TIMEOUT_MS = 1_000; - - private final GrpcTransport grpcTransport; - private final ScheduledExecutorService scheduledExecutorService; - private final GrpcRequestSettings requestSettings = GrpcRequestSettings.newBuilder().build(); - - public OperationManager(GrpcTransport grpcTransport) { - this.grpcTransport = grpcTransport; - this.scheduledExecutorService = grpcTransport.getScheduler(); - } - - @VisibleForTesting - static Status status(OperationProtos.Operation operation) { - StatusCode code = StatusCode.fromProto(operation.getStatus()); - Double consumedRu = null; - if (operation.hasCostInfo()) { - consumedRu = operation.getCostInfo().getConsumedUnits(); - } - - return Status.of(code, consumedRu, Issue.fromPb(operation.getIssuesList())); - } - - public static Function, Result> syncResultUnwrapper( - Function operationExtractor, - Class resultClass - ) { - return (result) -> { - if (!result.isSuccess()) { - return result.map(null); - } - OperationProtos.Operation operation = operationExtractor.apply(result.getValue()); - if (operation.getReady()) { - Status status = status(operation); - if (!status.isSuccess()) { - return Result.fail(status); - } - - try { - M resultMessage = operation.getResult().unpack(resultClass); - return Result.success(resultMessage, status); - } catch (InvalidProtocolBufferException ex) { - return Result.error("Can't unpack message " + resultClass.getName(), ex); - } - } - return Result.fail(ASYNC_ARE_UNSUPPORTED); - }; - } - - public static Function, Status> syncStatusUnwrapper( - Function operationExtractor - ) { - return (result) -> { - if (!result.isSuccess()) { - return result.getStatus(); - } - - OperationProtos.Operation operation = operationExtractor.apply(result.getValue()); - if (operation.getReady()) { - return status(operation); - } - - return ASYNC_ARE_UNSUPPORTED; - }; - } - - public Function, Operation> operationUnwrapper( - Function operationExtractor, - Class resultClass - ) { - return (result) -> { - if (!result.isSuccess()) { - return new Operation<>( - null, - null, - CompletableFuture.completedFuture(result.map(null)) - ); - } - - OperationProtos.Operation operationProto = operationExtractor.apply(result.getValue()); - - Operation operation = new Operation<>(operationProto.getId(), this, new CompletableFuture<>()); - - completeOperation(operationProto, operation, resultClass); - - return operation; - }; - } - - private void completeOperation( - final OperationProtos.Operation operationProto, - final Operation operation, - final Class resultClass - ) { - if (operation.getResultFuture().isDone()) { - return; - } - - final Status status = status(operationProto); - - if (operationProto.getReady()) { - if (status.isSuccess()) { - try { - V unpackResult = operationProto.getResult().unpack(resultClass); - - operation.getResultFuture().complete(Result.success(unpackResult, status)); - } catch (InvalidProtocolBufferException ex) { - operation.getResultFuture().completeExceptionally(ex); - } - } else { - operation.getResultFuture().complete(Result.fail(status)); - } - - return; - } - - scheduledExecutorService.schedule( - () -> { - assert operation.getOperationId() != null; - if (operation.getResultFuture().isDone()) { - return; - } - - OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest - .newBuilder() - .setId(operation.getOperationId()) - .build(); - - grpcTransport.unaryCall( - OperationServiceGrpc.getGetOperationMethod(), - requestSettings, - request - ).whenComplete( - (getOperationResponseResult, throwable) -> { - if (throwable != null) { - operation.getResultFuture().completeExceptionally(throwable); - } else if (getOperationResponseResult != null) { - if (getOperationResponseResult.isSuccess()) { - completeOperation( - getOperationResponseResult.getValue().getOperation(), - operation, - resultClass - ); - } else { - operation.getResultFuture().complete( - getOperationResponseResult.map(null) - ); - } - } - } - ); - }, - OPERATION_CHECK_TIMEOUT_MS, - TimeUnit.MILLISECONDS - ); - } - - CompletableFuture> cancel( - final Operation operation - ) { - assert operation.getOperationId() != null; - return grpcTransport.unaryCall( - OperationServiceGrpc.getCancelOperationMethod(), - GrpcRequestSettings.newBuilder() - .build(), - OperationProtos.CancelOperationRequest.newBuilder() - .setId(operation.getOperationId()) - .build() - ).whenComplete( - (cancelOperationResponseResult, throwable) -> { - if (throwable != null) { - logger.error("Fail cancel polling operation with id: {}", - operation.getOperationId(), throwable); - } - - if (cancelOperationResponseResult.isSuccess()) { - logger.info("Success cancel polling operation with id: {}", operation.getOperationId()); - - operation.getResultFuture().complete(Result.fail(Status.of(StatusCode.CANCELLED))); - } else { - logger.error("Fail cancel polling operation with id: {}", operation.getOperationId()); - } - } - ); - } -} diff --git a/core/src/main/java/tech/ydb/core/operation/OperationTray.java b/core/src/main/java/tech/ydb/core/operation/OperationTray.java new file mode 100644 index 000000000..fef3be432 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/OperationTray.java @@ -0,0 +1,72 @@ +package tech.ydb.core.operation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Aleksandr Gorshenin + */ +public class OperationTray { + private static final Logger logger = LoggerFactory.getLogger(OperationTray.class); + + private OperationTray() { } + + public static CompletableFuture fetchOperation(Operation operation, int rateSeconds) { + CompletableFuture future = new CompletableFuture<>(); + if (operation.isReady()) { + logger.debug("{} is already done", operation); + future.complete(operation.getValue()); + return future; + } + + if (operation instanceof AsyncOperation) { + long started = System.currentTimeMillis(); + fetch(null, future, (AsyncOperation) operation, started, rateSeconds); + return future; + } + + logger.error("unknown type of {}", operation); + throw new IllegalArgumentException("Unknown type of operation"); + } + + private static boolean complete(Throwable th, CompletableFuture f, AsyncOperation o, long elapsed) { + if (th != null) { + logger.error("cannot fetch the operation {}, {} ms elapsed", o, elapsed, th); + f.completeExceptionally(th); + return false; + } + + if (!o.isReady()) { + return false; + } + + logger.info("{} is done, {} ms elapsed", o, elapsed); + f.complete(o.getValue()); + return true; + } + + private static void fetch(Throwable th, CompletableFuture f, AsyncOperation o, long started, int rs) { + long elapsed = System.currentTimeMillis() - started; + + if (complete(th, f, o, elapsed)) { + return; + } + + logger.info("fetch the operation {} update in {} seconds, {} ms elapsed", o, rs, elapsed); + o.fetch().whenComplete((res, th2) -> { + long elapsed2 = System.currentTimeMillis() - started; + if (complete(th2, f, o, elapsed2)) { + return; + } + + if (res != null) { + logger.info("got operation {} status {}, schedule next update in {} seconds", o, res, rs); + } + o.getScheduler().schedule(() -> fetch(th, f, o, started, rs), rs, TimeUnit.SECONDS); + }); + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/OperationUtils.java b/core/src/main/java/tech/ydb/core/operation/OperationUtils.java deleted file mode 100644 index e7d74f4a7..000000000 --- a/core/src/main/java/tech/ydb/core/operation/OperationUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -package tech.ydb.core.operation; - -import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.settings.BaseRequestSettings; -import tech.ydb.core.settings.OperationSettings; -import tech.ydb.core.utils.ProtobufUtils; -import tech.ydb.proto.OperationProtos; -import tech.ydb.proto.common.CommonProtos; - -/** - * @author Kirill Kurdyukov - */ -public class OperationUtils { - - private OperationUtils() { - } - - public static OperationProtos.OperationParams createParams(OperationSettings settings) { - OperationProtos.OperationParams.Builder builder = OperationProtos.OperationParams.newBuilder(); - - if (settings.getOperationTimeout() != null) { - builder.setOperationTimeout(ProtobufUtils.durationToProto(settings.getOperationTimeout())); - } - if (settings.getCancelTimeout() != null) { - builder.setOperationTimeout(ProtobufUtils.durationToProto(settings.getCancelTimeout())); - } - if (settings.getReportCostInfo() != null) { - if (settings.getReportCostInfo()) { - builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.ENABLED); - } else { - builder.setReportCostInfo(CommonProtos.FeatureFlag.Status.DISABLED); - } - } - - builder.setOperationMode(settings.getMode().toProto()); - - return builder.build(); - } - - public static GrpcRequestSettings createGrpcRequestSettings(BaseRequestSettings settings) { - return GrpcRequestSettings.newBuilder() - .withDeadline(settings.getRequestTimeout()) - .build(); - } -} diff --git a/core/src/main/java/tech/ydb/core/operation/ReadyOperation.java b/core/src/main/java/tech/ydb/core/operation/ReadyOperation.java new file mode 100644 index 000000000..6c109aef6 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/ReadyOperation.java @@ -0,0 +1,66 @@ +package tech.ydb.core.operation; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +/** + * + * @author Aleksandr Gorshenin + */ +class ReadyOperation implements Operation { + static final Status ALREADY_DONE_STATUS = Status.of(StatusCode.CLIENT_INTERNAL_ERROR) + .withIssues(Issue.of("Operation is already done", Issue.Severity.ERROR)); + + private final String id; + private final T value; + + ReadyOperation(String id, T value) { + this.id = id; + this.value = value; + } + + @Override + public String getId() { + return id; + } + + @Override + public T getValue() { + return value; + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public CompletableFuture cancel() { + return CompletableFuture.completedFuture(ALREADY_DONE_STATUS); + } + + @Override + public CompletableFuture forget() { + return CompletableFuture.completedFuture(ALREADY_DONE_STATUS); + } + + @Override + public CompletableFuture> fetch() { + return CompletableFuture.completedFuture(Result.success(Boolean.TRUE)); + } + + @Override + public Operation transform(Function func) { + return new ReadyOperation<>(id, func.apply(value)); + } + + @Override + public String toString() { + return "ReadyOperation{id=" + id + ", value=" + value + "}"; + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/StatusExtractor.java b/core/src/main/java/tech/ydb/core/operation/StatusExtractor.java new file mode 100644 index 000000000..0213215a7 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/StatusExtractor.java @@ -0,0 +1,44 @@ +package tech.ydb.core.operation; + +import java.util.List; +import java.util.function.Function; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.proto.StatusCodesProtos.StatusIds; +import tech.ydb.proto.YdbIssueMessage.IssueMessage; + +public class StatusExtractor implements Function, Result> { + private final Function statusMethod; + private final Function> issuesMethod; + + private StatusExtractor(Function status, Function> issues) { + this.statusMethod = status; + this.issuesMethod = issues; + } + + @Override + public Result apply(Result result) { + if (!result.isSuccess()) { + return result; + } + + R resp = result.getValue(); + Status status = Status.of( + StatusCode.fromProto(statusMethod.apply(resp)), + result.getStatus().getConsumedRu(), + Issue.fromPb(issuesMethod.apply(resp)) + ); + + return status.isSuccess() ? Result.success(resp, status) : Result.fail(status); + } + + public static StatusExtractor of( + Function statusMethod, + Function> issuerMethod + ) { + return new StatusExtractor<>(statusMethod, issuerMethod); + } +} diff --git a/core/src/main/java/tech/ydb/core/operation/StatusMapper.java b/core/src/main/java/tech/ydb/core/operation/StatusMapper.java new file mode 100644 index 000000000..ee4008e66 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/operation/StatusMapper.java @@ -0,0 +1,42 @@ +package tech.ydb.core.operation; + +import java.util.List; +import java.util.function.Function; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.proto.StatusCodesProtos.StatusIds; +import tech.ydb.proto.YdbIssueMessage.IssueMessage; + +public class StatusMapper implements Function, Status> { + private final Function statusMethod; + private final Function> issuesMethod; + + private StatusMapper(Function status, Function> issues) { + this.statusMethod = status; + this.issuesMethod = issues; + } + + @Override + public Status apply(Result result) { + if (!result.isSuccess()) { + return result.getStatus(); + } + + R resp = result.getValue(); + return Status.of( + StatusCode.fromProto(statusMethod.apply(resp)), + result.getStatus().getConsumedRu(), + Issue.fromPb(issuesMethod.apply(resp)) + ); + } + + public static StatusMapper of( + Function statusMethod, + Function> issuesMethod + ) { + return new StatusMapper<>(statusMethod, issuesMethod); + } +} diff --git a/core/src/main/java/tech/ydb/core/settings/LongOperationSettings.java b/core/src/main/java/tech/ydb/core/settings/LongOperationSettings.java deleted file mode 100644 index f290727cf..000000000 --- a/core/src/main/java/tech/ydb/core/settings/LongOperationSettings.java +++ /dev/null @@ -1,36 +0,0 @@ -package tech.ydb.core.settings; - -/** - * @author Kirill Kurdyukov - */ -public class LongOperationSettings extends OperationSettings { - - private final Mode mode; - - public LongOperationSettings(LongOperationBuilder builder) { - super(builder); - - this.mode = builder.mode; - } - - @Override - public Mode getMode() { - return mode; - } - - public static class LongOperationBuilder> extends OperationBuilder { - - private Mode mode = Mode.ASYNC; - - public Self setOperationMode(Mode mode) { - this.mode = mode; - - return self(); - } - - @Override - public OperationSettings build() { - return new OperationSettings(this); - } - } -} diff --git a/core/src/main/java/tech/ydb/core/settings/OperationSettings.java b/core/src/main/java/tech/ydb/core/settings/OperationSettings.java index 721c8da9c..8face6671 100644 --- a/core/src/main/java/tech/ydb/core/settings/OperationSettings.java +++ b/core/src/main/java/tech/ydb/core/settings/OperationSettings.java @@ -3,8 +3,6 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; -import tech.ydb.proto.OperationProtos; - /** * * @author Aleksandr Gorshenin @@ -13,12 +11,14 @@ public class OperationSettings extends BaseRequestSettings { private final Duration operationTimeout; private final Duration cancelTimeout; private final Boolean reportCostInfo; + private final boolean isAsyncMode; protected OperationSettings(OperationBuilder builder) { super(builder); this.operationTimeout = builder.operationTimeout; this.cancelTimeout = builder.cancelTimeout; this.reportCostInfo = builder.reportCostInfo; + this.isAsyncMode = builder.isAsyncMode; } public Duration getOperationTimeout() { @@ -33,14 +33,15 @@ public Boolean getReportCostInfo() { return reportCostInfo; } - public Mode getMode() { - return Mode.SYNC; + public boolean isAsyncMode() { + return isAsyncMode; } public static class OperationBuilder> extends BaseBuilder { private Duration operationTimeout = null; private Duration cancelTimeout = null; private Boolean reportCostInfo = null; + private boolean isAsyncMode = false; public Self withOperationTimeout(Duration duration) { this.operationTimeout = duration; @@ -67,24 +68,14 @@ public Self withReportCostInfo(Boolean report) { return self(); } + public Self withAsyncMode(boolean isAsyncOperation) { + this.isAsyncMode = isAsyncOperation; + return self(); + } + @Override public OperationSettings build() { return new OperationSettings(this); } } - - public enum Mode { - SYNC, ASYNC; - - public OperationProtos.OperationParams.OperationMode toProto() { - switch (this) { - case SYNC: - return OperationProtos.OperationParams.OperationMode.SYNC; - case ASYNC: - return OperationProtos.OperationParams.OperationMode.ASYNC; - default: - throw new RuntimeException("Unsupported operation mode"); - } - } - } } diff --git a/core/src/test/java/tech/ydb/core/operation/OperationBinderTest.java b/core/src/test/java/tech/ydb/core/operation/OperationBinderTest.java new file mode 100644 index 000000000..aec722f9f --- /dev/null +++ b/core/src/test/java/tech/ydb/core/operation/OperationBinderTest.java @@ -0,0 +1,396 @@ +package tech.ydb.core.operation; + +import com.google.protobuf.Any; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.YdbIssueMessage.IssueMessage; +import tech.ydb.proto.common.CommonProtos; +import tech.ydb.proto.table.YdbTable; + +/** + * @author Kirill Kurdyukov + */ +public class OperationBinderTest { + private static final IssueMessage TEST_ISSUE_MESSAGE = IssueMessage.newBuilder() + .setIssueCode(12345) + .setSeverity(Issue.Severity.INFO.getCode()) + .setMessage("some-issue") + .build(); + + private static final Issue TEST_ISSUE = Issue.of(12345, "some-issue", Issue.Severity.INFO); + + private static final Status NOT_FOUND = Status.of(StatusCode.NOT_FOUND) + .withIssues(Issue.of("not-found", Issue.Severity.ERROR)); + + private final GrpcTransport mocked = Mockito.mock(GrpcTransport.class); + + @Test + public void syncStatusBinderTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setCostInfo(CommonProtos.CostInfo.newBuilder().setConsumedUnits(15d).build()) + .build()) + .build(); + + Status status = OperationBinder + .bindSync(YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertEquals(StatusCode.SUCCESS, status.getCode()); + Assert.assertTrue(status.hasConsumedRu()); + Assert.assertEquals(Double.valueOf(15d), status.getConsumedRu()); + Assert.assertEquals(0, status.getIssues().length); + } + + @Test + public void syncStatusBinderFailTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.NOT_FOUND) + .setId("errored-id") + .setReady(true) + .addIssues(TEST_ISSUE_MESSAGE) + .build()) + .build(); + + Status status = OperationBinder + .bindSync(YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertEquals(StatusCode.NOT_FOUND, status.getCode()); + Assert.assertFalse(status.hasConsumedRu()); + Assert.assertArrayEquals(new Issue[] { TEST_ISSUE }, status.getIssues()); + } + + @Test + public void syncStatusBinderErrorTest() { + Status failed = OperationBinder + .bindSync(YdbTable.AlterTableResponse::getOperation) + .apply(Result.fail(NOT_FOUND)); + Assert.assertEquals(NOT_FOUND, failed); + } + + @Test + public void syncStatusBinderNotReadyTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setId("not-ready-id") + .setReady(false) + .build() + ).build(); + + Status status = OperationBinder + .bindSync(YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertEquals(OperationBinder.UNEXPECTED_ASYNC, status); + } + + @Test + public void syncResultBinderTest() { + Any data = Any.pack(YdbTable.ExplainQueryResult.newBuilder().setQueryPlan("plan").build()); + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation(OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setCostInfo(CommonProtos.CostInfo.newBuilder().setConsumedUnits(15d).build()) + .setResult(data) + .build() + ).build(); + + Result result = OperationBinder + .bindSync(YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + + Assert.assertTrue(result.isSuccess()); + Assert.assertTrue(result.getStatus().hasConsumedRu()); + Assert.assertEquals(Double.valueOf(15d), result.getStatus().getConsumedRu()); + Assert.assertEquals(0, result.getStatus().getIssues().length); + } + + @Test + public void syncResultBinderFailTest() { + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.NOT_FOUND) + .setId("error-id") + .setReady(true) + .addIssues(TEST_ISSUE_MESSAGE) + .build()) + .build(); + + Result result = OperationBinder + .bindSync(YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + + Assert.assertEquals(StatusCode.NOT_FOUND, result.getStatus().getCode()); + Assert.assertFalse(result.getStatus().hasConsumedRu()); + Assert.assertArrayEquals(new Issue[] { TEST_ISSUE }, result.getStatus().getIssues()); + } + + @Test + public void syncResultBinderNotReadyTest() { + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setId("not-ready-id") + .setReady(false) + .build() + ).build(); + + Result result = OperationBinder + .bindSync(YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + + Assert.assertFalse(result.isSuccess()); + Assert.assertEquals(OperationBinder.UNEXPECTED_ASYNC, result.getStatus()); + } + + @Test + public void syncResultBinderErrorTest() { + Result result = OperationBinder + .bindSync(YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.fail(NOT_FOUND)); + + Assert.assertFalse(result.isSuccess()); + Assert.assertEquals(NOT_FOUND, result.getStatus()); + } + + @Test + public void syncResultBinderMixClassesTest() { + Any data = Any.pack(YdbTable.ExplainQueryResult.newBuilder().setQueryPlan("plan").build()); + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation(OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setCostInfo(CommonProtos.CostInfo.newBuilder().setConsumedUnits(15d).build()) + .setResult(data) + .build() + ).build(); + + final Result result = OperationBinder + .bindSync(YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExecuteQueryResult.class) + .apply(Result.success(response)); + + Assert.assertFalse(result.isSuccess()); + Assert.assertEquals(StatusCode.CLIENT_INTERNAL_ERROR, result.getStatus().getCode()); + UnexpectedResultException ex = Assert.assertThrows(UnexpectedResultException.class, () -> result.getValue()); + Assert.assertEquals( + "Can't unpack message tech.ydb.proto.table.YdbTable$ExecuteQueryResult, code: CLIENT_INTERNAL_ERROR", + ex.getMessage() + ); + } + + @Test + public void asyncStatusBinderTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setCostInfo(CommonProtos.CostInfo.newBuilder().setConsumedUnits(15d).build()) + .build()) + .build(); + + Operation operation = OperationBinder + .bindAsync(mocked, YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("ready-id", operation.getId()); + + Status status = operation.getValue(); + Assert.assertNotNull(status); + Assert.assertEquals(StatusCode.SUCCESS, status.getCode()); + Assert.assertTrue(status.hasConsumedRu()); + Assert.assertEquals(Double.valueOf(15d), status.getConsumedRu()); + Assert.assertEquals(0, status.getIssues().length); + + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.cancel().join()); + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.forget().join()); + Assert.assertTrue(operation.fetch().join().getValue()); + } + + @Test + public void asyncStatusBinderFailTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.NOT_FOUND) + .setId("error-id") + .setReady(true) + .addIssues(TEST_ISSUE_MESSAGE) + .build()) + .build(); + Status error = Status.of(StatusCode.NOT_FOUND, null, TEST_ISSUE); + + Operation operation = OperationBinder + .bindAsync(mocked, YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("error-id", operation.getId()); + + Status status = operation.getValue(); + Assert.assertNotNull(status); + Assert.assertEquals(error, status); + + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.cancel().join()); + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.forget().join()); + Assert.assertTrue(operation.fetch().join().getValue()); + } + + @Test + public void asyncStatusBinderErrorTest() { + Status error = Status.of(StatusCode.NOT_FOUND, null, TEST_ISSUE); + + Operation operation = OperationBinder + .bindAsync(mocked, YdbTable.AlterTableResponse::getOperation) + .apply(Result.fail(error)); + + Assert.assertTrue(operation.isReady()); + Assert.assertNull(operation.getId()); + + Assert.assertEquals(error, operation.getValue()); + + Assert.assertEquals(error, operation.cancel().join()); + Assert.assertEquals(error, operation.forget().join()); + Assert.assertEquals(error, operation.fetch().join().getStatus()); + } + + @Test + public void asyncStatusBinderNotReadyTest() { + YdbTable.AlterTableResponse response = YdbTable.AlterTableResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setId("not-ready-id") + .setReady(false) + .build() + ).build(); + + Operation operation = OperationBinder + .bindAsync(mocked, YdbTable.AlterTableResponse::getOperation) + .apply(Result.success(response)); + + Assert.assertFalse(operation.isReady()); + Assert.assertEquals("not-ready-id", operation.getId()); + } + + @Test + public void asyncResultBinderTest() { + Any data = Any.pack(YdbTable.ExplainQueryResult.newBuilder().setQueryPlan("plan").build()); + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation(OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setCostInfo(CommonProtos.CostInfo.newBuilder().setConsumedUnits(15d).build()) + .setResult(data) + .build() + ).build(); + + Operation> operation = OperationBinder + .bindAsync(mocked, YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("ready-id", operation.getId()); + + Result result = operation.getValue(); + Assert.assertNotNull(result); + Assert.assertEquals(StatusCode.SUCCESS, result.getStatus().getCode()); + Assert.assertTrue(result.getStatus().hasConsumedRu()); + Assert.assertEquals(Double.valueOf(15d), result.getStatus().getConsumedRu()); + Assert.assertEquals(0, result.getStatus().getIssues().length); + + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.cancel().join()); + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.forget().join()); + Assert.assertTrue(operation.fetch().join().getValue()); + } + + @Test + public void asyncResultBinderFailTest() { + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.NOT_FOUND) + .setId("error-id") + .setReady(true) + .addIssues(TEST_ISSUE_MESSAGE) + .build()) + .build(); + + Operation> operation = OperationBinder + .bindAsync(mocked, YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + Status error = Status.of(StatusCode.NOT_FOUND, null, TEST_ISSUE); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("error-id", operation.getId()); + + Result result = operation.getValue(); + Assert.assertNotNull(result); + Assert.assertEquals(error, result.getStatus()); + + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.cancel().join()); + Assert.assertEquals(ReadyOperation.ALREADY_DONE_STATUS, operation.forget().join()); + Assert.assertTrue(operation.fetch().join().getValue()); + } + + @Test + public void asyncResultBinderErrorTest() { + Status error = Status.of(StatusCode.BAD_SESSION, null, TEST_ISSUE, TEST_ISSUE); + Operation> operation = OperationBinder + .bindAsync(mocked, YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.fail(error)); + + Assert.assertTrue(operation.isReady()); + Assert.assertNull(operation.getId()); + + Result result = operation.getValue(); + Assert.assertNotNull(result); + Assert.assertEquals(error, result.getStatus()); + + Assert.assertEquals(error, operation.cancel().join()); + Assert.assertEquals(error, operation.forget().join()); + Assert.assertEquals(error, operation.fetch().join().getStatus()); + } + + @Test + public void asyncResultBinderNotReadyTest() { + YdbTable.ExplainDataQueryResponse response = YdbTable.ExplainDataQueryResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setId("not-ready-id") + .setReady(false) + .build() + ).build(); + + Operation> operation = OperationBinder + .bindAsync(mocked, YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class) + .apply(Result.success(response)); + + Assert.assertFalse(operation.isReady()); + Assert.assertEquals("not-ready-id", operation.getId()); + } +} diff --git a/core/src/test/java/tech/ydb/core/operation/OperationManagerTest.java b/core/src/test/java/tech/ydb/core/operation/OperationManagerTest.java deleted file mode 100644 index 9e0b09afe..000000000 --- a/core/src/test/java/tech/ydb/core/operation/OperationManagerTest.java +++ /dev/null @@ -1,357 +0,0 @@ -package tech.ydb.core.operation; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import com.google.protobuf.Any; -import org.apache.logging.log4j.core.config.CronScheduledFuture; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import tech.ydb.core.Issue; -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.proto.OperationProtos; -import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.proto.StatusCodesProtos.StatusIds; -import tech.ydb.proto.YdbIssueMessage.IssueMessage; -import tech.ydb.proto.operation.v1.OperationServiceGrpc; -import tech.ydb.proto.table.YdbTable; -import tech.ydb.proto.table.v1.TableServiceGrpc; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; - -/** - * @author Kirill Kurdyukov - */ -public class OperationManagerTest { - - private static final String OPERATION_ID = "123"; - - private final GrpcTransport transport = Mockito.mock(GrpcTransport.class); - private final ScheduledExecutorServiceTest scheduledExecutorServiceTest = new ScheduledExecutorServiceTest(); - - private OperationManager operationManager; - - @Before - public void before() { - when(transport.getScheduler()).thenReturn(scheduledExecutorServiceTest); - - operationManager = new OperationManager(transport); - } - - @Test - public void successWithoutIssues() { - Status s = OperationManager.status(OperationProtos.Operation.newBuilder() - .setStatus(StatusIds.StatusCode.SUCCESS) - .setId("some-id") - .setReady(true) - .build()); - - assertSame(Status.SUCCESS, s); - assertEquals(0, s.getIssues().length); - } - - @Test - public void successWithIssues() { - Status s = OperationManager.status(OperationProtos.Operation.newBuilder() - .setStatus(StatusIds.StatusCode.SUCCESS) - .setId("some-id") - .setReady(true) - .addIssues(IssueMessage.newBuilder() - .setIssueCode(12345) - .setSeverity(Issue.Severity.INFO.getCode()) - .setMessage("some-issue") - .build()) - .build()); - - assertTrue(s.isSuccess()); - assertArrayEquals(new Issue[]{ - Issue.of(12345, "some-issue", Issue.Severity.INFO) - }, s.getIssues()); - } - - @Test - public void completeSuccessOperation() { - mockExplainDataQueryMethodTransport(createResult(true, StatusCodesProtos.StatusIds.StatusCode.SUCCESS)); - - checkSuccessOperation(resultUnwrap()); - } - - @Test - public void completeFailOperation() { - mockExplainDataQueryMethodTransport(createResult(true, - StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION)); - - Result result = resultUnwrap().join(); - - Assert.assertEquals(Status.of(StatusCode.BAD_SESSION), result.getStatus()); - } - - @Test - public void failUnwrapOperation() { - mockExplainDataQueryMethodTransport(Result.fail(Status.of(StatusCode.BAD_SESSION))); - - Result result = resultUnwrap().join(); - - Assert.assertEquals(Status.of(StatusCode.BAD_SESSION), result.getStatus()); - } - - @Test - public void completeSuccessPollingOperation() { - mockExplainDataQueryMethodTransport(createResult(false, - StatusCodesProtos.StatusIds.StatusCode.SUCCESS)); - - CompletableFuture> resultCompletableFuture = resultUnwrap(); - - Assert.assertFalse(resultCompletableFuture.isDone()); - - mockGetOperationMethodTransport(StatusCodesProtos.StatusIds.StatusCode.SUCCESS); - scheduledExecutorServiceTest.execCommand(); - - checkSuccessOperation(resultCompletableFuture); - } - - @Test - public void completeFailPollingOperation() { - mockExplainDataQueryMethodTransport(createResult(false, - StatusCodesProtos.StatusIds.StatusCode.SUCCESS)); - - CompletableFuture> resultCompletableFuture = resultUnwrap(); - - Assert.assertFalse(resultCompletableFuture.isDone()); - - mockGetOperationMethodTransport(StatusCodesProtos.StatusIds.StatusCode.BAD_REQUEST); - scheduledExecutorServiceTest.execCommand(); - - Assert.assertEquals(Status.of(StatusCode.BAD_REQUEST), resultCompletableFuture.join().getStatus()); - } - - @Test - public void cancelPollingOperation() { - mockExplainDataQueryMethodTransport(createResult(false, StatusCodesProtos.StatusIds.StatusCode.SUCCESS)); - - Operation resultCompletableFuture = operationUnwrap(); - - mockCancelOperationMethodTransport( - Result.success( - OperationProtos.CancelOperationResponse - .getDefaultInstance() - ) - ); - resultCompletableFuture.cancel(); - - mockGetOperationMethodTransport(StatusCodesProtos.StatusIds.StatusCode.SUCCESS); - scheduledExecutorServiceTest.execCommand(); - - Result result = resultCompletableFuture.getResultFuture().join(); - - Assert.assertEquals(Status.of(StatusCode.CANCELLED), result.getStatus()); - } - - @Test - public void failCancelThenSuccessPollingOperation() { - mockExplainDataQueryMethodTransport(createResult(false, StatusCodesProtos.StatusIds.StatusCode.SUCCESS)); - - Operation resultCompletableFuture = operationUnwrap(); - - mockCancelOperationMethodTransport(Result.fail(Status.of(StatusCode.BAD_SESSION))); - resultCompletableFuture.cancel(); - - mockGetOperationMethodTransport(StatusCodesProtos.StatusIds.StatusCode.SUCCESS); - scheduledExecutorServiceTest.execCommand(); - - checkSuccessOperation(resultCompletableFuture.getResultFuture()); - } - - private Operation operationUnwrap() { - return transport - .unaryCall( - TableServiceGrpc.getExplainDataQueryMethod(), - GrpcRequestSettings.newBuilder().build(), - YdbTable.ExplainDataQueryRequest.getDefaultInstance() - ) - .thenApply(operationManager - .operationUnwrapper( - YdbTable.ExplainDataQueryResponse::getOperation, - YdbTable.ExplainQueryResult.class - ) - ).thenApply( - t -> t.transform(explainQueryResult -> new TestTransform(explainQueryResult.getQueryPlan())) - ).join(); - } - - private static class TestTransform { - - final String plan; - - public TestTransform(String plan) { - this.plan = plan; - } - } - - private static void checkSuccessOperation( - CompletableFuture> resultCompletableFuture - ) { - Result result = resultCompletableFuture.join(); - - Assert.assertTrue(result.isSuccess()); - Assert.assertEquals("Hello", result.getValue().plan); - } - - private void mockCancelOperationMethodTransport(Result responseResult) { - when(transport - .unaryCall( - eq(OperationServiceGrpc.getCancelOperationMethod()), - any(GrpcRequestSettings.class), - eq( - OperationProtos.CancelOperationRequest - .newBuilder() - .setId(OPERATION_ID) - .build() - ) - ) - ).thenReturn( - CompletableFuture - .completedFuture(responseResult) - ); - } - - private void mockGetOperationMethodTransport(StatusCodesProtos.StatusIds.StatusCode statusCode) { - when(transport - .unaryCall( - eq(OperationServiceGrpc.getGetOperationMethod()), - any(GrpcRequestSettings.class), - eq( - OperationProtos.GetOperationRequest - .newBuilder() - .setId(OPERATION_ID) - .build() - ) - ) - ).thenReturn(CompletableFuture - .completedFuture( - Result.success( - OperationProtos.GetOperationResponse.newBuilder() - .setOperation(createOperation(true, statusCode)) - .build() - ) - ) - ); - } - - private CompletableFuture> resultUnwrap() { - return transport - .unaryCall( - TableServiceGrpc.getExplainDataQueryMethod(), - GrpcRequestSettings.newBuilder().build(), - YdbTable.ExplainDataQueryRequest.getDefaultInstance() - ) - .thenApply( - operationManager - .operationUnwrapper( - YdbTable.ExplainDataQueryResponse::getOperation, - YdbTable.ExplainQueryResult.class - ) - ) - .thenApply(t -> t.transform(explainQueryResult -> new TestTransform(explainQueryResult.getQueryPlan()))) - .thenCompose(Operation::getResultFuture); - } - - private static Result createResult( - boolean ready, - StatusCodesProtos.StatusIds.StatusCode statusCode - ) { - return Result.success(YdbTable.ExplainDataQueryResponse - .newBuilder() - .setOperation( - createOperation( - ready, - statusCode - ) - ) - .build() - ); - } - - private static OperationProtos.Operation createOperation( - boolean ready, - StatusCodesProtos.StatusIds.StatusCode statusCode - ) { - return OperationProtos.Operation - .newBuilder() - .setReady(ready) - .setId(OPERATION_ID) - .setStatus(statusCode) - .setResult( - Any.pack(YdbTable.ExplainQueryResult.newBuilder() - .setQueryPlan("Hello") - .build()) - ) - .build(); - } - - private void mockExplainDataQueryMethodTransport(Result responseResult) { - when(transport - .unaryCall( - eq(TableServiceGrpc.getExplainDataQueryMethod()), - any(GrpcRequestSettings.class), - any(YdbTable.ExplainDataQueryRequest.class) - ) - ).thenReturn(CompletableFuture - .completedFuture(responseResult) - ); - } - - private static class ScheduledExecutorServiceTest extends ScheduledThreadPoolExecutor { - private Runnable command; - - public ScheduledExecutorServiceTest() { - super(0); - } - - public ScheduledExecutorServiceTest(int corePoolSize) { - super(corePoolSize); - } - - public ScheduledExecutorServiceTest(int corePoolSize, ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); - } - - public ScheduledExecutorServiceTest(int corePoolSize, RejectedExecutionHandler handler) { - super(corePoolSize, handler); - } - - public ScheduledExecutorServiceTest(int corePoolSize, ThreadFactory threadFactory, - RejectedExecutionHandler handler) { - super(corePoolSize, threadFactory, handler); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - this.command = command; - - // unused scheduled future - return new CronScheduledFuture<>(null, null); - } - - public void execCommand() { - command.run(); - } - } -} diff --git a/core/src/test/java/tech/ydb/core/operation/OperationSettingsTest.java b/core/src/test/java/tech/ydb/core/operation/OperationSettingsTest.java new file mode 100644 index 000000000..003580447 --- /dev/null +++ b/core/src/test/java/tech/ydb/core/operation/OperationSettingsTest.java @@ -0,0 +1,83 @@ +package tech.ydb.core.operation; + + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.core.settings.OperationSettings; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.common.CommonProtos; + +/** + * + * @author Aleksandr Gorshenin + */ +public class OperationSettingsTest { + private final static com.google.protobuf.Duration EMPTY = com.google.protobuf.Duration.newBuilder().build(); + + private com.google.protobuf.Duration duration(int seconds) { + return com.google.protobuf.Duration.newBuilder().setSeconds(seconds).build(); + } + + @Test + public void defaultSettingsTest() { + OperationSettings settings = new OperationSettings.OperationBuilder<>().build(); + OperationProtos.OperationParams proto = Operation.buildParams(settings); + + Assert.assertEquals(EMPTY, proto.getOperationTimeout()); + Assert.assertEquals(EMPTY, proto.getCancelAfter()); + Assert.assertEquals(CommonProtos.FeatureFlag.Status.STATUS_UNSPECIFIED, proto.getReportCostInfo()); + + Assert.assertEquals(OperationProtos.OperationParams.OperationMode.SYNC, proto.getOperationMode()); + } + + @Test + public void settingsTimeoutsTest() { + OperationSettings settings = new OperationSettings.OperationBuilder<>() + .withCancelTimeout(Duration.ofMinutes(1)) + .withOperationTimeout(10, TimeUnit.SECONDS) + .build(); + OperationProtos.OperationParams proto = Operation.buildParams(settings); + + Assert.assertEquals(duration(10), proto.getOperationTimeout()); + Assert.assertEquals(duration(60), proto.getCancelAfter()); + Assert.assertEquals(CommonProtos.FeatureFlag.Status.STATUS_UNSPECIFIED, proto.getReportCostInfo()); + + Assert.assertEquals(OperationProtos.OperationParams.OperationMode.SYNC, proto.getOperationMode()); + } + + @Test + public void costEnabledTest() { + OperationSettings settings = new OperationSettings.OperationBuilder<>() + .withAsyncMode(false) + .withReportCostInfo(Boolean.TRUE) + .build(); + OperationProtos.OperationParams proto = Operation.buildParams(settings); + + Assert.assertEquals(EMPTY, proto.getOperationTimeout()); + Assert.assertEquals(EMPTY, proto.getCancelAfter()); + Assert.assertEquals(CommonProtos.FeatureFlag.Status.ENABLED, proto.getReportCostInfo()); + + Assert.assertEquals(OperationProtos.OperationParams.OperationMode.SYNC, proto.getOperationMode()); + } + + @Test + public void allSettingsTest() { + OperationSettings settings = new OperationSettings.OperationBuilder<>() + .withAsyncMode(true) + .withCancelTimeout(3, TimeUnit.MINUTES) + .withOperationTimeout(Duration.ofSeconds(23)) + .withReportCostInfo(Boolean.FALSE) + .build(); + OperationProtos.OperationParams proto = Operation.buildParams(settings); + + Assert.assertEquals(duration(23), proto.getOperationTimeout()); + Assert.assertEquals(duration(180), proto.getCancelAfter()); + Assert.assertEquals(CommonProtos.FeatureFlag.Status.DISABLED, proto.getReportCostInfo()); + + Assert.assertEquals(OperationProtos.OperationParams.OperationMode.ASYNC, proto.getOperationMode()); + } +} diff --git a/core/src/test/java/tech/ydb/core/operation/OperationTrayTest.java b/core/src/test/java/tech/ydb/core/operation/OperationTrayTest.java new file mode 100644 index 000000000..3cfbc73ce --- /dev/null +++ b/core/src/test/java/tech/ydb/core/operation/OperationTrayTest.java @@ -0,0 +1,207 @@ +package tech.ydb.core.operation; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; + +import com.google.protobuf.Any; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.OperationProtos.GetOperationResponse; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.operation.v1.OperationServiceGrpc; +import tech.ydb.proto.table.YdbTable.ExplainDataQueryResponse; +import tech.ydb.proto.table.YdbTable.ExplainQueryResult; + + +/** + * + * @author Aleksandr Gorshenin + */ +public class OperationTrayTest { + private final GrpcTransport transport = Mockito.mock(GrpcTransport.class); + private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class); + + private final Function, Operation>> binder = + OperationBinder.bindAsync(transport, ExplainDataQueryResponse::getOperation, ExplainQueryResult.class); + + private final Function, Result> mapper = + r -> r.map(ExplainQueryResult::getQueryPlan); + + @Before + public void prepare() { + Mockito.when(transport.getScheduler()).thenReturn(scheduler); +// Mockito.when(scheduler.schedule(Mockito.any(Runnable.class), Mockito.anyInt(), Mockito.any())).thenAnswer( +// (InvocationOnMock iom) -> { +// scheduledActions.offer(iom.getArgument(0, Runnable.class)); +// return null; +// }); + } + + @Test + public void errorOperationTest() { + Status error = Status.of(StatusCode.UNAVAILABLE); + Operation> operation = binder.apply(Result.fail(error)).transform(mapper); + + Assert.assertTrue(operation.isReady()); + Assert.assertNull(operation.getId()); + + CompletableFuture> future = OperationTray.fetchOperation(operation, 100); + + Assert.assertTrue(future.isDone()); + Assert.assertEquals(Status.of(StatusCode.UNAVAILABLE), future.join().getStatus()); + + assertMockCalls(0, 0); + } + + @Test + public void failedOperationTest() { + ExplainDataQueryResponse response = ExplainDataQueryResponse.newBuilder() + .setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.NOT_FOUND) + .setId("error-id") + .setReady(true) + .build()) + .build(); + + Operation> operation = binder.apply(Result.success(response)).transform(mapper); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("error-id", operation.getId()); + + CompletableFuture> future = OperationTray.fetchOperation(operation, 100); + + Assert.assertTrue(future.isDone()); + Assert.assertEquals(Status.of(StatusCode.NOT_FOUND), future.join().getStatus()); + + assertMockCalls(0, 0); + } + + @Test + public void readyOperationTest() { + Any data = Any.pack(ExplainQueryResult.newBuilder().setQueryPlan("plan").build()); + ExplainDataQueryResponse response = ExplainDataQueryResponse.newBuilder() + .setOperation(OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId("ready-id") + .setReady(true) + .setResult(data) + .build() + ).build(); + + Operation> operation = binder.apply(Result.success(response)).transform(mapper); + + Assert.assertTrue(operation.isReady()); + Assert.assertEquals("ready-id", operation.getId()); + + CompletableFuture> future = OperationTray.fetchOperation(operation, 5); + + Assert.assertTrue(future.isDone()); + Assert.assertEquals(Status.SUCCESS, future.join().getStatus()); + Assert.assertEquals("plan", future.join().getValue()); + + assertMockCalls(0, 0); + } + + @Test + public void notReadyOperationTest() { + String id = "op1"; + + final Queue scheduled = new LinkedList<>(); + CompletableFuture> getOperation1 = new CompletableFuture<>(); + CompletableFuture> getOperation2 = new CompletableFuture<>(); + CompletableFuture> getOperation3 = new CompletableFuture<>(); + + Mockito.when(transport.unaryCall( + Mockito.eq(OperationServiceGrpc.getGetOperationMethod()), Mockito.any(), Mockito.any() + )).thenReturn(getOperation1).thenReturn(getOperation2).thenReturn(getOperation3); + + Mockito.when(scheduler.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any())) + .thenAnswer((InvocationOnMock iom) -> { + scheduled.offer(iom.getArgument(0, Runnable.class)); + return null; + }); + + ExplainDataQueryResponse notReady = ExplainDataQueryResponse.newBuilder() + .setOperation(OperationProtos.Operation.newBuilder() + .setId(id) + .setReady(false) + .build() + ).build(); + Operation> operation = binder.apply(Result.success(notReady)).transform(mapper); + + Assert.assertFalse(operation.isReady()); + Assert.assertEquals(id, operation.getId()); + assertMockCalls(0, 0); + + CompletableFuture> future = OperationTray.fetchOperation(operation, 5); + Assert.assertFalse(future.isDone()); + + assertMockCalls(1, 0); + completeNotReady(getOperation1, id); + + assertMockCalls(1, 1); + Assert.assertFalse(scheduled.isEmpty()); + scheduled.poll().run(); + Assert.assertTrue(scheduled.isEmpty()); + + assertMockCalls(2, 1); + completeNotReady(getOperation2, id); + + assertMockCalls(2, 2); + Assert.assertFalse(scheduled.isEmpty()); + scheduled.poll().run(); + Assert.assertTrue(scheduled.isEmpty()); + + assertMockCalls(3, 2); + completeReady(getOperation3, id, "hello_plan"); + assertMockCalls(3, 2); + Assert.assertTrue(scheduled.isEmpty()); + + Assert.assertTrue(future.isDone()); + Assert.assertEquals(Status.SUCCESS, future.join().getStatus()); + Assert.assertEquals("hello_plan", future.join().getValue()); + } + + private void assertMockCalls(int getOperationCount, int scheduleCount) { + Mockito.verify(transport, Mockito.times(getOperationCount)) + .unaryCall(Mockito.eq(OperationServiceGrpc.getGetOperationMethod()), Mockito.any(), Mockito.any()); + Mockito.verify(scheduler, Mockito.times(scheduleCount)) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any()); + } + + private void completeNotReady(CompletableFuture> future, String id) { + GetOperationResponse result = GetOperationResponse.newBuilder().setOperation( + OperationProtos.Operation.newBuilder() + .setId(id) + .setReady(false) + .build() + ).build(); + future.complete(Result.success(result)); + } + + private void completeReady(CompletableFuture> future, String id, String plan) { + Any data = Any.pack(ExplainQueryResult.newBuilder().setQueryPlan(plan).build()); + GetOperationResponse result = GetOperationResponse.newBuilder().setOperation( + OperationProtos.Operation.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setId(id) + .setReady(true) + .setResult(data) + .build() + ).build(); + future.complete(Result.success(result)); + } +} diff --git a/export/src/main/java/tech/ydb/export/ExportClient.java b/export/src/main/java/tech/ydb/export/ExportClient.java index c298cf3c0..1a3b1cb49 100644 --- a/export/src/main/java/tech/ydb/export/ExportClient.java +++ b/export/src/main/java/tech/ydb/export/ExportClient.java @@ -4,8 +4,10 @@ import javax.annotation.WillNotClose; +import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; import tech.ydb.export.impl.ExportClientImpl; import tech.ydb.export.impl.GrpcExportRpcImpl; import tech.ydb.export.result.ExportToS3Result; @@ -22,17 +24,26 @@ static ExportClient newClient(@WillNotClose GrpcTransport transport) { return new ExportClientImpl(GrpcExportRpcImpl.useTransport(transport)); } - CompletableFuture> exportS3( - String endpoint, - String bucket, - String accessKey, - String secretKey, - ExportToS3Settings exportToS3Settings + CompletableFuture>> startExportToS3( + String endpoint, String bucket, String accessKey, String secretKey, ExportToS3Settings settings ); - CompletableFuture> exportYt( - String host, - String token, - ExportToYtSettings exportToYtSettings + CompletableFuture>> startExportToYt( + String host, String token, ExportToYtSettings settings ); + + default CompletableFuture> exportToS3( + String endpoint, String bucket, String accessKey, String secretKey, ExportToS3Settings settings, + int updateRateSeconds + ) { + return startExportToS3(endpoint, bucket, accessKey, secretKey, settings) + .thenCompose(operation -> OperationTray.fetchOperation(operation, updateRateSeconds)); + } + + default CompletableFuture> startExportToYt( + String host, String token, ExportToYtSettings settings, int updateRateSeconds + ) { + return startExportToYt(host, token, settings) + .thenCompose(operation -> OperationTray.fetchOperation(operation, updateRateSeconds)); + } } diff --git a/export/src/main/java/tech/ydb/export/ExportRpc.java b/export/src/main/java/tech/ydb/export/ExportRpc.java index 6d7c815e1..cd322ec11 100644 --- a/export/src/main/java/tech/ydb/export/ExportRpc.java +++ b/export/src/main/java/tech/ydb/export/ExportRpc.java @@ -2,6 +2,7 @@ import java.util.concurrent.CompletableFuture; +import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; import tech.ydb.proto.export.YdbExport; @@ -11,13 +12,11 @@ */ public interface ExportRpc { - CompletableFuture> exportS3( - YdbExport.ExportToS3Request exportToS3Request, - GrpcRequestSettings grpcRequestSettings + CompletableFuture>> exportS3( + YdbExport.ExportToS3Request request, GrpcRequestSettings settings ); - CompletableFuture> exportYt( - YdbExport.ExportToYtRequest exportToYtRequest, - GrpcRequestSettings grpcRequestSettings + CompletableFuture>> exportYt( + YdbExport.ExportToYtRequest request, GrpcRequestSettings settings ); } diff --git a/export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java b/export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java index 8c942b31d..be9c69c69 100644 --- a/export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java +++ b/export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java @@ -2,8 +2,10 @@ import java.util.concurrent.CompletableFuture; +import tech.ydb.core.Result; +import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; -import tech.ydb.core.operation.OperationUtils; +import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.export.ExportClient; import tech.ydb.export.ExportRpc; import tech.ydb.export.result.ExportToS3Result; @@ -22,102 +24,103 @@ public class ExportClientImpl implements ExportClient { public ExportClientImpl(ExportRpc exportRpc) { this.exportRpc = exportRpc; } + private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + return GrpcRequestSettings.newBuilder() + .withDeadline(settings.getRequestTimeout()) + .build(); + } - public CompletableFuture> exportS3( - String endpoint, - String bucket, - String accessKey, - String secretKey, - ExportToS3Settings exportToS3Settings + @Override + public CompletableFuture>> startExportToS3( + String endpoint, String bucket, String accessKey, String secretKey, ExportToS3Settings settings ) { - YdbExport.ExportToS3Settings.Builder exportToYtSettingsBuilder = YdbExport.ExportToS3Settings.newBuilder() + YdbExport.ExportToS3Settings.Builder builder = YdbExport.ExportToS3Settings.newBuilder() .setEndpoint(endpoint) .setBucket(bucket) .setAccessKey(accessKey) .setSecretKey(secretKey); - if (exportToS3Settings.getSchema() != null) { - exportToYtSettingsBuilder.setScheme(exportToS3Settings.getSchema().toProto()); + if (settings.getSchema() != null) { + builder.setScheme(settings.getSchema().toProto()); + } + + if (settings.getNumberOfRetries() != null) { + builder.setNumberOfRetries(settings.getNumberOfRetries()); } - if (exportToS3Settings.getNumberOfRetries() != null) { - exportToYtSettingsBuilder.setNumberOfRetries(exportToS3Settings.getNumberOfRetries()); + if (settings.getStorageClass() != null) { + builder.setStorageClass(settings.getStorageClass().toProto()); } - if (exportToS3Settings.getStorageClass() != null) { - exportToYtSettingsBuilder.setStorageClass(exportToS3Settings.getStorageClass().toProto()); + if (settings.getCompression() != null) { + builder.setCompression(settings.getCompression()); } - if (exportToS3Settings.getCompression() != null) { - exportToYtSettingsBuilder.setCompression(exportToS3Settings.getCompression()); + if (settings.getRegion() != null) { + builder.setRegion(settings.getRegion()); } - if (exportToS3Settings.getRegion() != null) { - exportToYtSettingsBuilder.setRegion(exportToS3Settings.getRegion()); + if (settings.getDescription() != null) { + builder.setDescription(settings.getDescription()); } - if (exportToS3Settings.getDescription() != null) { - exportToYtSettingsBuilder.setDescription(exportToS3Settings.getDescription()); + for (ExportToS3Settings.Item item : settings.getItemList()) { + builder.addItems( + YdbExport.ExportToS3Settings.Item.newBuilder() + .setSourcePath(item.getSourcePath()) + .setDestinationPrefix(item.getDestinationPrefix()) + .build() + ); } - exportToS3Settings.getItemList().forEach(item -> exportToYtSettingsBuilder.addItems( - YdbExport.ExportToS3Settings.Item.newBuilder() - .setSourcePath(item.getSourcePath()) - .setDestinationPrefix(item.getDestinationPrefix()) - .build() - ) - ); - - return exportRpc.exportS3( - YdbExport.ExportToS3Request.newBuilder() - .setSettings(exportToYtSettingsBuilder.build()) - .setOperationParams( - OperationUtils.createParams(exportToS3Settings) - ) - .build(), - OperationUtils.createGrpcRequestSettings(exportToS3Settings) - ).thenApply(op -> op.transform(ExportToS3Result::new)); + YdbExport.ExportToS3Request request = YdbExport.ExportToS3Request.newBuilder() + .setSettings(builder.build()) + .setOperationParams(Operation.buildParams(settings)) + .build(); + + return exportRpc.exportS3(request, makeGrpcRequestSettings(settings)) + .thenApply(op -> op.transform(r -> r.map(ExportToS3Result::new))); } - public CompletableFuture> exportYt( - String host, - String token, - ExportToYtSettings exportToYtSettings + @Override + public CompletableFuture>> startExportToYt( + String host, String token, ExportToYtSettings settings ) { - YdbExport.ExportToYtSettings.Builder exportToYtSettingBuilder = YdbExport.ExportToYtSettings.newBuilder() + YdbExport.ExportToYtSettings.Builder builder = YdbExport.ExportToYtSettings.newBuilder() .setHost(host) .setToken(token); - if (exportToYtSettings.getPort() != null) { - exportToYtSettingBuilder.setPort(exportToYtSettings.getPort()); + if (settings.getPort() != null) { + builder.setPort(settings.getPort()); } - if (exportToYtSettings.getNumberOfRetries() != null) { - exportToYtSettingBuilder.setNumberOfRetries(exportToYtSettings.getNumberOfRetries()); + if (settings.getNumberOfRetries() != null) { + builder.setNumberOfRetries(settings.getNumberOfRetries()); } - if (exportToYtSettings.getUseTypeV3() != null) { - exportToYtSettingBuilder.setUseTypeV3(exportToYtSettings.getUseTypeV3()); + if (settings.getUseTypeV3() != null) { + builder.setUseTypeV3(settings.getUseTypeV3()); } - if (exportToYtSettings.getDescription() != null) { - exportToYtSettingBuilder.setDescription(exportToYtSettings.getDescription()); + if (settings.getDescription() != null) { + builder.setDescription(settings.getDescription()); } - exportToYtSettings.getItemList().forEach(item -> exportToYtSettingBuilder.addItems( - YdbExport.ExportToYtSettings.Item.newBuilder() - .setSourcePath(item.getSourcePath()) - .setDestinationPath(item.getDestinationPath()) - .build() - ) - ); - - return exportRpc.exportYt( - YdbExport.ExportToYtRequest.newBuilder() - .setSettings(exportToYtSettingBuilder.build()) - .setOperationParams(OperationUtils.createParams(exportToYtSettings)) - .build(), - OperationUtils.createGrpcRequestSettings(exportToYtSettings) - ).thenApply(op -> op.transform(ExportToYtResult::new)); + for (ExportToYtSettings.Item item : settings.getItemList()) { + builder.addItems( + YdbExport.ExportToYtSettings.Item.newBuilder() + .setSourcePath(item.getSourcePath()) + .setDestinationPath(item.getDestinationPath()) + .build() + ); + } + + YdbExport.ExportToYtRequest request = YdbExport.ExportToYtRequest.newBuilder() + .setSettings(builder.build()) + .setOperationParams(Operation.buildParams(settings)) + .build(); + + return exportRpc.exportYt(request, makeGrpcRequestSettings(settings)) + .thenApply(op -> op.transform(r -> r.map(ExportToYtResult::new))); } } diff --git a/export/src/main/java/tech/ydb/export/impl/GrpcExportRpcImpl.java b/export/src/main/java/tech/ydb/export/impl/GrpcExportRpcImpl.java index 3f5787648..102b42e5d 100644 --- a/export/src/main/java/tech/ydb/export/impl/GrpcExportRpcImpl.java +++ b/export/src/main/java/tech/ydb/export/impl/GrpcExportRpcImpl.java @@ -4,10 +4,11 @@ import javax.annotation.WillNotClose; +import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.operation.Operation; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.export.ExportRpc; import tech.ydb.proto.export.YdbExport; import tech.ydb.proto.export.v1.ExportServiceGrpc; @@ -16,13 +17,10 @@ * @author Kirill Kurdyukov */ public class GrpcExportRpcImpl implements ExportRpc { - - private final GrpcTransport grpcTransport; - private final OperationManager operationManager; + private final GrpcTransport transport; private GrpcExportRpcImpl(GrpcTransport grpcTransport) { - this.grpcTransport = grpcTransport; - this.operationManager = new OperationManager(grpcTransport); + this.transport = grpcTransport; } public static GrpcExportRpcImpl useTransport(@WillNotClose GrpcTransport grpcTransport) { @@ -30,35 +28,24 @@ public static GrpcExportRpcImpl useTransport(@WillNotClose GrpcTransport grpcTra } @Override - public CompletableFuture> exportS3( - YdbExport.ExportToS3Request exportToS3Request, - GrpcRequestSettings grpcRequestSettings + public CompletableFuture>> exportS3( + YdbExport.ExportToS3Request request, + GrpcRequestSettings settings ) { - return grpcTransport.unaryCall( - ExportServiceGrpc.getExportToS3Method(), - grpcRequestSettings, - exportToS3Request - ).thenApply( - operationManager.operationUnwrapper( - YdbExport.ExportToS3Response::getOperation, - YdbExport.ExportToS3Result.class - ) - ); + return transport.unaryCall(ExportServiceGrpc.getExportToS3Method(), settings, request) + .thenApply(OperationBinder.bindAsync( + transport, YdbExport.ExportToS3Response::getOperation, YdbExport.ExportToS3Result.class) + ); } @Override - public CompletableFuture> exportYt( - YdbExport.ExportToYtRequest exportToYtRequest, - GrpcRequestSettings grpcRequestSettings + public CompletableFuture>> exportYt( + YdbExport.ExportToYtRequest request, + GrpcRequestSettings settings ) { - return grpcTransport.unaryCall( - ExportServiceGrpc.getExportToYtMethod(), - grpcRequestSettings, - exportToYtRequest - ).thenApply( - operationManager.operationUnwrapper( - YdbExport.ExportToYtResponse::getOperation, - YdbExport.ExportToYtResult.class + return transport.unaryCall(ExportServiceGrpc.getExportToYtMethod(), settings, request) + .thenApply(OperationBinder.bindAsync( + transport, YdbExport.ExportToYtResponse::getOperation, YdbExport.ExportToYtResult.class ) ); } diff --git a/export/src/main/java/tech/ydb/export/settings/ExportToS3Settings.java b/export/src/main/java/tech/ydb/export/settings/ExportToS3Settings.java index be24f0c28..8f4634dd5 100644 --- a/export/src/main/java/tech/ydb/export/settings/ExportToS3Settings.java +++ b/export/src/main/java/tech/ydb/export/settings/ExportToS3Settings.java @@ -5,13 +5,13 @@ import com.google.common.base.Preconditions; -import tech.ydb.core.settings.LongOperationSettings; +import tech.ydb.core.settings.OperationSettings; import tech.ydb.proto.export.YdbExport; /** * @author Kirill Kurdyukov */ -public class ExportToS3Settings extends LongOperationSettings { +public class ExportToS3Settings extends OperationSettings { private final Schema schema; private final Integer numberOfRetries; @@ -63,10 +63,10 @@ public List getItemList() { } public static Builder newBuilder() { - return new Builder(); + return new Builder().withAsyncMode(true); } - public static class Builder extends LongOperationBuilder { + public static class Builder extends OperationSettings.OperationBuilder { private Schema schema = null; private Integer numberOfRetries = null; private StorageClass storageClass = null; diff --git a/export/src/main/java/tech/ydb/export/settings/ExportToYtSettings.java b/export/src/main/java/tech/ydb/export/settings/ExportToYtSettings.java index 2e2f85c5c..f6cf7cfbb 100644 --- a/export/src/main/java/tech/ydb/export/settings/ExportToYtSettings.java +++ b/export/src/main/java/tech/ydb/export/settings/ExportToYtSettings.java @@ -5,12 +5,12 @@ import com.google.common.base.Preconditions; -import tech.ydb.core.settings.LongOperationSettings; +import tech.ydb.core.settings.OperationSettings; /** * @author Kirill Kurdyukov */ -public class ExportToYtSettings extends LongOperationSettings { +public class ExportToYtSettings extends OperationSettings { private final Integer port; private final List itemList; @@ -50,10 +50,10 @@ public Boolean getUseTypeV3() { } public static Builder newBuilder() { - return new Builder(); + return new Builder().withAsyncMode(true); } - public static class Builder extends LongOperationBuilder { + public static class Builder extends OperationSettings.OperationBuilder { private Integer port = null; private final List itemList = new ArrayList<>(); private String description = null; diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index ec72ea5c4..25f0dfb0e 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,7 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.StatusExtractor; import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.query.v1.QueryServiceGrpc; @@ -15,32 +16,32 @@ * @author Aleksandr Gorshenin */ class QueryServiceRpc { - private static final StatusExtract CREATE_SESSION = StatusExtract.of( + private static final StatusExtractor CREATE_SESSION = StatusExtractor.of( YdbQuery.CreateSessionResponse::getStatus, YdbQuery.CreateSessionResponse::getIssuesList ); - private static final StatusExtract DELETE_SESSION = StatusExtract.of( + private static final StatusExtractor DELETE_SESSION = StatusExtractor.of( YdbQuery.DeleteSessionResponse::getStatus, YdbQuery.DeleteSessionResponse::getIssuesList ); - private static final StatusExtract BEGIN_TRANSACTION = StatusExtract.of( + private static final StatusExtractor BEGIN_TX = StatusExtractor.of( YdbQuery.BeginTransactionResponse::getStatus, YdbQuery.BeginTransactionResponse::getIssuesList ); - private static final StatusExtract COMMIT_TRANSACTION = StatusExtract.of( + private static final StatusExtractor COMMIT_TX = StatusExtractor.of( YdbQuery.CommitTransactionResponse::getStatus, YdbQuery.CommitTransactionResponse::getIssuesList ); - private static final StatusExtract ROLLBACK_TRANSACTION = StatusExtract.of( + private static final StatusExtractor ROLLBACK_TX = StatusExtractor.of( YdbQuery.RollbackTransactionResponse::getStatus, YdbQuery.RollbackTransactionResponse::getIssuesList ); - private static final StatusExtract FETCH_SCRIPT = StatusExtract.of( + private static final StatusExtractor FETCH_SCRIPT = StatusExtractor.of( YdbQuery.FetchScriptResultsResponse::getStatus, YdbQuery.FetchScriptResultsResponse::getIssuesList ); @@ -74,21 +75,21 @@ public CompletableFuture> beginTransac YdbQuery.BeginTransactionRequest request, GrpcRequestSettings settings) { return transport .unaryCall(QueryServiceGrpc.getBeginTransactionMethod(), settings, request) - .thenApply(BEGIN_TRANSACTION); + .thenApply(BEGIN_TX); } public CompletableFuture> commitTransaction( YdbQuery.CommitTransactionRequest request, GrpcRequestSettings settings) { return transport .unaryCall(QueryServiceGrpc.getCommitTransactionMethod(), settings, request) - .thenApply(COMMIT_TRANSACTION); + .thenApply(COMMIT_TX); } public CompletableFuture> rollbackTransaction( YdbQuery.RollbackTransactionRequest request, GrpcRequestSettings settings) { return transport .unaryCall(QueryServiceGrpc.getRollbackTransactionMethod(), settings, request) - .thenApply(ROLLBACK_TRANSACTION); + .thenApply(ROLLBACK_TX); } public GrpcReadStream executeQuery( diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 05a1d01b6..508a550af 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -22,6 +22,7 @@ import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.YdbHeaders; import tech.ydb.core.impl.call.ProxyReadStream; +import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; import tech.ydb.proto.query.YdbQuery; @@ -49,11 +50,11 @@ abstract class SessionImpl implements QuerySession { private static final Logger logger = LoggerFactory.getLogger(QuerySession.class); - private static final StatusExtract CREATE_SESSION = StatusExtract.of( + private static final StatusExtractor CREATE_SESSION = StatusExtractor.of( YdbQuery.CreateSessionResponse::getStatus, YdbQuery.CreateSessionResponse::getIssuesList ); - private static final StatusExtract DELETE_SESSION = StatusExtract.of( + private static final StatusExtractor DELETE_SESSION = StatusExtractor.of( YdbQuery.DeleteSessionResponse::getStatus, YdbQuery.DeleteSessionResponse::getIssuesList ); diff --git a/query/src/main/java/tech/ydb/query/impl/StatusExtract.java b/query/src/main/java/tech/ydb/query/impl/StatusExtract.java deleted file mode 100644 index 15570c7a8..000000000 --- a/query/src/main/java/tech/ydb/query/impl/StatusExtract.java +++ /dev/null @@ -1,50 +0,0 @@ -package tech.ydb.query.impl; - -import java.util.List; -import java.util.function.Function; - -import tech.ydb.core.Issue; -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.proto.YdbIssueMessage; - -/** - * - * @author Aleksandr Gorshenin - * @param result class - */ -public class StatusExtract implements Function, Result> { - private final Function statusFunctor; - private final Function> issuesFunctor; - - private StatusExtract( - Function statusFunctor, - Function> issuesFunctor) { - this.statusFunctor = statusFunctor; - this.issuesFunctor = issuesFunctor; - } - - @Override - public Result apply(Result result) { - if (!result.isSuccess()) { - return result; - } - - T resp = result.getValue(); - Status status = Status.of( - StatusCode.fromProto(statusFunctor.apply(resp)), - result.getStatus().getConsumedRu(), - Issue.fromPb(issuesFunctor.apply(resp)) - ); - - return status.isSuccess() ? Result.success(resp, status) : Result.fail(status); - } - - public static StatusExtract of( - Function statusFunctor, - Function> issuesFunctor) { - return new StatusExtract<>(statusFunctor, issuesFunctor); - } -} diff --git a/scheme/src/main/java/tech/ydb/scheme/impl/GrpcSchemeRpc.java b/scheme/src/main/java/tech/ydb/scheme/impl/GrpcSchemeRpc.java index 0e255c7b7..e586fe185 100644 --- a/scheme/src/main/java/tech/ydb/scheme/impl/GrpcSchemeRpc.java +++ b/scheme/src/main/java/tech/ydb/scheme/impl/GrpcSchemeRpc.java @@ -10,7 +10,7 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.proto.scheme.SchemeOperationProtos.DescribePathRequest; import tech.ydb.proto.scheme.SchemeOperationProtos.DescribePathResponse; import tech.ydb.proto.scheme.SchemeOperationProtos.DescribePathResult; @@ -50,7 +50,7 @@ public static GrpcSchemeRpc ownTransport(@WillClose GrpcTransport transport) { public CompletableFuture makeDirectory(MakeDirectoryRequest request, GrpcRequestSettings settings) { return transport .unaryCall(SchemeServiceGrpc.getMakeDirectoryMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(MakeDirectoryResponse::getOperation)); + .thenApply(OperationBinder.bindSync(MakeDirectoryResponse::getOperation)); } @@ -58,7 +58,7 @@ public CompletableFuture makeDirectory(MakeDirectoryRequest request, Grp public CompletableFuture removeDirectory(RemoveDirectoryRequest request, GrpcRequestSettings settings) { return transport .unaryCall(SchemeServiceGrpc.getRemoveDirectoryMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(RemoveDirectoryResponse::getOperation)); + .thenApply(OperationBinder.bindSync(RemoveDirectoryResponse::getOperation)); } @Override @@ -66,8 +66,7 @@ public CompletableFuture> describeDirectory(ListDire GrpcRequestSettings settings) { return transport .unaryCall(SchemeServiceGrpc.getListDirectoryMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - ListDirectoryResponse::getOperation, ListDirectoryResult.class)); + .thenApply(OperationBinder.bindSync(ListDirectoryResponse::getOperation, ListDirectoryResult.class)); } @Override @@ -75,8 +74,7 @@ public CompletableFuture> describePath(DescribePathRe GrpcRequestSettings settings) { return transport .unaryCall(SchemeServiceGrpc.getDescribePathMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - DescribePathResponse::getOperation, DescribePathResult.class)); + .thenApply(OperationBinder.bindSync(DescribePathResponse::getOperation, DescribePathResult.class)); } @Override diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 1c53ebd89..04d45891a 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -29,7 +29,7 @@ import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.YdbHeaders; import tech.ydb.core.impl.call.ProxyReadStream; -import tech.ydb.core.operation.OperationUtils; +import tech.ydb.core.operation.Operation; import tech.ydb.core.utils.URITools; import tech.ydb.proto.StatusCodesProtos.StatusIds; import tech.ydb.proto.ValueProtos; @@ -148,7 +148,7 @@ public static CompletableFuture> createSessionId(TableRpc tableRp CreateSessionSettings settings, boolean useServerBalancer) { YdbTable.CreateSessionRequest request = YdbTable.CreateSessionRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); Metadata headers = null; @@ -156,6 +156,7 @@ public static CompletableFuture> createSessionId(TableRpc tableRp headers = new Metadata(); headers.put(YdbHeaders.YDB_CLIENT_CAPABILITIES, SERVER_BALANCER_HINT); } + GrpcRequestSettings grpcSettings = GrpcRequestSettings.newBuilder() .withDeadline(settings.getTimeoutDuration()) .withExtraHeaders(headers) @@ -278,7 +279,7 @@ public CompletableFuture createTable( YdbTable.CreateTableRequest.Builder request = YdbTable.CreateTableRequest.newBuilder() .setSessionId(id) .setPath(path) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .addAllPrimaryKey(description.getPrimaryKeys()); for (ColumnFamily family: description.getColumnFamilies()) { @@ -394,7 +395,7 @@ public CompletableFuture dropTable(String path, DropTableSettings settin YdbTable.DropTableRequest request = YdbTable.DropTableRequest.newBuilder() .setSessionId(id) .setPath(path) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); @@ -406,7 +407,7 @@ public CompletableFuture alterTable(String path, AlterTableSettings sett YdbTable.AlterTableRequest.Builder builder = YdbTable.AlterTableRequest.newBuilder() .setSessionId(id) .setPath(path) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())); + .setOperationParams(Operation.buildParams(settings.toOperationSettings())); for (TableColumn addColumn: settings.getAddColumns()) { builder.addAddColumns(buildColumnMeta(addColumn)); @@ -450,7 +451,7 @@ public CompletableFuture copyTable(String src, String dst, CopyTableSett .setSessionId(id) .setSourcePath(src) .setDestinationPath(dst) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); @@ -522,7 +523,7 @@ public CompletableFuture> describeTable(String path, De YdbTable.DescribeTableRequest request = YdbTable.DescribeTableRequest.newBuilder() .setSessionId(id) .setPath(path) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setIncludeTableStats(settings.isIncludeTableStats()) .setIncludeShardKeyBounds(settings.isIncludeShardKeyBounds()) .setIncludePartitionStats(settings.isIncludePartitionStats()) @@ -669,7 +670,7 @@ private CompletableFuture> executeDataQueryInternal( String query, YdbTable.TransactionControl txControl, Params params, ExecuteDataQuerySettings settings) { YdbTable.ExecuteDataQueryRequest.Builder request = YdbTable.ExecuteDataQueryRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxControl(txControl) .setQuery(YdbTable.Query.newBuilder().setYqlText(query)) .setCollectStats(settings.collectStats().toPb()) @@ -735,7 +736,7 @@ CompletableFuture> executePreparedDataQuery(String query ExecuteDataQuerySettings settings) { YdbTable.ExecuteDataQueryRequest.Builder request = YdbTable.ExecuteDataQueryRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxControl(txControl.toPb()) .setCollectStats(settings.collectStats().toPb()); @@ -780,7 +781,7 @@ CompletableFuture> executePreparedDataQuery(String query public CompletableFuture> prepareDataQuery(String query, PrepareDataQuerySettings settings) { YdbTable.PrepareDataQueryRequest.Builder request = YdbTable.PrepareDataQueryRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setYqlText(query); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); @@ -796,7 +797,7 @@ public CompletableFuture> prepareDataQuery(String query, Prepa public CompletableFuture executeSchemeQuery(String query, ExecuteSchemeQuerySettings settings) { YdbTable.ExecuteSchemeQueryRequest request = YdbTable.ExecuteSchemeQueryRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setYqlText(query) .build(); @@ -809,7 +810,7 @@ public CompletableFuture> explainDataQuery(String ExplainDataQuerySettings settings) { YdbTable.ExplainDataQueryRequest request = YdbTable.ExplainDataQueryRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setYqlText(query) .build(); @@ -823,7 +824,7 @@ public CompletableFuture> beginTransaction(Transaction.Mode BeginTxSettings settings) { YdbTable.BeginTransactionRequest request = YdbTable.BeginTransactionRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxSettings(txSettings(transactionMode)) .build(); @@ -842,7 +843,7 @@ public TableTransaction createNewTransaction(TxMode txMode) { public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { YdbTable.BeginTransactionRequest request = YdbTable.BeginTransactionRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxSettings(TxControlToPb.txSettings(txMode)) .build(); @@ -946,7 +947,7 @@ public GrpcReadStream executeScanQuery(String query, Params par private CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) { YdbTable.CommitTransactionRequest request = YdbTable.CommitTransactionRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxId(txId) .build(); @@ -963,7 +964,7 @@ public CompletableFuture commitTransaction(String txId, CommitTxSettings private CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) { YdbTable.RollbackTransactionRequest request = YdbTable.RollbackTransactionRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .setTxId(txId) .build(); @@ -982,7 +983,7 @@ public CompletableFuture rollbackTransaction(String txId, RollbackTxSett public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) { YdbTable.KeepAliveRequest request = YdbTable.KeepAliveRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); @@ -1000,7 +1001,7 @@ public CompletableFuture executeBulkUpsert(String tablePath, ListValue r YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder() .setTable(tablePath) .setRows(typedRows) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); @@ -1025,7 +1026,7 @@ private static State mapSessionStatus(YdbTable.KeepAliveResult result) { public CompletableFuture delete(DeleteSessionSettings settings) { YdbTable.DeleteSessionRequest request = YdbTable.DeleteSessionRequest.newBuilder() .setSessionId(id) - .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); diff --git a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java index 03907d9af..76b0cc0f2 100644 --- a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java +++ b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java @@ -7,56 +7,14 @@ import javax.annotation.WillClose; import javax.annotation.WillNotClose; - import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.StatusExtractor; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; +import tech.ydb.core.operation.StatusExtractor; import tech.ydb.proto.table.YdbTable; -import tech.ydb.proto.table.YdbTable.AlterTableRequest; -import tech.ydb.proto.table.YdbTable.AlterTableResponse; -import tech.ydb.proto.table.YdbTable.BeginTransactionRequest; -import tech.ydb.proto.table.YdbTable.BeginTransactionResponse; -import tech.ydb.proto.table.YdbTable.BeginTransactionResult; -import tech.ydb.proto.table.YdbTable.BulkUpsertResponse; -import tech.ydb.proto.table.YdbTable.CommitTransactionRequest; -import tech.ydb.proto.table.YdbTable.CommitTransactionResponse; -import tech.ydb.proto.table.YdbTable.CopyTableRequest; -import tech.ydb.proto.table.YdbTable.CopyTableResponse; -import tech.ydb.proto.table.YdbTable.CopyTablesRequest; -import tech.ydb.proto.table.YdbTable.CopyTablesResponse; -import tech.ydb.proto.table.YdbTable.CreateTableRequest; -import tech.ydb.proto.table.YdbTable.CreateTableResponse; -import tech.ydb.proto.table.YdbTable.DeleteSessionRequest; -import tech.ydb.proto.table.YdbTable.DeleteSessionResponse; -import tech.ydb.proto.table.YdbTable.DescribeTableRequest; -import tech.ydb.proto.table.YdbTable.DescribeTableResponse; -import tech.ydb.proto.table.YdbTable.DescribeTableResult; -import tech.ydb.proto.table.YdbTable.DropTableRequest; -import tech.ydb.proto.table.YdbTable.DropTableResponse; -import tech.ydb.proto.table.YdbTable.ExecuteDataQueryRequest; -import tech.ydb.proto.table.YdbTable.ExecuteDataQueryResponse; -import tech.ydb.proto.table.YdbTable.ExecuteQueryResult; -import tech.ydb.proto.table.YdbTable.ExecuteSchemeQueryRequest; -import tech.ydb.proto.table.YdbTable.ExecuteSchemeQueryResponse; -import tech.ydb.proto.table.YdbTable.ExplainDataQueryRequest; -import tech.ydb.proto.table.YdbTable.ExplainDataQueryResponse; -import tech.ydb.proto.table.YdbTable.ExplainQueryResult; -import tech.ydb.proto.table.YdbTable.KeepAliveRequest; -import tech.ydb.proto.table.YdbTable.KeepAliveResponse; -import tech.ydb.proto.table.YdbTable.KeepAliveResult; -import tech.ydb.proto.table.YdbTable.PrepareDataQueryRequest; -import tech.ydb.proto.table.YdbTable.PrepareDataQueryResponse; -import tech.ydb.proto.table.YdbTable.PrepareQueryResult; -import tech.ydb.proto.table.YdbTable.ReadRowsRequest; -import tech.ydb.proto.table.YdbTable.ReadRowsResponse; -import tech.ydb.proto.table.YdbTable.RenameTablesRequest; -import tech.ydb.proto.table.YdbTable.RenameTablesResponse; -import tech.ydb.proto.table.YdbTable.RollbackTransactionRequest; -import tech.ydb.proto.table.YdbTable.RollbackTransactionResponse; import tech.ydb.proto.table.v1.TableServiceGrpc; import tech.ydb.table.rpc.TableRpc; @@ -68,8 +26,8 @@ public final class GrpcTableRpc implements TableRpc { private final GrpcTransport transport; private final boolean transportOwned; - private static final StatusExtractor READ_ROWS = new StatusExtractor<>( - ReadRowsResponse::getStatus, ReadRowsResponse::getIssuesList); + private static final StatusExtractor READ_ROWS = StatusExtractor.of( + YdbTable.ReadRowsResponse::getStatus, YdbTable.ReadRowsResponse::getIssuesList); private GrpcTableRpc(GrpcTransport transport, boolean transportOwned) { this.transport = transport; @@ -85,169 +43,179 @@ public static GrpcTableRpc ownTransport(@WillClose GrpcTransport transport) { } @Override - public CompletableFuture> createSession(YdbTable.CreateSessionRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> createSession( + YdbTable.CreateSessionRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getCreateSessionMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - YdbTable.CreateSessionResponse::getOperation, - YdbTable.CreateSessionResult.class)); + .thenApply(OperationBinder.bindSync( + YdbTable.CreateSessionResponse::getOperation, YdbTable.CreateSessionResult.class + )); } @Override - public CompletableFuture deleteSession(DeleteSessionRequest request, - GrpcRequestSettings settings) { + public CompletableFuture deleteSession( + YdbTable.DeleteSessionRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getDeleteSessionMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(DeleteSessionResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.DeleteSessionResponse::getOperation)); } @Override - public CompletableFuture> keepAlive(KeepAliveRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> keepAlive( + YdbTable.KeepAliveRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getKeepAliveMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - KeepAliveResponse::getOperation, KeepAliveResult.class)); + .thenApply(OperationBinder + .bindSync(YdbTable.KeepAliveResponse::getOperation, YdbTable.KeepAliveResult.class) + ); } @Override - public CompletableFuture createTable(CreateTableRequest request, - GrpcRequestSettings settings) { + public CompletableFuture createTable(YdbTable.CreateTableRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getCreateTableMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(CreateTableResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.CreateTableResponse::getOperation)); } @Override - public CompletableFuture dropTable(DropTableRequest request, GrpcRequestSettings settings) { + public CompletableFuture dropTable(YdbTable.DropTableRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getDropTableMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(DropTableResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.DropTableResponse::getOperation)); } @Override - public CompletableFuture alterTable(AlterTableRequest request, GrpcRequestSettings settings) { + public CompletableFuture alterTable(YdbTable.AlterTableRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getAlterTableMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(AlterTableResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.AlterTableResponse::getOperation)); } @Override - public CompletableFuture copyTable(CopyTableRequest request, - GrpcRequestSettings settings) { + public CompletableFuture copyTable(YdbTable.CopyTableRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getCopyTableMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(CopyTableResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.CopyTableResponse::getOperation)); } @Override - public CompletableFuture copyTables(CopyTablesRequest request, - GrpcRequestSettings settings) { + public CompletableFuture copyTables(YdbTable.CopyTablesRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getCopyTablesMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(CopyTablesResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.CopyTablesResponse::getOperation)); } @Override - public CompletableFuture renameTables(RenameTablesRequest request, - GrpcRequestSettings settings) { + public CompletableFuture renameTables(YdbTable.RenameTablesRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getRenameTablesMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(RenameTablesResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.RenameTablesResponse::getOperation)); } @Override - public CompletableFuture> describeTable(DescribeTableRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> describeTable( + YdbTable.DescribeTableRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getDescribeTableMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - DescribeTableResponse::getOperation, DescribeTableResult.class)); + .thenApply(OperationBinder.bindSync( + YdbTable.DescribeTableResponse::getOperation, YdbTable.DescribeTableResult.class + )); } @Override - public CompletableFuture> explainDataQuery(ExplainDataQueryRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> explainDataQuery( + YdbTable.ExplainDataQueryRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getExplainDataQueryMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - ExplainDataQueryResponse::getOperation, ExplainQueryResult.class) - ); + .thenApply(OperationBinder.bindSync( + YdbTable.ExplainDataQueryResponse::getOperation, YdbTable.ExplainQueryResult.class + )); } @Override - public CompletableFuture> prepareDataQuery( - PrepareDataQueryRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> prepareDataQuery( + YdbTable.PrepareDataQueryRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getPrepareDataQueryMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - PrepareDataQueryResponse::getOperation, PrepareQueryResult.class + .thenApply(OperationBinder.bindSync( + YdbTable.PrepareDataQueryResponse::getOperation, YdbTable.PrepareQueryResult.class )); } @Override - public CompletableFuture> executeDataQuery(ExecuteDataQueryRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> executeDataQuery( + YdbTable.ExecuteDataQueryRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getExecuteDataQueryMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - ExecuteDataQueryResponse::getOperation, ExecuteQueryResult.class + .thenApply(OperationBinder.bindSync( + YdbTable.ExecuteDataQueryResponse::getOperation, YdbTable.ExecuteQueryResult.class )); } @Override - public CompletableFuture> readRows(ReadRowsRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> readRows( + YdbTable.ReadRowsRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getReadRowsMethod(), settings, request) .thenApply(READ_ROWS); } @Override - public CompletableFuture executeSchemeQuery(ExecuteSchemeQueryRequest request, + public CompletableFuture executeSchemeQuery(YdbTable.ExecuteSchemeQueryRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getExecuteSchemeQueryMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(ExecuteSchemeQueryResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.ExecuteSchemeQueryResponse::getOperation)); } @Override - public CompletableFuture> beginTransaction(BeginTransactionRequest request, - GrpcRequestSettings settings) { + public CompletableFuture> beginTransaction( + YdbTable.BeginTransactionRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getBeginTransactionMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper( - BeginTransactionResponse::getOperation, BeginTransactionResult.class + .thenApply(OperationBinder.bindSync( + YdbTable.BeginTransactionResponse::getOperation, YdbTable.BeginTransactionResult.class )); } @Override - public CompletableFuture commitTransaction(CommitTransactionRequest request, - GrpcRequestSettings settings) { + public CompletableFuture commitTransaction( + YdbTable.CommitTransactionRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getCommitTransactionMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(CommitTransactionResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.CommitTransactionResponse::getOperation)); } @Override - public CompletableFuture rollbackTransaction(RollbackTransactionRequest request, - GrpcRequestSettings settings) { + public CompletableFuture rollbackTransaction( + YdbTable.RollbackTransactionRequest request, GrpcRequestSettings settings + ) { return transport .unaryCall(TableServiceGrpc.getRollbackTransactionMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(RollbackTransactionResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.RollbackTransactionResponse::getOperation)); } @Override public GrpcReadStream streamReadTable( - YdbTable.ReadTableRequest request, GrpcRequestSettings settings) { + YdbTable.ReadTableRequest request, GrpcRequestSettings settings + ) { return transport.readStreamCall(TableServiceGrpc.getStreamReadTableMethod(), settings, request); } @Override public GrpcReadStream streamExecuteScanQuery( - YdbTable.ExecuteScanQueryRequest request, GrpcRequestSettings settings) { + YdbTable.ExecuteScanQueryRequest request, GrpcRequestSettings settings + ) { return transport.readStreamCall(TableServiceGrpc.getStreamExecuteScanQueryMethod(), settings, request); } @@ -255,7 +223,7 @@ public GrpcReadStream streamExecuteSca public CompletableFuture bulkUpsert(YdbTable.BulkUpsertRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TableServiceGrpc.getBulkUpsertMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(BulkUpsertResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTable.BulkUpsertResponse::getOperation)); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java index 1d4eaa415..974223dcd 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java @@ -11,7 +11,7 @@ import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.operation.OperationManager; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.proto.topic.v1.TopicServiceGrpc; import tech.ydb.topic.TopicRpc; @@ -37,14 +37,14 @@ public static GrpcTopicRpc useTransport(@WillNotClose GrpcTransport transport) { public CompletableFuture createTopic(YdbTopic.CreateTopicRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getCreateTopicMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.CreateTopicResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTopic.CreateTopicResponse::getOperation)); } @Override public CompletableFuture alterTopic(YdbTopic.AlterTopicRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getAlterTopicMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.AlterTopicResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTopic.AlterTopicResponse::getOperation)); } @Override @@ -52,22 +52,23 @@ public CompletableFuture> describeTopic(Ydb GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getDescribeTopicMethod(), settings, request) - .thenApply(OperationManager.syncResultUnwrapper(YdbTopic.DescribeTopicResponse::getOperation, - YdbTopic.DescribeTopicResult.class)); + .thenApply(OperationBinder.bindSync( + YdbTopic.DescribeTopicResponse::getOperation, YdbTopic.DescribeTopicResult.class) + ); } @Override public CompletableFuture dropTopic(YdbTopic.DropTopicRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getDropTopicMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.DropTopicResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTopic.DropTopicResponse::getOperation)); } @Override public CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest request, GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getCommitOffsetMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.CommitOffsetResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTopic.CommitOffsetResponse::getOperation)); } @Override @@ -75,8 +76,7 @@ public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffse GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request) - .thenApply(OperationManager.syncStatusUnwrapper( - YdbTopic.UpdateOffsetsInTransactionResponse::getOperation)); + .thenApply(OperationBinder.bindSync(YdbTopic.UpdateOffsetsInTransactionResponse::getOperation)); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index f17583a81..bae9812b5 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -16,7 +16,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.core.operation.OperationUtils; +import tech.ydb.core.operation.Operation; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; @@ -86,7 +86,7 @@ private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings @Override public CompletableFuture createTopic(String path, CreateTopicSettings settings) { YdbTopic.CreateTopicRequest.Builder requestBuilder = YdbTopic.CreateTopicRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setPath(path) .setRetentionStorageMb(settings.getRetentionStorageMb()) .setPartitionWriteSpeedBytesPerSecond(settings.getPartitionWriteSpeedBytesPerSecond()) @@ -122,7 +122,7 @@ public CompletableFuture createTopic(String path, CreateTopicSettings se @Override public CompletableFuture alterTopic(String path, AlterTopicSettings settings) { YdbTopic.AlterTopicRequest.Builder requestBuilder = YdbTopic.AlterTopicRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setPath(path)/* .putAllAttributes(settings.getAttributes()) .setMeteringMode(toProto(settings.getMeteringMode()))*/; @@ -218,7 +218,7 @@ public CompletableFuture alterTopic(String path, AlterTopicSettings sett @Override public CompletableFuture dropTopic(String path, DropTopicSettings settings) { YdbTopic.DropTopicRequest request = YdbTopic.DropTopicRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setPath(path) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings); @@ -228,7 +228,7 @@ public CompletableFuture dropTopic(String path, DropTopicSettings settin @Override public CompletableFuture> describeTopic(String path, DescribeTopicSettings settings) { YdbTopic.DescribeTopicRequest request = YdbTopic.DescribeTopicRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setPath(path) .build(); final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings); @@ -308,7 +308,7 @@ public AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersS @Override public CompletableFuture commitOffset(String path, CommitOffsetSettings settings) { YdbTopic.CommitOffsetRequest request = YdbTopic.CommitOffsetRequest.newBuilder() - .setOperationParams(OperationUtils.createParams(settings)) + .setOperationParams(Operation.buildParams(settings)) .setOffset(settings.getOffset()) .setPath(path) .setConsumer(settings.getConsumer())