Skip to content

Commit

Permalink
[Fix][GCS] Implement reconnection for RedisContext
Browse files Browse the repository at this point in the history
Closes: #47419
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Dec 5, 2024
1 parent 69f558b commit b6d2d83
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 30 deletions.
5 changes: 0 additions & 5 deletions src/ray/gcs/redis_async_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ redisAsyncContext *RedisAsyncContext::GetRawRedisAsyncContext() {
return redis_async_context_.get();
}

void RedisAsyncContext::ResetRawRedisAsyncContext() {
// Reset redis_async_context_ to nullptr because hiredis has released this context.
redis_async_context_.release();
}

Status RedisAsyncContext::RedisAsyncCommand(redisCallbackFn *fn,
void *privdata,
const char *format,
Expand Down
3 changes: 0 additions & 3 deletions src/ray/gcs/redis_async_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ class RedisAsyncContext {
/// \return redisAsyncContext *
redisAsyncContext *GetRawRedisAsyncContext();

/// Reset the raw 'redisAsyncContext' pointer to nullptr.
void ResetRawRedisAsyncContext();

/// Perform command 'redisvAsyncCommand'. Thread-safe.
///
/// \param fn Callback that will be called after the command finishes.
Expand Down
92 changes: 70 additions & 22 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ extern "C" {
#include "ray/common/ray_config.h"

namespace ray {

namespace gcs {

CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply->type) {
RAY_CHECK(nullptr != redis_reply);

Expand Down Expand Up @@ -277,9 +275,13 @@ RedisContext::~RedisContext() {
}
}

void RedisContext::ResetSyncContext() { context_.reset(); }

void RedisContext::ResetAsyncContext() { redis_async_context_.reset(); }

void RedisContext::Disconnect() {
context_.reset();
redis_async_context_.reset();
ResetSyncContext();
ResetAsyncContext();
}

Status AuthenticateRedis(redisContext *context,
Expand Down Expand Up @@ -324,14 +326,15 @@ Status AuthenticateRedis(redisAsyncContext *context,

void RedisAsyncContextDisconnectCallback(const redisAsyncContext *context, int status) {
RAY_LOG(DEBUG) << "Redis async context disconnected. Status: " << status;
// Reset raw 'redisAsyncContext' to nullptr because hiredis will release this context.
reinterpret_cast<RedisAsyncContext *>(context->data)->ResetRawRedisAsyncContext();
// Reset 'RedisAsyncContext' to nullptr because hiredis will release the raw async
// context.
reinterpret_cast<RedisContext *>(context->data)->ResetAsyncContext();
}

void SetDisconnectCallback(RedisAsyncContext *redis_async_context) {
void SetDisconnectCallback(RedisContext *redis_context) {
redisAsyncContext *raw_redis_async_context =
redis_async_context->GetRawRedisAsyncContext();
raw_redis_async_context->data = redis_async_context;
redis_context->async_context().GetRawRedisAsyncContext();
raw_redis_async_context->data = redis_context;
redisAsyncSetDisconnectCallback(raw_redis_async_context,
RedisAsyncContextDisconnectCallback);
}
Expand Down Expand Up @@ -595,6 +598,14 @@ Status RedisContext::Connect(const std::string &address,

RAY_CHECK(!context_);
RAY_CHECK(!redis_async_context_);

// Remember function arguments for reconnection
address_ = address;
port_ = port;
username_ = username;
password_ = password;
enable_ssl_ = enable_ssl;

// Fetch the ip address from the address. It might return multiple
// addresses and only the first one will be used.
auto ip_addresses = ResolveDNS(address, port);
Expand All @@ -616,7 +627,6 @@ Status RedisContext::Connect(const std::string &address,
}
RAY_CHECK_OK(AuthenticateRedis(context_.get(), username, password));

// Connect to async context
std::unique_ptr<redisAsyncContext, RedisContextDeleter> async_context;
{
auto resp =
Expand All @@ -632,7 +642,11 @@ Status RedisContext::Connect(const std::string &address,
RAY_CHECK_OK(AuthenticateRedis(async_context.get(), username, password));
redis_async_context_.reset(
new RedisAsyncContext(io_service_, std::move(async_context)));
SetDisconnectCallback(redis_async_context_.get());
SetDisconnectCallback(this);

// Check all contexts are connected before proceeding.
RAY_CHECK(context_ != nullptr || redis_async_context_ != nullptr)
<< "Failed to connect to Redis.";

// handle validation and primary connection for different types of redis
if (isRedisSentinel(*this)) {
Expand All @@ -646,6 +660,12 @@ Status RedisContext::Connect(const std::string &address,
}
}

Status RedisContext::Reconnect() {
RAY_LOG(INFO) << "Try to reconnect to Redis server.";
Disconnect();
return Connect(address_, port_, username_, password_, enable_ssl_);
}

std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
const std::vector<std::string> &args) {
RAY_CHECK(context_);
Expand All @@ -656,27 +676,55 @@ std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
argv.push_back(arg.data());
argc.push_back(arg.size());
}
auto redis_reply = reinterpret_cast<redisReply *>(
::redisCommandArgv(context_.get(), args.size(), argv.data(), argc.data()));
if (redis_reply == nullptr) {
RAY_LOG(ERROR) << "Failed to send redis command (sync): " << context_->errstr;
return nullptr;
// Run the command. We try to reconnect if the connection is lost, and we retry with
// exponential backoff using the same connection if error happens.
auto exp_back_off = ExponentialBackOff(RayConfig::instance().redis_retry_base_ms(),
RayConfig::instance().redis_retry_multiplier(),
RayConfig::instance().redis_retry_max_ms());
size_t pending_retries = RayConfig::instance().num_redis_request_retries() + 1;
std::unique_ptr<redisReply, decltype(freeReplyObject) *> redis_reply(nullptr,
freeReplyObject);
while (pending_retries > 0) {
redis_reply.reset(reinterpret_cast<redisReply *>(
::redisCommandArgv(context_.get(), args.size(), argv.data(), argc.data())));
// Disconnected. Try to reconnect first.
if (redis_reply == nullptr) {
RAY_CHECK_OK(this->Reconnect());
continue;
}
// Error happened, retry with same connection.
if (redis_reply->type != REDIS_REPLY_ERROR) {
std::unique_ptr<CallbackReply> callback_reply(new CallbackReply(redis_reply.get()));
return callback_reply;
}

auto error_msg = redis_reply ? redis_reply->str : context_->errstr;
RAY_LOG(ERROR) << "Redis request [" << absl::StrJoin(args, " ") << "]"
<< " failed due to error " << error_msg << ". " << pending_retries
<< " retries left.";
auto delay = exp_back_off.Current();
exp_back_off.Next();
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
--pending_retries;
}
std::unique_ptr<CallbackReply> callback_reply(new CallbackReply(redis_reply));
freeReplyObject(redis_reply);
return callback_reply;

RAY_LOG(ERROR) << "Failed to send redis command (sync): " << context_->errstr;
return nullptr;
}

void RedisContext::RunArgvAsync(std::vector<std::string> args,
RedisCallback redis_callback) {
RAY_CHECK(redis_async_context_);
// redis_async_context_ is nullptr means the connection is lost because it is reset in
// the disconnect callback. If the connection is lost, we need to reconnect before
// sending the request.
if (redis_async_context_ == nullptr) {
RAY_CHECK_OK(this->Reconnect());
}
auto request_context = new RedisRequestContext(io_service_,
std::move(redis_callback),
redis_async_context_.get(),
std::move(args));
request_context->Run();
}

} // namespace gcs

} // namespace ray
12 changes: 12 additions & 0 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class RedisContext {
const std::string &password,
bool enable_ssl = false);

Status Reconnect();

/// Disconnect from the server.
void Disconnect();

Expand All @@ -162,6 +164,9 @@ class RedisContext {
void RunArgvAsync(std::vector<std::string> args,
RedisCallback redis_callback = nullptr);

void ResetSyncContext();
void ResetAsyncContext();

redisContext *sync_context() {
RAY_CHECK(context_);
return context_.get();
Expand All @@ -180,6 +185,13 @@ class RedisContext {
std::unique_ptr<redisContext, RedisContextDeleter> context_;
redisSSLContext *ssl_context_;
std::unique_ptr<RedisAsyncContext> redis_async_context_;

// Remember Connect function arguments for reconnection
std::string address_;
int port_;
std::string username_;
std::string password_;
bool enable_ssl_;
};

} // namespace gcs
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/store_client/redis_store_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ int RedisStoreClient::GetNextJobID() {

auto cxt = redis_client_->GetPrimaryContext();
auto reply = cxt->RunArgvSync(command.ToRedisArgs());
RAY_CHECK(reply && !reply->IsNil()) << "Failed to get next job";
return static_cast<int>(reply->ReadAsInteger());
}

Expand Down Expand Up @@ -511,6 +512,7 @@ bool RedisDelKeyPrefixSync(const std::string &host,
std::vector<std::string> cmd{"KEYS",
RedisMatchPattern::Prefix(redis_key.ToString()).escaped};
auto reply = context->RunArgvSync(cmd);
RAY_CHECK(reply && !reply->IsNil()) << "Failed to delete keys";
const auto &keys = reply->ReadAsStringArray();
if (keys.empty()) {
RAY_LOG(INFO) << "No keys found for external storage namespace "
Expand All @@ -520,6 +522,7 @@ bool RedisDelKeyPrefixSync(const std::string &host,
auto delete_one_sync = [context](const std::string &key) {
auto del_cmd = std::vector<std::string>{"DEL", key};
auto del_reply = context->RunArgvSync(del_cmd);
RAY_CHECK(del_reply && !del_reply->IsNil()) << "Failed to delete key";
return del_reply->ReadAsInteger() > 0;
};
size_t num_deleted = 0;
Expand Down

0 comments on commit b6d2d83

Please sign in to comment.