Skip to content

Commit

Permalink
Merge pull request #2579 from openziti/router-data-model-test
Browse files Browse the repository at this point in the history
router data model test
  • Loading branch information
plorenz authored Jan 9, 2025
2 parents 1834455 + 5429ce5 commit 7b4ca1d
Show file tree
Hide file tree
Showing 89 changed files with 5,526 additions and 1,336 deletions.
55 changes: 45 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,61 @@
# Release 1.2.3
# Release 1.3.0

## What's New

* Router Data Model enabled by default
* Bug fixes

## Router Data Model

As part of the controller HA work, a stripped down version of the data model can now be distributed to the routers,
allowing routers to make some authorization/authentication decisions. This code has existed for some time, but
after testing and validation, is now enabled by default.

It can still be disabled at the controller level using new configuration. Note that the router data model is required
for HA functionality, so if the controller is running in HA mode, it cannot be disabled.

```yaml
routerDataModel:
# Controls whether routers are told to enable functionality dependent on the router data model
# Defaults to true
enabled: true

# How many model changes to buffer so that routers can be updated iteratively. If a router requests
# data that's no longer available, it will receive the full data model
logSize: 10000
```
## HA Changes
Routers no longer require the `ha: enabled` flag be set in the configuration. Routers should work correctly
whether connecting to HA or non-HA controllers.

NOTE: If the controller a router is connected changes modes, specifically if the controller goes from
supporting the router data model to not, or vice-versa, the router will shutdown so that it can
restart with the correct mode.

## Component Updates and Bug Fixes

* github.com/openziti/agent: [v1.0.20 -> v1.0.22](https://github.com/openziti/agent/compare/v1.0.20...v1.0.22)
* github.com/openziti/channel/v3: [v3.0.16 -> v3.0.22](https://github.com/openziti/channel/compare/v3.0.16...v3.0.22)
* github.com/openziti/agent: [v1.0.20 -> v1.0.23](https://github.com/openziti/agent/compare/v1.0.20...v1.0.23)
* github.com/openziti/channel/v3: [v3.0.16 -> v3.0.25](https://github.com/openziti/channel/compare/v3.0.16...v3.0.25)
* github.com/openziti/edge-api: [v0.26.35 -> v0.26.36](https://github.com/openziti/edge-api/compare/v0.26.35...v0.26.36)
* [Issue #138](https://github.com/openziti/edge-api/issues/138) - management api deletes were generally not mapping 404 properly

* github.com/openziti/foundation/v2: [v2.0.52 -> v2.0.55](https://github.com/openziti/foundation/compare/v2.0.52...v2.0.55)
* github.com/openziti/identity: [v1.0.90 -> v1.0.93](https://github.com/openziti/identity/compare/v1.0.90...v1.0.93)
* github.com/openziti/metrics: [v1.2.61 -> v1.2.64](https://github.com/openziti/metrics/compare/v1.2.61...v1.2.64)
* github.com/openziti/runzmd: [v1.0.55 -> v1.0.58](https://github.com/openziti/runzmd/compare/v1.0.55...v1.0.58)
* github.com/openziti/secretstream: [v0.1.26 -> v0.1.27](https://github.com/openziti/secretstream/compare/v0.1.26...v0.1.27)
* github.com/openziti/storage: [v0.3.8 -> v0.3.13](https://github.com/openziti/storage/compare/v0.3.8...v0.3.13)
* github.com/openziti/foundation/v2: [v2.0.52 -> v2.0.56](https://github.com/openziti/foundation/compare/v2.0.52...v2.0.56)
* github.com/openziti/identity: [v1.0.90 -> v1.0.94](https://github.com/openziti/identity/compare/v1.0.90...v1.0.94)
* github.com/openziti/metrics: [v1.2.61 -> v1.2.65](https://github.com/openziti/metrics/compare/v1.2.61...v1.2.65)
* github.com/openziti/runzmd: [v1.0.55 -> v1.0.59](https://github.com/openziti/runzmd/compare/v1.0.55...v1.0.59)
* github.com/openziti/secretstream: [v0.1.26 -> v0.1.28](https://github.com/openziti/secretstream/compare/v0.1.26...v0.1.28)
* github.com/openziti/storage: [v0.3.8 -> v0.3.14](https://github.com/openziti/storage/compare/v0.3.8...v0.3.14)
* [Issue #91](https://github.com/openziti/storage/issues/91) - Support dashes in identifier segments after the first dot

* github.com/openziti/transport/v2: [v2.0.153 -> v2.0.157](https://github.com/openziti/transport/compare/v2.0.153...v2.0.157)
* github.com/openziti/transport/v2: [v2.0.153 -> v2.0.159](https://github.com/openziti/transport/compare/v2.0.153...v2.0.159)
* github.com/openziti/ziti: [v1.2.2 -> v1.2.3](https://github.com/openziti/ziti/compare/v1.2.2...v1.2.3)
* [Issue #2596](https://github.com/openziti/ziti/issues/2596) - Add DisableRouterDataModel config flag to controller
* [Issue #2599](https://github.com/openziti/ziti/issues/2599) - Routers should only stream model data from one controller
* [Issue #2232](https://github.com/openziti/ziti/issues/2232) - Standardized REST API Error For Mutation on Non-Consensus Controller
* [Issue #2566](https://github.com/openziti/ziti/issues/2566) - Remove HA config flag from router
* [Issue #2550](https://github.com/openziti/ziti/issues/2550) - Router Data Model Chaos Test

# Release 1.2.2

Expand Down
1 change: 1 addition & 0 deletions common/capabilities/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ const (
ControllerCreateTerminatorV2 int = 1
ControllerSingleRouterLinkSource int = 2
ControllerCreateCircuitV2 int = 3
RouterDataModel int = 4
)
91 changes: 91 additions & 0 deletions common/config/value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"github.com/openziti/foundation/v2/concurrenz"
"sync"
)

type Listener[T any] interface {
NotifyChanged(init bool, old T, new T)
}

type ListenerFunc[T any] func(init bool, old T, new T)

func (f ListenerFunc[T]) NotifyChanged(init bool, old T, new T) {
f(init, old, new)
}

func NewConfigValue[T comparable]() *Value[T] {
return &Value[T]{
notifyInitialized: make(chan struct{}),
}
}

type Value[T comparable] struct {
lock sync.Mutex
initialized bool
notifyInitialized chan struct{}
value concurrenz.AtomicValue[T]
listeners concurrenz.CopyOnWriteSlice[Listener[T]]
}

func (self *Value[T]) Store(value T) {
self.lock.Lock()
defer self.lock.Unlock()

first := !self.initialized
old := self.value.Swap(value)

if first || old != value {
for _, l := range self.listeners.Value() {
l.NotifyChanged(first, old, value)
}
}

if first {
self.initialized = true
close(self.notifyInitialized)
}
}

func (self *Value[T]) Load() T {
return self.value.Load()
}

func (self *Value[T]) AddListener(listener Listener[T]) {
self.lock.Lock()
defer self.lock.Unlock()

self.listeners.Append(listener)

if self.initialized {
listener.NotifyChanged(true, self.Load(), self.Load())
}
}

func (self *Value[T]) RemoveListener(listener Listener[T]) {
self.lock.Lock()
defer self.lock.Unlock()

self.listeners.Delete(listener)
}

func (self *Value[T]) GetInitNotifyChannel() <-chan struct{} {
return self.notifyInitialized
}
49 changes: 20 additions & 29 deletions common/event_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"sync"
"sync/atomic"
)

type OnStoreSuccess func(index uint64, event *edge_ctrl_pb.DataState_ChangeSet)
Expand All @@ -33,7 +34,7 @@ type EventCache interface {
CurrentIndex() (uint64, bool)

// ReplayFrom returns an array of events from startIndex and true if the replay may be facilitated.
// An empty slice and true is returned in cases where the requested startIndex is the current index.
// An empty slice and true is returned in cases where the requested startIndex is greater than the current index.
// An empty slice and false is returned in cases where the replay cannot be facilitated.
// This function is blocking.
ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_ChangeSet, bool)
Expand All @@ -51,17 +52,15 @@ type EventCache interface {
// when replaying events is not expected (i.e. in routers)
type ForgetfulEventCache struct {
lock sync.Mutex
index *uint64
index uint64
}

func NewForgetfulEventCache() *ForgetfulEventCache {
return &ForgetfulEventCache{}
}

func (cache *ForgetfulEventCache) SetCurrentIndex(index uint64) {
cache.lock.Lock()
defer cache.lock.Unlock()
cache.index = &index
cache.index = index
}

func (cache *ForgetfulEventCache) WhileLocked(callback func(uint64, bool)) {
Expand All @@ -82,16 +81,14 @@ func (cache *ForgetfulEventCache) Store(event *edge_ctrl_pb.DataState_ChangeSet,
return nil
}

if cache.index != nil {
if *cache.index >= event.Index {
return fmt.Errorf("out of order event detected, currentIndex: %d, receivedIndex: %d, type :%T", *cache.index, event.Index, cache)
}
if cache.index > 0 && cache.index >= event.Index {
return fmt.Errorf("out of order event detected, currentIndex: %d, receivedIndex: %d, type :%T", cache.index, event.Index, cache)
}

cache.index = &event.Index
cache.index = event.Index

if onSuccess != nil {
onSuccess(*cache.index, event)
onSuccess(cache.index, event)
}

return nil
Expand All @@ -102,18 +99,11 @@ func (cache *ForgetfulEventCache) ReplayFrom(_ uint64) ([]*edge_ctrl_pb.DataStat
}

func (cache *ForgetfulEventCache) CurrentIndex() (uint64, bool) {
cache.lock.Lock()
defer cache.lock.Unlock()

return cache.currentIndex()
}

func (cache *ForgetfulEventCache) currentIndex() (uint64, bool) {
if cache.index == nil {
return 0, false
}

return *cache.index, true
return atomic.LoadUint64(&cache.index), true
}

// LoggingEventCache stores events in order to support replaying (i.e. in controllers).
Expand Down Expand Up @@ -211,6 +201,12 @@ func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.D
_, eventFound := cache.Events[startIndex]

if !eventFound {
// if we're asked to replay an index we haven't reached yet, return an empty list
headIndex := cache.Log[cache.HeadLogIndex]
if headIndex < startIndex {
return nil, true
}

return nil, false
}

Expand All @@ -228,27 +224,22 @@ func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.D
return nil, false
}

// no replay
if *startLogIndex == cache.HeadLogIndex {
return nil, true
}

// ez replay
if *startLogIndex < cache.HeadLogIndex {
// replay, no loop required
if *startLogIndex <= cache.HeadLogIndex {
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:cache.HeadLogIndex] {
for _, key := range cache.Log[*startLogIndex : cache.HeadLogIndex+1] {
result = append(result, cache.Events[key])
}
return result, true
}

//looping replay
// looping replay
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:] {
result = append(result, cache.Events[key])
}

for _, key := range cache.Log[0:cache.HeadLogIndex] {
for _, key := range cache.Log[:cache.HeadLogIndex+1] {
result = append(result, cache.Events[key])
}

Expand Down
Loading

0 comments on commit 7b4ca1d

Please sign in to comment.