Skip to content

Commit

Permalink
go/runtime/registry: Cleanup bundles for versions lower then active
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Jan 17, 2025
1 parent ce3f6fc commit c405417
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 1 deletion.
1 change: 1 addition & 0 deletions .changelog/5737.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime/bundle: Cleanup bundles on runtime upgrade
66 changes: 66 additions & 0 deletions go/runtime/bundle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/config"
)

Expand Down Expand Up @@ -45,6 +46,13 @@ type ManifestStore interface {
// AddManifest adds the provided manifest, whose components were extracted
// to the specified directory, to the store.
AddManifest(manifest *Manifest, dir string) error

// RemoveManifests removes all regular manifests for the specified runtimeID
// whose versions are lower than the provided active version.
//
// It returns a list of directories for the removed exploded bundles or an error
// if the operation fails.
RemoveManifests(runtimeID common.Namespace, active version.Version) ([]string, error)
}

// Manager is responsible for managing bundles.
Expand All @@ -63,6 +71,8 @@ type Manager struct {

downloadCh chan struct{}
downloadQueue map[common.Namespace][]hash.Hash
cleanupCh chan struct{}
cleanupQueue map[common.Namespace]version.Version

client *http.Client
store ManifestStore
Expand Down Expand Up @@ -120,6 +130,8 @@ func NewManager(dataDir string, runtimeIDs []common.Namespace, store ManifestSto
runtimeBaseURLs: runtimeBaseURLs,
downloadCh: make(chan struct{}, 1),
downloadQueue: make(map[common.Namespace][]hash.Hash),
cleanupCh: make(chan struct{}, 1),
cleanupQueue: make(map[common.Namespace]version.Version),
client: &client,
store: store,
logger: *logger,
Expand Down Expand Up @@ -192,6 +204,9 @@ func (m *Manager) run(ctx context.Context) {
select {
case <-ticker.C:
case <-m.downloadCh:
case <-m.cleanupCh:
m.CleanStaleBundles()
continue
case <-ctx.Done():
m.logger.Info("stopping")
return
Expand Down Expand Up @@ -242,13 +257,38 @@ func (m *Manager) Queue(runtimeID common.Namespace, manifestHashes []hash.Hash)
}
}

// QueueCleanup updates the active version for the runtime ID,
// triggering clean-up of regular bundles for version lower than active.
func (m *Manager) QueueCleanup(runtimeID common.Namespace, active version.Version) {
// Update the active versions.
m.mu.Lock()
defer m.mu.Unlock()

m.cleanupQueue[runtimeID] = active

// Trigger immediate clean-up of stale bundles.
select {
case m.cleanupCh <- struct{}{}:
default:
}
}

// Download tries to download bundles in the queue.
func (m *Manager) Download() {
for runtimeID := range m.runtimeIDs {
m.downloadBundles(runtimeID)
}
}

// CleanStaleBundles removes outdated manifest hashes and deletes corresponding
// exploded bundles for runtimes in the clean-up queue.
func (m *Manager) CleanStaleBundles() {
m.logger.Info("removing regular bundles with version less than active")
for runtimeID := range m.runtimeIDs {
m.cleanStaleBundles(runtimeID)
}
}

func (m *Manager) downloadBundles(runtimeID common.Namespace) {
// Try to download queued bundles.
m.mu.RLock()
Expand Down Expand Up @@ -436,6 +476,32 @@ func (m *Manager) fetchBundle(url string) (string, error) {
return file.Name(), nil
}

func (m *Manager) cleanStaleBundles(runtimeID common.Namespace) {
m.mu.Lock()
active, ok := m.cleanupQueue[runtimeID]
m.mu.Unlock()

if !ok {
return
}

// TODO should you also remove dirs for manifests successfully removed,
// even if error?
dirs, err := m.store.RemoveManifests(runtimeID, active)
if err != nil {
m.logger.Error("failed to remove regular manifests from the registry",
"runtimeID", runtimeID,
"version", active,
"err", err,
)
return
}

for _, dir := range dirs {
m.removeBundle(dir)
}
}

func (m *Manager) loadManifests() (map[string]*Manifest, error) {
m.logger.Info("loading manifests")

Expand Down
6 changes: 6 additions & 0 deletions go/runtime/bundle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/version"
)

type mockStore struct {
Expand All @@ -28,6 +30,10 @@ func (r *mockStore) AddManifest(manifest *Manifest, _ string) error {
return nil
}

func (r *mockStore) RemoveManifests(_ common.Namespace, _ version.Version) ([]string, error) {
panic("RemoveManifests not implemented")
}

func TestRegisterManifest(t *testing.T) {
store := newMockStore()
manager, err := NewManager("", nil, store)
Expand Down
56 changes: 56 additions & 0 deletions go/runtime/bundle/registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bundle

import (
"errors"
"fmt"
"maps"
"slices"
Expand Down Expand Up @@ -152,6 +153,61 @@ func (r *Registry) AddManifest(manifest *Manifest, dir string) error {
return nil
}

// RemoveManifests removes all regular manifests for the specified runtimeID
// whose versions are lower than the provided active version.
//
// It returns a list of directories for the removed exploded bundles or an error
// if the operation fails.
func (r *Registry) RemoveManifests(runtimeID common.Namespace, active version.Version) ([]string, error) {
r.logger.Info("removing manifests with version lower then active",
"id", runtimeID,
"active", active,
)

var stale []string
for _, version := range r.GetVersions(runtimeID) {
if !version.Less(active) {
continue
}

dir, err := r.removeManifest(runtimeID, version)
if err != nil {
return nil, err
}
stale = append(stale, dir)
}
return stale, nil
}

func (r *Registry) removeManifest(runtimeID common.Namespace, version version.Version) (string, error) {
r.mu.Lock()
defer r.mu.Unlock()
var explDir string
manifest, ok := r.regularManifests[runtimeID][version]
if !ok {
return explDir, fmt.Errorf("missing regular manifest for runtime ID %v with %s", runtimeID, version.String())
}
explComp := r.components[runtimeID][component.ID_RONL][version]
if explComp == nil {
return explDir, errors.New("components missing RONL component")
}
explDir = explComp.ExplodedDataDir
manifestHash := manifest.Hash()

r.logger.Info("Removing (regular) manifest from registry",
"id", runtimeID,
"version", version,
"manifest_hash", manifestHash,
)

delete(r.regularManifests[runtimeID], version)
delete(r.manifestHashes, manifestHash)
for _, c := range manifest.Components {
delete(r.components[runtimeID][c.ID()], c.Version)
}
return explDir, nil
}

// GetVersions returns versions for the given runtime, sorted in ascending
// order.
func (r *Registry) GetVersions(runtimeID common.Namespace) []version.Version {
Expand Down
17 changes: 16 additions & 1 deletion go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,26 @@ func (r *runtime) run(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-epoCh:
case epoch := <-epoCh:
if up := r.updateActiveDescriptor(ctx); up && !activeInitialized {
close(r.activeDescriptorCh)
activeInitialized = true
}

// Trigger clean-up for bundles less than active version.
r.RLock()
rt := r.activeDescriptor
r.RUnlock()
if rt == nil {
continue
}

active := rt.ActiveDeployment(epoch)
if active == nil {
continue
}

r.bundleManager.QueueCleanup(rt.ID, active.Version)
case rt := <-regCh:
if !rt.ID.Equal(&r.id) {
continue
Expand Down

0 comments on commit c405417

Please sign in to comment.