Skip to content

Commit

Permalink
Consolidate fabric and edge persistence code. Fixes #1555
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Dec 6, 2023
1 parent 2fae394 commit 104eebc
Show file tree
Hide file tree
Showing 172 changed files with 1,693 additions and 1,873 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.DS_Store
release/
db/
/db/
# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/openziti/storage/ast"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/kataras/go-events"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/change"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/change"
"go.etcd.io/bbolt"
"strings"
"time"
Expand Down Expand Up @@ -132,10 +131,10 @@ func (store *apiSessionStoreImpl) GetEventsEmitter() events.EventEmmiter {
return store.eventsEmitter
}

func (store *apiSessionStoreImpl) onEventualDelete(name string, apiSessionId []byte) {
func (store *apiSessionStoreImpl) onEventualDelete(db boltz.Db, name string, apiSessionId []byte) {
idCollector := &sessionIdCollector{}
indexPath := []string{db.RootBucket, boltz.IndexesBucket, EntityTypeApiSessions, EntityTypeSessions}
err := store.stores.DbProvider.GetDb().View(func(tx *bbolt.Tx) error {
indexPath := []string{RootBucket, boltz.IndexesBucket, EntityTypeApiSessions, EntityTypeSessions}
err := db.View(func(tx *bbolt.Tx) error {
path := append(indexPath, string(apiSessionId))
if bucket := boltz.Path(tx, path...); bucket != nil {
boltz.Traverse(bucket.Bucket, "/"+strings.Join(path, "/"), idCollector)
Expand All @@ -152,7 +151,7 @@ func (store *apiSessionStoreImpl) onEventualDelete(name string, apiSessionId []b

for _, id := range idCollector.ids {
changeContext := change.New().SetSourceType("events.emitter").SetChangeAuthorType(change.AuthorTypeController)
err = store.stores.DbProvider.GetDb().Update(changeContext.NewMutateContext(), func(ctx boltz.MutateContext) error {
err = db.Update(changeContext.NewMutateContext(), func(ctx boltz.MutateContext) error {
if err := store.stores.session.DeleteById(ctx, id); err != nil {
if boltz.IsErrNotFoundErr(err) {
return nil
Expand All @@ -172,7 +171,7 @@ func (store *apiSessionStoreImpl) onEventualDelete(name string, apiSessionId []b
}

changeContext := change.New().SetSourceType("events.emitter").SetChangeAuthorType(change.AuthorTypeController)
err = store.stores.DbProvider.GetDb().Update(changeContext.NewMutateContext(), func(ctx boltz.MutateContext) error {
err = db.Update(changeContext.NewMutateContext(), func(ctx boltz.MutateContext) error {
if bucket := boltz.Path(ctx.Tx(), indexPath...); bucket != nil {
if err := bucket.DeleteBucket(apiSessionId); err != nil {
if err != bbolt.ErrBucketNotFound {
Expand Down Expand Up @@ -264,7 +263,7 @@ func (store *apiSessionStoreImpl) LoadOneByToken(tx *bbolt.Tx, token string) (*A

func (store *apiSessionStoreImpl) GetCachedSessionId(tx *bbolt.Tx, apiSessionId, sessionType, serviceId string) *string {
bucket := boltz.Path(tx,
db.RootBucket, boltz.IndexesBucket,
RootBucket, boltz.IndexesBucket,
EntityTypeApiSessions, EntityTypeSessions,
apiSessionId, sessionType,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
limitations under the License.
*/

package persistence
package db

import (
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/change"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/storage/boltz"
"github.com/openziti/storage/boltztest"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/change"
"go.etcd.io/bbolt"
"testing"
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/openziti/storage/ast"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package persistence
package db

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/openziti/storage/boltz"
Expand Down
125 changes: 120 additions & 5 deletions controller/db/base_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,129 @@
package db

import (
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"go.etcd.io/bbolt"
"strings"
)

const (
FieldName = "name"
)
type initializableStore interface {
boltz.Store
initializeLocal()
initializeLinked()
initializeIndexes(tx *bbolt.Tx, errorHolder errorz.ErrorHolder)
}

type Store[E boltz.ExtEntity] interface {
boltz.EntityStore[E]
initializableStore
LoadOneById(tx *bbolt.Tx, id string) (E, error)
}

type baseStore[T boltz.ExtEntity] struct {
type baseStore[E boltz.ExtEntity] struct {
stores *stores
*boltz.BaseStore[T]
*boltz.BaseStore[E]
}

func (store *baseStore[E]) addUniqueNameField() boltz.ReadIndex {
symbolName := store.AddSymbol(FieldName, ast.NodeTypeString)
return store.AddUniqueIndex(symbolName)
}

func (store *baseStore[E]) initializeIndexes(tx *bbolt.Tx, errorHolder errorz.ErrorHolder) {
store.InitializeIndexes(tx, errorHolder)
}

func (store *baseStore[E]) LoadOneById(tx *bbolt.Tx, id string) (E, error) {
entity := store.NewStoreEntity()
if err := store.baseLoadOneById(tx, id, entity); err != nil {
return *new(E), err
}
return entity, nil
}

func (store *baseStore[E]) baseLoadOneById(tx *bbolt.Tx, id string, entity E) error {
found, err := store.LoadEntity(tx, id, entity)
if err != nil {
return err
}
if !found {
return boltz.NewNotFoundError(store.GetSingularEntityType(), "id", id)
}
return nil
}

func (store *baseStore[E]) deleteEntityReferences(tx *bbolt.Tx, entity boltz.NamedExtEntity, rolesSymbol boltz.EntitySetSymbol) error {
idRef := entityRef(entity.GetId())

for _, policyHolderId := range store.GetRelatedEntitiesIdList(tx, entity.GetId(), rolesSymbol.GetStore().GetEntityType()) {
err := rolesSymbol.Map(tx, []byte(policyHolderId), func(ctx *boltz.MapContext) {
if ctx.ValueS() == idRef {
ctx.Delete()
}
})
if err != nil {
return err
}
}
return nil
}

func (store *baseStore[E]) getParentBucket(entity boltz.Entity, childBucket *boltz.TypedBucket) *boltz.TypedBucket {
parentBucket := store.GetParentStore().GetEntityBucket(childBucket.Tx(), []byte(entity.GetId()))
parentBucket.ErrorHolderImpl = childBucket.ErrorHolderImpl
return parentBucket
}

type NameIndexed interface {
GetNameIndex() boltz.ReadIndex
}

func (store *baseStore[E]) GetName(tx *bbolt.Tx, id string) *string {
symbol := store.GetSymbol(FieldName)
if symbol == nil {
return nil
}
_, val := symbol.Eval(tx, []byte(id))
if val != nil {
result := string(val)
return &result
}
return nil
}

func (store *baseStore[E]) getRoleAttributesCursorProvider(index boltz.SetReadIndex, values []string, semantic string) (ast.SetCursorProvider, error) {
if semantic == "" {
semantic = SemanticAllOf
}

if !isSemanticValid(semantic) {
return nil, errorz.NewFieldError("invalid semantic", FieldSemantic, semantic)
}

roles, ids, err := splitRolesAndIds(values)
if err != nil {
return nil, err
}

return func(tx *bbolt.Tx, forward bool) ast.SetCursor {
validIds := ast.NewTreeSet(forward)
for _, id := range ids {
if store.IsEntityPresent(tx, id) {
validIds.Add([]byte(id))
}
}

var rolesCursor ast.SetCursor
if strings.EqualFold(semantic, SemanticAllOf) {
rolesCursor = store.IteratorMatchingAllOf(index, roles)(tx, forward)
} else {
rolesCursor = store.IteratorMatchingAnyOf(index, roles)(tx, forward)
}
if validIds.Size() == 0 {
return rolesCursor
}
return ast.NewUnionSetCursor(rolesCursor, validIds.ToCursor(), forward)
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/openziti/storage/ast"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
limitations under the License.
*/

package persistence
package db

import (
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/eid"
"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -88,7 +87,7 @@ func (store *configStoreImpl) initializeLocal() {
store.indexName = store.addUniqueNameField()
store.symbolType = store.AddFkSymbol(FieldConfigType, store.stores.configType)
store.AddMapSymbol(FieldConfigData, ast.NodeTypeAnyType, FieldConfigData)
store.symbolServices = store.AddFkSetSymbol(db.EntityTypeServices, store.stores.edgeService)
store.symbolServices = store.AddFkSetSymbol(EntityTypeServices, store.stores.edgeService)
store.symbolIdentityServices = store.AddSetSymbol(FieldConfigIdentityService, ast.NodeTypeOther)
store.identityServicesLinks = &boltz.LinkedSetSymbol{EntitySymbol: store.symbolIdentityServices}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
limitations under the License.
*/

package persistence
package db

import (
"encoding/json"
"fmt"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/storage/boltztest"
"github.com/openziti/ziti/common/eid"
"go.etcd.io/bbolt"
"testing"
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
limitations under the License.
*/

package persistence
package db

import (
"encoding/json"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/eid"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
limitations under the License.
*/

package persistence
package db

import (
"fmt"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/storage/boltztest"
"github.com/openziti/ziti/common/eid"
"go.etcd.io/bbolt"
"testing"
"time"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package persistence
package db

import (
"fmt"
"github.com/openziti/ziti/common/eid"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/eid"
"sort"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func (store *edgeRouterPolicyStoreImpl) initializeLocal() {
store.symbolIdentityRoles = store.AddPublicSetSymbol(FieldIdentityRoles, ast.NodeTypeString)
store.symbolEdgeRouterRoles = store.AddPublicSetSymbol(FieldEdgeRouterRoles, ast.NodeTypeString)
store.symbolIdentities = store.AddFkSetSymbol(EntityTypeIdentities, store.stores.identity)
store.symbolEdgeRouters = store.AddFkSetSymbol(db.EntityTypeRouters, store.stores.edgeRouter)
store.symbolEdgeRouters = store.AddFkSetSymbol(EntityTypeRouters, store.stores.edgeRouter)

store.AddConstraint(boltz.NewSystemEntityEnforcementConstraint(store))
}
Expand Down
Loading

0 comments on commit 104eebc

Please sign in to comment.