Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Closes: #47419
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Dec 16, 2024
1 parent 8627c65 commit 2db1939
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 93 deletions.
179 changes: 97 additions & 82 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,22 @@ RedisRequestContext::RedisRequestContext(instrumented_io_context &io_service,
}
}

namespace {
std::optional<std::pair<std::string, int>> ParseIffMovedError(
const std::string &error_msg) {
// MOVED error message format:
// MOVED 14946 10.xx.xx.xx:7001
std::vector<std::string> parts = absl::StrSplit(error_msg, " ");
if (parts[0] != "MOVED") {
return std::nullopt;
}
RAY_CHECK_EQ(parts.size(), 3u);
std::vector<std::string> ip_port = absl::StrSplit(parts[2], ":");
RAY_CHECK_EQ(ip_port.size(), 2u);
return std::make_pair(ip_port[0], std::stoi(ip_port[1]));
}
} // namespace

void RedisRequestContext::RedisResponseFn(redisAsyncContext *async_context,
void *raw_reply,
void *privdata) {
Expand All @@ -181,10 +197,28 @@ void RedisRequestContext::RedisResponseFn(redisAsyncContext *async_context,
<< "]"
<< " failed due to error " << error_msg << ". "
<< request_cxt->pending_retries_ << " retries left.";
// Reconnect if connection is lost or the error is a MOVED error.
if (redis_reply == nullptr ||
std::string(redis_reply->str).find("MOVED") != std::string::npos) {
RAY_CHECK_OK(request_cxt->redis_context_.Reconnect());
// Reconnect if connection is lost
if (redis_reply == nullptr) {
RAY_CHECK(request_cxt->redis_context_.Reconnect().ok())
<< "Connection lost and failed to reconnect while executing async redis "
"command.";
} else {
// Reconnect if the error message is a MOVED error.
const std::string redis_error_msg(redis_reply->str, redis_reply->len);
auto maybe_ip_port = ParseIffMovedError(redis_error_msg);
if (maybe_ip_port.has_value()) {
const auto &[ip, port] = maybe_ip_port.value();
RAY_LOG(INFO) << "Redis cluster leader is " << ip << ":" << port
<< ". Reconnect to it.";
auto status = request_cxt->redis_context_.ConnectToIPAddress(ip, port);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconnect to the new leader: " << status.ToString()
<< ". Try to reconnect to the original address.";
RAY_CHECK(request_cxt->redis_context_.Reconnect().ok())
<< "Connection lost and failed to reconnect while executing async redis "
"command.";
}
}
}
auto delay = request_cxt->exp_back_off_.Current();
request_cxt->exp_back_off_.Next();
Expand Down Expand Up @@ -402,22 +436,6 @@ ConnectWithRetries(const std::string &address,
return resp;
}

namespace {
std::optional<std::pair<std::string, int>> ParseIffMovedError(
const std::string &error_msg) {
// MOVED error message format:
// MOVED 14946 10.xx.xx.xx:7001
std::vector<std::string> parts = absl::StrSplit(error_msg, " ");
if (parts[0] != "MOVED") {
return std::nullopt;
}
RAY_CHECK_EQ(parts.size(), 3u);
std::vector<std::string> ip_port = absl::StrSplit(parts[2], ":");
RAY_CHECK_EQ(ip_port.size(), 2u);
return std::make_pair(ip_port[0], std::stoi(ip_port[1]));
}
} // namespace

void ValidateRedisDB(RedisContext &context) {
auto reply = context.RunArgvSync(std::vector<std::string>{"INFO", "CLUSTER"});
// cluster_state:ok
Expand Down Expand Up @@ -463,13 +481,11 @@ void ValidateRedisDB(RedisContext &context) {

bool RedisContext::IsRedisSentinel() {
auto reply = RunArgvSync(std::vector<std::string>{"INFO", "SENTINEL"});
RAY_CHECK(reply && !reply->IsNil()) << "Failed to get Redis sentinel info";
RAY_CHECK(reply) << "Failed to get Redis sentinel info";
return !reply->IsNil() && !reply->IsError() && !reply->ReadAsString().empty();
}

Status RedisContext::ConnectRedisCluster(const std::string &username,
const std::string &password,
bool enable_ssl) {
Status RedisContext::ConnectRedisCluster() {
RAY_LOG(INFO) << "Connect to Redis Cluster";
// Ray has some restrictions for RedisDB. Validate it here.
ValidateRedisDB(*this);
Expand Down Expand Up @@ -506,7 +522,10 @@ Status RedisContext::ConnectRedisCluster(const std::string &username,
const auto &[ip, port] = maybe_ip_port.value();
RAY_LOG(INFO) << "Redis cluster leader is " << ip << ":" << port
<< ". Reconnect to it.";
if (!ConnectToIPAddress(ip, port, username, password, enable_ssl)) {
auto status = ConnectToIPAddress(ip, port);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconnect to the new leader: " << status.ToString()
<< ". Try to reconnect to the original address.";
return Reconnect();
}
}
Expand All @@ -523,9 +542,7 @@ Status RedisContext::ConnectRedisCluster(const std::string &username,
"Failed to find the redis master node. Maybe the cluster is down.");
}

Status RedisContext::ConnectRedisSentinel(const std::string &username,
const std::string &password,
bool enable_ssl) {
Status RedisContext::ConnectRedisSentinel() {
RAY_LOG(INFO) << "Connect to Redis sentinel";

std::vector<const char *> argv;
Expand Down Expand Up @@ -577,7 +594,13 @@ Status RedisContext::ConnectRedisSentinel(const std::string &username,
RAY_LOG(INFO) << "Connecting to the Redis primary node behind sentinel: " << actual_ip
<< ":" << actual_port;
Disconnect();
return Connect(actual_ip, std::stoi(actual_port), username, password, enable_ssl);
auto status = ConnectToIPAddress(actual_ip, std::stoi(actual_port));
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to the Redis primary node behind sentinel: "
<< status.ToString() << ". Try to reconnect to the original address.";
return Reconnect();
}
return Status::OK();
}

std::vector<std::string> ResolveDNS(instrumented_io_context &io_service,
Expand All @@ -595,15 +618,11 @@ std::vector<std::string> ResolveDNS(instrumented_io_context &io_service,
return ip_addresses;
}

bool RedisContext::ConnectToIPAddress(const std::string &ip_address,
int port,
const std::string &username,
const std::string &password,
bool enable_ssl) {
Status RedisContext::ConnectToIPAddress(const std::string &ip_address, int port) {
{
auto resp = ConnectWithRetries<redisContext>(ip_address, port, redisConnect);
if (!resp.first.ok()) {
return false;
return Status::RedisError("Failed to connect to Redis for sync context");
}
context_ = std::move(resp.second /* redisContext */);
}
Expand All @@ -613,40 +632,32 @@ bool RedisContext::ConnectToIPAddress(const std::string &ip_address,
auto resp =
ConnectWithRetries<redisAsyncContext>(ip_address, port, redisAsyncConnect);
if (!resp.first.ok()) {
return false;
return Status::RedisError("Failed to connect to Redis for async context");
}
async_context = std::move(resp.second);
}

if (enable_ssl) {
if (enable_ssl_) {
RAY_CHECK(ssl_context_ != nullptr);
if (redisInitiateSSLWithContext(context_.get(), ssl_context_) != REDIS_OK) {
RAY_LOG(ERROR) << "Failed to setup encrypted redis for sync context: "
<< context_->errstr;
return false;
return Status::RedisError("Failed to setup encrypted redis for sync context");
}
if (redisInitiateSSLWithContext(&async_context->c, ssl_context_) != REDIS_OK) {
RAY_LOG(ERROR) << "Failed to setup encrypted redis for async context: "
<< async_context->errstr;
return false;
return Status::RedisError("Failed to setup encrypted redis for async context");
}
}
if (!AuthenticateRedis(context_.get(), username, password).ok()) {
RAY_LOG(ERROR) << "Failed to authenticate redis for sync context: "
<< context_->errstr;
return false;
if (!AuthenticateRedis(context_.get(), username_, password_).ok()) {
return Status::RedisError("Failed to authenticate redis for sync context");
}
if (!AuthenticateRedis(async_context.get(), username, password).ok()) {
RAY_LOG(ERROR) << "Failed to authenticate redis for async context: "
<< async_context->errstr;
return false;
if (!AuthenticateRedis(async_context.get(), username_, password_).ok()) {
return Status::RedisError("Failed to authenticate redis for async context");
}

redis_async_context_.reset(
new RedisAsyncContext(io_service_, std::move(async_context)));
SetDisconnectCallback(this);

return true;
return Status::OK();
}

Status RedisContext::Connect(const std::string &address,
Expand All @@ -668,55 +679,49 @@ Status RedisContext::Connect(const std::string &address,
// address from the error message. Re-run this function with the
// right leader address.

RAY_CHECK(!context_);
RAY_CHECK(!redis_async_context_);
// Remember function arguments for reconnection
address_ = address;
port_ = port;
username_ = username;
password_ = password;
enable_ssl_ = enable_ssl;

bool should_reconnect = true;
return Connect();
}

// Remember function arguments for reconnection
if (is_first_connect_) {
is_first_connect_ = false;
address_ = address;
port_ = port;
username_ = username;
password_ = password;
enable_ssl_ = enable_ssl;
// Don't try to reconnect for the first time.
should_reconnect = false;
}
Status RedisContext::Connect() {
RAY_CHECK(!context_);
RAY_CHECK(!redis_async_context_);

// Fetch the ip address from the address. It might return multiple
// addresses and only the first one will be used.
RAY_CHECK(!address.empty());
auto ip_addresses = ResolveDNS(io_service_, address, port);
RAY_CHECK(!address_.empty());
auto ip_addresses = ResolveDNS(io_service_, address_, port_);
RAY_CHECK(!ip_addresses.empty())
<< "Failed to resolve DNS for " << address << ":" << port;
<< "Failed to resolve DNS for " << address_ << ":" << port_;

RAY_LOG(INFO) << "Resolve Redis address to " << absl::StrJoin(ip_addresses, ", ");
const auto &ip_address = ip_addresses[0];

if (!ConnectToIPAddress(ip_address, port, username, password, enable_ssl)) {
if (!should_reconnect) {
RAY_LOG(FATAL) << "Failed to connect to Redis.";
}
return Reconnect();
}
// If we failed to connect to the saved address RAY_REDIS_ADDRESS, then it's a fatal
// error.
RAY_CHECK_OK(ConnectToIPAddress(ip_address, port_));

// Check all contexts are connected before proceeding.
RAY_CHECK(context_ != nullptr || redis_async_context_ != nullptr)
<< "Failed to connect to Redis.";

// handle validation and primary connection for different types of redis
if (IsRedisSentinel()) {
return ConnectRedisSentinel(username, password, enable_ssl);
return ConnectRedisSentinel();
}
return ConnectRedisCluster(username, password, enable_ssl);
return ConnectRedisCluster();
}

Status RedisContext::Reconnect() {
RAY_LOG(INFO) << "Try to reconnect to Redis server.";
Disconnect();
return Connect(address_, port_, username_, password_, enable_ssl_);
return Connect();
}

std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
Expand Down Expand Up @@ -751,13 +756,23 @@ std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
}

// Reconnect if the error message is MOVED.
if (std::string(redis_reply->str).find("MOVED") != std::string::npos) {
RAY_CHECK_OK(Reconnect());
continue;
std::string error_msg(redis_reply->str, redis_reply->len);
auto maybe_ip_port = ParseIffMovedError(error_msg);
if (maybe_ip_port.has_value()) {
const auto &[ip, port] = maybe_ip_port.value();
RAY_LOG(INFO) << "Redis cluster leader is " << ip << ":" << port
<< ". Reconnect to it.";
auto status = ConnectToIPAddress(ip, port);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconnect to the new leader: " << status.ToString()
<< ". Try to reconnect to the original address.";
RAY_CHECK(Reconnect().ok()) << "Connection lost and failed to reconnect while "
"executing sync redis command.";
}
}

// Error happened, retry with same connection.
auto error_msg = redis_reply ? redis_reply->str : context_->errstr;
error_msg = redis_reply ? redis_reply->str : context_->errstr;
RAY_LOG(ERROR) << "Redis request [" << absl::StrJoin(args, " ") << "]"
<< " failed due to error " << error_msg << ". " << pending_retries
<< " retries left.";
Expand Down
21 changes: 10 additions & 11 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,14 @@ class RedisContext {
private:
bool IsRedisSentinel();

bool ConnectToIPAddress(const std::string &ip_address,
int port,
const std::string &username,
const std::string &password,
bool enable_ssl);
// Connect using saved arguments.
Status Connect();

Status ConnectRedisCluster(const std::string &username,
const std::string &password,
bool enable_ssl);
Status ConnectToIPAddress(const std::string &ip_address, int port);

Status ConnectRedisSentinel(const std::string &username,
const std::string &password,
bool enable_ssl);
Status ConnectRedisCluster();

Status ConnectRedisSentinel();

instrumented_io_context &io_service_;

Expand All @@ -207,6 +202,10 @@ class RedisContext {
std::string username_;
std::string password_;
bool enable_ssl_;

friend void RedisRequestContext::RedisResponseFn(redisAsyncContext *async_context,
void *raw_reply,
void *privdata);
};

} // namespace gcs
Expand Down

0 comments on commit 2db1939

Please sign in to comment.