Skip to content

Commit

Permalink
Merge pull request #264 from alex268/master
Browse files Browse the repository at this point in the history
Added option for async transport building
  • Loading branch information
alex268 authored Apr 25, 2024
2 parents 4bfda68 + b90e642 commit 273c93f
Show file tree
Hide file tree
Showing 26 changed files with 1,755 additions and 919 deletions.
1 change: 0 additions & 1 deletion core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
* @author Nikolay Perfilov
*/
public interface GrpcTransport extends AutoCloseable {

<ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
MethodDescriptor<ReqT, RespT> method,
GrpcRequestSettings settings,
Expand Down
58 changes: 54 additions & 4 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,37 @@
* @author Aleksandr Gorshenin
*/
public class GrpcTransportBuilder {
/**
* The initialization mode defines the behavior of {@link tech.ydb.core.grpc.GrpcTransportBuilder#build() }
* method.
*/
public enum InitMode {
/**
* In synchronous mode, transport creation will wait for successful discovery of current database nodes. Any
* errors on discovery execution like an authentication error or a network issue will be thrown as
* RuntimeException. It allows to catch configuration problems and stops the transport creating.
*/
SYNC,
/**
* In asynchronous mode, transport creation will not be blocked while waiting for discovery response and will
* not throw any exceptions in case of configuration problems. But any request with the transport will wait for
* the discovery and may throw an exception if it will not be completed. This mode allows the application not
* to be blocked during the transport initialization. Any user request on this transport will wait for
* initialization completion before being sent to the server
*/
ASYNC,

/**
* In fallback asynchronous mode, neither transport creation nor user request execution will be blocked while
* initial discovery is in progress. In this case if the discovery is not completed, all requests will be sent
* to the discovery endpoint. Any discovery problems will be ignored. This mode allows to start working with the
* database without waiting for discovery to complete, but after its completion, existing long-running
* operations (like grpc streams) will be interrupted.
* Thus this mode is not recommended for long-running streams such as topic reading/writing.
*/
ASYNC_FALLBACK
}

private final String endpoint;
private final HostAndPort host;
private final String database;
Expand All @@ -49,6 +80,7 @@ public class GrpcTransportBuilder {
private long discoveryTimeoutMillis = 60_000;
private boolean useDefaultGrpcResolver = false;
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
private InitMode initMode = InitMode.SYNC;

/**
* can cause leaks https://github.com/grpc/grpc-java/issues/9340
Expand Down Expand Up @@ -200,6 +232,18 @@ public GrpcTransportBuilder withBalancingSettings(BalancingSettings balancingSet
return this;
}

/**
* Set GrpcTransport's init mode.
* See {@link tech.ydb.core.grpc.GrpcTransportBuilder.InitMode } for details
*
* @param initMode mode of transport initialization
* @return GrpcTransportBuilder with the given initMode
*/
public GrpcTransportBuilder withInitMode(InitMode initMode) {
this.initMode = initMode;
return this;
}

public GrpcTransportBuilder withAuthProvider(AuthRpcProvider<? super GrpcAuthRpc> authProvider) {
this.authProvider = Objects.requireNonNull(authProvider);
return this;
Expand All @@ -209,7 +253,7 @@ public GrpcTransportBuilder withAuthProvider(AuthRpcProvider<? super GrpcAuthRpc
* Sets the compression to use for the calls. See {@link io.grpc.CallOptions#withCompression(java.lang.String) }
* for details
* @param compression the compression value
* @return return GrpcTransportBuilder with the given compression
* @return GrpcTransportBuilder with the given compression
*/
public GrpcTransportBuilder withGrpcCompression(@Nonnull GrpcCompression compression) {
this.compression = Objects.requireNonNull(compression, "compression is null");
Expand Down Expand Up @@ -328,17 +372,23 @@ public GrpcTransportBuilder disableRetry() {
public GrpcTransport build() {
YdbTransportImpl impl = new YdbTransportImpl(this);
try {
impl.init();
impl.start(initMode);
return impl;
} catch (RuntimeException ex) {
impl.close();
throw ex;
}
}

@Deprecated
public GrpcTransport buildAsync(Runnable ready) {
YdbTransportImpl impl = new YdbTransportImpl(this);
impl.initAsync(ready);
return impl;
try {
impl.startAsync(ready);
return impl;
} catch (RuntimeException ex) {
impl.close();
throw ex;
}
}
}
36 changes: 27 additions & 9 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand All @@ -14,6 +15,7 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
Expand All @@ -27,7 +29,6 @@
import tech.ydb.core.impl.call.ReadWriteStreamCall;
import tech.ydb.core.impl.call.UnaryCall;
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.utils.Async;

/**
*
Expand All @@ -41,15 +42,21 @@ public abstract class BaseGrpcTransport implements GrpcTransport {
.withIssues(Issue.of("Request was not sent: transport is shutting down", Issue.Severity.ERROR)
));

protected volatile boolean shutdown;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

abstract AuthCallOptions getAuthCallOptions();
protected abstract AuthCallOptions getAuthCallOptions();
protected abstract GrpcChannel getChannel(GrpcRequestSettings settings);
abstract void updateChannelStatus(GrpcChannel channel, io.grpc.Status status);
protected abstract void updateChannelStatus(GrpcChannel channel, io.grpc.Status status);

protected void shutdown() {
// nothing to shutdown
}

@Override
public void close() {
this.shutdown = true;
if (isClosed.compareAndSet(false, true)) {
shutdown();
}
}

@Override
Expand All @@ -58,7 +65,7 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
GrpcRequestSettings settings,
ReqT request
) {
if (shutdown) {
if (isClosed.get()) {
return CompletableFuture.completedFuture(SHUTDOWN_RESULT.map(null));
}

Expand All @@ -85,9 +92,12 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
}

return new UnaryCall<>(call, handler).startCall(request, makeMetadataFromSettings(settings));
} catch (UnexpectedResultException ex) {
logger.error("unary call with traceId {} unexprected status {}", settings.getTraceId(), ex.getStatus());
return CompletableFuture.completedFuture(Result.fail(ex));
} catch (RuntimeException ex) {
logger.error("unary call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
return Async.failedFuture(ex);
return CompletableFuture.completedFuture(Result.error(ex.getMessage(), ex));
}
}

Expand All @@ -97,7 +107,7 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
GrpcRequestSettings settings,
ReqT request
) {
if (shutdown) {
if (isClosed.get()) {
return new EmptyStream<>(SHUTDOWN_RESULT.getStatus());
}

Expand All @@ -124,6 +134,10 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
}

return new ReadStreamCall<>(call, request, makeMetadataFromSettings(settings), handler);
} catch (UnexpectedResultException ex) {
logger.error("server stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server stream call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
Expand All @@ -137,7 +151,7 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
MethodDescriptor<ReqT, RespT> method,
GrpcRequestSettings settings
) {
if (shutdown) {
if (isClosed.get()) {
return new EmptyStream<>(SHUTDOWN_RESULT.getStatus());
}

Expand All @@ -163,6 +177,10 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
}

return new ReadWriteStreamCall<>(call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler);
} catch (UnexpectedResultException ex) {
logger.error("server bidirectional stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server bidirectional stream call with traceId {} problem {}", settings.getTraceId(),
ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public String getDatabase() {
}

@Override
public void close() {
super.close();
protected void shutdown() {
channel.shutdown();
}

Expand All @@ -63,7 +62,7 @@ protected GrpcChannel getChannel(GrpcRequestSettings settings) {
}

@Override
void updateChannelStatus(GrpcChannel channel, Status status) {
protected void updateChannelStatus(GrpcChannel channel, Status status) {
if (!status.isOk()) {
logger.warn("grpc error {}[{}] on fixed channel {}",
status.getCode(),
Expand Down
40 changes: 11 additions & 29 deletions core/src/main/java/tech/ydb/core/impl/MultiChannelTransport.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package tech.ydb.core.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import com.google.common.base.Strings;
import com.google.common.net.HostAndPort;
Expand All @@ -20,7 +20,6 @@
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.impl.pool.GrpcChannelPool;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.proto.discovery.DiscoveryProtos;

/**
*
Expand All @@ -32,7 +31,7 @@ public class MultiChannelTransport extends BaseGrpcTransport {

private final String database;
private final AuthCallOptions callOptions;
private final DiscoveryProtos.ListEndpointsResult discoveryResult;
private final List<EndpointRecord> endpoints;
private final EndpointPool endpointPool;
private final GrpcChannelPool channelPool;
private final ScheduledExecutorService scheduler;
Expand All @@ -45,28 +44,14 @@ public MultiChannelTransport(GrpcTransportBuilder builder, List<HostAndPort> hos
this.database = Strings.nullToEmpty(builder.getDatabase());
this.scheduler = builder.getSchedulerFactory().get();

List<EndpointRecord> endpoints = new ArrayList<>();
DiscoveryProtos.ListEndpointsResult.Builder discoveryBuilder = DiscoveryProtos.ListEndpointsResult.newBuilder();
hosts.forEach(host -> {
endpoints.add(new EndpointRecord(host.getHost(), host.getPortOrDefault(YdbTransportImpl.DEFAULT_PORT), 0));
discoveryBuilder.addEndpointsBuilder()
.setAddress(host.getHost())
.setPort(host.getPortOrDefault(YdbTransportImpl.DEFAULT_PORT))
.build();
});

this.discoveryResult = discoveryBuilder.build();
this.callOptions = new AuthCallOptions(scheduler,
database,
endpoints,
channelFactory,
builder
);
this.endpoints = hosts.stream()
.map(host -> new EndpointRecord(host.getHost(), host.getPortOrDefault(YdbTransportImpl.DEFAULT_PORT)))
.collect(Collectors.toList());

this.callOptions = new AuthCallOptions(scheduler, endpoints, channelFactory, builder);
this.channelPool = new GrpcChannelPool(channelFactory, scheduler);
this.endpointPool = new EndpointPool(null, BalancingSettings.defaultInstance());

this.endpointPool.setNewState(discoveryResult);
this.endpointPool = new EndpointPool(BalancingSettings.defaultInstance());
this.endpointPool.setNewState(null, endpoints);
}

@Override
Expand All @@ -80,12 +65,9 @@ public String getDatabase() {
}

@Override
public void close() {
super.close();

protected void shutdown() {
channelPool.shutdown();
callOptions.close();

YdbSchedulerFactory.shutdownScheduler(scheduler);
}

Expand All @@ -101,12 +83,12 @@ protected GrpcChannel getChannel(GrpcRequestSettings settings) {
}

@Override
void updateChannelStatus(GrpcChannel channel, Status status) {
protected void updateChannelStatus(GrpcChannel channel, Status status) {
if (!status.isOk()) {
endpointPool.pessimizeEndpoint(channel.getEndpoint());

if (endpointPool.needToRunDiscovery()) {
endpointPool.setNewState(discoveryResult);
endpointPool.setNewState(null, endpoints);
}
}
}
Expand Down
16 changes: 4 additions & 12 deletions core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tech.ydb.core.impl;

import java.util.Collections;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -36,12 +36,7 @@ public SingleChannelTransport(GrpcTransportBuilder builder) {
this.channel = new GrpcChannel(endpoint, channelFactory, true);

this.scheduler = builder.getSchedulerFactory().get();
this.callOptions = new AuthCallOptions(scheduler,
database,
Collections.singletonList(endpoint),
channelFactory,
builder
);
this.callOptions = new AuthCallOptions(scheduler, Arrays.asList(endpoint), channelFactory, builder);
}

@Override
Expand All @@ -55,12 +50,9 @@ public String getDatabase() {
}

@Override
public void close() {
super.close();

protected void shutdown() {
channel.shutdown();
callOptions.close();

YdbSchedulerFactory.shutdownScheduler(scheduler);
}

Expand All @@ -75,7 +67,7 @@ protected GrpcChannel getChannel(GrpcRequestSettings settings) {
}

@Override
void updateChannelStatus(GrpcChannel channel, Status status) {
protected void updateChannelStatus(GrpcChannel channel, Status status) {
if (!status.isOk()) {
logger.warn("grpc error {}[{}] on single channel {}",
status.getCode(),
Expand Down
Loading

0 comments on commit 273c93f

Please sign in to comment.