diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index e50e6093c35..49876e7a2f3 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -43,6 +43,7 @@ import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.network.TransportContext; import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; @@ -81,6 +82,7 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; + private final int rpcMaxRetries; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -179,6 +181,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); + rpcMaxRetries = conf.clientRpcMaxRetries(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -534,6 +537,7 @@ private ConcurrentHashMap registerShuffle( lifecycleManagerRef.askSync( RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions), conf.clientRpcRegisterShuffleAskTimeout(), + rpcMaxRetries, ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class))); } @@ -1707,6 +1711,7 @@ private void mapEndInternal( MapperEndResponse response = lifecycleManagerRef.askSync( new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), + rpcMaxRetries, ClassTag$.MODULE$.apply(MapperEndResponse.class)); if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); @@ -1741,65 +1746,60 @@ public boolean cleanupShuffle(int shuffleId) { protected Tuple2 loadFileGroupInternal( int shuffleId, boolean isSegmentGranularityVisible) { - { - long getReducerFileGroupStartTime = System.nanoTime(); - String exceptionMsg = null; - try { - if (lifecycleManagerRef == null) { - exceptionMsg = "Driver endpoint is null!"; - logger.warn(exceptionMsg); - } else { - GetReducerFileGroup getReducerFileGroup = - new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); - - GetReducerFileGroupResponse response = - lifecycleManagerRef.askSync( - getReducerFileGroup, - conf.clientRpcGetReducerFileGroupAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); + long getReducerFileGroupStartTime = System.nanoTime(); + String exceptionMsg = null; + if (lifecycleManagerRef == null) { + exceptionMsg = "Driver endpoint is null!"; + logger.warn(exceptionMsg); + return Tuple2.apply(null, exceptionMsg); + } + try { + GetReducerFileGroup getReducerFileGroup = + new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); - switch (response.status()) { - case SUCCESS: - logger.info( - "Shuffle {} request reducer file group success using {} ms, result partition size {}.", - shuffleId, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), - response.fileGroup().size()); - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case SHUFFLE_NOT_REGISTERED: - logger.warn( - "Request {} return {} for {}.", - getReducerFileGroup, - response.status(), - shuffleId); - // return empty result - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case STAGE_END_TIME_OUT: - case SHUFFLE_DATA_LOST: - exceptionMsg = - String.format( - "Request %s return %s for %s.", - getReducerFileGroup, response.status(), shuffleId); - logger.warn(exceptionMsg); - break; - default: // fall out - } - } - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); - exceptionMsg = e.getMessage(); + GetReducerFileGroupResponse response = + lifecycleManagerRef.askSync( + getReducerFileGroup, + conf.clientRpcGetReducerFileGroupAskTimeout(), + rpcMaxRetries, + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); + switch (response.status()) { + case SUCCESS: + logger.info( + "Shuffle {} request reducer file group success using {} ms, result partition size {}.", + shuffleId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), + response.fileGroup().size()); + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case SHUFFLE_NOT_REGISTERED: + logger.warn( + "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); + // return empty result + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case STAGE_END_TIME_OUT: + case SHUFFLE_DATA_LOST: + exceptionMsg = + String.format( + "Request %s return %s for %s.", + getReducerFileGroup, response.status(), shuffleId); + logger.warn(exceptionMsg); + break; + default: // fall out } - return Tuple2.apply(null, exceptionMsg); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); + exceptionMsg = e.getMessage(); } + return Tuple2.apply(null, exceptionMsg); } @Override @@ -1928,8 +1928,14 @@ public void shutdown() { @Override public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - lifecycleManagerRef = - rpcEnv.setupEndpointRef(new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP); + try { + lifecycleManagerRef = + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP, rpcMaxRetries); + } catch (Exception e) { + throw new CelebornRuntimeException("setupLifecycleManagerRef failed!", e); + } + initDataClientFactoryIfNeeded(); } diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index 5256ae0fb0c..183878cbe1e 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -249,6 +249,12 @@ private CelebornConf setupEnv( RegisterShuffleResponse$.MODULE$.apply( statusCode, new PartitionLocation[] {primaryLocation})); + when(endpointRef.askSync(any(), any(), any(Integer.class), any())) + .thenAnswer( + t -> + RegisterShuffleResponse$.MODULE$.apply( + statusCode, new PartitionLocation[] {primaryLocation})); + shuffleClient.setupLifecycleManagerRef(endpointRef); ChannelFuture mockedFuture = diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 1efab142e82..3496d7cf933 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -520,6 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) def rpcAskTimeout: RpcTimeout = new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key) + def rpcTimeoutRetryWaitMs: Long = get(RPC_TIMEOUT_RETRY_WAIT) def rpcInMemoryBoundedInboxCapacity(): Int = { get(RPC_INBOX_CAPACITY) } @@ -4884,6 +4885,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") + val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.rpc.retryWait") + .categories("network") + .version("0.6.0") + .doc("Wait time before next retry on RpcTimeoutException.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.client.reserveSlots.maxRetries") .withAlternative("celeborn.slots.reserve.maxRetries") @@ -5033,7 +5042,7 @@ object CelebornConf extends Logging { buildConf("celeborn.client.rpc.maxRetries") .categories("client") .version("0.3.2") - .doc("Max RPC retry times in LifecycleManager.") + .doc("Max RPC retry times in client.") .intConf .createWithDefault(3) diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index edd7005e2e9..a608b97ad09 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -17,6 +17,9 @@ package org.apache.celeborn.common.rpc +import java.util.Random +import java.util.concurrent.TimeUnit + import scala.concurrent.Future import scala.reflect.ClassTag @@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) extends Serializable with Logging { private[this] val defaultAskTimeout = conf.rpcAskTimeout + private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt /** * return the address for the [[RpcEndpointRef]] @@ -88,4 +92,58 @@ abstract class RpcEndpointRef(conf: CelebornConf) val future = ask[T](message, timeout) timeout.awaitResult(future, address) } + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, retry if timeout, throw an exception if this still fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, retryCount: Int): T = + askSync(message, defaultAskTimeout, retryCount) + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * specified timeout, retry if timeout, throw an exception if this still fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @param timeout the timeout duration + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = { + var numRetries = retryCount + while (numRetries > 0) { + numRetries -= 1 + try { + val future = ask[T](message, timeout) + return timeout.awaitResult(future, address) + } catch { + case e: RpcTimeoutException => + if (numRetries > 0) { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) + try { + TimeUnit.MILLISECONDS.sleep(retryWaitMs) + } catch { + case _: InterruptedException => + throw e + } + } else { + throw e + } + } + } + // should never be here + val future = ask[T](message, timeout) + timeout.awaitResult(future, address) + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 7a44d8b63aa..58d29a4fa3d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -18,6 +18,8 @@ package org.apache.celeborn.common.rpc import java.io.File +import java.util.Random +import java.util.concurrent.TimeUnit import scala.concurrent.Future @@ -104,6 +106,7 @@ object RpcEnv { abstract class RpcEnv(config: RpcEnvConfig) { private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout + private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement @@ -142,6 +145,41 @@ abstract class RpcEnv(config: RpcEnvConfig) { setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) } + /** + * Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName` with timeout retry. + * This is a blocking action. + */ + def setupEndpointRef( + address: RpcAddress, + endpointName: String, + retryCount: Int): RpcEndpointRef = { + var numRetries = retryCount + while (numRetries > 0) { + numRetries -= 1 + try { + return setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) + } catch { + case e: RpcTimeoutException => + if (numRetries > 0) { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) + try { + TimeUnit.MILLISECONDS.sleep(retryWaitMs) + } catch { + case _: InterruptedException => + throw e + } + } else { + throw e + } + case e: RpcEndpointNotFoundException => + throw e + } + } + // should never be here + null + } + /** * Stop [[RpcEndpoint]] specified by `endpoint`. */ diff --git a/docs/configuration/client.md b/docs/configuration/client.md index f035713eb90..479d16de778 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -80,7 +80,7 @@ license: | | celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size | | celeborn.client.rpc.commitFiles.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. | 0.4.1 | | | celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. | 0.2.0 | | -| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | | +| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in client. | 0.3.2 | | | celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout | | celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.2.0 | | | celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for LifecycleManager request reserve slots. | 0.3.0 | | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index f690d205e25..14ec3431a23 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -56,6 +56,7 @@ license: | | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory bounded capacity. | 0.5.0 | | | celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | +| celeborn.rpc.retryWait | 1s | false | Wait time before next retry on RpcTimeoutException. | 0.6.0 | | | celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | | | celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | |