Skip to content

Commit

Permalink
Fixup: Bundle clean-up
Browse files Browse the repository at this point in the history
Personally, I think it would be simpler to use `GetVersions` from
the registry.

Bundle registry is a dependency for both runtime registry and
manager (which will be later made a worker?). I find this
unnecessary abstraction (store interface), but maybe there is
a good reason for it?
  • Loading branch information
martintomazic committed Jan 23, 2025
1 parent 4135478 commit 7f1fb9b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 81 deletions.
81 changes: 51 additions & 30 deletions go/runtime/bundle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/config"
"github.com/oasisprotocol/oasis-core/go/runtime/bundle/component"
)

const (
Expand Down Expand Up @@ -48,11 +49,11 @@ type ManifestStore interface {
// to the specified directory, to the store.
AddManifest(manifest *Manifest, dir string) error

// RemoveManifests removes all regular manifests for the specified runtimeID
// whose versions are lower than the provided version.
//
// Returns a list of directories for the removed exploded bundles.
RemoveManifests(runtimeID common.Namespace, version version.Version) ([]string, error)
// RemoveManifest removes a manifest with provided hash.
RemoveManifest(hash hash.Hash) bool

// Manifests returns all known manifests (regular and detached).
Manifests() []ExplodedManifest
}

// Manager is responsible for managing bundles.
Expand All @@ -69,9 +70,9 @@ type Manager struct {
runtimeBaseURLs map[common.Namespace][]string
globalBaseURLs []string

downloadAndCleanCh chan struct{}
downloadQueue map[common.Namespace][]hash.Hash
cleanupQueue map[common.Namespace]version.Version
triggerCh chan struct{}
downloadQueue map[common.Namespace][]hash.Hash
cleanupQueue map[common.Namespace]version.Version

client *http.Client
store ManifestStore
Expand Down Expand Up @@ -127,7 +128,7 @@ func NewManager(dataDir string, runtimeIDs []common.Namespace, store ManifestSto
runtimeIDs: runtimes,
globalBaseURLs: globalBaseURLs,
runtimeBaseURLs: runtimeBaseURLs,
downloadAndCleanCh: make(chan struct{}, 1),
triggerCh: make(chan struct{}, 1),
downloadQueue: make(map[common.Namespace][]hash.Hash),
cleanupQueue: make(map[common.Namespace]version.Version),
client: &client,
Expand Down Expand Up @@ -201,7 +202,7 @@ func (m *Manager) run(ctx context.Context) {
for {
select {
case <-ticker.C:
case <-m.downloadAndCleanCh:
case <-m.triggerCh:
case <-ctx.Done():
m.logger.Info("stopping")
return
Expand Down Expand Up @@ -248,23 +249,22 @@ func (m *Manager) Download(runtimeID common.Namespace, manifestHashes []hash.Has

// Trigger immediate download and clean-up of stale bundles.
select {
case m.downloadAndCleanCh <- struct{}{}:
case m.triggerCh <- struct{}{}:
default:
}
}

// Cleanup initiates the clean-up of regular bundles for the specified runtime,
// removing versions lower than the specified one.
func (m *Manager) Cleanup(runtimeID common.Namespace, active version.Version) {
// Update the active versions.
func (m *Manager) Cleanup(runtimeID common.Namespace, version version.Version) {
m.mu.Lock()
defer m.mu.Unlock()

m.cleanupQueue[runtimeID] = active
m.cleanupQueue[runtimeID] = version

// Trigger immediate download and clean-up of stale bundles.
select {
case m.downloadAndCleanCh <- struct{}{}:
case m.triggerCh <- struct{}{}:
default:
}
}
Expand All @@ -276,10 +276,10 @@ func (m *Manager) download() {
}
}

// Clean removes outdated manifest hashes and deletes corresponding
// Clean removes outdated manifest and deletes corresponding
// exploded bundles for runtimes in the clean-up queue.
func (m *Manager) Clean() {
m.logger.Info("removing regular bundles")
m.logger.Info("cleaning regular bundles")
for runtimeID := range m.runtimeIDs {
m.cleanStaleBundles(runtimeID)
}
Expand Down Expand Up @@ -474,28 +474,49 @@ func (m *Manager) fetchBundle(url string) (string, error) {

func (m *Manager) cleanStaleBundles(runtimeID common.Namespace) {
m.mu.Lock()
active, ok := m.cleanupQueue[runtimeID]
maxVersion, ok := m.cleanupQueue[runtimeID]
m.mu.Unlock()

if !ok {
return
}

// TODO should you also remove dirs for manifests successfully removed,
// even if error?
dirs, err := m.store.RemoveManifests(runtimeID, active)
if err != nil {
m.logger.Error("failed to remove regular manifests from the registry",
"runtimeID", runtimeID,
"version", active,
"err", err,
)
return
m.logger.Info("starting clean-up of stale bundles",
"id", runtimeID,
"max_version", maxVersion,
)

for _, manifest := range m.store.Manifests() {
if manifest.ID != runtimeID {
continue
}

if manifest.IsDetached() {
continue
}

if ronl := manifest.GetComponentByID(component.ID_RONL); ronl.Version.Less(maxVersion) {
continue
}

err := m.cleanStaleBundle(manifest)
if err != nil {
m.logger.Error("failed to clean stale bundle",
"err", err,
)
}
}
}

func (m *Manager) cleanStaleBundle(manifest ExplodedManifest) error {
if ok := m.store.RemoveManifest(manifest.Hash()); !ok {
return fmt.Errorf("failed to remove manifest with hash %s from the store", manifest.Hash().Hex())
}

for _, dir := range dirs {
_ = m.removeBundle(dir)
if err := m.removeBundle(manifest.ExplodedDataDir); err != nil {
return err
}
return nil
}

func (m *Manager) loadManifests() (map[string]*Manifest, error) {
Expand Down
10 changes: 6 additions & 4 deletions go/runtime/bundle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/version"
)

type mockStore struct {
Expand All @@ -30,8 +28,12 @@ func (r *mockStore) AddManifest(manifest *Manifest, _ string) error {
return nil
}

func (r *mockStore) RemoveManifests(_ common.Namespace, _ version.Version) ([]string, error) {
panic("RemoveManifests not implemented")
func (r *mockStore) Manifests() []ExplodedManifest {
panic("not implemented")
}

func (r *mockStore) RemoveManifest(_ hash.Hash) bool {
panic("not implemented")
}

func TestRegisterManifest(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions go/runtime/bundle/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ const (
manifestName = manifestPath + "/MANIFEST.MF"
)

// ExplodedManifest is manifest with corresponding exploded bundle dir.
type ExplodedManifest struct {
*Manifest

// ExplodedDataDir is the path to the data directory where the bundle
// represented by manifest has been extracted.
ExplodedDataDir string
}

// Manifest is a deserialized runtime bundle manifest.
type Manifest struct {
// Name is the optional human readable runtime name.
Expand Down
70 changes: 23 additions & 47 deletions go/runtime/bundle/registry.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bundle

import (
"errors"
"fmt"
"maps"
"slices"
Expand All @@ -24,7 +23,7 @@ import (
type Registry struct {
mu sync.RWMutex

manifestHashes map[hash.Hash]struct{}
manifests map[hash.Hash]ExplodedManifest
regularManifests map[common.Namespace]map[version.Version]*Manifest
components map[common.Namespace]map[component.ID]map[version.Version]*ExplodedComponent
notifiers map[common.Namespace]*pubsub.Broker
Expand All @@ -37,7 +36,7 @@ func NewRegistry() *Registry {
logger := logging.GetLogger("runtime/bundle/registry")

return &Registry{
manifestHashes: make(map[hash.Hash]struct{}),
manifests: make(map[hash.Hash]ExplodedManifest),
regularManifests: make(map[common.Namespace]map[version.Version]*Manifest),
components: make(map[common.Namespace]map[component.ID]map[version.Version]*ExplodedComponent),
notifiers: make(map[common.Namespace]*pubsub.Broker),
Expand All @@ -51,7 +50,7 @@ func (r *Registry) HasManifest(hash hash.Hash) bool {
r.mu.RLock()
defer r.mu.RUnlock()

_, ok := r.manifestHashes[hash]
_, ok := r.manifests[hash]
return ok
}

Expand All @@ -69,7 +68,7 @@ func (r *Registry) AddManifest(manifest *Manifest, dir string) error {
)

// Skip already processed manifests.
if _, ok := r.manifestHashes[manifestHash]; ok {
if _, ok := r.manifests[manifestHash]; ok {
return nil
}

Expand Down Expand Up @@ -143,7 +142,7 @@ func (r *Registry) AddManifest(manifest *Manifest, dir string) error {
}

// Remember which manifests were added.
r.manifestHashes[manifestHash] = struct{}{}
r.manifests[manifestHash] = ExplodedManifest{Manifest: manifest, ExplodedDataDir: dir}

r.logger.Info("manifest added",
"name", manifest.Name,
Expand All @@ -153,58 +152,35 @@ func (r *Registry) AddManifest(manifest *Manifest, dir string) error {
return nil
}

// RemoveManifests removes all regular manifests for the specified runtime ID
// whose versions are lower than the provided version.
//
// Returns a list of directories for the removed exploded bundles.
func (r *Registry) RemoveManifests(runtimeID common.Namespace, version version.Version) ([]string, error) {
r.logger.Info("removing manifests with versions lower then provided",
"id", runtimeID,
"version", version,
)

var stale []string
for _, v := range r.GetVersions(runtimeID) {
if !v.Less(version) {
continue
}

dir, err := r.removeManifest(runtimeID, v)
if err != nil {
return nil, err
}
stale = append(stale, dir)
}
return stale, nil
// Manifests returns all known manifests (regular and detached).
func (r *Registry) Manifests() []ExplodedManifest {
r.mu.RLock()
defer r.mu.RUnlock()
return slices.Collect(maps.Values(r.manifests))
}

func (r *Registry) removeManifest(runtimeID common.Namespace, version version.Version) (string, error) {
// RemoveManifest removes a manifest with provided hash.
func (r *Registry) RemoveManifest(hash hash.Hash) bool {
r.mu.Lock()
defer r.mu.Unlock()
var explDir string
manifest, ok := r.regularManifests[runtimeID][version]
explManifest, ok := r.manifests[hash]
if !ok {
return explDir, fmt.Errorf("missing regular manifest for runtime ID %v with %s", runtimeID, version.String())
}
explComp := r.components[runtimeID][component.ID_RONL][version]
if explComp == nil {
return explDir, errors.New("components missing RONL component")
return false
}
explDir = explComp.ExplodedDataDir
manifestHash := manifest.Hash()

r.logger.Info("Removing (regular) manifest from registry",
runtimeID := explManifest.Manifest.ID
r.logger.Info("Removing manifest from registry",
"id", runtimeID,
"version", version,
"manifest_hash", manifestHash,
"manifest_hash", hash,
)

delete(r.regularManifests[runtimeID], version)
delete(r.manifestHashes, manifestHash)
for _, c := range manifest.Components {
if ronl := explManifest.GetComponentByID(component.ID_RONL); ronl != nil {
delete(r.regularManifests[runtimeID], ronl.Version)
}
delete(r.manifests, hash)
for _, c := range explManifest.Manifest.Components {
delete(r.components[runtimeID][c.ID()], c.Version)
}
return explDir, nil
return true
}

// GetVersions returns versions for the given runtime, sorted in ascending
Expand Down

0 comments on commit 7f1fb9b

Please sign in to comment.