From 32eddd61cae562aba777d74dd3f5535fda8c0b96 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Wed, 29 May 2024 09:04:56 -0400 Subject: [PATCH] HA SDK terminators test. Fixes #2217. Fixes #2533 --- common/pb/cmd_pb/cmd.pb.go | 16 +- common/pb/cmd_pb/cmd.proto | 1 + common/pb/ctrl_pb/impl.go | 31 +- common/router_data_model.go | 50 +++- controller/command/command.go | 5 + controller/config/config.go | 12 +- controller/config/config_raft.go | 6 +- controller/controller.go | 2 +- controller/db/controller_store.go | 16 +- controller/db/terminator_store.go | 11 +- controller/env/appenv.go | 4 + controller/env/broker.go | 54 +--- controller/env/sync.go | 4 +- controller/handler_ctrl/create_terminator.go | 1 + .../handler_edge_ctrl/create_terminator.go | 1 + .../handler_edge_ctrl/create_terminator_v2.go | 2 +- .../create_tunnel_terminator.go | 1 + .../create_tunnel_terminator_v2.go | 1 + controller/handler_peer_ctrl/command.go | 9 +- controller/handler_peer_ctrl/inspect.go | 40 +-- .../internal/routes/controller_api_model.go | 5 +- controller/model/base_manager.go | 9 + controller/model/controller_manager.go | 147 ++++----- controller/model/controller_model.go | 52 +++- controller/model/env.go | 2 + controller/model/terminator_manager.go | 2 + controller/model/terminator_model.go | 7 + controller/model/testing.go | 7 + controller/models/base_model.go | 2 + controller/models/errors.go | 2 +- controller/network/router_messaging.go | 213 ++++++++----- controller/raft/mesh/mesh.go | 280 +++++++++++++----- controller/raft/mesh/peerconn.go | 17 +- controller/raft/raft.go | 41 ++- controller/xt/xt.go | 1 + controller/xt_common/failure_test.go | 4 + router/env/ctrls.go | 29 +- router/handler_ctrl/settings.go | 2 +- ...lAddresses.go => update_ctrl_addresses.go} | 8 +- router/router.go | 6 +- router/router_test.go | 4 +- router/state/manager.go | 12 + router/xgress_edge/certchecker_test.go | 2 +- router/xgress_edge/hosted.go | 4 +- zitirest/clients.go | 2 +- .../sdk-hosting-test/configs/ctrl.yml.tmpl | 12 +- .../sdk-hosting-test/configs/router.yml.tmpl | 11 + zititest/models/sdk-hosting-test/main.go | 90 ++++-- .../models/sdk-hosting-test/validation.go | 47 +-- zititest/models/sdk-status-test/main.go | 3 +- zititest/models/smoke/configs/router.yml.tmpl | 6 + zititest/zitilab/actions/edge/raft_join.go | 26 +- 52 files changed, 847 insertions(+), 475 deletions(-) rename router/handler_ctrl/{updateCtrlAddresses.go => update_ctrl_addresses.go} (88%) diff --git a/common/pb/cmd_pb/cmd.pb.go b/common/pb/cmd_pb/cmd.pb.go index 8f17c8487..a58259c31 100644 --- a/common/pb/cmd_pb/cmd.pb.go +++ b/common/pb/cmd_pb/cmd.pb.go @@ -1001,6 +1001,7 @@ type Terminator struct { HostId string `protobuf:"bytes,12,opt,name=hostId,proto3" json:"hostId,omitempty"` IsSystem bool `protobuf:"varint,13,opt,name=isSystem,proto3" json:"isSystem,omitempty"` SavedPrecedence uint32 `protobuf:"varint,14,opt,name=savedPrecedence,proto3" json:"savedPrecedence,omitempty"` + SourceCtrl string `protobuf:"bytes,15,opt,name=sourceCtrl,proto3" json:"sourceCtrl,omitempty"` } func (x *Terminator) Reset() { @@ -1133,6 +1134,13 @@ func (x *Terminator) GetSavedPrecedence() uint32 { return 0 } +func (x *Terminator) GetSourceCtrl() string { + if x != nil { + return x.SourceCtrl + } + return "" +} + var File_cmd_proto protoreflect.FileDescriptor var file_cmd_proto_rawDesc = []byte{ @@ -1253,8 +1261,8 @@ var file_cmd_proto_rawDesc = []byte{ 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2b, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x7a, 0x69, 0x74, 0x69, 0x2e, 0x63, 0x6d, 0x64, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x67, 0x56, 0x61, - 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xeb, - 0x04, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x0e, 0x0a, + 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8b, + 0x05, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x72, @@ -1283,7 +1291,9 @@ var file_cmd_proto_rawDesc = []byte{ 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x53, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x28, 0x0a, 0x0f, 0x73, 0x61, 0x76, 0x65, 0x64, 0x50, 0x72, 0x65, 0x63, 0x65, 0x64, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, - 0x73, 0x61, 0x76, 0x65, 0x64, 0x50, 0x72, 0x65, 0x63, 0x65, 0x64, 0x65, 0x6e, 0x63, 0x65, 0x1a, + 0x73, 0x61, 0x76, 0x65, 0x64, 0x50, 0x72, 0x65, 0x63, 0x65, 0x64, 0x65, 0x6e, 0x63, 0x65, 0x12, + 0x1e, 0x0a, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x74, 0x72, 0x6c, 0x18, 0x0f, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x74, 0x72, 0x6c, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x65, 0x65, 0x72, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, diff --git a/common/pb/cmd_pb/cmd.proto b/common/pb/cmd_pb/cmd.proto index b86323cbd..6a2f4f50e 100644 --- a/common/pb/cmd_pb/cmd.proto +++ b/common/pb/cmd_pb/cmd.proto @@ -123,4 +123,5 @@ message Terminator { string hostId = 12; bool isSystem = 13; uint32 savedPrecedence = 14; + string sourceCtrl = 15; } diff --git a/common/pb/ctrl_pb/impl.go b/common/pb/ctrl_pb/impl.go index 0b08aee27..33505afe7 100644 --- a/common/pb/ctrl_pb/impl.go +++ b/common/pb/ctrl_pb/impl.go @@ -16,7 +16,10 @@ package ctrl_pb -import "github.com/openziti/ziti/controller/xt" +import ( + "github.com/openziti/channel/v3/protobufs" + "github.com/openziti/ziti/controller/xt" +) func (request *CircuitConfirmation) GetContentType() int32 { return int32(ContentType_CircuitConfirmationType) @@ -107,3 +110,29 @@ func (request *UpdateCtrlAddresses) GetContentType() int32 { func (request *PeerStateChanges) GetContentType() int32 { return int32(ContentType_PeerStateChangeRequestType) } + +type FilterableValidateTerminatorsRequest interface { + protobufs.TypedMessage + FilterTerminators(f func(terminator *Terminator) bool) + GetTerminators() []*Terminator +} + +func (request *ValidateTerminatorsRequest) FilterTerminators(f func(terminator *Terminator) bool) { + var terminators []*Terminator + for _, terminator := range request.Terminators { + if f(terminator) { + terminators = append(terminators, terminator) + } + } + request.Terminators = terminators +} + +func (request *ValidateTerminatorsV2Request) FilterTerminators(f func(terminator *Terminator) bool) { + var terminators []*Terminator + for _, terminator := range request.Terminators { + if f(terminator) { + terminators = append(terminators, terminator) + } + } + request.Terminators = terminators +} diff --git a/common/router_data_model.go b/common/router_data_model.go index f9446e345..280e3f55c 100644 --- a/common/router_data_model.go +++ b/common/router_data_model.go @@ -18,9 +18,12 @@ package common import ( "compress/gzip" + "crypto" + "crypto/x509" "encoding/json" "fmt" "github.com/michaelquigley/pfxlog" + "github.com/openziti/foundation/v2/concurrenz" "github.com/openziti/ziti/common/pb/edge_ctrl_pb" cmap "github.com/orcaman/concurrent-map/v2" "github.com/sirupsen/logrus" @@ -93,14 +96,15 @@ type RouterDataModel struct { EventCache listeners map[chan *edge_ctrl_pb.DataState_ChangeSet]struct{} - ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"` - Configs cmap.ConcurrentMap[string, *Config] `json:"configs"` - Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"` - Services cmap.ConcurrentMap[string, *Service] `json:"services"` - ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"` - PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"` - PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"` - Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"` + ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"` + Configs cmap.ConcurrentMap[string, *Config] `json:"configs"` + Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"` + Services cmap.ConcurrentMap[string, *Service] `json:"services"` + ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"` + PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"` + PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"` + Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"` + CachedPublicKeys concurrenz.AtomicValue[map[string]crypto.PublicKey] listenerBufferSize uint lastSaveIndex *uint64 @@ -433,6 +437,7 @@ func (rdm *RouterDataModel) HandlePublicKeyEvent(event *edge_ctrl_pb.DataState_E } else { rdm.PublicKeys.Set(model.PublicKey.Kid, model.PublicKey) } + rdm.recalculateCachedPublicKeys() } // HandleRevocationEvent will apply the delta event to the router data model. It is not restricted by index calculations. @@ -507,8 +512,33 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_ }) } -func (rdm *RouterDataModel) GetPublicKeys() map[string]*edge_ctrl_pb.DataState_PublicKey { - return rdm.PublicKeys.Items() +func (rdm *RouterDataModel) GetPublicKeys() map[string]crypto.PublicKey { + return rdm.CachedPublicKeys.Load() +} + +func (rdm *RouterDataModel) recalculateCachedPublicKeys() { + publicKeys := map[string]crypto.PublicKey{} + rdm.PublicKeys.IterCb(func(kid string, pubKey *edge_ctrl_pb.DataState_PublicKey) { + log := pfxlog.Logger().WithField("format", pubKey.Format).WithField("kid", kid) + + switch pubKey.Format { + case edge_ctrl_pb.DataState_PublicKey_X509CertDer: + if cert, err := x509.ParseCertificate(pubKey.GetData()); err != nil { + log.WithError(err).Error("error parsing x509 certificate DER") + } else { + publicKeys[kid] = cert.PublicKey + } + case edge_ctrl_pb.DataState_PublicKey_PKIXPublicKey: + if pub, err := x509.ParsePKIXPublicKey(pubKey.GetData()); err != nil { + log.WithError(err).Error("error parsing PKIX public key DER") + } else { + publicKeys[kid] = pub + } + default: + log.Error("unknown public key format") + } + }) + rdm.CachedPublicKeys.Store(publicKeys) } func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState { diff --git a/controller/command/command.go b/controller/command/command.go index 0004b08fd..9c7b16b33 100644 --- a/controller/command/command.go +++ b/controller/command/command.go @@ -52,6 +52,7 @@ type Dispatcher interface { Dispatch(command Command) error IsLeaderOrLeaderless() bool IsLeaderless() bool + IsLeader() bool GetPeers() map[string]channel.Channel GetRateLimiter() rate.RateLimiter Bootstrap() error @@ -67,6 +68,10 @@ func (self *LocalDispatcher) Bootstrap() error { return nil } +func (self *LocalDispatcher) IsLeader() bool { + return true +} + func (self *LocalDispatcher) IsLeaderOrLeaderless() bool { return true } diff --git a/controller/config/config.go b/controller/config/config.go index bb36b8869..02e1a2b56 100644 --- a/controller/config/config.go +++ b/controller/config/config.go @@ -51,7 +51,7 @@ const ( DefaultHealthChecksBoltCheckTimeout = 20 * time.Second DefaultHealthChecksBoltCheckInitialDelay = 30 * time.Second - DefaultRaftCommandHandlerMaxQueueSize = 1000 + DefaultRaftCommandHandlerMaxQueueSize = 250 // DefaultTlsHandshakeRateLimiterEnabled is whether the tls handshake rate limiter is enabled by default DefaultTlsHandshakeRateLimiterEnabled = false @@ -204,6 +204,10 @@ func LoadConfig(path string) (*Config, error) { if value, found := cfgmap["raft"]; found { if submap, ok := value.(map[interface{}]interface{}); ok { controllerConfig.Raft = &RaftConfig{} + + controllerConfig.Raft.ElectionTimeout = 5 * time.Second + controllerConfig.Raft.HeartbeatTimeout = 3 * time.Second + controllerConfig.Raft.LeaderLeaseTimeout = 3 * time.Second controllerConfig.Raft.CommandHandlerOptions.MaxQueueSize = DefaultRaftCommandHandlerMaxQueueSize if value, found := submap["dataDir"]; found { @@ -243,7 +247,7 @@ func LoadConfig(path string) (*Config, error) { if value, found := submap["electionTimeout"]; found { if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil { - controllerConfig.Raft.ElectionTimeout = &val + controllerConfig.Raft.ElectionTimeout = val } else { return nil, errors.Wrapf(err, "failed to parse raft.electionTimeout value '%v", value) } @@ -251,7 +255,7 @@ func LoadConfig(path string) (*Config, error) { if value, found := submap["heartbeatTimeout"]; found { if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil { - controllerConfig.Raft.HeartbeatTimeout = &val + controllerConfig.Raft.HeartbeatTimeout = val } else { return nil, errors.Wrapf(err, "failed to parse raft.heartbeatTimeout value '%v", value) } @@ -259,7 +263,7 @@ func LoadConfig(path string) (*Config, error) { if value, found := submap["leaderLeaseTimeout"]; found { if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil { - controllerConfig.Raft.LeaderLeaseTimeout = &val + controllerConfig.Raft.LeaderLeaseTimeout = val } else { return nil, errors.Wrapf(err, "failed to parse raft.leaderLeaseTimeout value '%v", value) } diff --git a/controller/config/config_raft.go b/controller/config/config_raft.go index ed1304f64..ad9278e05 100644 --- a/controller/config/config_raft.go +++ b/controller/config/config_raft.go @@ -20,10 +20,10 @@ type RaftConfig struct { TrailingLogs *uint32 MaxAppendEntries *uint32 - ElectionTimeout *time.Duration + ElectionTimeout time.Duration CommitTimeout *time.Duration - HeartbeatTimeout *time.Duration - LeaderLeaseTimeout *time.Duration + HeartbeatTimeout time.Duration + LeaderLeaseTimeout time.Duration LogLevel *string Logger hclog.Logger diff --git a/controller/controller.go b/controller/controller.go index c0c835659..1f9a3a455 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -540,7 +540,7 @@ func (c *Controller) GetEventDispatcher() event.Dispatcher { } func (c *Controller) routerDispatchCallback(evt *event.ClusterEvent) { - if evt.EventType == event.ClusterMembersChanged { + if evt.EventType == event.ClusterMembersChanged || evt.EventType == event.ClusterLeadershipGained { var endpoints []string for _, peer := range evt.Peers { endpoints = append(endpoints, peer.Addr) diff --git a/controller/db/controller_store.go b/controller/db/controller_store.go index 8f7b9b15e..f6250c941 100644 --- a/controller/db/controller_store.go +++ b/controller/db/controller_store.go @@ -36,12 +36,12 @@ const ( type Controller struct { boltz.BaseExtEntity - Name string `json:"name"` - CtrlAddress string `json:"address"` - CertPem string `json:"certPem"` - Fingerprint string `json:"fingerprint"` - IsOnline bool `json:"isOnline"` - LastJoinedAt *time.Time `json:"lastJoinedAt"` + Name string `json:"name"` + CtrlAddress string `json:"address"` + CertPem string `json:"certPem"` + Fingerprint string `json:"fingerprint"` + IsOnline bool `json:"isOnline"` + LastJoinedAt time.Time `json:"lastJoinedAt"` ApiAddresses map[string][]ApiAddress } @@ -105,7 +105,7 @@ func (store *controllerStoreImpl) FillEntity(entity *Controller, bucket *boltz.T entity.CertPem = bucket.GetStringOrError(FieldControllerCertPem) entity.Fingerprint = bucket.GetStringOrError(FieldControllerFingerprint) entity.IsOnline = bucket.GetBoolWithDefault(FieldControllerIsOnline, false) - entity.LastJoinedAt = bucket.GetTime(FieldControllerLastJoinedAt) + entity.LastJoinedAt = bucket.GetTimeOrError(FieldControllerLastJoinedAt) entity.ApiAddresses = map[string][]ApiAddress{} apiListBucket := bucket.GetBucket(FieldControllerApiAddresses) @@ -142,7 +142,7 @@ func (store *controllerStoreImpl) PersistEntity(entity *Controller, ctx *boltz.P ctx.SetString(FieldControllerCertPem, entity.CertPem) ctx.SetString(FieldControllerFingerprint, entity.Fingerprint) ctx.SetBool(FieldControllerIsOnline, entity.IsOnline) - ctx.SetTimeP(FieldControllerLastJoinedAt, entity.LastJoinedAt) + ctx.SetTimeP(FieldControllerLastJoinedAt, &entity.LastJoinedAt) apiListBucket := ctx.Bucket.GetOrCreateBucket(FieldControllerApiAddresses) diff --git a/controller/db/terminator_store.go b/controller/db/terminator_store.go index f4fef3b4f..2ce7d2596 100644 --- a/controller/db/terminator_store.go +++ b/controller/db/terminator_store.go @@ -19,10 +19,10 @@ package db import ( "encoding/binary" "github.com/michaelquigley/pfxlog" - "github.com/openziti/ziti/controller/xt" "github.com/openziti/foundation/v2/sequence" "github.com/openziti/storage/ast" "github.com/openziti/storage/boltz" + "github.com/openziti/ziti/controller/xt" "go.etcd.io/bbolt" ) @@ -39,6 +39,7 @@ const ( FieldServerPeerData = "peerData" FieldTerminatorHostId = "hostId" FieldTerminatorSavedPrecedence = "savedPrecedence" + FieldTerminatorsSourceCtrl = "sourceCtrl" ) type Terminator struct { @@ -54,6 +55,7 @@ type Terminator struct { PeerData xt.PeerData `json:"peerData"` HostId string `json:"hostId"` SavedPrecedence *string `json:"savedPrecedence"` + SourceCtrl string `json:"sourceCtrl"` } func (entity *Terminator) GetCost() uint16 { @@ -100,6 +102,10 @@ func (entity *Terminator) GetEntityType() string { return EntityTypeTerminators } +func (entity *Terminator) GetSourceCtrl() string { + return entity.SourceCtrl +} + type TerminatorStore interface { boltz.EntityStore[*Terminator] GetTerminatorsInIdentityGroup(tx *bbolt.Tx, terminatorId string) ([]*Terminator, error) @@ -133,6 +139,7 @@ func (store *terminatorStoreImpl) initializeLocal() { store.AddSymbol(FieldTerminatorAddress, ast.NodeTypeString) store.AddSymbol(FieldTerminatorInstanceId, ast.NodeTypeString) store.AddSymbol(FieldTerminatorHostId, ast.NodeTypeString) + store.AddSymbol(FieldTerminatorsSourceCtrl, ast.NodeTypeString) store.serviceSymbol = store.AddFkSymbol(FieldTerminatorService, store.stores.service) store.routerSymbol = store.AddFkSymbol(FieldTerminatorRouter, store.stores.router) @@ -164,6 +171,7 @@ func (store *terminatorStoreImpl) FillEntity(entity *Terminator, bucket *boltz.T entity.Precedence = bucket.GetStringWithDefault(FieldTerminatorPrecedence, xt.Precedences.Default.String()) entity.HostId = bucket.GetStringWithDefault(FieldTerminatorHostId, "") entity.SavedPrecedence = bucket.GetString(FieldTerminatorSavedPrecedence) + entity.SourceCtrl = bucket.GetStringWithDefault(FieldTerminatorsSourceCtrl, "") data := bucket.GetBucket(FieldServerPeerData) if data != nil { @@ -201,6 +209,7 @@ func (store *terminatorStoreImpl) PersistEntity(entity *Terminator, ctx *boltz.P ctx.SetRequiredString(FieldTerminatorPrecedence, entity.Precedence) ctx.SetString(FieldTerminatorHostId, entity.HostId) ctx.SetStringP(FieldTerminatorSavedPrecedence, entity.SavedPrecedence) + ctx.SetString(FieldTerminatorsSourceCtrl, entity.SourceCtrl) if ctx.ProceedWithSet(FieldServerPeerData) { _ = ctx.Bucket.DeleteBucket([]byte(FieldServerPeerData)) diff --git a/controller/env/appenv.go b/controller/env/appenv.go index 7a5d04c7b..7903d3895 100644 --- a/controller/env/appenv.go +++ b/controller/env/appenv.go @@ -413,6 +413,10 @@ func (ae *AppEnv) AddRouterPresenceHandler(h model.RouterPresenceHandler) { ae.HostController.GetNetwork().AddRouterPresenceHandler(h) } +func (ae *AppEnv) GetId() string { + return ae.HostController.GetNetwork().GetAppId() +} + type HostController interface { GetConfig() *config.Config GetEnv() *AppEnv diff --git a/controller/env/broker.go b/controller/env/broker.go index bff4a0c9e..6673ebe5e 100644 --- a/controller/env/broker.go +++ b/controller/env/broker.go @@ -18,7 +18,6 @@ package env import ( "crypto" - "crypto/x509" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v3" "github.com/openziti/storage/boltz" @@ -27,7 +26,6 @@ import ( "github.com/openziti/ziti/controller/event" "github.com/openziti/ziti/controller/model" "go.etcd.io/bbolt" - "sync" ) const ( @@ -60,9 +58,6 @@ type Broker struct { apiSessionChunkSize int routerMsgBufferSize int routerSyncStrategy RouterSyncStrategy - - publicKeyLock sync.Mutex - publicKeys map[string]crypto.PublicKey } func NewBroker(ae *AppEnv, synchronizer RouterSyncStrategy) *Broker { @@ -93,9 +88,13 @@ func (broker *Broker) ValidateRouterDataModel() []error { } func (broker *Broker) AcceptClusterEvent(clusterEvent *event.ClusterEvent) { + if clusterEvent.EventType == event.ClusterLeadershipGained { + broker.ae.Managers.Controller.PeersConnected(clusterEvent.Peers, false) + } + if broker.ae.HostController.IsRaftLeader() { if clusterEvent.EventType == event.ClusterPeerConnected { - broker.ae.Managers.Controller.PeersConnected(clusterEvent.Peers) + broker.ae.Managers.Controller.PeersConnected(clusterEvent.Peers, true) } if clusterEvent.EventType == event.ClusterPeerDisconnected { @@ -196,46 +195,5 @@ func (broker *Broker) Stop() { } func (broker *Broker) GetPublicKeys() map[string]crypto.PublicKey { - broker.publicKeyLock.Lock() - defer broker.publicKeyLock.Unlock() - - if broker.publicKeys == nil { - broker.publicKeys = map[string]crypto.PublicKey{} - } - - for kid, pubKey := range broker.routerSyncStrategy.GetPublicKeys() { - //don't reprocess the same kid - if _, exists := broker.publicKeys[kid]; exists { - continue - } - log := pfxlog.Logger().WithField("format", pubKey.Format).WithField("kid", kid) - - switch pubKey.Format { - case edge_ctrl_pb.DataState_PublicKey_X509CertDer: - cert, err := x509.ParseCertificate(pubKey.GetData()) - if err != nil { - log.WithError(err).Error("error parsing x509 certificate DER") - continue - } - - broker.publicKeys[kid] = cert.PublicKey - case edge_ctrl_pb.DataState_PublicKey_PKIXPublicKey: - pub, err := x509.ParsePKIXPublicKey(pubKey.GetData()) - if err != nil { - log.WithError(err).Error("error parsing PKIX public key DER") - continue - } - broker.publicKeys[kid] = pub - default: - log.Error("unknown public key format") - } - } - - result := map[string]crypto.PublicKey{} - - for k, v := range broker.publicKeys { - result[k] = v - } - - return result + return broker.routerSyncStrategy.GetPublicKeys() } diff --git a/controller/env/sync.go b/controller/env/sync.go index 16afa9624..d40707860 100644 --- a/controller/env/sync.go +++ b/controller/env/sync.go @@ -17,9 +17,9 @@ package env import ( + "crypto" "github.com/openziti/channel/v3" "github.com/openziti/foundation/v2/versions" - "github.com/openziti/ziti/common/pb/edge_ctrl_pb" "github.com/openziti/ziti/controller/db" "github.com/openziti/ziti/controller/model" "sync" @@ -58,7 +58,7 @@ type RouterSyncStrategy interface { Type() RouterSyncStrategyType GetEdgeRouterState(id string) RouterStateValues Stop() - GetPublicKeys() map[string]*edge_ctrl_pb.DataState_PublicKey + GetPublicKeys() map[string]crypto.PublicKey RouterConnectionHandler RouterSynchronizerEventHandler Validate() []error diff --git a/controller/handler_ctrl/create_terminator.go b/controller/handler_ctrl/create_terminator.go index 148f99439..e2bd632b1 100644 --- a/controller/handler_ctrl/create_terminator.go +++ b/controller/handler_ctrl/create_terminator.go @@ -71,6 +71,7 @@ func (self *createTerminatorHandler) handleCreateTerminator(msg *channel.Message PeerData: request.PeerData, Precedence: request.GetXtPrecedence(), Cost: uint16(request.Cost), + SourceCtrl: self.network.GetAppId(), } if err := self.network.Terminator.Create(terminator, self.newChangeContext(ch, "fabric.create.terminator")); err == nil { diff --git a/controller/handler_edge_ctrl/create_terminator.go b/controller/handler_edge_ctrl/create_terminator.go index 7e73ae78e..662f3c0c0 100644 --- a/controller/handler_edge_ctrl/create_terminator.go +++ b/controller/handler_edge_ctrl/create_terminator.go @@ -108,6 +108,7 @@ func (self *createTerminatorHandler) CreateTerminator(ctx *CreateTerminatorReque Precedence: ctx.req.GetXtPrecedence(), Cost: uint16(ctx.req.Cost), HostId: ctx.session.IdentityId, + SourceCtrl: self.appEnv.GetId(), } cmd := &model.CreateEdgeTerminatorCmd{ diff --git a/controller/handler_edge_ctrl/create_terminator_v2.go b/controller/handler_edge_ctrl/create_terminator_v2.go index cff6f12f6..f992af3ff 100644 --- a/controller/handler_edge_ctrl/create_terminator_v2.go +++ b/controller/handler_edge_ctrl/create_terminator_v2.go @@ -76,7 +76,6 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV start := time.Now() logger := pfxlog.ContextLogger(self.ch.Label()). WithField("routerId", self.ch.Id()). - WithField("token", ctx.req.SessionToken). WithField("terminatorId", ctx.req.Address) if !ctx.loadRouter() { @@ -143,6 +142,7 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV Precedence: ctx.req.GetXtPrecedence(), Cost: uint16(ctx.req.Cost), HostId: ctx.session.IdentityId, + SourceCtrl: self.appEnv.GetId(), } cmd := &model.CreateEdgeTerminatorCmd{ diff --git a/controller/handler_edge_ctrl/create_tunnel_terminator.go b/controller/handler_edge_ctrl/create_tunnel_terminator.go index e130ed50f..34fb90433 100644 --- a/controller/handler_edge_ctrl/create_tunnel_terminator.go +++ b/controller/handler_edge_ctrl/create_tunnel_terminator.go @@ -125,6 +125,7 @@ func (self *createTunnelTerminatorHandler) CreateTerminator(ctx *CreateTunnelTer Precedence: ctx.req.GetXtPrecedence(), Cost: uint16(ctx.req.Cost), HostId: ctx.session.IdentityId, + SourceCtrl: self.appEnv.GetId(), } if err := self.appEnv.Managers.Terminator.Create(terminator, ctx.newTunnelChangeContext()); err != nil { diff --git a/controller/handler_edge_ctrl/create_tunnel_terminator_v2.go b/controller/handler_edge_ctrl/create_tunnel_terminator_v2.go index 1f9fa199a..d9dfaf82b 100644 --- a/controller/handler_edge_ctrl/create_tunnel_terminator_v2.go +++ b/controller/handler_edge_ctrl/create_tunnel_terminator_v2.go @@ -116,6 +116,7 @@ func (self *createTunnelTerminatorV2Handler) CreateTerminator(ctx *createTunnelT Precedence: ctx.req.GetXtPrecedence(), Cost: uint16(ctx.req.Cost), HostId: ctx.identity.Id, + SourceCtrl: self.appEnv.GetId(), } if err := self.appEnv.Managers.Terminator.Create(terminator, ctx.newTunnelChangeContext()); err != nil { diff --git a/controller/handler_peer_ctrl/command.go b/controller/handler_peer_ctrl/command.go index a7cd53bc7..c551e4224 100644 --- a/controller/handler_peer_ctrl/command.go +++ b/controller/handler_peer_ctrl/command.go @@ -22,7 +22,9 @@ import ( "github.com/openziti/foundation/v2/goroutines" "github.com/openziti/ziti/common/metrics" "github.com/openziti/ziti/common/pb/cmd_pb" + "github.com/openziti/ziti/controller/apierror" "github.com/openziti/ziti/controller/raft" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "time" ) @@ -61,7 +63,7 @@ func (self *commandHandler) ContentType() int32 { func (self *commandHandler) HandleReceive(m *channel.Message, ch channel.Channel) { log := pfxlog.ContextLogger(ch.Label()) - err := self.pool.Queue(func() { + err := self.pool.QueueOrError(func() { if idx, err := self.controller.ApplyEncodedCommand(m.Body); err != nil { sendErrorResponseCalculateType(m, ch, err) return @@ -70,7 +72,12 @@ func (self *commandHandler) HandleReceive(m *channel.Message, ch channel.Channel } }) + if errors.Is(err, goroutines.QueueFullError) { + err = apierror.NewTooManyUpdatesError() + } + if err != nil { log.WithError(err).Error("unable to queue command for processing") + go sendErrorResponseCalculateType(m, ch, err) } } diff --git a/controller/handler_peer_ctrl/inspect.go b/controller/handler_peer_ctrl/inspect.go index 0faefa361..9e1a3b525 100644 --- a/controller/handler_peer_ctrl/inspect.go +++ b/controller/handler_peer_ctrl/inspect.go @@ -39,26 +39,28 @@ func (*inspectHandler) ContentType() int32 { } func (handler *inspectHandler) HandleReceive(msg *channel.Message, ch channel.Channel) { - context := &inspectRequestContext{ - handler: handler, - msg: msg, - ch: ch, - request: &ctrl_pb.InspectRequest{}, - response: &ctrl_pb.InspectResponse{Success: true}, - } - - var err error - if err = proto.Unmarshal(msg.Body, context.request); err != nil { - context.appendError(err.Error()) - } - - if !context.response.Success { + go func() { + context := &inspectRequestContext{ + handler: handler, + msg: msg, + ch: ch, + request: &ctrl_pb.InspectRequest{}, + response: &ctrl_pb.InspectResponse{Success: true}, + } + + var err error + if err = proto.Unmarshal(msg.Body, context.request); err != nil { + context.appendError(err.Error()) + } + + if !context.response.Success { + context.sendResponse() + return + } + + context.processLocal() context.sendResponse() - return - } - - context.processLocal() - context.sendResponse() + }() } type inspectRequestContext struct { diff --git a/controller/internal/routes/controller_api_model.go b/controller/internal/routes/controller_api_model.go index 6c3e3f510..65785cfe2 100644 --- a/controller/internal/routes/controller_api_model.go +++ b/controller/internal/routes/controller_api_model.go @@ -45,10 +45,7 @@ func MapControllerToManagementRestModel(controller *model.Controller) (*rest_mod CertPem: &controller.CertPem, Fingerprint: &controller.Fingerprint, IsOnline: &controller.IsOnline, - } - - if controller.LastJoinedAt != nil { - ret.LastJoinedAt = toStrFmtDateTimeP(*controller.LastJoinedAt) + LastJoinedAt: toStrFmtDateTimeP(controller.LastJoinedAt), } for apiKey, instances := range controller.ApiAddresses { diff --git a/controller/model/base_manager.go b/controller/model/base_manager.go index d23af8b25..d519953ed 100644 --- a/controller/model/base_manager.go +++ b/controller/model/base_manager.go @@ -244,6 +244,15 @@ func (self *baseEntityManager[ME, PE]) readInTx(tx *bbolt.Tx, id string) (ME, er return modelEntity, nil } +func (self *baseEntityManager[ME, PE]) IsEntityPresent(id string) (bool, error) { + result := false + err := self.GetDb().View(func(tx *bbolt.Tx) error { + result = self.Store.IsEntityPresent(tx, id) + return nil + }) + return result, err +} + func (self *baseEntityManager[ME, PE]) readEntity(id string, modelEntity ME) error { return self.GetDb().View(func(tx *bbolt.Tx) error { return self.readEntityInTx(tx, id, modelEntity) diff --git a/controller/model/controller_manager.go b/controller/model/controller_manager.go index 919b44c2e..78aa94b1d 100644 --- a/controller/model/controller_manager.go +++ b/controller/model/controller_manager.go @@ -20,7 +20,6 @@ import ( "crypto/x509" "github.com/michaelquigley/pfxlog" nfpem "github.com/openziti/foundation/v2/pem" - "github.com/openziti/foundation/v2/stringz" "github.com/openziti/storage/boltz" "github.com/openziti/ziti/common/pb/edge_cmd_pb" "github.com/openziti/ziti/controller/change" @@ -103,7 +102,7 @@ func (self *ControllerManager) Marshall(entity *Controller) ([]byte, error) { CertPem: entity.CertPem, Fingerprint: entity.Fingerprint, IsOnline: entity.IsOnline, - LastJoinedAt: timePtrToPb(entity.LastJoinedAt), + LastJoinedAt: timePtrToPb(&entity.LastJoinedAt), ApiAddresses: map[string]*edge_cmd_pb.ApiAddressList{}, } @@ -137,7 +136,7 @@ func (self *ControllerManager) Unmarshall(bytes []byte) (*Controller, error) { CertPem: msg.CertPem, Fingerprint: msg.Fingerprint, IsOnline: msg.IsOnline, - LastJoinedAt: pbTimeToTimePtr(msg.LastJoinedAt), + LastJoinedAt: *pbTimeToTimePtr(msg.LastJoinedAt), ApiAddresses: map[string][]ApiAddress{}, } @@ -162,9 +161,7 @@ func (self *ControllerManager) getCurrentAsClusterPeer() *event.ClusterPeer { var leaderCerts []*x509.Certificate for _, certBytes := range tlsConfig.Certificate { - cert, err := x509.ParseCertificate(certBytes) - - if err == nil { + if cert, err := x509.ParseCertificate(certBytes); err == nil { leaderCerts = append(leaderCerts, cert) } } @@ -180,35 +177,28 @@ func (self *ControllerManager) getCurrentAsClusterPeer() *event.ClusterPeer { } } -func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer) { - var controllerIds []string - err := self.ListWithHandler("", func(tx *bbolt.Tx, ids []string, qmd *models.QueryMetaData) error { - controllerIds = ids - return nil - }) - - changeCtx := change.New() - changeCtx.SetSourceType("raft.peers.connected"). - SetChangeAuthorType(change.AuthorTypeController) +func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer, peerConnectedEvent bool) { + controllers := map[string]*Controller{} + result, err := self.BaseList("true limit none") if err != nil { - pfxlog.Logger().WithError(err).Error("could not list controllers to handle new peer(s) connection") + pfxlog.Logger().WithError(err).Error("failed to list controllers") return + } else { + for _, ctrl := range result.Entities { + controllers[ctrl.Id] = ctrl + } } - connectFields := fields.UpdatedFieldsMap{ - db.FieldControllerLastJoinedAt: struct{}{}, - db.FieldControllerCertPem: struct{}{}, - db.FieldControllerFingerprint: struct{}{}, - db.FieldControllerIsOnline: struct{}{}, - db.FieldControllerCtrlAddress: struct{}{}, - db.FieldControllerApiAddresses: struct{}{}, - db.FieldControllerApiAddressUrl: struct{}{}, - db.FieldControllerApiAddressVersion: struct{}{}, + changeCtx := change.New() + if peerConnectedEvent { + changeCtx.SetSourceType("raft.peers.connected"). + SetChangeAuthorType(change.AuthorTypeController) + } else { + changeCtx.SetSourceType("raft.leadership.gained"). + SetChangeAuthorType(change.AuthorTypeController) } - now := time.Now() - selfAsPeer := self.getCurrentAsClusterPeer() peerFingerprints := "" for _, peer := range peers { @@ -225,75 +215,51 @@ func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer) { pfxlog.Logger().Infof("acting as leader, updating controllers with peers, self: %s, peers: %s", nfpem.FingerprintFromCertificate(selfAsPeer.ServerCert[0]), peerFingerprints) - // always as this controller as a "peer" to add or update on the controller list - peers = append(peers, selfAsPeer) + if !peerConnectedEvent { + // add this controller as a "peer" when leadership is gained + peers = append(peers, selfAsPeer) + } for _, peer := range peers { - if stringz.Contains(controllerIds, peer.Id) { - existing, err := self.Read(peer.Id) - if err != nil { - pfxlog.Logger().WithError(err).Error("could not handle new peer(s) connection, existing controller could not be read") - continue - } - - existing.CtrlAddress = peer.Addr - existing.IsOnline = true - existing.LastJoinedAt = &now - existing.ApiAddresses = map[string][]ApiAddress{} - - for apiKey, instances := range peer.ApiAddresses { - existing.ApiAddresses[apiKey] = nil - - for _, instance := range instances { - existing.ApiAddresses[apiKey] = append(existing.ApiAddresses[apiKey], ApiAddress{ - Url: instance.Url, - Version: instance.Version, - }) - } - } + if len(peer.ServerCert) < 1 { + pfxlog.Logger().Errorf("peer %s has no certificate", peer.Id) + continue + } - if len(peer.ServerCert) > 0 { - existing.CertPem = nfpem.EncodeToString(peer.ServerCert[0]) - existing.Fingerprint = nfpem.FingerprintFromCertificate(peer.ServerCert[0]) - } + newController := &Controller{ + BaseEntity: models.BaseEntity{ + Id: peer.Id, + }, + Name: peer.ServerCert[0].Subject.CommonName, + CertPem: nfpem.EncodeToString(peer.ServerCert[0]), + Fingerprint: nfpem.FingerprintFromCertificate(peer.ServerCert[0]), + CtrlAddress: peer.Addr, + IsOnline: true, + LastJoinedAt: time.Now(), + ApiAddresses: map[string][]ApiAddress{}, + } - if err := self.Update(existing, connectFields, changeCtx); err != nil { - pfxlog.Logger().WithError(err).Error("could not update controller during peer(s) connection") - } - } else { - if len(peer.ServerCert) == 0 { - pfxlog.Logger().Error("could not create controller during peer(s) connection, no server certificate provided") - continue - } + for apiKey, instances := range peer.ApiAddresses { + newController.ApiAddresses[apiKey] = nil - newController := &Controller{ - BaseEntity: models.BaseEntity{ - Id: peer.Id, - }, - CtrlAddress: peer.Addr, - IsOnline: true, - LastJoinedAt: &now, - ApiAddresses: map[string][]ApiAddress{}, + for _, instance := range instances { + newController.ApiAddresses[apiKey] = append(newController.ApiAddresses[apiKey], ApiAddress{ + Url: instance.Url, + Version: instance.Version, + }) } + } - for apiKey, instances := range peer.ApiAddresses { - newController.ApiAddresses[apiKey] = nil - - for _, instance := range instances { - newController.ApiAddresses[apiKey] = append(newController.ApiAddresses[apiKey], ApiAddress{ - Url: instance.Url, - Version: instance.Version, - }) - } + existing := controllers[peer.Id] + if existing == nil { + if err = self.Create(newController, changeCtx); err != nil { + pfxlog.Logger().WithError(err).WithField("ctrlId", peer.Id). + Error("could not create controller during peer(s) connection") } - - newController.Name = peer.ServerCert[0].Subject.CommonName - newController.CertPem = nfpem.EncodeToString(peer.ServerCert[0]) - newController.Fingerprint = nfpem.FingerprintFromCertificate(peer.ServerCert[0]) - - if err := self.Create(newController, changeCtx); err != nil { - pfxlog.Logger().WithError(err).Error("could not create controller during peer(s) connection") - continue + } else if peerConnectedEvent || existing.IsChanged(newController) { + if err = self.Update(existing, nil, changeCtx); err != nil { + pfxlog.Logger().WithError(err).WithField("ctrlId", peer.Id). + Error("could not update controller during peer(s) connection") } } } @@ -305,8 +271,7 @@ func (self *ControllerManager) PeersDisconnected(peers []*event.ClusterPeer) { SetChangeAuthorType(change.AuthorTypeController) disconnectFields := fields.UpdatedFieldsMap{ - db.FieldControllerLastJoinedAt: struct{}{}, - db.FieldControllerIsOnline: struct{}{}, + db.FieldControllerIsOnline: struct{}{}, } for _, peer := range peers { controller := &Controller{ diff --git a/controller/model/controller_model.go b/controller/model/controller_model.go index 16f42c293..61eb1aae2 100644 --- a/controller/model/controller_model.go +++ b/controller/model/controller_model.go @@ -21,6 +21,7 @@ import ( "github.com/openziti/ziti/controller/db" "github.com/openziti/ziti/controller/models" "go.etcd.io/bbolt" + "sort" "time" ) @@ -31,10 +32,59 @@ type Controller struct { CertPem string Fingerprint string IsOnline bool - LastJoinedAt *time.Time + LastJoinedAt time.Time ApiAddresses map[string][]ApiAddress } +func (entity *Controller) sortApiAddresses() { + for _, v := range entity.ApiAddresses { + sort.Slice(v, func(i, j int) bool { + if v[i].Version < v[j].Version { + return true + } + if v[i].Version > v[j].Version { + return false + } + return v[i].Url < v[j].Url + }) + } +} + +func (entity *Controller) IsChanged(other *Controller) bool { + if entity.Name != other.Name || + entity.CtrlAddress != other.CtrlAddress || + entity.CertPem != other.CertPem || + entity.Fingerprint != other.Fingerprint || + entity.IsOnline != other.IsOnline { + return true + } + + if len(entity.ApiAddresses) != len(other.ApiAddresses) { + return true + } + + entity.sortApiAddresses() + other.sortApiAddresses() + + for k, v := range entity.ApiAddresses { + v2, ok := other.ApiAddresses[k] + if !ok { + return true + } + if len(v) != len(v2) { + return true + } + for idx, addr := range v { + addr2 := v2[idx] + if addr.Version != addr2.Version || addr.Url != addr2.Url { + return true + } + } + } + + return false +} + type ApiAddress struct { Url string `json:"url"` Version string `json:"version"` diff --git a/controller/model/env.go b/controller/model/env.go index 070f890ec..344a65f52 100644 --- a/controller/model/env.go +++ b/controller/model/env.go @@ -66,4 +66,6 @@ type Env interface { GetCloseNotifyChannel() <-chan struct{} GetPeerSigners() []*x509.Certificate AddRouterPresenceHandler(h RouterPresenceHandler) + + GetId() string } diff --git a/controller/model/terminator_manager.go b/controller/model/terminator_manager.go index dada189b5..6c2672288 100644 --- a/controller/model/terminator_manager.go +++ b/controller/model/terminator_manager.go @@ -207,6 +207,7 @@ func (self *TerminatorManager) Marshall(entity *Terminator) ([]byte, error) { HostId: entity.HostId, IsSystem: entity.IsSystem, SavedPrecedence: savedPrecedence, + SourceCtrl: entity.SourceCtrl, } return proto.Marshal(msg) @@ -251,6 +252,7 @@ func (self *TerminatorManager) Unmarshall(bytes []byte) (*Terminator, error) { PeerData: msg.PeerData, HostId: msg.HostId, SavedPrecedence: savedPrecedence, + SourceCtrl: msg.SourceCtrl, } return result, nil diff --git a/controller/model/terminator_model.go b/controller/model/terminator_model.go index 7cb710384..c763a99b2 100644 --- a/controller/model/terminator_model.go +++ b/controller/model/terminator_model.go @@ -21,6 +21,7 @@ type Terminator struct { PeerData map[uint32][]byte HostId string SavedPrecedence xt.Precedence + SourceCtrl string } func (entity *Terminator) GetServiceId() string { @@ -63,6 +64,10 @@ func (entity *Terminator) GetHostId() string { return entity.HostId } +func (entity *Terminator) GetSourceCtrl() string { + return entity.SourceCtrl +} + func (entity *Terminator) toBoltEntityForUpdate(tx *bbolt.Tx, env Env, _ boltz.FieldChecker) (*db.Terminator, error) { return entity.toBoltEntityForCreate(tx, env) } @@ -92,6 +97,7 @@ func (entity *Terminator) toBoltEntityForCreate(*bbolt.Tx, Env) (*db.Terminator, PeerData: entity.PeerData, HostId: entity.HostId, SavedPrecedence: savedPrecedence, + SourceCtrl: entity.SourceCtrl, }, nil } @@ -106,6 +112,7 @@ func (entity *Terminator) fillFrom(_ Env, _ *bbolt.Tx, boltTerminator *db.Termin entity.Cost = boltTerminator.Cost entity.Precedence = xt.GetPrecedenceForName(boltTerminator.Precedence) entity.HostId = boltTerminator.HostId + entity.SourceCtrl = boltTerminator.SourceCtrl entity.FillCommon(boltTerminator) if boltTerminator.SavedPrecedence != nil { diff --git a/controller/model/testing.go b/controller/model/testing.go index a3305350c..3cdcf7c90 100644 --- a/controller/model/testing.go +++ b/controller/model/testing.go @@ -52,6 +52,10 @@ type TestContext struct { eventDispatcher event.Dispatcher } +func (ctx *TestContext) GetId() string { + return ctx.config.Id.Token +} + func (ctx *TestContext) GetEnrollmentJwtSigner() (jwtsigner.Signer, error) { return ctx, nil } @@ -197,6 +201,9 @@ func NewTestContext(t testing.TB) *TestContext { ctx.TestContext.Init() ctx.config = &config.Config{ + Id: &identity.TokenId{ + Token: "test", + }, Network: config.DefaultNetworkConfig(), Edge: &config.EdgeConfig{ Enrollment: config.Enrollment{ diff --git a/controller/models/base_model.go b/controller/models/base_model.go index 5044fb181..ecf8a4d61 100644 --- a/controller/models/base_model.go +++ b/controller/models/base_model.go @@ -52,6 +52,8 @@ type EntityRetriever[T Entity] interface { // as fabric and edge services, and fabric and edge routers. However, they should have distinct entity type // ids, so we can figure out to which controller to route commands GetEntityTypeId() string + + IsEntityPresent(id string) (bool, error) } type Entity interface { diff --git a/controller/models/errors.go b/controller/models/errors.go index 3761b09bd..a27a211c8 100644 --- a/controller/models/errors.go +++ b/controller/models/errors.go @@ -1,9 +1,9 @@ package models import ( - "github.com/openziti/ziti/controller/apierror" "github.com/openziti/foundation/v2/errorz" "github.com/openziti/storage/boltz" + "github.com/openziti/ziti/controller/apierror" ) func ToApiError(err error) *errorz.ApiError { diff --git a/controller/network/router_messaging.go b/controller/network/router_messaging.go index 3ab40afb0..4ae14d5de 100644 --- a/controller/network/router_messaging.go +++ b/controller/network/router_messaging.go @@ -21,6 +21,7 @@ import ( "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v3" "github.com/openziti/channel/v3/protobufs" + "github.com/openziti/foundation/v2/concurrenz" "github.com/openziti/foundation/v2/goroutines" "github.com/openziti/storage/boltz" "github.com/openziti/ziti/common/inspect" @@ -48,6 +49,7 @@ type terminatorInfo struct { xt.Terminator marker uint64 } + type terminatorValidations struct { terminators map[string]terminatorInfo checkInProgress atomic.Bool @@ -60,12 +62,13 @@ type routerEvent interface { func NewRouterMessaging(env model.Env, routerCommPool goroutines.Pool) *RouterMessaging { result := &RouterMessaging{ - env: env, - managers: env.GetManagers(), - eventsC: make(chan routerEvent, 16), - routerUpdates: map[string]*routerUpdates{}, - terminatorValidations: map[string]*terminatorValidations{}, - routerCommPool: routerCommPool, + env: env, + managers: env.GetManagers(), + eventsC: make(chan routerEvent, 16), + routerUpdates: map[string]*routerUpdates{}, + terminatorValidations: map[string]*terminatorValidations{}, + routerCommPool: routerCommPool, + queuedTerminatorDeletes: map[string]struct{}{}, } env.GetManagers().Terminator.GetStore().AddEntityEventListenerF(result.TerminatorCreated, boltz.EntityCreated) @@ -74,13 +77,16 @@ func NewRouterMessaging(env model.Env, routerCommPool goroutines.Pool) *RouterMe } type RouterMessaging struct { - env model.Env - managers *model.Managers - eventsC chan routerEvent - routerUpdates map[string]*routerUpdates - terminatorValidations map[string]*terminatorValidations - routerCommPool goroutines.Pool - markerCounter atomic.Uint64 + env model.Env + managers *model.Managers + eventsC chan routerEvent + routerUpdates map[string]*routerUpdates + terminatorValidations map[string]*terminatorValidations + queuedTerminatorDeletes map[string]struct{} + routerCommPool goroutines.Pool + markerCounter atomic.Uint64 + deleteInProgress atomic.Bool + deleteStarted concurrenz.AtomicValue[time.Time] } func (self *RouterMessaging) getNextMarker() uint64 { @@ -150,8 +156,11 @@ func (self *RouterMessaging) run() { self.syncStates() } - if !self.env.GetManagers().Dispatcher.IsLeaderless() && len(self.terminatorValidations) > 0 { - self.sendTerminatorValidationRequests() + if !self.env.GetManagers().Dispatcher.IsLeaderless() { + if len(self.terminatorValidations) > 0 { + self.sendTerminatorValidationRequests() + } + self.processQueuedDeletes() } } } @@ -293,8 +302,13 @@ func (self *RouterMessaging) sendTerminatorValidationRequest(routerId string, up var terminators []*ctrl_pb.Terminator + localCtrlId := self.env.GetId() + + var toRemove []string for _, terminator := range updates.terminators { - if time.Since(terminator.GetCreatedAt()) > 5*time.Second { + if localCtrlId != terminator.GetSourceCtrl() && !self.managers.Dispatcher.IsLeader() { + toRemove = append(toRemove, terminator.GetId()) + } else if time.Since(terminator.GetCreatedAt()) > 5*time.Second { pfxlog.Logger().WithField("terminatorId", terminator.GetId()).Info("queuing validate of terminator") terminators = append(terminators, &ctrl_pb.Terminator{ Id: terminator.GetId(), @@ -305,15 +319,17 @@ func (self *RouterMessaging) sendTerminatorValidationRequest(routerId string, up } } + for _, terminatorId := range toRemove { + delete(updates.terminators, terminatorId) + } + if len(terminators) == 0 || !updates.checkInProgress.CompareAndSwap(false, true) { return } - var req protobufs.TypedMessage + var req ctrl_pb.FilterableValidateTerminatorsRequest if !supportsVerifyV2 { - req = &ctrl_pb.ValidateTerminatorsRequest{ - Terminators: terminators, - } + req = &ctrl_pb.ValidateTerminatorsRequest{Terminators: terminators} } else { req = &ctrl_pb.ValidateTerminatorsV2Request{ Terminators: terminators, @@ -323,23 +339,22 @@ func (self *RouterMessaging) sendTerminatorValidationRequest(routerId string, up queueErr := self.routerCommPool.QueueOrError(func() { ch := notifyRouter.Control - if ch == nil { + if ch == nil || self.managers.Dispatcher.IsLeaderless() { + updates.checkInProgress.Store(false) return } - if self.managers.Dispatcher.IsLeaderOrLeaderless() { - if err := protobufs.MarshalTyped(req).WithTimeout(time.Second * 1).SendAndWaitForWire(ch); err != nil { - pfxlog.Logger().WithError(err).WithField("routerId", notifyRouter.Id).Error("failed to send validate terminators request to router") - } else if !supportsVerifyV2 { - // V1 doesn't send responses, it will just send deletes if the terminator is invalid. - // we're going to mark these ok. If they're not, we should get a delete message. Older - // routers can still fail to delete, if the delete gets lost for some reason. - self.generateMockTerminatorValidationResponse(notifyRouter, updates) - } - } else if !self.managers.Dispatcher.IsLeaderless() { - // If there's a leader, and we're not it, let the leader worry about sending the validation requests - // otherwise we're generating a bunch of extra load on the routers - self.generateMockTerminatorValidationResponse(notifyRouter, updates) + for _, terminator := range req.GetTerminators() { + pfxlog.Logger().WithField("terminatorId", terminator.GetId()).Debug("queuing validate of terminator") + } + + if err = protobufs.MarshalTyped(req).WithTimeout(time.Second * 1).SendAndWaitForWire(ch); err != nil { + pfxlog.Logger().WithError(err).WithField("routerId", notifyRouter.Id).Error("failed to send validate terminators request to router") + } else if !supportsVerifyV2 { + // V1 doesn't send responses, it will just send deletes if the terminator is invalid. + // we're going to mark these ok. If they're not, we should get a delete message. Older + // routers can still fail to delete, if the delete gets lost for some reason. + self.generateMockTerminatorValidationResponse(notifyRouter, updates, false) } }) @@ -350,25 +365,87 @@ func (self *RouterMessaging) sendTerminatorValidationRequest(routerId string, up } } -func (self *RouterMessaging) generateMockTerminatorValidationResponse(r *model.Router, validations *terminatorValidations) { +func (self *RouterMessaging) generateMockTerminatorValidationResponse(r *model.Router, validations *terminatorValidations, onlyNonLocal bool) { handler := &terminatorValidationRespReceived{ - router: r, - changeCtx: change.New(), // won't be used since we're marking things valid + router: r, resp: &ctrl_pb.ValidateTerminatorsV2Response{ States: map[string]*ctrl_pb.RouterTerminatorState{}, }, } + localCtrlId := self.env.GetId() + for id, t := range validations.terminators { - handler.resp.States[id] = &ctrl_pb.RouterTerminatorState{ - Valid: true, - Marker: t.marker, + if !onlyNonLocal || t.GetSourceCtrl() != localCtrlId { + handler.resp.States[id] = &ctrl_pb.RouterTerminatorState{ + Valid: true, + Marker: t.marker, + } } } self.queueEvent(handler) } +func (self *RouterMessaging) processQueuedDeletes() { + if len(self.queuedTerminatorDeletes) == 0 { + return + } + + if self.deleteInProgress.Load() { + if time.Since(self.deleteStarted.Load()) < 30*time.Second { + return + } + } + + self.deleteInProgress.Store(false) + + log := pfxlog.Logger() + + var toDelete []string + count := 0 + for terminatorId := range self.queuedTerminatorDeletes { + if present, _ := self.env.GetManagers().Terminator.IsEntityPresent(terminatorId); present { + toDelete = append(toDelete, terminatorId) + count++ + if count >= 100 { + break + } + } else { + delete(self.queuedTerminatorDeletes, terminatorId) + } + } + + if len(toDelete) == 0 { + return + } + + self.deleteInProgress.Store(true) + self.deleteStarted.Store(time.Now()) + + err := self.routerCommPool.QueueOrError(func() { + changeCtx := change.New().SetChangeAuthorName(self.env.GetId()). + SetChangeAuthorType(change.AuthorTypeController) + if err := self.env.GetManagers().Terminator.DeleteBatch(toDelete, changeCtx); err != nil { + for _, terminatorId := range toDelete { + log.WithField("terminatorId", terminatorId). + WithError(err). + Info("batch delete failed") + } + } else { + self.queueEvent(&terminatorBatchDeleteCompleted{ + deletedTerminatorIds: toDelete, + }) + } + self.deleteInProgress.Store(false) + }) + + if err != nil { + log.WithError(err).Error("unable to queue terminator deletes") + self.deleteInProgress.Store(false) + } +} + func (self *RouterMessaging) NewValidationResponseHandler(n *Network, r *model.Router) channel.ReceiveHandlerF { return func(m *channel.Message, ch channel.Channel) { log := pfxlog.Logger().WithField("routerId", r.Id) @@ -378,14 +455,11 @@ func (self *RouterMessaging) NewValidationResponseHandler(n *Network, r *model.R return } - changeCtx := change.NewControlChannelChange(r.Id, r.Name, "fabric.validate.terminator", ch) - handler := &terminatorValidationRespReceived{ - router: r, - changeCtx: changeCtx, - resp: resp, + router: r, + resp: resp, } - handler.DeleteInvalid(n) + self.queueEvent(handler) } } @@ -498,36 +572,8 @@ func (self *validateTerminators) handle(c *RouterMessaging) { } type terminatorValidationRespReceived struct { - router *model.Router - changeCtx *change.Context - resp *ctrl_pb.ValidateTerminatorsV2Response - success bool -} - -func (self *terminatorValidationRespReceived) DeleteInvalid(n *Network) { - log := pfxlog.Logger().WithField("routerId", self.router.Id) - - var toDelete []string - for terminatorId, state := range self.resp.States { - if !state.Valid { - toDelete = append(toDelete, terminatorId) - log.WithField("terminatorId", terminatorId). - WithField("reason", state.Reason.String()). - Info("queuing terminator for delete") - } - } - - if len(toDelete) > 0 { - if err := n.Managers.Terminator.DeleteBatch(toDelete, self.changeCtx); err != nil { - for _, terminatorId := range toDelete { - log.WithField("terminatorId", terminatorId). - WithError(err). - Info("batch delete failed") - } - } else { - self.success = true - } - } + router *model.Router + resp *ctrl_pb.ValidateTerminatorsV2Response } func (self *terminatorValidationRespReceived) handle(c *RouterMessaging) { @@ -536,7 +582,10 @@ func (self *terminatorValidationRespReceived) handle(c *RouterMessaging) { for terminatorId, state := range self.resp.States { if terminator, ok := states.terminators[terminatorId]; ok { - if (state.Valid && (state.Marker == 0 || state.Marker == terminator.marker)) || self.success { + if state.Marker == 0 || state.Marker == terminator.marker { + if !state.Valid { + c.queuedTerminatorDeletes[terminatorId] = struct{}{} + } delete(states.terminators, terminatorId) } } @@ -547,6 +596,16 @@ func (self *terminatorValidationRespReceived) handle(c *RouterMessaging) { } } +type terminatorBatchDeleteCompleted struct { + deletedTerminatorIds []string +} + +func (self *terminatorBatchDeleteCompleted) handle(c *RouterMessaging) { + for _, terminatorId := range self.deletedTerminatorIds { + delete(c.queuedTerminatorDeletes, terminatorId) + } +} + type routerMessagingInspectEvent struct { resultC chan *inspect.RouterMessagingState } diff --git a/controller/raft/mesh/mesh.go b/controller/raft/mesh/mesh.go index 52c08da3e..59d1e6043 100644 --- a/controller/raft/mesh/mesh.go +++ b/controller/raft/mesh/mesh.go @@ -47,70 +47,109 @@ const ( SigningCertHeader = 2050 ApiAddressesHeader = 2051 RaftDisconnectType = 2052 + RaftConnId = 2053 ChannelTypeMesh = "ctrl.mesh" ) type Peer struct { - mesh *impl - Id raft.ServerID - Address string - Channel channel.Channel - RaftConn atomic.Pointer[raftPeerConn] - Version *versions.VersionInfo - SigningCerts []*x509.Certificate - ApiAddresses map[string][]event.ApiAddress -} - -func (self *Peer) initRaftConn() *raftPeerConn { - self.mesh.lock.Lock() - defer self.mesh.lock.Unlock() - conn := self.RaftConn.Load() - if conn == nil { - conn = newRaftPeerConn(self, self.mesh.netAddr) - self.RaftConn.Store(conn) - } - return conn + mesh *impl + Id raft.ServerID + Address string + Channel channel.Channel + RaftConns concurrenz.CopyOnWriteMap[uint32, *raftPeerConn] + Version *versions.VersionInfo + SigningCerts []*x509.Certificate + ApiAddresses map[string][]event.ApiAddress + raftPeerIdGen uint32 +} + +func (self *Peer) nextRaftPeerId() uint32 { + // dialing peers use odd ids, dialed peers use even, so we + // shouldn't get any conflict + return atomic.AddUint32(&self.raftPeerIdGen, 2) +} + +func (self *Peer) newRaftPeerConn(id uint32) *raftPeerConn { + result := &raftPeerConn{ + id: id, + peer: self, + localAddr: self.mesh.netAddr, + readTimeout: newDeadline(), + readC: make(chan []byte, 16), + closeNotify: make(chan struct{}), + } + self.RaftConns.Put(id, result) + return result } func (self *Peer) HandleClose(channel.Channel) { - self.mesh.lock.Lock() - conn := self.RaftConn.Swap(nil) - if conn != nil { - conn.close() + conns := self.RaftConns.AsMap() + self.RaftConns.Clear() + for _, v := range conns { + v.close() } - self.mesh.lock.Unlock() - self.mesh.PeerDisconnected(self) } -func (self *Peer) ContentType() int32 { - return RaftConnectType -} - -func (self *Peer) HandleReceive(m *channel.Message, _ channel.Channel) { +func (self *Peer) handleReceiveConnect(m *channel.Message, ch channel.Channel) { go func() { + log := pfxlog.Logger().WithField("peerId", ch.Id()) + log.Info("received connect request from raft peer") + + id, ok := m.GetUint32Header(RaftConnId) + if !ok { + response := channel.NewResult(false, "no conn id in connect request") + response.ReplyTo(m) + + if err := response.WithTimeout(5 * time.Second).Send(self.Channel); err != nil { + log.WithError(err).Error("failed to send raft peer connect error response") + } + return + } + + if peerConn := self.RaftConns.Get(id); peerConn != nil { + response := channel.NewResult(false, "duplicate conn id in connect request") + response.ReplyTo(m) + + if err := response.WithTimeout(5 * time.Second).Send(self.Channel); err != nil { + log.WithError(err).Error("failed to send raft peer connect error response") + } + return + } + response := channel.NewResult(true, "") response.ReplyTo(m) if err := response.WithTimeout(5 * time.Second).Send(self.Channel); err != nil { - logrus.WithError(err).Error("failed to send connect response") + log.WithError(err).Error("failed to send raft peer connect response") } else { - conn := self.initRaftConn() + conn := self.newRaftPeerConn(id) select { case self.mesh.raftAccepts <- conn: + log.Info("raft peer connection sent to listener") case <-self.mesh.closeNotify: + log.Info("unable to send raft peer connection to listener, listener closed") } } }() } -func (self *Peer) handleReceiveDisconnect(m *channel.Message, _ channel.Channel) { +func (self *Peer) handleReceiveDisconnect(m *channel.Message, ch channel.Channel) { go func() { - self.mesh.lock.Lock() - conn := self.RaftConn.Swap(nil) - self.mesh.lock.Unlock() + log := pfxlog.ContextLogger(ch.Label()) + + id, ok := m.GetUint32Header(RaftConnId) + if !ok { + response := channel.NewResult(false, "no conn id in disconnect request") + response.ReplyTo(m) - if conn != nil { + if err := response.WithTimeout(5 * time.Second).Send(self.Channel); err != nil { + log.WithError(err).Error("failed to send raft peer connect error response") + } + return + } + + if conn := self.RaftConns.Get(id); conn != nil { conn.close() } @@ -118,51 +157,96 @@ func (self *Peer) handleReceiveDisconnect(m *channel.Message, _ channel.Channel) response.ReplyTo(m) if err := response.WithTimeout(5 * time.Second).Send(self.Channel); err != nil { - logrus.WithError(err).Error("failed to send close response") + log.WithError(err).Error("failed to send close response, closing channel") + if closeErr := self.Channel.Close(); closeErr != nil { + pfxlog.Logger().WithError(closeErr).WithField("ch", self.Channel.Label()).Error("failed to close channel") + } } + + log.Infof("received disconnect, disconnected peer %v at %v", self.Id, self.Address) }() } func (self *Peer) handleReceiveData(m *channel.Message, ch channel.Channel) { - if conn := self.RaftConn.Load(); conn != nil { - conn.HandleReceive(m, ch) + id, ok := m.GetUint32Header(RaftConnId) + if !ok { + pfxlog.Logger().WithField("peerId", ch.Id()).Error("no conn id in data request") + return + } + + conn := self.RaftConns.Get(id) + if conn == nil { + pfxlog.Logger().WithField("peerId", ch.Id()). + WithField("connId", id).Error("invalid conn id in data request") + return } + + conn.HandleReceive(m, ch) } func (self *Peer) Connect(timeout time.Duration) (net.Conn, error) { + log := pfxlog.Logger().WithField("peerId", string(self.Id)).WithField("address", self.Address) + log.Info("sending connect msg to raft peer") + + id := self.nextRaftPeerId() msg := channel.NewMessage(RaftConnectType, nil) + msg.Headers.PutUint32Header(RaftConnId, id) + response, err := msg.WithTimeout(timeout).SendForReply(self.Channel) if err != nil { + log.WithError(err).Error("failed to send connect message to raft peer, closing channel") + if closeErr := self.Channel.Close(); closeErr != nil { + log.WithError(closeErr).Error("failed to close raft peer channel") + } return nil, err } result := channel.UnmarshalResult(response) if !result.Success { - return nil, errors.Errorf("connect failed: %v", result.Message) + log.WithError(err).Error("non-success response to raft peer connect message, closing channel") + if closeErr := self.Channel.Close(); closeErr != nil { + log.WithError(closeErr).Error("failed to close raft peer channel") + } + return nil, errors.Errorf("raft peer connect failed: %v", result.Message) } - logrus.Infof("connected peer %v at %v", self.Id, self.Address) + log.Info("raft peer connected") - return self.initRaftConn(), nil + return self.newRaftPeerConn(id), nil } -func (self *Peer) closeRaftConn(timeout time.Duration) error { - self.mesh.lock.Lock() - conn := self.RaftConn.Swap(nil) - defer self.mesh.lock.Unlock() - if conn == nil { +func (self *Peer) closeRaftConn(peerConn *raftPeerConn, timeout time.Duration) error { + isCurrentPeer := self.RaftConns.DeleteIf(func(key uint32, val *raftPeerConn) bool { + return key == peerConn.id && val == peerConn + }) + + peerConn.close() + + log := pfxlog.Logger().WithField("peerId", self.Id) + if !isCurrentPeer { + log.Info("closed peer connection is not current connection, not sending disconnect message") return nil } - conn.close() + log.Info("closed peer connection is current connection, sending disconnect message") msg := channel.NewMessage(RaftDisconnectType, nil) + msg.Headers.PutUint32Header(RaftConnId, peerConn.id) + response, err := msg.WithTimeout(timeout).SendForReply(self.Channel) if err != nil { + log.WithError(err).Error("failed to send disconnect msg response, closing channel") + if closeErr := self.Channel.Close(); closeErr != nil { + log.WithError(closeErr).Error("failed to close channel") + } return err } result := channel.UnmarshalResult(response) if !result.Success { - return errors.Errorf("connect failed: %v", result.Message) + log.WithError(err).Error("result from disconnect was not success, closing channel") + if closeErr := self.Channel.Close(); closeErr != nil { + log.WithError(closeErr).Error("failed to close channel") + } + return errors.Errorf("close failed: %v", result.Message) } logrus.Infof("disconnected peer %v at %v", self.Id, self.Address) @@ -213,6 +297,7 @@ type Mesh interface { RegisterClusterStateHandler(f func(state ClusterState)) Init(bindHandler channel.BindHandler) + GetAllPeersForEvent() []*event.ClusterPeer } func New(env Env, raftAddr raft.ServerAddress, helloHeaderProviders []HeaderProvider) Mesh { @@ -230,7 +315,7 @@ func New(env Env, raftAddr raft.ServerAddress, helloHeaderProviders []HeaderProv }, Peers: map[string]*Peer{}, closeNotify: make(chan struct{}), - raftAccepts: make(chan net.Conn), + raftAccepts: make(chan *raftPeerConn), version: env.GetVersionProvider(), versionEncoded: versionEncoded, eventDispatcher: env.GetEventDispatcher(), @@ -246,7 +331,7 @@ type impl struct { lock sync.RWMutex closeNotify chan struct{} closed atomic.Bool - raftAccepts chan net.Conn + raftAccepts chan *raftPeerConn bindHandler concurrenz.AtomicValue[channel.BindHandler] version versions.VersionProvider versionEncoded []byte @@ -284,39 +369,44 @@ func (self *impl) Addr() net.Addr { func (self *impl) Accept() (net.Conn, error) { select { case conn := <-self.raftAccepts: + pfxlog.Logger().WithField("peerId", conn.peer.Id).Info("new raft peer connection return to raft layer") return conn, nil case <-self.closeNotify: - return nil, errors.New("closed") + pfxlog.Logger().Error("return error from raft peer mesh listener accept, listener closed") + return nil, errors.New("raft peer listener closed") } } func (self *impl) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) { - logrus.Infof("dialing %v", address) + log := pfxlog.Logger().WithField("address", address) + log.Info("dialing raft peer channel") peer, err := self.GetOrConnectPeer(string(address), timeout) if err != nil { + log.WithError(err).Error("unable to get or connect raft peer channel") return nil, err } - if peerConn := peer.RaftConn.Load(); peerConn != nil { - return peerConn, nil - } + log.WithField("peerId", peer.Id).Info("invoking raft connect on established peer channel") return peer.Connect(timeout) } func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer, error) { + log := pfxlog.Logger().WithField("address", address) + if address == "" { - return nil, errors.New("cannot get peer for empty address") + return nil, errors.New("cannot get raft peer for empty address") } + if peer := self.GetPeer(raft.ServerAddress(address)); peer != nil { - logrus.Debugf("existing peer found for %v, returning", address) + log.Debug("existing new raft peer channel found for address") return peer, nil } - logrus.Infof("creating new peer for %v, returning", address) + + log.Info("establishing new raft peer channel") addr, err := transport.ParseAddress(address) if err != nil { - logrus.WithError(err).WithField("address", address).Error("failed to parse address") return nil, err } @@ -349,8 +439,9 @@ func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer dialOptions.ConnectOptions.ConnectTimeout = timeout peer := &Peer{ - mesh: self, - Address: address, + mesh: self, + Address: address, + raftPeerIdGen: 1, } bindHandler := channel.BindHandlerF(func(binding channel.Binding) error { @@ -390,8 +481,8 @@ func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer peer.Version = versionInfo peer.SigningCerts = []*x509.Certificate{underlay.Certificates()[0]} - binding.AddTypedReceiveHandler(peer) binding.AddReceiveHandlerF(RaftDataType, peer.handleReceiveData) + binding.AddReceiveHandlerF(RaftConnectType, peer.handleReceiveConnect) binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect) binding.AddCloseHandler(peer) @@ -404,6 +495,8 @@ func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer return nil, errors.Wrapf(err, "error dialing peer %v", address) } + log.WithField("peerId", peer.Id).Info("established new raft peer channel") + return peer, nil } @@ -488,25 +581,21 @@ func ExtractSpiffeId(certs []*x509.Certificate) (string, error) { func (self *impl) PeerConnected(peer *Peer, dial bool) error { self.lock.Lock() - defer self.lock.Unlock() if self.Peers[peer.Address] != nil { + defer self.lock.Unlock() return fmt.Errorf("connection from peer %v @ %v already present", peer.Id, peer.Address) } self.Peers[peer.Address] = peer self.updateClusterState() + self.lock.Unlock() + pfxlog.Logger().WithField("peerId", peer.Id). WithField("peerAddr", peer.Address). Info("peer connected") evt := event.NewClusterEvent(event.ClusterPeerConnected) - evt.Peers = append(evt.Peers, &event.ClusterPeer{ - Id: string(peer.Id), - Addr: peer.Address, - Version: peer.Version.Version, - ServerCert: peer.SigningCerts, - ApiAddresses: peer.ApiAddresses, - }) + evt.Peers = self.GetEventPeerList(peer) self.eventDispatcher.AcceptClusterEvent(evt) @@ -534,6 +623,38 @@ func (self *impl) PeerConnected(peer *Peer, dial bool) error { return nil } +func (self *impl) GetEventPeerList(peers ...*Peer) []*event.ClusterPeer { + if len(peers) == 0 { + return nil + } + var result []*event.ClusterPeer + for _, peer := range peers { + result = append(result, &event.ClusterPeer{ + Id: string(peer.Id), + Addr: peer.Address, + Version: peer.Version.Version, + ServerCert: peer.SigningCerts, + ApiAddresses: peer.ApiAddresses, + }) + } + return result +} + +func (self *impl) GetAllPeersForEvent() []*event.ClusterPeer { + peers := self.GetPeers() + var peerList []*Peer + for _, peer := range peers { + peerList = append(peerList, peer) + } + peerList = append(peerList, &Peer{ + mesh: self, + Id: raft.ServerID(self.id.Token), + Address: string(self.raftAddr), + Version: self.version.AsVersionInfo(), + }) + return self.GetEventPeerList(peerList...) +} + func (self *impl) GetPeer(addr raft.ServerAddress) *Peer { self.lock.RLock() defer self.lock.RUnlock() @@ -542,25 +663,22 @@ func (self *impl) GetPeer(addr raft.ServerAddress) *Peer { func (self *impl) PeerDisconnected(peer *Peer) { self.lock.Lock() - defer self.lock.Unlock() currentPeer := self.Peers[peer.Address] if currentPeer == nil || currentPeer != peer { + self.lock.Unlock() return } delete(self.Peers, peer.Address) self.updateClusterState() + self.lock.Unlock() pfxlog.Logger().WithField("peerId", peer.Id). WithField("peerAddr", peer.Address). Info("peer disconnected") evt := event.NewClusterEvent(event.ClusterPeerDisconnected) - evt.Peers = append(evt.Peers, &event.ClusterPeer{ - Id: string(peer.Id), - Addr: peer.Address, - Version: peer.Version.Version, - }) + evt.Peers = self.GetEventPeerList(peer) self.eventDispatcher.AcceptClusterEvent(evt) } @@ -655,8 +773,8 @@ func (self *impl) AcceptUnderlay(underlay channel.Underlay) error { peer.Version = versionInfo - binding.AddTypedReceiveHandler(peer) binding.AddReceiveHandlerF(RaftDataType, peer.handleReceiveData) + binding.AddReceiveHandlerF(RaftConnectType, peer.handleReceiveConnect) binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect) binding.AddCloseHandler(peer) return self.PeerConnected(peer, false) @@ -676,8 +794,8 @@ func (self *impl) AcceptUnderlay(underlay channel.Underlay) error { } func (self *impl) GetPeers() map[string]*Peer { - self.lock.Lock() - defer self.lock.Unlock() + self.lock.RLock() + defer self.lock.RUnlock() result := map[string]*Peer{} diff --git a/controller/raft/mesh/peerconn.go b/controller/raft/mesh/peerconn.go index b26125f6f..5995e5bdd 100644 --- a/controller/raft/mesh/peerconn.go +++ b/controller/raft/mesh/peerconn.go @@ -17,6 +17,7 @@ package mesh import ( + "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v3" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -26,19 +27,10 @@ import ( "time" ) -func newRaftPeerConn(peer *Peer, localAddr net.Addr) *raftPeerConn { - return &raftPeerConn{ - peer: peer, - localAddr: localAddr, - readTimeout: newDeadline(), - readC: make(chan []byte, 16), - closeNotify: make(chan struct{}), - } -} - // raftPeerConn presents a net.Conn API over a channel. This allows us to multiplex raft traffic as well // as our own traffic (such as command forwarding) over the same network connection type raftPeerConn struct { + id uint32 peer *Peer localAddr net.Addr readTimeout *deadline @@ -118,6 +110,7 @@ func (self *raftPeerConn) Write(b []byte) (n int, err error) { } // logrus.Infof("writing %v bytes to raft peer %v", len(b), self.peer.Id) msg := channel.NewMessage(RaftDataType, b) + msg.Headers.PutUint32Header(RaftConnId, self.id) if deadline := self.writeDeadline; !deadline.IsZero() { now := time.Now() if deadline.After(now) { @@ -129,11 +122,13 @@ func (self *raftPeerConn) Write(b []byte) (n int, err error) { } func (self *raftPeerConn) Close() error { - return self.peer.closeRaftConn(5 * time.Second) + pfxlog.Logger().WithField("peerId", self.peer.Id).Info("close called on peer connection") + return self.peer.closeRaftConn(self, 5*time.Second) } func (self *raftPeerConn) close() bool { if self.closed.CompareAndSwap(false, true) { + pfxlog.Logger().WithField("peerId", self.peer.Channel.Id()).Info("closing raft peer connection") close(self.closeNotify) return true } diff --git a/controller/raft/raft.go b/controller/raft/raft.go index e70f1946a..ae97f3bc4 100644 --- a/controller/raft/raft.go +++ b/controller/raft/raft.go @@ -254,7 +254,7 @@ func (self *Controller) Dispatch(cmd command.Command) error { return errors.New("unable to execute command, cluster has no leader") } - log.WithField("cmd", reflect.TypeOf(cmd)).WithField("dest", self.GetLeaderAddr()).Info("forwarding command") + log.WithField("cmd", reflect.TypeOf(cmd)).WithField("dest", self.GetLeaderAddr()).Debug("forwarding command") peer, err := self.GetMesh().GetOrConnectPeer(self.GetLeaderAddr(), 5*time.Second) if err != nil { @@ -335,7 +335,7 @@ func (self *Controller) decodeApiError(data []byte) error { return errors.New(string(data)) } - if cause, ok := m["cause"]; ok { + if cause, ok := m["cause"]; ok && cause != nil { if strCause, ok := cause.(string); ok { apiErr.Cause = errors.New(strCause) } else if objCause, ok := cause.(map[string]any); ok { @@ -587,17 +587,9 @@ func (self *Controller) Configure(ctrlConfig *config.RaftConfig, conf *raft.Conf conf.CommitTimeout = *ctrlConfig.CommitTimeout } - if ctrlConfig.ElectionTimeout != nil { - conf.ElectionTimeout = *ctrlConfig.ElectionTimeout - } - - if ctrlConfig.HeartbeatTimeout != nil { - conf.HeartbeatTimeout = *ctrlConfig.HeartbeatTimeout - } - - if ctrlConfig.LeaderLeaseTimeout != nil { - conf.LeaderLeaseTimeout = *ctrlConfig.LeaderLeaseTimeout - } + conf.ElectionTimeout = ctrlConfig.ElectionTimeout + conf.HeartbeatTimeout = ctrlConfig.HeartbeatTimeout + conf.LeaderLeaseTimeout = ctrlConfig.LeaderLeaseTimeout if ctrlConfig.LogLevel != nil { conf.LogLevel = *ctrlConfig.LogLevel @@ -619,13 +611,8 @@ func (self *Controller) ConfigureReloadable(ctrlConfig *config.RaftConfig, conf conf.TrailingLogs = uint64(*ctrlConfig.TrailingLogs) } - if ctrlConfig.ElectionTimeout != nil { - conf.ElectionTimeout = *ctrlConfig.ElectionTimeout - } - - if ctrlConfig.HeartbeatTimeout != nil { - conf.HeartbeatTimeout = *ctrlConfig.HeartbeatTimeout - } + conf.ElectionTimeout = ctrlConfig.ElectionTimeout + conf.HeartbeatTimeout = ctrlConfig.HeartbeatTimeout } func (self *Controller) validateCert() { @@ -830,19 +817,25 @@ func (self *Controller) addEventsHandlers() { self.RegisterClusterEventHandler(func(evt ClusterEvent, state ClusterState) { switch evt { case ClusterEventLeadershipGained: - self.env.GetEventDispatcher().AcceptClusterEvent(event.NewClusterEvent(event.ClusterLeadershipGained)) + self.newClusterEvent(event.ClusterLeadershipGained, self.Mesh.GetAllPeersForEvent()) case ClusterEventLeadershipLost: - self.env.GetEventDispatcher().AcceptClusterEvent(event.NewClusterEvent(event.ClusterLeadershipLost)) + self.newClusterEvent(event.ClusterLeadershipLost, nil) case ClusterEventReadOnly: - self.env.GetEventDispatcher().AcceptClusterEvent(event.NewClusterEvent(event.ClusterStateReadOnly)) + self.newClusterEvent(event.ClusterStateReadOnly, nil) case ClusterEventReadWrite: - self.env.GetEventDispatcher().AcceptClusterEvent(event.NewClusterEvent(event.ClusterStateReadWrite)) + self.newClusterEvent(event.ClusterStateReadWrite, nil) default: pfxlog.Logger().Errorf("unhandled cluster event type: %v", evt) } }) } +func (self *Controller) newClusterEvent(eventType event.ClusterEventType, peers []*event.ClusterPeer) { + evt := event.NewClusterEvent(eventType) + evt.Peers = peers + self.env.GetEventDispatcher().AcceptClusterEvent(evt) +} + type MigrationManager interface { ValidateMigrationEnvironment() error TryInitializeRaftFromBoltDb() error diff --git a/controller/xt/xt.go b/controller/xt/xt.go index 730bea678..e8f27ee72 100644 --- a/controller/xt/xt.go +++ b/controller/xt/xt.go @@ -45,6 +45,7 @@ type Terminator interface { GetPeerData() PeerData GetCreatedAt() time.Time GetHostId() string + GetSourceCtrl() string } type PeerData map[uint32][]byte diff --git a/controller/xt_common/failure_test.go b/controller/xt_common/failure_test.go index 0af231f2b..43c7ae649 100644 --- a/controller/xt_common/failure_test.go +++ b/controller/xt_common/failure_test.go @@ -56,6 +56,10 @@ func (m mockTerminator) GetHostId() string { panic("implement me") } +func (m mockTerminator) GetSourceCtrl() string { + panic("implement me") +} + func TestFailures(t *testing.T) { //t.SkipNow() costVisitor := &CostVisitor{ diff --git a/router/env/ctrls.go b/router/env/ctrls.go index 5a03e6129..3f4b8744c 100644 --- a/router/env/ctrls.go +++ b/router/env/ctrls.go @@ -34,10 +34,11 @@ import ( ) type NetworkControllers interface { - UpdateControllerEndpoints(endpoints []string) bool + UpdateControllerEndpoints(endpoints []string, leaderId string) bool GetAll() map[string]NetworkController GetNetworkController(ctrlId string) NetworkController AnyCtrlChannel() channel.Channel + GetModelUpdateCtrlChannel() channel.Channel GetIfResponsive(ctrlId string) (channel.Channel, bool) AllResponsiveCtrlChannels() []channel.Channel AnyValidCtrlChannel() channel.Channel @@ -66,9 +67,10 @@ type networkControllers struct { defaultRequestTimeout time.Duration ctrlEndpoints cmap.ConcurrentMap[string, struct{}] ctrls concurrenz.CopyOnWriteMap[string, NetworkController] + leaderId concurrenz.AtomicValue[string] } -func (self *networkControllers) UpdateControllerEndpoints(addresses []string) bool { +func (self *networkControllers) UpdateControllerEndpoints(addresses []string, leaderId string) bool { self.lock.Lock() defer self.lock.Unlock() @@ -98,6 +100,10 @@ func (self *networkControllers) UpdateControllerEndpoints(addresses []string) bo self.connectToControllerWithBackoff(endpoint) } + if leaderId != "" { + self.leaderId.Store(leaderId) + } + return changed } @@ -186,6 +192,25 @@ func (self *networkControllers) AnyCtrlChannel() channel.Channel { return current.Channel() } +func (self *networkControllers) isLeader(controller NetworkController) bool { + return self.leaderId.Load() == controller.Channel().Id() +} + +func (self *networkControllers) GetModelUpdateCtrlChannel() channel.Channel { + var current NetworkController + for _, ctrl := range self.ctrls.AsMap() { + if current == nil || + (ctrl.isMoreResponsive(current) && !self.isLeader(current)) || + (!ctrl.IsUnresponsive() && self.isLeader(ctrl)) { + current = ctrl + } + } + if current == nil { + return nil + } + return current.Channel() +} + func (self *networkControllers) AllResponsiveCtrlChannels() []channel.Channel { var channels []channel.Channel for _, ctrl := range self.ctrls.AsMap() { diff --git a/router/handler_ctrl/settings.go b/router/handler_ctrl/settings.go index 81a0d7212..fcf823af0 100644 --- a/router/handler_ctrl/settings.go +++ b/router/handler_ctrl/settings.go @@ -44,7 +44,7 @@ func (handler *settingsHandler) HandleReceive(msg *channel.Message, ch channel.C switch settingType { case int32(ctrl_pb.SettingTypes_NewCtrlAddress): newAddress := string(settingValue) - handler.updater.UpdateCtrlEndpoints([]string{newAddress}) + handler.updater.UpdateCtrlEndpoints([]string{newAddress}, "") default: log.Error("unknown setting type, ignored") } diff --git a/router/handler_ctrl/updateCtrlAddresses.go b/router/handler_ctrl/update_ctrl_addresses.go similarity index 88% rename from router/handler_ctrl/updateCtrlAddresses.go rename to router/handler_ctrl/update_ctrl_addresses.go index 9baf30c09..8db0ec614 100644 --- a/router/handler_ctrl/updateCtrlAddresses.go +++ b/router/handler_ctrl/update_ctrl_addresses.go @@ -11,7 +11,7 @@ import ( var updateCtrlAddressesHandlerInstance *updateCtrlAddressesHandler type CtrlAddressUpdater interface { - UpdateCtrlEndpoints(endpoints []string) + UpdateCtrlEndpoints(endpoints []string, leaderId string) } type updateCtrlAddressesHandler struct { @@ -38,7 +38,11 @@ func (handler *updateCtrlAddressesHandler) HandleReceive(msg *channel.Message, c if upd.IsLeader || handler.currentVersion == 0 || handler.currentVersion < upd.Index { log.Info("updating to controller endpoints to version") - handler.callback.UpdateCtrlEndpoints(upd.Addresses) + leaderId := "" + if upd.IsLeader { + leaderId = ch.Id() + } + handler.callback.UpdateCtrlEndpoints(upd.Addresses, leaderId) handler.currentVersion = upd.Index } } diff --git a/router/router.go b/router/router.go index ca8b2cbe8..07af30597 100644 --- a/router/router.go +++ b/router/router.go @@ -558,7 +558,7 @@ func (self *Router) startControlPlane() error { log := pfxlog.Logger() log.Infof("router configured with %v controller endpoints", len(endpoints)) - self.ctrls.UpdateControllerEndpoints(endpoints) + self.ctrls.UpdateControllerEndpoints(endpoints, "") self.metricsReporter = fabricMetrics.NewControllersReporter(self.ctrls) self.metricsRegistry.StartReporting(self.metricsReporter, self.config.Metrics.ReportInterval, self.config.Metrics.MessageQueueSize) @@ -758,9 +758,9 @@ func (self *Router) getInitialCtrlEndpoints() ([]string, error) { return endpoints, nil } -func (self *Router) UpdateCtrlEndpoints(endpoints []string) { +func (self *Router) UpdateCtrlEndpoints(endpoints []string, leaderId string) { log := pfxlog.Logger().WithField("endpoints", endpoints).WithField("filepath", self.config.Ctrl.DataDir) - if changed := self.ctrls.UpdateControllerEndpoints(endpoints); changed { + if changed := self.ctrls.UpdateControllerEndpoints(endpoints, leaderId); changed { log.Info("Attempting to save file") endpointsFile := path.Join(self.config.Ctrl.DataDir, "endpoints") diff --git a/router/router_test.go b/router/router_test.go index 215ffac73..5add3ded3 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -104,9 +104,9 @@ func Test_updateCtrlEndpoints(t *testing.T) { endpoints, err := r.getInitialCtrlEndpoints() req.NoError(err) - r.UpdateCtrlEndpoints(endpoints) + r.UpdateCtrlEndpoints(endpoints, "") - r.UpdateCtrlEndpoints([]string{"tls:localhost:6565"}) + r.UpdateCtrlEndpoints([]string{"tls:localhost:6565"}, "") req.FileExists(path.Join(tmpDir, "endpoints")) b, err := os.ReadFile(path.Join(tmpDir, "endpoints")) diff --git a/router/state/manager.go b/router/state/manager.go index ed83c1b70..7922eac47 100644 --- a/router/state/manager.go +++ b/router/state/manager.go @@ -471,6 +471,18 @@ func (a *ApiSession) SelectCtrlCh(ctrls env.NetworkControllers) channel.Channel return ctrls.AnyCtrlChannel() } +func (a *ApiSession) SelectModelUpdateCtrlCh(ctrls env.NetworkControllers) channel.Channel { + if a == nil { + return nil + } + + if a.ControllerId != "" { + return ctrls.GetCtrlChannel(a.ControllerId) + } + + return ctrls.GetModelUpdateCtrlChannel() +} + func NewApiSessionFromToken(jwtToken *jwt.Token, accessClaims *common.AccessClaims) (*ApiSession, error) { subj, err := jwtToken.Claims.GetSubject() if err != nil { diff --git a/router/xgress_edge/certchecker_test.go b/router/xgress_edge/certchecker_test.go index 9bb3129ed..633bfced0 100644 --- a/router/xgress_edge/certchecker_test.go +++ b/router/xgress_edge/certchecker_test.go @@ -525,7 +525,7 @@ func newCertChecker() (*CertExpirationChecker, func()) { return testChannel.Bind(bindHandler) }) ctrls := env.NewNetworkControllers(time.Second, ctrlDialer, env.NewDefaultHeartbeatOptions()) - ctrls.UpdateControllerEndpoints([]string{"tls:localhost:6262"}) + ctrls.UpdateControllerEndpoints([]string{"tls:localhost:6262"}, "") start := time.Now() for { if ctrls.AnyCtrlChannel() != nil { diff --git a/router/xgress_edge/hosted.go b/router/xgress_edge/hosted.go index 74b494610..17a5b418c 100644 --- a/router/xgress_edge/hosted.go +++ b/router/xgress_edge/hosted.go @@ -295,7 +295,7 @@ func (self *hostedServiceRegistry) RemoveTerminators(terminatorIds []string) err request := &ctrl_pb.RemoveTerminatorsRequest{ TerminatorIds: terminatorIds, } - + ctrls := self.env.GetNetworkControllers() ctrlCh := ctrls.AnyValidCtrlChannel() if ctrlCh == nil { @@ -560,7 +560,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato request.ApiSessionToken = apiSession.Token } - ctrlCh := terminator.edgeClientConn.apiSession.SelectCtrlCh(factory.ctrls) + ctrlCh := terminator.edgeClientConn.apiSession.SelectModelUpdateCtrlCh(factory.ctrls) if ctrlCh == nil { errStr := "no controller available, cannot create terminator" diff --git a/zitirest/clients.go b/zitirest/clients.go index 80a851ea6..d87635d4f 100644 --- a/zitirest/clients.go +++ b/zitirest/clients.go @@ -83,7 +83,7 @@ func (self *Clients) Authenticate(user, password string) error { } return err } - pfxlog.Logger().WithField("token", self.token).Info("authenticated successfully") + pfxlog.Logger().WithField("token", self.token).Debug("authenticated successfully") self.SetSessionToken(*result.Payload.Data.Token) return nil } diff --git a/zititest/models/sdk-hosting-test/configs/ctrl.yml.tmpl b/zititest/models/sdk-hosting-test/configs/ctrl.yml.tmpl index 19a6da43d..4e9464ee9 100644 --- a/zititest/models/sdk-hosting-test/configs/ctrl.yml.tmpl +++ b/zititest/models/sdk-hosting-test/configs/ctrl.yml.tmpl @@ -9,9 +9,9 @@ db: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/ctrl.db {{end}} identity: - cert: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/fablab/pki/{{ .Component.Id }}/certs/{{ .Component.Id }}-server.cert + cert: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/fablab/pki/{{ .Component.Id }}/certs/{{ .Component.Id }}-server.chain.pem key: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/fablab/pki/{{ .Component.Id }}/keys/{{ .Component.Id }}-server.key - ca: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/fablab/pki/{{ .Component.Id }}/certs/{{ .Component.Id }}-server.chain.pem + ca: /home/{{ .Model.MustVariable "credentials.ssh.username" }}/fablab/pki/{{ .Component.Id }}/certs/{{ .Component.Id }}.chain.pem trustDomain: sdk-hosting-test @@ -195,13 +195,7 @@ web: # - edge-client # - fabric-management - binding: health-checks - options: {} - binding: fabric - binding: edge-management - # options - variable optional/required - # This section is used to define values that are specified by the API they are associated with. - # These settings are per API. The example below is for the `edge-api` and contains both optional values and - # required values. - options: {} - binding: edge-client - options: {} + - binding: edge-oidc \ No newline at end of file diff --git a/zititest/models/sdk-hosting-test/configs/router.yml.tmpl b/zititest/models/sdk-hosting-test/configs/router.yml.tmpl index e9d33f99b..8e42c4598 100644 --- a/zititest/models/sdk-hosting-test/configs/router.yml.tmpl +++ b/zititest/models/sdk-hosting-test/configs/router.yml.tmpl @@ -6,15 +6,24 @@ v: 3 enableDebugOps: true +{{if .Component.GetFlag "ha"}} +ha: + enabled: true +{{end}} + identity: cert: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}-client.cert server_cert: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}-server.cert key: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}.key ca: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}-server.chain.pem +tls: + handshakeTimeout: 30s + ctrl: endpoints: {{ range $host := .Model.MustSelectHosts "component.ctrl" 1 }} - tls:{{ $host.PublicIp }}:6262{{end}} + startupTimeout: 5m healthChecks: ctrlPingCheck: @@ -36,6 +45,8 @@ link: advertise: tls:{{$router_ip}}:6000 dialers: - binding: transport + options: + connectTimeout: 30s listeners: {{if .Component.HasTag "tunneler"}} diff --git a/zititest/models/sdk-hosting-test/main.go b/zititest/models/sdk-hosting-test/main.go index d75ccd9e2..01596e19f 100644 --- a/zititest/models/sdk-hosting-test/main.go +++ b/zititest/models/sdk-hosting-test/main.go @@ -17,13 +17,13 @@ import ( "github.com/openziti/fablab/kernel/lib/runlevel/0_infrastructure/terraform" distribution "github.com/openziti/fablab/kernel/lib/runlevel/3_distribution" "github.com/openziti/fablab/kernel/lib/runlevel/3_distribution/rsync" - aws_ssh_key2 "github.com/openziti/fablab/kernel/lib/runlevel/6_disposal/aws_ssh_key" + awsSshKeyDispose "github.com/openziti/fablab/kernel/lib/runlevel/6_disposal/aws_ssh_key" "github.com/openziti/fablab/kernel/lib/runlevel/6_disposal/terraform" "github.com/openziti/fablab/kernel/model" "github.com/openziti/fablab/resources" "github.com/openziti/ziti/zititest/models/test_resources" "github.com/openziti/ziti/zititest/zitilab" - zitilib_actions "github.com/openziti/ziti/zititest/zitilab/actions" + zitilibActions "github.com/openziti/ziti/zititest/zitilab/actions" "github.com/openziti/ziti/zititest/zitilab/actions/edge" "github.com/openziti/ziti/zititest/zitilab/chaos" "github.com/openziti/ziti/zititest/zitilab/cli" @@ -33,14 +33,7 @@ import ( "time" ) -// const TargetZitiVersion = "v0.31.0" - const TargetZitiVersion = "" -const TargetZitiEdgeTunnelVersion = "" - -//const TargetZitiEdgeTunnelVersion = "0.22.12" - -var TunnelType = "!zet" //go:embed configs var configResource embed.FS @@ -100,7 +93,16 @@ var m = &model.Model{ }, StructureFactories: []model.Factory{ model.FactoryFunc(func(m *model.Model) error { - err := m.ForEachHost("component.router", 1, func(host *model.Host) error { + err := m.ForEachHost("component.ctrl", 1, func(host *model.Host) error { + host.InstanceType = "c5.xlarge" + return nil + }) + + if err != nil { + return err + } + + err = m.ForEachHost("component.router", 1, func(host *model.Host) error { host.InstanceType = "c5.xlarge" return nil }) @@ -123,6 +125,20 @@ var m = &model.Model{ return nil }) }), + model.FactoryFunc(func(m *model.Model) error { + if val, _ := m.GetBoolVariable("ha"); !val { + for _, host := range m.SelectHosts("component.ha") { + delete(host.Region.Hosts, host.Id) + } + } else { + for _, component := range m.SelectComponents("*") { + if ztType, ok := component.Type.(*zitilab.ZitiTunnelType); ok { + ztType.HA = true + } + } + } + return nil + }), model.NewScaleFactoryWithDefaultEntityFactory(&scaleStrategy{}), }, Resources: model.Resources{ @@ -136,7 +152,6 @@ var m = &model.Model{ Site: "us-east-1a", Hosts: model.Hosts{ "ctrl1": { - InstanceType: "c5.xlarge", Components: model.Components{ "ctrl1": { Scope: model.Scope{Tags: model.Tags{"ctrl"}}, @@ -171,9 +186,19 @@ var m = &model.Model{ }, }, "eu-west-2": { - Region: "us-west-2", - Site: "us-west-2a", + Region: "eu-west-2", + Site: "eu-west-2a", Hosts: model.Hosts{ + "ctrl2": { + Components: model.Components{ + "ctrl2": { + Scope: model.Scope{Tags: model.Tags{"ctrl", "ha"}}, + Type: &zitilab.ControllerType{ + Version: TargetZitiVersion, + }, + }, + }, + }, "router-eu-{{.ScaleIndex}}": { Scope: model.Scope{Tags: model.Tags{"router"}}, Components: model.Components{ @@ -202,6 +227,16 @@ var m = &model.Model{ Region: "ap-southeast-2", Site: "ap-southeast-2a", Hosts: model.Hosts{ + "ctrl3": { + Components: model.Components{ + "ctrl3": { + Scope: model.Scope{Tags: model.Tags{"ctrl", "ha"}}, + Type: &zitilab.ControllerType{ + Version: TargetZitiVersion, + }, + }, + }, + }, "router-ap-{{.ScaleIndex}}": { Scope: model.Scope{Tags: model.Tags{"router", "scaled"}}, Components: model.Components{ @@ -220,6 +255,7 @@ var m = &model.Model{ Scope: model.Scope{Tags: model.Tags{"host"}}, Type: &zitilab.ZitiTunnelType{ Version: TargetZitiVersion, + HA: true, }, }, }, @@ -245,8 +281,6 @@ var m = &model.Model{ workflow.AddAction(component.Start(".ctrl")) if isHA { - workflow.AddAction(semaphore.Sleep(2 * time.Second)) - workflow.AddAction(edge.RaftJoin("ctrl1", ".ctrl")) workflow.AddAction(semaphore.Sleep(2 * time.Second)) workflow.AddAction(edge.InitRaftController("#ctrl1")) } @@ -258,10 +292,10 @@ var m = &model.Model{ workflow.AddAction(edge.InitEdgeRouters(models.RouterTag, 25)) workflow.AddAction(edge.InitIdentities(".host", 25)) - workflow.AddAction(zitilib_actions.Edge("create", "edge-router-policy", "all", "--edge-router-roles", "#all", "--identity-roles", "#all")) - workflow.AddAction(zitilib_actions.Edge("create", "service-edge-router-policy", "all", "--service-roles", "#all", "--edge-router-roles", "#all")) + workflow.AddAction(zitilibActions.Edge("create", "edge-router-policy", "all", "--edge-router-roles", "#all", "--identity-roles", "#all")) + workflow.AddAction(zitilibActions.Edge("create", "service-edge-router-policy", "all", "--service-roles", "#all", "--edge-router-roles", "#all")) - workflow.AddAction(zitilib_actions.Edge("create", "config", "host-config", "host.v1", ` + workflow.AddAction(zitilibActions.Edge("create", "config", "host-config", "host.v1", ` { "address" : "localhost", "port" : 8080, @@ -273,7 +307,7 @@ var m = &model.Model{ for i := 0; i < 2000; i++ { name := fmt.Sprintf("service-%04d", i) task := func() error { - _, err := cli.Exec(run.GetModel(), "edge", "create", "service", name, "-c", "host-config") + _, err := cli.Exec(run.GetModel(), "edge", "create", "service", name, "-c", "host-config", "--timeout", "15") return err } tasks = append(tasks, task) @@ -299,7 +333,7 @@ var m = &model.Model{ } tasks = append(tasks, func() error { _, err := cli.Exec(run.GetModel(), "edge", "create", "service-policy", name, "Bind", - "--identity-roles", identityRoles, "--service-roles", servicesRoles) + "--identity-roles", identityRoles, "--service-roles", servicesRoles, "--timeout", "15") return err }) } @@ -307,6 +341,9 @@ var m = &model.Model{ })) workflow.AddAction(semaphore.Sleep(2 * time.Second)) + workflow.AddAction(edge.RaftJoin("ctrl1", ".ctrl")) + workflow.AddAction(semaphore.Sleep(5 * time.Second)) + workflow.AddAction(component.StartInParallel(".router", 10)) workflow.AddAction(semaphore.Sleep(2 * time.Second)) workflow.AddAction(component.StartInParallel(".host", 50)) @@ -318,7 +355,9 @@ var m = &model.Model{ component.StopInParallelHostExclusive("*", 15), host.GroupExec("*", 25, "rm -f logs/*"), )), - "login": model.Bind(edge.Login("#ctrl1")), + "login": model.Bind(edge.Login("#ctrl1")), + "login2": model.Bind(edge.Login("#ctrl2")), + "login3": model.Bind(edge.Login("#ctrl3")), "restart": model.ActionBinder(func(run *model.Model) model.Action { workflow := actions.Workflow() workflow.AddAction(component.StopInParallel("*", 100)) @@ -348,6 +387,13 @@ var m = &model.Model{ return nil })), "validate": model.Bind(model.ActionFunc(validateTerminators)), + "testIteration": model.Bind(model.ActionFunc(func(run model.Run) error { + return run.GetModel().Exec(run, + "sowChaos", + "validateUp", + "validate", + ) + })), }, Infrastructure: model.Stages{ @@ -367,7 +413,7 @@ var m = &model.Model{ Disposal: model.Stages{ terraform.Dispose(), - aws_ssh_key2.Dispose(), + awsSshKeyDispose.Dispose(), }, } diff --git a/zititest/models/sdk-hosting-test/validation.go b/zititest/models/sdk-hosting-test/validation.go index 8def5e8bc..010be59d2 100644 --- a/zititest/models/sdk-hosting-test/validation.go +++ b/zititest/models/sdk-hosting-test/validation.go @@ -80,7 +80,7 @@ func validateTerminators(run model.Run) error { deadline := time.Now().Add(15 * time.Minute) for _, ctrl := range ctrls { ctrlComponent := ctrl - go validateTerminatorsForCtrlWithChan(ctrlComponent, deadline, errC) + go validateTerminatorsForCtrlWithChan(run, ctrlComponent, deadline, errC) } for i := 0; i < len(ctrls); i++ { @@ -93,39 +93,15 @@ func validateTerminators(run model.Run) error { return nil } -func validateTerminatorsForCtrlWithChan(c *model.Component, deadline time.Time, errC chan<- error) { - errC <- validateTerminatorsForCtrl(c, deadline) +func validateTerminatorsForCtrlWithChan(run model.Run, c *model.Component, deadline time.Time, errC chan<- error) { + errC <- validateTerminatorsForCtrl(run, c, deadline) } -func validateTerminatorsForCtrl(c *model.Component, deadline time.Time) error { +func validateTerminatorsForCtrl(run model.Run, c *model.Component, deadline time.Time) error { expectedTerminatorCount := int64(6000) - username := c.MustStringVariable("credentials.edge.username") - password := c.MustStringVariable("credentials.edge.password") - edgeApiBaseUrl := c.Host.PublicIp + ":1280" - - var clients *zitirest.Clients - loginStart := time.Now() - for { - var err error - clients, err = zitirest.NewManagementClients(edgeApiBaseUrl) - if err != nil { - if time.Since(loginStart) > time.Minute { - return err - } - pfxlog.Logger().WithField("ctrlId", c.Id).WithError(err).Info("failed to initialize mgmt client, trying again in 1s") - time.Sleep(time.Second) - continue - } - - if err = clients.Authenticate(username, password); err != nil { - if time.Since(loginStart) > time.Minute { - return err - } - pfxlog.Logger().WithField("ctrlId", c.Id).WithError(err).Info("failed to authenticate, trying again in 1s") - time.Sleep(time.Second) - } else { - break - } + clients, err := chaos.EnsureLoggedIntoCtrl(run, c, time.Minute) + if err != nil { + return err } terminatorsPresent := false @@ -142,6 +118,10 @@ func validateTerminatorsForCtrl(c *model.Component, deadline time.Time) error { terminatorsPresent = true } else { time.Sleep(5 * time.Second) + clients, err = chaos.EnsureLoggedIntoCtrl(run, c, time.Minute) + if err != nil { + return err + } } if time.Since(lastLog) > time.Minute { logger.Infof("current terminator count: %v, elapsed time: %v", terminatorCount, time.Since(start)) @@ -167,6 +147,11 @@ func validateTerminatorsForCtrl(c *model.Component, deadline time.Time) error { logger.Infof("current count of invalid sdk terminators: %v, elapsed time: %v", count, time.Since(start)) time.Sleep(15 * time.Second) + + clients, err = chaos.EnsureLoggedIntoCtrl(run, c, time.Minute) + if err != nil { + return err + } } } diff --git a/zititest/models/sdk-status-test/main.go b/zititest/models/sdk-status-test/main.go index b127d645a..a26298452 100644 --- a/zititest/models/sdk-status-test/main.go +++ b/zititest/models/sdk-status-test/main.go @@ -280,9 +280,8 @@ var m = &model.Model{ workflow.AddAction(semaphore.Sleep(2 * time.Second)) workflow.AddAction(edge.RaftJoin("ctrl1", ".ctrl")) - workflow.AddAction(semaphore.Sleep(2 * time.Second)) + workflow.AddAction(semaphore.Sleep(5 * time.Second)) - workflow.AddAction(semaphore.Sleep(2 * time.Second)) workflow.AddAction(component.StartInParallel(".router", 10)) workflow.AddAction(semaphore.Sleep(2 * time.Second)) workflow.AddAction(component.StartInParallel(".host", 50)) diff --git a/zititest/models/smoke/configs/router.yml.tmpl b/zititest/models/smoke/configs/router.yml.tmpl index 3fdcfc53e..c390b4c78 100644 --- a/zititest/models/smoke/configs/router.yml.tmpl +++ b/zititest/models/smoke/configs/router.yml.tmpl @@ -17,9 +17,13 @@ identity: key: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}.key ca: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}-server.chain.pem +tls: + handshakeTimeout: 30s + ctrl: endpoints: {{ range $host := .Model.MustSelectHosts "component.ctrl" 1 }} - tls:{{ $host.PublicIp }}:6262{{end}} + startupTimeout: 5m healthChecks: ctrlPingCheck: @@ -41,6 +45,8 @@ link: advertise: tls:{{$router_ip}}:6000 dialers: - binding: transport + options: + connectTimeout: 30s listeners: {{if .Component.HasTag "tunneler"}} diff --git a/zititest/zitilab/actions/edge/raft_join.go b/zititest/zitilab/actions/edge/raft_join.go index 65a229cf6..88939f5de 100644 --- a/zititest/zitilab/actions/edge/raft_join.go +++ b/zititest/zitilab/actions/edge/raft_join.go @@ -6,18 +6,25 @@ import ( "github.com/openziti/fablab/kernel/model" "github.com/openziti/ziti/zititest/zitilab" "github.com/pkg/errors" + "time" ) func RaftJoin(primaryId string, componentSpec string) model.Action { + return RaftJoinWithTimeout(primaryId, componentSpec, time.Minute) +} + +func RaftJoinWithTimeout(primaryId string, componentSpec string, timeout time.Duration) model.Action { return &raftJoin{ primaryId: primaryId, componentSpec: componentSpec, + timeout: timeout, } } type raftJoin struct { primaryId string componentSpec string + timeout time.Duration } func (self *raftJoin) Execute(run model.Run) error { @@ -35,15 +42,24 @@ func (self *raftJoin) Execute(run model.Run) error { return errors.Errorf("component %s is not a controller", primary.Id) } log := pfxlog.Logger().WithField("component", primary.Id) + start := time.Now() for _, c := range ctrls { if c.Id == primary.Id { continue } - tmpl := "%s agent cluster add %v --id %v" - cmd := fmt.Sprintf(tmpl, ctrlType.GetBinaryPath(primary), "tls:"+c.Host.PublicIp+":6262", c.Id) - log.Info(cmd) - if err = primary.GetHost().ExecLogOnlyOnError(cmd); err != nil { - return err + + for { + tmpl := "%s agent cluster add %v --id %v" + cmd := fmt.Sprintf(tmpl, ctrlType.GetBinaryPath(primary), "tls:"+c.Host.PublicIp+":6262", c.Id) + log.Info(cmd) + if err = primary.GetHost().ExecLogOnlyOnError(cmd); err != nil { + if time.Since(start) > self.timeout { + return err + } + time.Sleep(2 * time.Second) + } else { + break + } } }