Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Consistent hashing for balancer #258

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
[submodule "subprojects/json"]
path = subprojects/json
url = https://github.com/nlohmann/json.git
[submodule "subprojects/chash"]
path = subprojects/chash
url = https://github.com/yanet-platform/chash.git
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved
branch = master
3 changes: 3 additions & 0 deletions autotest/autotest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ bool readTimeLimited(int fd, u_char* buf, ssize_t len, std::chrono::system_clock
switch (ret)
{
case 0:
YANET_LOG_ERROR("Failed to read packets: end of file\n");
return false;
case -1:
if ((errno == EAGAIN) || (errno = EWOULDBLOCK))
Expand All @@ -299,6 +300,7 @@ bool readTimeLimited(int fd, u_char* buf, ssize_t len, std::chrono::system_clock
}
else
{
YANET_LOG_ERROR("Failed to read packets: %s\n", strerror(errno));
return false;
}
break;
Expand All @@ -319,6 +321,7 @@ static bool readPacket(int fd, pcap_pkthdr* header, u_char* data, Duration timel
struct packHeader hdr;
if (!readTimeLimited(fd, hdr, time_to_give_up))
{
YANET_LOG_ERROR("Failed to read packet header\n");
return false;
}

Expand Down
Binary file modified autotest/units/001_one_port/055_balancer_wlc/001-expect.pcap
Binary file not shown.
Binary file modified autotest/units/001_one_port/055_balancer_wlc/002-expect.pcap
Binary file not shown.
Binary file modified autotest/units/001_one_port/055_balancer_wlc/003-expect.pcap
Binary file not shown.
14 changes: 7 additions & 7 deletions autotest/units/001_one_port/055_balancer_wlc/autotest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ steps:
YANET_FORMAT_COLUMNS=module,virtual_ip,proto,virtual_port,scheduler,real_ip,real_port,enabled,weight,connections,packets,bytes balancer real any
module virtual_ip proto virtual_port scheduler real_ip real_port enabled weight connections packets bytes
--------- ---------- ----- ------------ --------- ------- --------- ------- ------ ----------- ------- -----
balancer0 10.1.0.55 tcp 443 wlc 2443::1 443 true 2 16 16 1568
balancer0 10.1.0.55 tcp 443 wlc 2443::1 443 true 2 15 15 1470
balancer0 10.1.0.55 tcp 443 wlc 2443::2 443 true 1 8 8 784
balancer0 10.1.0.55 tcp 443 wlc 2443::3 443 true 1 8 8 784
balancer0 10.1.0.55 tcp 443 wlc 2443::3 443 true 1 9 9 882
balancer0 10.1.0.55 tcp 443 wlc 2443::4 443 false 4 0 0 0

- cli:
Expand All @@ -30,10 +30,10 @@ steps:
YANET_FORMAT_COLUMNS=module,virtual_ip,proto,virtual_port,scheduler,real_ip,real_port,enabled,weight,connections,packets,bytes balancer real any
module virtual_ip proto virtual_port scheduler real_ip real_port enabled weight connections packets bytes
--------- ---------- ----- ------------ --------- ------- --------- ------- ------ ----------- ------- -----
balancer0 10.1.0.55 tcp 443 wlc 2443::1 443 true 2 17 17 1666
balancer0 10.1.0.55 tcp 443 wlc 2443::2 443 true 1 10 10 980
balancer0 10.1.0.55 tcp 443 wlc 2443::1 443 true 2 16 16 1568
balancer0 10.1.0.55 tcp 443 wlc 2443::2 443 true 1 8 8 784
balancer0 10.1.0.55 tcp 443 wlc 2443::3 443 true 1 9 9 882
balancer0 10.1.0.55 tcp 443 wlc 2443::4 443 true 4 28 28 2744
balancer0 10.1.0.55 tcp 443 wlc 2443::4 443 true 4 31 31 3038

- sleep: 2
- sendPackets:
Expand All @@ -45,6 +45,6 @@ steps:
module virtual_ip proto virtual_port scheduler real_ip real_port enabled weight connections packets bytes
--------- ---------- ----- ------------ --------- ------- --------- ------- ------ ----------- ------- -----
balancer0 10.1.0.55 tcp 443 wlc 2443::1 443 true 2 24 24 2352
balancer0 10.1.0.55 tcp 443 wlc 2443::2 443 true 1 11 11 1078
balancer0 10.1.0.55 tcp 443 wlc 2443::2 443 true 1 12 12 1176
balancer0 10.1.0.55 tcp 443 wlc 2443::3 443 true 1 12 12 1176
balancer0 10.1.0.55 tcp 443 wlc 2443::4 443 true 4 49 49 4802
balancer0 10.1.0.55 tcp 443 wlc 2443::4 443 true 4 48 48 4704
2 changes: 1 addition & 1 deletion common/controlplaneconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ using service_t = std::tuple<balancer_service_id_t,
std::optional<uint16_t>, ///< vport
std::optional<std::string>, ///< version
::balancer::scheduler,
::balancer::scheduler_params,
uint32_t, ///< wlc power
::balancer::forwarding_method,
uint8_t, ///< flags: mss_fix|ops
std::optional<common::ipv4_prefix_t>, ///< ipv4_outer_source_network
Expand Down
23 changes: 23 additions & 0 deletions common/deferer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace utils
{

template<typename F>
class Deferer
{
F action_;

public:
Deferer(F&& action) :
action_{std::move(action)}
{}
Deferer(const Deferer& other) = delete;
Deferer(Deferer&& other) = delete;
Deferer& operator=(const Deferer& other) = delete;
Deferer& operator=(Deferer&& other) = delete;
~Deferer()
{
action_();
}
};

} // namespace utils
1 change: 0 additions & 1 deletion common/idp.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ using service = std::tuple<balancer_service_id_t, /// service id
tCounterId, ///< size 4
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved
balancer::scheduler,
balancer::forwarding_method, // tunneling method (default ipip)
uint32_t, /// default_wlc_power
uint32_t, ///< real_start
uint32_t, ///< real_size
std::optional<common::ipv4_prefix_t>, ///< ipv4_outer_source_network
Expand Down
21 changes: 6 additions & 15 deletions common/scheduler.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <variant>

#include <cstdint>
namespace balancer
Expand All @@ -9,34 +10,24 @@ enum class scheduler : uint8_t
rr,
wrr,
wlc,
};

class scheduler_params
{
public:
scheduler_params() = default;
uint32_t wlc_power;
chash
};

[[maybe_unused]] constexpr const char* to_string(const scheduler& scheduler)
{
switch (scheduler)
{
case scheduler::rr:
{
return "rr";
}
case scheduler::wrr:
{
return "wrr";
}
case scheduler::wlc:
{
return "wlc";
}
case scheduler::chash:
return "chash";
default:
return "unknown";
}

return "unknown";
}

}
157 changes: 153 additions & 4 deletions controlplane/balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,14 @@ void balancer_t::compile(common::idp::updateGlobalBase::request& globalbase,
virtual_port,
version,
scheduler,
scheduler_params,
wlc_power,
forwarding_method,
flags,
ipv4_outer_source_network,
ipv6_outer_source_network,
reals] : balancer.services)
{
YANET_GCC_BUG_UNUSED(scheduler_params);
YANET_GCC_BUG_UNUSED(wlc_power);
YANET_GCC_BUG_UNUSED(version);

if (service_id >= YANET_CONFIG_BALANCER_SERVICES_SIZE)
Expand Down Expand Up @@ -808,7 +808,6 @@ void balancer_t::compile(common::idp::updateGlobalBase::request& globalbase,
counter_id,
scheduler,
forwarding_method,
balancer.default_wlc_power, // todo use scheduler_params.wlc_power when other services will be able to set it
(uint32_t)real_start,
(uint32_t)(req_reals.size() - real_start),
ipv4_outer_source_network,
Expand Down Expand Up @@ -883,6 +882,12 @@ void balancer_t::flush_reals(common::idp::updateGlobalBaseBalancer::request& bal
}
}

auto reals_wlc = reals_wlc_weight.find(key);
if (reals_wlc != reals_wlc_weight.end() && reals_wlc->second.has_value())
{
effective_weight = reals_wlc->second.value();
}
Comment on lines +885 to +889
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add comments that I've left at Levon's pr, since you will be merging his commit now:

We will have code duplication here with the following lines:

auto it = reals_enabled.find(key);
if (it != reals_enabled.end())
{
	if (it->second.has_value())
	{
		effective_weight = it->second.value();
	}
}

Mayve we can add a private method in balancer_t, smth like

 template <typename Map>
    std::optional<typename Map::mapped_type::value_type> get_effective_weight(const Map& map, const balancer::real_key_global_t& key) const {
        auto it = map.find(key);
        if (it != map.end() && it->second.has_value()) {
            return it->second.value();
        }
        return std::nullopt;
    }

And then use it, for example, with compound initialization:

if (auto found_weight = get_effective_weight(reals_wlc_weight, key); found_weight) {
    effective_weight = *found_weight;
}


uint32_t real_unordered_id = 0;
{
std::lock_guard<std::mutex> guard(reals_unordered_mutex);
Expand Down Expand Up @@ -926,8 +931,152 @@ void balancer_t::reconfigure_wlc_thread()
{
while (!flagStop)
{
balancer_real_flush();
if (balancer_t::reconfigure_wlc())
{
balancer_real_flush();
}

std::this_thread::sleep_for(std::chrono::seconds(YANET_CONFIG_BALANCER_WLC_RECONFIGURE));
}
}

bool balancer_t::reconfigure_wlc()
{
bool wlc_weight_changed = false;

common::idp::updateGlobalBaseBalancer::request balancer;
const auto balancer_real_connections = dataplane.balancer_real_connections();

std::lock_guard<std::mutex> guard(reals_enabled_mutex);

for (const auto& [module_name, balancer] : generations_config.current().config_balancers)
{

for (const auto& [service_id,
virtual_ip,
proto,
virtual_port,
version,
scheduler,
requested_wlc_power,
forwarding_method,
flags,
ipv4_outer_source_network,
ipv6_outer_source_network,
reals] : balancer.services)
{
(void)flags;
(void)version;
(void)forwarding_method;
(void)ipv4_outer_source_network;
(void)ipv6_outer_source_network;
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved

if (scheduler != ::balancer::scheduler::wlc)
{
continue;
}

if (service_id >= YANET_CONFIG_BALANCER_SERVICES_SIZE)
{
continue;
}

std::vector<std::tuple<balancer::real_key_global_t, uint32_t, uint32_t>> service_reals_usage_info;
uint32_t connection_sum = 0;
uint32_t weight_sum = 0;

for (const auto& [real_ip, real_port, weight] : reals)
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved
{
balancer::real_key_global_t key = {module_name, {virtual_ip, proto, virtual_port}, {real_ip, real_port}};
uint32_t effective_weight = weight;
{
auto it = reals_enabled.find(key);
if (it != reals_enabled.end())
{
if (it->second.has_value())
{
effective_weight = it->second.value();
}
}
}
Comment on lines +992 to +1002
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto found_weight = get_effective_weight(reals_enabled, key);
uint32_t effective_weight = found_weight ? *found_weight : weight;


weight_sum += effective_weight;

// don`t count connections for disabled reals - it can make other reals "feel" underloaded
if (effective_weight == 0)
{
continue;
}

common::idp::balancer_real_connections::real_key_t real_connections_key = {balancer.balancer_id,
virtual_ip,
proto,
virtual_port.value(),
real_ip,
real_port.value()};
uint32_t connections = 0;
for (auto& [socket_id, real_connections] : balancer_real_connections)
{
(void)socket_id;
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved

auto it = real_connections.find(real_connections_key);
if (it == real_connections.end())
{
continue;
}
connections += it->second;
TheRandomCharacter marked this conversation as resolved.
Show resolved Hide resolved
}

connection_sum += connections;

service_reals_usage_info.emplace_back(key,
effective_weight,
connections);
}

for (auto [key,
effective_weight,
connections] : service_reals_usage_info)
{
uint32_t wlc_power = requested_wlc_power;
if (wlc_power < 1 || wlc_power > 100)
{
wlc_power = YANET_CONFIG_BALANCER_WLC_DEFAULT_POWER;
}

effective_weight = calculate_wlc_weight(effective_weight, connections, weight_sum, connection_sum, wlc_power);

if (reals_wlc_weight[key] != effective_weight)
{
reals_wlc_weight[key] = effective_weight;
real_updates.insert(key);
if (in_reload)
{
real_reload_updates.insert(key);
}
wlc_weight_changed = true;
}
}
}
}

return wlc_weight_changed;
}

uint32_t balancer_t::calculate_wlc_weight(uint32_t weight, uint32_t connections, uint32_t weight_sum, uint32_t connection_sum, uint32_t wlc_power)
{
if (weight == 0 || weight_sum == 0 || connection_sum < weight_sum)
{
return weight;
}

auto wlc_ratio = std::max(1.0, wlc_power * (1 - 1.0 * connections * weight_sum / connection_sum / weight));
auto wlc_weight = (uint32_t)(weight * wlc_ratio);
Comment on lines +1074 to +1075
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's obscure. I suggest:

    auto scaled_connections = static_cast<double>(connections * weight_sum);
    auto scaled_weight = static_cast<double>(connection_sum * weight);
    double connection_ratio = scaled_connections / scaled_weight;

    double wlc_ratio = std::max(min_ratio, wlc_power * (1.0 - connection_ratio));

    auto new_weight = static_cast<uint32_t>(std::round(weight * wlc_ratio));


if (wlc_weight > YANET_CONFIG_BALANCER_REAL_WEIGHT_MAX)
{
wlc_weight = YANET_CONFIG_BALANCER_REAL_WEIGHT_MAX;
}

return wlc_weight;
}
6 changes: 6 additions & 0 deletions controlplane/balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class balancer_t : public module_t, common::icp_proto::BalancerService

std::map<balancer::real_key_global_t, std::optional<uint32_t>> reals_enabled;

std::map<balancer::real_key_global_t, std::optional<uint32_t>> reals_wlc_weight;

// The set contains all reals touched after the last one flush operation
std::set<balancer::real_key_global_t> real_updates;

Expand All @@ -133,7 +135,11 @@ class balancer_t : public module_t, common::icp_proto::BalancerService
friend class telegraf_t;
counter_t<balancer::service_counter_key_t, (size_t)balancer::service_counter::size> service_counters;
counter_t<balancer::real_counter_key_t, (size_t)balancer::real_counter::size> real_counters;

void RealFind(google::protobuf::RpcController* controller, const common::icp_proto::BalancerRealFindRequest* request, common::icp_proto::BalancerRealFindResponse* response, google::protobuf::Closure* done) override;
void Real(google::protobuf::RpcController* controller, const ::common::icp_proto::BalancerRealRequest* request, ::common::icp_proto::Empty* response, ::google::protobuf::Closure* done) override;
void RealFlush(google::protobuf::RpcController* controller, const ::common::icp_proto::Empty* request, ::common::icp_proto::Empty* response, ::google::protobuf::Closure* done) override;

bool reconfigure_wlc();
uint32_t calculate_wlc_weight(uint32_t weight, uint32_t connections, uint32_t weight_sum, uint32_t connection_sum, uint32_t wlc_power);
};
Loading
Loading