Skip to content

Commit

Permalink
Separate leader updates from controller cluster member updates
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 9, 2025
1 parent 727a21d commit 5429ce5
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 176 deletions.
394 changes: 231 additions & 163 deletions common/pb/ctrl_pb/ctrl.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/pb/ctrl_pb/ctrl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum ContentType {
DecommissionRouterRequestType = 1043;

PeerStateChangeRequestType = 1050;
UpdateClusterLeaderRequestType = 1051;
}

enum ControlHeaders {
Expand Down Expand Up @@ -268,6 +269,10 @@ message UpdateCtrlAddresses {
bool isLeader = 3;
}

message UpdateClusterLeader {
uint64 index = 1;
}

enum PeerState {
Healthy = 0;
Unhealthy = 1;
Expand Down
4 changes: 4 additions & 0 deletions common/pb/ctrl_pb/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ func (request *UpdateCtrlAddresses) GetContentType() int32 {
func (request *PeerStateChanges) GetContentType() int32 {
return int32(ContentType_PeerStateChangeRequestType)
}

func (request *UpdateClusterLeader) GetContentType() int32 {
return int32(ContentType_UpdateClusterLeaderRequestType)
}
2 changes: 1 addition & 1 deletion controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
DefaultTlsHandshakeRateLimiterMaxWindow = 1000

DefaultRouterDataModelEnabled = true
DefaultRouterDataModelLogSize = 10_1000
DefaultRouterDataModelLogSize = 10_000
DefaultRouterDataModelListenerBufferSize = 1000
)

Expand Down
27 changes: 24 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,28 +552,49 @@ func (c *Controller) GetEventDispatcher() event.Dispatcher {
}

func (c *Controller) routerDispatchCallback(evt *event.ClusterEvent) {
if evt.EventType == event.ClusterMembersChanged || evt.EventType == event.ClusterLeadershipGained {
if evt.EventType == event.ClusterLeadershipGained {
req := &ctrl_pb.UpdateClusterLeader{
Index: evt.Index,
}

for _, r := range c.network.AllConnectedRouters() {
log := pfxlog.Logger().WithFields(map[string]interface{}{
"index": evt.Index,
})

if err := protobufs.MarshalTyped(req).Send(r.Control); err != nil {

pfxlog.Logger().WithError(err).WithField("routerId", r.Id).Error("unable to update cluster leader on router")
} else {
log.WithField("routerId", r.Id).WithField("routerName", r.Name).Info("router updated with info on new leader")
}
}
}

if evt.EventType == event.ClusterMembersChanged {
var endpoints []string
for _, peer := range evt.Peers {
endpoints = append(endpoints, peer.Addr)
}

updMsg := &ctrl_pb.UpdateCtrlAddresses{
Addresses: endpoints,
IsLeader: c.raftController.IsLeader(),
Index: evt.Index,
}

log := pfxlog.Logger().WithFields(map[string]interface{}{
"event": evt.EventType,
"addresses": endpoints,
"index": evt.Index,
})

log.Info("router connected, syncing ctrl addresses")
log.Info("syncing updated ctrl addresses to connected routers")

for _, r := range c.network.AllConnectedRouters() {
if err := protobufs.MarshalTyped(updMsg).Send(r.Control); err != nil {
pfxlog.Logger().WithError(err).WithField("routerId", r.Id).Error("unable to update controller endpoints on router")
} else {
log.WithField("routerId", r.Id).WithField("routerName", r.Name).Info("router updated with latest ctrl addresses")
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ import (
"time"
)

type RouterDispatchCallback func(*raft.Configuration) error

type ClusterEvent uint32

func (self ClusterEvent) String() string {
Expand Down
4 changes: 2 additions & 2 deletions controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (strategy *InstantStrategy) AddPublicKey(cert *tls.Certificate) {
// Initialize implements RouterDataModelCache
func (strategy *InstantStrategy) Initialize(logSize uint64, bufferSize uint) error {
strategy.RouterDataModel = common.NewSenderRouterDataModel(logSize, bufferSize)

pfxlog.Logger().WithField("logSize", logSize).WithField("listenerBufferSizes", bufferSize).
Info("initialized controller router data model")
if strategy.ae.HostController.IsRaftEnabled() {
strategy.indexProvider = &RaftIndexProvider{
index: strategy.ae.GetHostController().GetRaftIndex(),
Expand Down Expand Up @@ -264,7 +265,6 @@ func NewInstantStrategy(ae *env.AppEnv, options InstantStrategyOptions) *Instant
ae: ae,
routerConnectedQueue: make(chan *RouterSender, options.MaxQueuedRouterConnects),
receivedClientHelloQueue: make(chan *RouterSender, options.MaxQueuedClientHellos),
RouterDataModel: common.NewSenderRouterDataModel(10000, 10000),
stopNotify: make(chan struct{}),
changeSets: map[uint64]*edge_ctrl_pb.DataState_ChangeSet{},
}
Expand Down
36 changes: 35 additions & 1 deletion quickstart/test/ha-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ declare -a INSTANCE_NAMES=(inst001 inst002 inst003)
# initialize a map of name=pid
declare -A PIDS

echo "${BUILD_DIR}/ziti" edge quickstart ha \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
--home="${ziti_home}" \
--trust-domain="${trust_domain}" \
--instance-id="${INSTANCE_NAMES[0]}" \
--ctrl-port="${ctrl_ports[0]}" \
--router-port="${router_ports[0]}" \
> /tmp/ha-test.cmds

nohup "${BUILD_DIR}/ziti" edge quickstart ha \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
Expand All @@ -84,6 +94,17 @@ _wait_for_controller "${ctrl_ports[0]}"
sleep 5
echo "controller online"

echo "${BUILD_DIR}/ziti" edge quickstart join \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
--home="${ziti_home}" \
--trust-domain="${trust_domain}" \
--ctrl-port="${ctrl_ports[1]}" \
--router-port="${router_ports[1]}" \
--instance-id="${INSTANCE_NAMES[1]}" \
--cluster-member="tls:127.0.0.1:${ctrl_ports[0]}" \
>> /tmp/ha-test.cmds

nohup "${BUILD_DIR}/ziti" edge quickstart join \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
Expand All @@ -96,6 +117,17 @@ nohup "${BUILD_DIR}/ziti" edge quickstart join \
&> "${ziti_home}/${INSTANCE_NAMES[1]}.log" &
PIDS["${INSTANCE_NAMES[1]}"]=$!

echo "${BUILD_DIR}/ziti" edge quickstart join \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
--home="${ziti_home}" \
--trust-domain="${trust_domain}" \
--ctrl-port="${ctrl_ports[2]}" \
--router-port="${router_ports[2]}" \
--instance-id="${INSTANCE_NAMES[2]}" \
--cluster-member="tls:127.0.0.1:${ctrl_ports[0]}" \
>> /tmp/ha-test.cmds

nohup "${BUILD_DIR}/ziti" edge quickstart join \
--ctrl-address="127.0.0.1" \
--router-address="127.0.0.1" \
Expand Down Expand Up @@ -132,6 +164,8 @@ while [[ ${count} -lt 3 ]]; do
((elapsed+=6))

if [[ ${elapsed} -ge ${timeout} ]]; then
"${BUILD_DIR}/ziti" fabric list routers
"${BUILD_DIR}/ziti" fabric list links
echo "Timeout reached; not all connections are 'Connected'."
exit 1
fi
Expand Down Expand Up @@ -167,4 +201,4 @@ fi
echo "Test passed: One leader found and all instances are connected"
trap - EXIT
_term_background_pids
_term_background_pids
1 change: 1 addition & 0 deletions router/handler_ctrl/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error {
binding.AddTypedReceiveHandler(newSettingsHandler(self.ctrlAddressUpdater))
binding.AddTypedReceiveHandler(newFaultHandler(self.env.GetXlinkRegistry()))
binding.AddTypedReceiveHandler(newUpdateCtrlAddressesHandler(self.ctrlAddressUpdater))
binding.AddTypedReceiveHandler(newUpdateClusterLeaderHandler(self.ctrlAddressUpdater))

binding.AddPeekHandler(trace.NewChannelPeekHandler(self.env.GetRouterId().Token, binding.GetChannel(), self.forwarder.TraceController()))

Expand Down
51 changes: 51 additions & 0 deletions router/handler_ctrl/update_cluster_leader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package handler_ctrl

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v3"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

var updateClusterLeaderHandlerInstance *updateClusterLeaderHandler

type updateClusterLeaderHandler struct {
callback CtrlAddressUpdater
currentVersion uint64
}

func (handler *updateClusterLeaderHandler) ContentType() int32 {
return int32(ctrl_pb.ContentType_UpdateClusterLeaderRequestType)
}

func (handler *updateClusterLeaderHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
log := pfxlog.ContextLogger(ch.Label()).Entry
upd := &ctrl_pb.UpdateClusterLeader{}
if err := proto.Unmarshal(msg.Body, upd); err != nil {
log.WithError(err).Error("error unmarshalling")
return
}

log = log.WithFields(logrus.Fields{
"localVersion": handler.currentVersion,
"remoteVersion": upd.Index,
"ctrlId": ch.Id(),
})

if handler.currentVersion == 0 || handler.currentVersion < upd.Index {
log.Info("handling update of cluster leader")
handler.callback.UpdateLeader(ch.Id())
} else {
log.Info("ignoring outdated update cluster leader message")
}
}

func newUpdateClusterLeaderHandler(callback CtrlAddressUpdater) channel.TypedReceiveHandler {
if updateClusterLeaderHandlerInstance == nil {
updateClusterLeaderHandlerInstance = &updateClusterLeaderHandler{
callback: callback,
}
}
return updateClusterLeaderHandlerInstance
}
14 changes: 10 additions & 4 deletions router/handler_ctrl/update_ctrl_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ func (handler *updateCtrlAddressesHandler) HandleReceive(msg *channel.Message, c
"isLeader": upd.IsLeader,
})

if upd.IsLeader {
handler.callback.UpdateLeader(ch.Id())
} else if handler.currentVersion == 0 || handler.currentVersion < upd.Index {
log.Info("updating to controller endpoints to version")
log.Info("update ctrl endpoints message received")

if handler.currentVersion == 0 || handler.currentVersion < upd.Index {
log.Info("updating to newer controller endpoints")
handler.callback.UpdateCtrlEndpoints(upd.Addresses)
handler.currentVersion = upd.Index

if upd.IsLeader {
handler.callback.UpdateLeader(ch.Id())
}
} else {
log.Info("ignoring outdated controller endpoints")
}
}

Expand Down

0 comments on commit 5429ce5

Please sign in to comment.