Skip to content

Commit

Permalink
Implement idempotency for sdk terminators. Fixes #1446
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Nov 14, 2023
1 parent 308995e commit 06924ae
Show file tree
Hide file tree
Showing 28 changed files with 815 additions and 275 deletions.
23 changes: 20 additions & 3 deletions controller/handler_edge_ctrl/create_terminator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/protobufs"
"github.com/openziti/ziti/controller/models"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/common"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/fields"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/models"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/persistence"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -104,6 +106,21 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV
self.returnError(ctx, edge_ctrl_pb.CreateTerminatorResult_FailedIdConflict, ctx.err, logger)
return
}

// if the precedence or cost has changed, update the terminator
if terminator.Precedence != ctx.req.GetXtPrecedence() || terminator.Cost != uint16(ctx.req.Cost) {
terminator.Precedence = ctx.req.GetXtPrecedence()
terminator.Cost = uint16(ctx.req.Cost)
err := self.appEnv.GetHostController().GetNetwork().Terminators.Update(terminator, fields.UpdatedFieldsMap{
db.FieldTerminatorPrecedence: struct{}{},
db.FieldTerminatorCost: struct{}{},
}, ctx.newChangeContext())

if err != nil {
self.returnError(ctx, edge_ctrl_pb.CreateTerminatorResult_FailedOther, err, logger)
return
}
}
} else {
terminator = &network.Terminator{
BaseEntity: models.BaseEntity{
Expand Down Expand Up @@ -161,7 +178,7 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV
logger.WithError(err).Error("failed to send CreateTunnelTerminatorResponse")
}

logger.Info("completed create tunnel terminator operation")
logger.Info("completed create terminator v2 operation")
}

func (self *createTerminatorV2Handler) returnError(ctx *CreateTerminatorV2RequestContext, resultType edge_ctrl_pb.CreateTerminatorResult, err error, logger *logrus.Entry) {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
github.com/openziti/jwks v1.0.3
github.com/openziti/metrics v1.2.37
github.com/openziti/runzmd v1.0.33
github.com/openziti/sdk-golang v0.20.129
github.com/openziti/sdk-golang v0.15.8-0.20231108150135-bb13a840de4b
github.com/openziti/secretstream v0.1.13
github.com/openziti/storage v0.2.23
github.com/openziti/transport/v2 v2.0.113
Expand All @@ -66,7 +66,7 @@ require (
github.com/rabbitmq/amqp091-go v1.8.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/russross/blackfriday v1.6.0
github.com/shirou/gopsutil/v3 v3.23.9
github.com/shirou/gopsutil/v3 v3.23.10
github.com/sirupsen/logrus v1.9.3
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spf13/cobra v1.7.0
Expand All @@ -80,8 +80,8 @@ require (
golang.org/x/crypto v0.14.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
golang.org/x/text v0.13.0
golang.org/x/sys v0.14.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.31.0
gopkg.in/AlecAivazis/survey.v1 v1.8.7
gopkg.in/resty.v1 v1.12.0
Expand Down
15 changes: 8 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ github.com/openziti/metrics v1.2.37 h1:5yWvMwQT6X43LDlNVcUtqAPJQXfKtbWSYoCIiOfXz
github.com/openziti/metrics v1.2.37/go.mod h1:jIL9iilxby8tR98C18uZaSe6bRG15ItR8XF2hmMt8vs=
github.com/openziti/runzmd v1.0.33 h1:tOyjRoUuVXIo1z1pNU32jALWkMmhzsSaDrhLtuOn3Ts=
github.com/openziti/runzmd v1.0.33/go.mod h1:8c/uvZR/XWXQNllTq6LuTpfKL2DTNxfI2X2wYhgRwik=
github.com/openziti/sdk-golang v0.20.129 h1:FjvXsGFxEiHq89sNyLSvNymruACFW5tbhkgZ3VCg2pE=
github.com/openziti/sdk-golang v0.20.129/go.mod h1:ZpJ7HCcIQbp8XiSno3YXkfhoDIbgjCjS2ScK2bda8eo=
github.com/openziti/sdk-golang v0.15.8-0.20231108150135-bb13a840de4b h1:BTEVFnlXrjWoWsETq+Zg6VFBQu8u1er2REISsjWFs6o=
github.com/openziti/sdk-golang v0.15.8-0.20231108150135-bb13a840de4b/go.mod h1:asthIJ9bYebgp+DGXQwv4J2gdXnyRwIp8DtCRBaNyPY=
github.com/openziti/secretstream v0.1.13 h1:grp53Q5gCFPXv6okwWHDVvqBBk2BhD0ikHwfV3Adhnc=
github.com/openziti/secretstream v0.1.13/go.mod h1:M4DYavDc3TVF/eemNqp5Fa+zGuYTNa0HTGSz/GkgUzA=
github.com/openziti/storage v0.2.23 h1:R5ZBGDGC/LvOz3fE/GlevwbPZ3HL7VxYEvlhKuezvNU=
Expand Down Expand Up @@ -728,8 +728,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E=
github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA=
github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM=
github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
Expand Down Expand Up @@ -1155,9 +1155,9 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1178,8 +1178,9 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
6 changes: 4 additions & 2 deletions router/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package env

import (
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/foundation/v2/goroutines"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/identity"
"github.com/openziti/metrics"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
)

type RouterEnv interface {
Expand All @@ -38,4 +39,5 @@ type RouterEnv interface {
RenderJsonConfig() (string, error)
GetHeartbeatOptions() HeartbeatOptions
GetRateLimiterPool() goroutines.Pool
GetVersionInfo() versions.VersionProvider
}
10 changes: 8 additions & 2 deletions router/xgress_edge/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/ziti/common/logcontext"
"github.com/openziti/ziti/controller/xt"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/pkg/errors"
)

Expand All @@ -39,7 +39,13 @@ func (dialer *dialer) IsTerminatorValid(id string, destination string) bool {
terminatorAddress := strings.TrimPrefix(destination, "hosted:")
pfxlog.Logger().Debug("looking up hosted service conn")
terminator, found := dialer.factory.hostedServices.Get(terminatorAddress)
return found && terminator.terminatorId.Load() == id
if found && terminator.terminatorId.Load() == id {
terminator.state.CompareAndSwap(TerminatorStateEstablishing, TerminatorStateEstablished)
terminator.state.CompareAndSwap(TerminatorStatePendingEstablishment, TerminatorStateEstablished)
dialer.factory.hostedServices.notifyTerminatorCreated(id)
return true
}
return false
}

func newDialer(factory *Factory, options *Options) xgress.Dialer {
Expand Down
38 changes: 35 additions & 3 deletions router/xgress_edge/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package xgress_edge
import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/router/xgress_common"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xgress_common"
"github.com/pkg/errors"
"io"
"math"
Expand All @@ -38,15 +39,45 @@ var headersFromFabric = map[uint8]int32{
xgress_common.PayloadFlagsHeader: edge.FlagsHeader,
}

type terminatorState int

const (
TerminatorStatePendingEstablishment terminatorState = 0
TerminatorStateEstablishing terminatorState = 1
TerminatorStateEstablished terminatorState = 2
TerminatorStateDeleting terminatorState = 3
)

func (self terminatorState) String() string {
switch self {
case TerminatorStatePendingEstablishment:
return "pending-establishment"
case TerminatorStateEstablishing:
return "establishing"
case TerminatorStateEstablished:
return "established"
case TerminatorStateDeleting:
return "deleting"
default:
return "unknown"
}
}

type edgeTerminator struct {
edge.MsgChannel
edgeClientConn *edgeClientConn
terminatorId concurrenz.AtomicValue[string]
listenerId string
token string
instance string
terminatorId concurrenz.AtomicValue[string]
instanceSecret []byte
cost uint16
precedence edge_ctrl_pb.TerminatorPrecedence
hostData map[uint32][]byte
assignIds bool
onClose func()
v2 bool
state concurrenz.AtomicValue[terminatorState]
}

func (self *edgeTerminator) nextDialConnId() uint32 {
Expand Down Expand Up @@ -75,6 +106,7 @@ func (self *edgeTerminator) close(notify bool, reason string) {
if self.v2 {
if terminatorId := self.terminatorId.Load(); terminatorId != "" {
if self.terminatorId.CompareAndSwap(terminatorId, "") {
self.state.Store(TerminatorStateDeleting)
logger.Info("removing terminator on controller")
ctrlCh := self.edgeClientConn.listener.factory.ctrls.AnyCtrlChannel()
if ctrlCh == nil {
Expand Down
39 changes: 20 additions & 19 deletions router/xgress_edge/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/metrics"
"github.com/openziti/transport/v2"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/router"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/fabric"
"github.com/openziti/ziti/router/handler_edge_ctrl"
"github.com/openziti/ziti/router/internal/apiproxy"
"github.com/openziti/ziti/router/internal/edgerouter"
"github.com/openziti/ziti/router"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/identity"
"github.com/openziti/metrics"
"github.com/openziti/transport/v2"
"github.com/pkg/errors"
"strings"
"time"
)

type Factory struct {
id *identity.TokenId
ctrls env.NetworkControllers
enabled bool
routerConfig *router.Config
Expand All @@ -47,6 +46,7 @@ type Factory struct {
versionProvider versions.VersionProvider
certChecker *CertExpirationChecker
metricsRegistry metrics.Registry
env env.RouterEnv
}

func (factory *Factory) GetNetworkControllers() env.NetworkControllers {
Expand All @@ -70,14 +70,18 @@ func (factory *Factory) BindChannel(binding channel.Binding) error {
binding.AddTypedReceiveHandler(handler_edge_ctrl.NewApiSessionRemovedHandler(factory.stateManager))
binding.AddTypedReceiveHandler(handler_edge_ctrl.NewApiSessionUpdatedHandler(factory.stateManager))
binding.AddTypedReceiveHandler(handler_edge_ctrl.NewSigningCertAddedHandler(factory.stateManager))
binding.AddTypedReceiveHandler(handler_edge_ctrl.NewExtendEnrollmentCertsHandler(factory.routerConfig.Id, func() {
binding.AddTypedReceiveHandler(handler_edge_ctrl.NewExtendEnrollmentCertsHandler(factory.env.GetRouterId(), func() {
factory.certChecker.CertsUpdated()
}))
binding.AddReceiveHandlerF(int32(edge_ctrl_pb.ContentType_CreateTerminatorV2ResponseType), factory.hostedServices.HandleCreateTerminatorResponse)

return nil
}

func (factory *Factory) NotifyOfReconnect(ch channel.Channel) {
pfxlog.Logger().Info("control channel reconnected, re-establishing hosted services")
factory.hostedServices.HandleReconnect()

go factory.stateManager.ValidateSessions(ch, factory.edgeRouterConfig.SessionValidateChunkSize, factory.edgeRouterConfig.SessionValidateMinInterval, factory.edgeRouterConfig.SessionValidateMaxInterval)
}

Expand All @@ -86,11 +90,8 @@ func (factory *Factory) GetTraceDecoders() []channel.TraceMessageDecoder {
}

func (factory *Factory) Run(env env.RouterEnv) error {
factory.ctrls = env.GetNetworkControllers()

factory.stateManager.StartHeartbeat(env, factory.edgeRouterConfig.HeartbeatIntervalSeconds, env.GetCloseNotify())

factory.certChecker = NewCertExpirationChecker(factory.routerConfig.Id, factory.edgeRouterConfig, env.GetNetworkControllers(), env.GetCloseNotify())
factory.certChecker = NewCertExpirationChecker(factory.env.GetRouterId(), factory.edgeRouterConfig, env.GetNetworkControllers(), env.GetCloseNotify())

go func() {
if err := factory.certChecker.Run(); err != nil {
Expand Down Expand Up @@ -118,22 +119,22 @@ func (factory *Factory) LoadConfig(configMap map[interface{}]interface{}) error
}
config.Tcfg["protocol"] = append(config.Tcfg.Protocols(), "ziti-edge", "")

factory.id = config.RouterConfig.Id

factory.edgeRouterConfig = config
go apiproxy.Start(config)

return nil
}

// NewFactory constructs a new Edge Xgress Factory instance
func NewFactory(routerConfig *router.Config, versionProvider versions.VersionProvider, stateManager fabric.StateManager, metricsRegistry metrics.Registry) *Factory {
func NewFactory(routerConfig *router.Config, env env.RouterEnv, stateManager fabric.StateManager) *Factory {
factory := &Factory{
hostedServices: NewHostedServicesRegistry(),
ctrls: env.GetNetworkControllers(),
hostedServices: NewHostedServicesRegistry(env),
stateManager: stateManager,
versionProvider: versionProvider,
versionProvider: env.GetVersionInfo(),
routerConfig: routerConfig,
metricsRegistry: metricsRegistry,
metricsRegistry: env.GetMetricsRegistry(),
env: env,
}
return factory
}
Expand Down Expand Up @@ -162,7 +163,7 @@ func (factory *Factory) CreateListener(optionsData xgress.OptionsData) (xgress.L
channel.HelloVersionHeader: versionHeader,
}

return newListener(factory.id, factory, options, headers), nil
return newListener(factory.env.GetRouterId(), factory, options, headers), nil
}

// CreateDialer creates a new Edge Xgress dialer
Expand Down
Loading

0 comments on commit 06924ae

Please sign in to comment.