Skip to content

Commit

Permalink
Added deadline check of discovery waitReady
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Apr 25, 2024
1 parent 63c8f7e commit b90e642
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
15 changes: 13 additions & 2 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
Expand All @@ -28,7 +29,6 @@
import tech.ydb.core.impl.call.ReadWriteStreamCall;
import tech.ydb.core.impl.call.UnaryCall;
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.utils.Async;

/**
*
Expand Down Expand Up @@ -92,9 +92,12 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
}

return new UnaryCall<>(call, handler).startCall(request, makeMetadataFromSettings(settings));
} catch (UnexpectedResultException ex) {
logger.error("unary call with traceId {} unexprected status {}", settings.getTraceId(), ex.getStatus());
return CompletableFuture.completedFuture(Result.fail(ex));
} catch (RuntimeException ex) {
logger.error("unary call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
return Async.failedFuture(ex);
return CompletableFuture.completedFuture(Result.error(ex.getMessage(), ex));
}
}

Expand Down Expand Up @@ -131,6 +134,10 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
}

return new ReadStreamCall<>(call, request, makeMetadataFromSettings(settings), handler);
} catch (UnexpectedResultException ex) {
logger.error("server stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server stream call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
Expand Down Expand Up @@ -170,6 +177,10 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
}

return new ReadWriteStreamCall<>(call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler);
} catch (UnexpectedResultException ex) {
logger.error("server bidirectional stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server bidirectional stream call with traceId {} problem {}", settings.getTraceId(),
ex.getMessage());
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void stop() {
}
}

public void waitReady() throws IllegalStateException {
public void waitReady(long millis) throws IllegalStateException {
if (isStarted) {
return;
}
Expand All @@ -94,7 +94,8 @@ public void waitReady() throws IllegalStateException {
return;
}

readyObj.wait(discoveryTimeout.toMillis());
long timeout = millis > 0 ? millis : discoveryTimeout.toMillis();
readyObj.wait(timeout);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
lastException = new IllegalStateException("Discovery waiting interrupted", ex);
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void start(GrpcTransportBuilder.InitMode mode) {
discovery.start();

if (mode == GrpcTransportBuilder.InitMode.SYNC) {
discovery.waitReady();
discovery.waitReady(-1);
}
}

Expand All @@ -82,7 +82,7 @@ public void startAsync(Runnable readyWatcher) {
discovery.start();
if (readyWatcher != null) {
scheduler.execute(() -> {
discovery.waitReady();
discovery.waitReady(-1);
readyWatcher.run();
});
}
Expand Down Expand Up @@ -157,7 +157,11 @@ public AuthCallOptions getAuthCallOptions() {
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
EndpointRecord endpoint = endpointPool.getEndpoint(settings.getPreferredNodeID());
if (endpoint == null) {
discovery.waitReady();
long timeout = -1;
if (settings.getDeadlineAfter() != 0) {
timeout = settings.getDeadlineAfter() - System.nanoTime();
}
discovery.waitReady(timeout);
endpoint = endpointPool.getEndpoint(settings.getPreferredNodeID());
}
return channelPool.getChannel(endpoint);
Expand Down
9 changes: 5 additions & 4 deletions core/src/test/java/tech/ydb/core/impl/YdbDiscoveryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private <T extends Throwable> T checkFutureException(CompletableFuture<Boolean>

private CompletableFuture<Boolean> createWaitingFuture(YdbDiscovery discovery) {
return CompletableFuture.supplyAsync(() -> {
discovery.waitReady();
discovery.waitReady(100);
return Boolean.TRUE;
});
}
Expand All @@ -87,7 +87,7 @@ public void baseTest() {
scheduler.hasTasksCount(1).runNextTask();
verifyDiscoveryCount(1);

discovery.waitReady();
discovery.waitReady(-1);
discovery.stop();

// stop is imdepotent operation
Expand Down Expand Up @@ -237,8 +237,9 @@ public void failedDiscoveryTest() {
scheduler.hasTasksCount(1).runNextTask();
verifyDiscoveryCount(4);

RuntimeException ex5 = checkFutureException(req5, "Discovery failed", RuntimeException.class);
Assert.assertEquals("Test io problem", ex5.getMessage());
UnexpectedResultException ex5 = checkFutureException(req5, "Discovery failed", UnexpectedResultException.class);
Assert.assertEquals(StatusCode.CLIENT_INTERNAL_ERROR, ex5.getStatus().getCode());
Assert.assertEquals("Test io problem, code: CLIENT_INTERNAL_ERROR", ex5.getMessage());

discovery.stop();
}
Expand Down

0 comments on commit b90e642

Please sign in to comment.