Skip to content

Commit

Permalink
[INLONG-9228][SDK] CPP SDK supports dynamic load balancing (apache#9235)
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Nov 8, 2023
1 parent 003715e commit b5d92d1
Show file tree
Hide file tree
Showing 10 changed files with 591 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class SdkConfig {
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
uint32_t dispatch_interval_zip_; // Compression thread distribution interval
uint32_t dispatch_interval_send_; // sending thread sending interval
uint32_t load_balance_interval_;
uint32_t heart_beat_interval_;

// Packaging parameters
bool enable_pack_;
Expand All @@ -77,14 +79,16 @@ class SdkConfig {
std::string manager_cluster_url_;
uint32_t manager_update_interval_; // Automatic update interval, minutes
uint32_t manager_url_timeout_; // URL parsing timeout, seconds
uint32_t max_proxy_num_;
uint64_t max_proxy_num_;
uint64_t reserve_proxy_num_;
uint32_t msg_type_;
bool enable_isolation_;

// Network parameters
bool enable_tcp_nagle_;
uint64_t tcp_idle_time_; // The time when tcpclient did not send data
uint32_t tcp_detection_interval_; // tcp-client detection interval
bool enable_balance_;

// auth settings
bool need_auth_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,34 @@ class Stat {

uint64_t send_failed_pack_num_;
uint64_t send_failed_msg_num_;
uint64_t time_cost_;

public:
Stat()
: send_success_pack_num_(0), send_success_msg_num_(0),
send_failed_pack_num_(0), send_failed_msg_num_(0) {}
send_failed_pack_num_(0), send_failed_msg_num_(0) ,time_cost_(0) {}

void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; }
void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; }
void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; }
void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; }
void AddTimeCost(uint64_t time_cost) { time_cost_ += time_cost; }

void ResetStat() {
send_success_pack_num_ = 0;
send_success_msg_num_ = 0;
send_failed_pack_num_ = 0;
send_failed_msg_num_ = 0;
time_cost_ = 0;
}
std::string ToString() {
std::stringstream stat;
stat << "success-pack[" << send_success_pack_num_ << "]";
stat << " success-msg[" << send_success_msg_num_ << "]";
stat << "msg[" << send_success_msg_num_ << "]";
stat << " failed-pack[" << send_failed_pack_num_ << "]";
stat << " failed-msg[" << send_failed_msg_num_ << "]";
stat << "msg[" << send_failed_msg_num_ << "]";
uint64_t pack_num = send_success_pack_num_ + send_failed_msg_num_ + 1;
stat << " trans[" << time_cost_ / pack_num << "]";
return stat.str();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port)
wait_timer_(std::make_shared<asio::steady_timer>(io_context)),
keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)),
ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), port),
status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false) {
;
status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false),
proxy_loads_(30), wait_heart_beat_(false), reset_client_(false),
heart_beat_index_(0), only_heart_heat_(false) {
client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";

tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_;
Expand Down Expand Up @@ -82,7 +83,6 @@ void TcpClient::AsyncConnect() {
asio::error_code error;
socket_->close(error);
if (asio::error::operation_aborted == error) {
// operation aborted
return;
}
}
Expand All @@ -102,7 +102,6 @@ void TcpClient::DoAsyncConnect(asio::error_code error) {
}
if (error) {
if (asio::error::operation_aborted == error) {
// operation aborted必
return;
}
}
Expand All @@ -122,7 +121,6 @@ void TcpClient::OnConnected(asio::error_code error) {
return;
}
if (asio::error::operation_aborted == error) {
// operation aborted
return;
}
status_ = kConnectFailed;
Expand Down Expand Up @@ -164,23 +162,22 @@ void TcpClient::OnWroten(const asio::error_code error,
}
if (error) {
if (asio::error::operation_aborted == error) {
// operation aborted
return;
}
status_ = kWriting;
LOG_ERROR("write error:" << error.message() << CLIENT_INFO);
status_ = kWriting;
HandleFail();
return;
}

if (0 == bytes_transferred) {
status_ = kWaiting;
LOG_ERROR("transferred 0 bytes." << CLIENT_INFO);
status_ = kWaiting;
HandleFail();
return;
}

status_ = CLIENT_RESPONSE;
status_ = kClientResponse;
asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)),
std::bind(&TcpClient::OnReturn, this, std::placeholders::_1,
std::placeholders::_2));
Expand All @@ -191,17 +188,16 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) {
}
if (error) {
if (asio::error::operation_aborted == error) {
// operation aborted
return;
}
LOG_ERROR("OnReturn error:" << error.message() << CLIENT_INFO);
status_ = kWaiting;
std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
HandleFail();
return;
}
if (len != sizeof(uint32_t)) {
status_ = kWaiting;
LOG_ERROR("len :" << len <<" != sizeof(uint32_t):" <<sizeof(uint32_t)<< CLIENT_INFO);
HandleFail();
return;
}
Expand All @@ -210,11 +206,11 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) {

if (resp_len > recv_buf_->m_max_size) {
status_ = kWaiting;
LOG_ERROR("invalid resp_len :" << resp_len << CLIENT_INFO);
HandleFail();
return;
}
asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, resp_len),
asio::async_read(*socket_,
asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len),
std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
std::placeholders::_2));
}
Expand All @@ -226,20 +222,34 @@ void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) {

if (error) {
if (asio::error::operation_aborted == error) {
// operation aborted
return;
}
LOG_ERROR("OnBody error:" << error.message() << CLIENT_INFO);
status_ = kWaiting;
HandleFail();
return;
}
uint32_t parse_index = sizeof(uint32_t);
uint8_t msg_type =
*reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);

if (sendBuffer_ != nullptr) {
stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
stat_.AddSendSuccessPackNum(1);
switch (msg_type) {
case 8:
ParseHeartBeat(bytesTransferred);
break;
default:
ParseGenericResponse();
break;
}
if (wait_heart_beat_) {
HeartBeat();
wait_heart_beat_ = false;
return;
}

sendBuffer_->releaseBuf();
if (reset_client_) {
RestClient();
reset_client_ = false;
}

status_ = kFree;
Expand All @@ -255,6 +265,8 @@ void TcpClient::HandleFail() {
stat_.AddSendFailMsgNum(sendBuffer_->msgCnt());
stat_.AddSendFailPackNum(1);

stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);

sendBuffer_->doUserCallBack();
sendBuffer_->releaseBuf();
}
Expand All @@ -269,12 +281,14 @@ void TcpClient::DetectStatus(const asio::error_code error) {
if (error) {
return;
}

LOG_INFO(stat_.ToString() << CLIENT_INFO);
stat_.ResetStat();
if (!only_heart_heat_) {
LOG_INFO(stat_.ToString() << CLIENT_INFO);
stat_.ResetStat();
}

if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
status_ != kConnecting) {
std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
LOG_INFO("reconnect because it has idle "
<< tcp_idle_time_ << " ms."
<< "last send time:" << last_update_time_ << CLIENT_INFO);
Expand All @@ -287,4 +301,142 @@ void TcpClient::DetectStatus(const asio::error_code error) {
std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
}

void TcpClient::HeartBeat(bool only_heart_heat) {
if (kStopped == status_ || exit_) {
return;
}
only_heart_heat_ = only_heart_heat;
status_ = kHeartBeat;
last_update_time_ = Utils::getCurrentMsTime();
// status_ = kWriting;

bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4);
bin_hb_.msg_type = 8;
bin_hb_.data_time =
htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
bin_hb_.body_ver = 1;
bin_hb_.body_len = 0;
bin_hb_.attr_len = 0;
bin_hb_.magic = htons(constants::kBinaryMagic);
char *hb = (char *)&bin_hb_;
uint32_t hb_len = sizeof(bin_hb_);

asio::async_write(*socket_, asio::buffer(hb, hb_len),
std::bind(&TcpClient::OnWroten, this, std::placeholders::_1,
std::placeholders::_2));
}

void TcpClient::ParseHeartBeat(size_t total_length) {
// | total length(4) | msg type(1) | data time(4) | body version(1) | body
// length (4) | body | attr length(2) | attr | magic (2) | skip total length
uint32_t parse_index = sizeof(uint32_t);
// skip msg type
parse_index += sizeof(uint8_t);
// skip data time
// uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
// *>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint32_t);

// 3、parse body version
uint32_t body_version =
*reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
parse_index += sizeof(uint8_t);

// 4、parse body length
uint32_t body_length = ntohl(
*reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint32_t);

// 5 parse load
int16_t load = ntohs(
*reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index));
parse_index += sizeof(int16_t);

// 7 parse attr length
uint16_t attr_length = ntohs(
*reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint16_t);

// 8 skip attr
parse_index += attr_length;

// 9 parse magic
uint16_t magic = ntohs(
*reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint16_t);

if (magic != constants::kBinaryMagic) {
LOG_ERROR("failed to parse heartbeat ack! error magic "
<< magic << " !=" << constants::kBinaryMagic << CLIENT_INFO);
return;
}

if (total_length + 4 != parse_index) {
LOG_ERROR("failed to parse heartbeat ack! total_length "
<< total_length << " +4 !=" << parse_index << CLIENT_INFO);
return;
}
if (heart_beat_index_ > constants::MAX_STAT) {
heart_beat_index_ = 0;
}
if (body_version == 1 && body_length == 2) {
proxy_loads_[heart_beat_index_++ % 30] = load;
} else {
proxy_loads_[heart_beat_index_++ % 30] = 0;
}
LOG_INFO("current load is " << load << CLIENT_INFO);
}

void TcpClient::ParseGenericResponse() {
if (sendBuffer_ != nullptr) {
stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
stat_.AddSendSuccessPackNum(1);

stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);

sendBuffer_->releaseBuf();
}
}

int32_t TcpClient::GetAvgLoad() {
int32_t numerator = 0;
int32_t denominator = 0;
for (int i = 0; i < proxy_loads_.size(); i++) {
if (proxy_loads_[i] > 0) {
numerator += proxy_loads_[i] * constants::kWeight[i];
denominator += constants::kWeight[i];
}
}
int32_t avg_load = 0;
if (0 == denominator) {
return avg_load;
}
avg_load = numerator / denominator;
LOG_INFO("average load is " << avg_load << CLIENT_INFO);
return avg_load;
}

void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; }

void TcpClient::UpdateClient(const std::string ip, const uint32_t port) {
LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port
<< "] replace" << CLIENT_INFO);
ip_ = ip;
port_ = port;
reset_client_ = true;
}
void TcpClient::RestClient() {
std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip_), port_);
endpoint_ = endpoint;
client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";

LOG_INFO("RestClient[" << only_heart_heat_ << "]" << CLIENT_INFO);

AsyncConnect();
}
const std::string &TcpClient::getIp() const { return ip_; }
const std::string &TcpClient::getClientInfo() const { return client_info_; }
uint32_t TcpClient::getPort() const { return port_; }

} // namespace inlong
Loading

0 comments on commit b5d92d1

Please sign in to comment.