Skip to content

Commit

Permalink
Merge pull request #583 from SiaFoundation/nate/increase-migrate-conc…
Browse files Browse the repository at this point in the history
…urrency

Increase volume concurrency
  • Loading branch information
n8maninger authored Jan 23, 2025
2 parents be43369 + 8d8d193 commit a92e0ca
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
default: patch
---

# Fixed a concurrency issue when migrating data from large volumes
4 changes: 2 additions & 2 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (s *Store) ExpireContractSectors(height uint64) error {
return nil
}
log.Debug("removed sectors", zap.Int("expired", expired), zap.Int("batch", i))
jitterSleep(time.Millisecond) // allow other transactions to run
jitterSleep(50 * time.Millisecond) // allow other transactions to run
}
}

Expand All @@ -537,7 +537,7 @@ func (s *Store) ExpireV2ContractSectors(height uint64) error {
return nil
}
log.Debug("removed sectors", zap.Int("expired", expired), zap.Int("batch", i))
jitterSleep(time.Millisecond) // allow other transactions to run
jitterSleep(50 * time.Millisecond) // allow other transactions to run
}
}

Expand Down
2 changes: 1 addition & 1 deletion persist/sqlite/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *Store) ExpireTempSectors(height uint64) error {
return nil
}
log.Debug("expired temp sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i))
jitterSleep(time.Millisecond) // allow other transactions to run
jitterSleep(50 * time.Millisecond) // allow other transactions to run
}
}

Expand Down
10 changes: 4 additions & 6 deletions persist/sqlite/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,13 @@ LIMIT 1;`
return nil
})
if err != nil {
err = fmt.Errorf("failed to migrated sector: %w", err)
err = fmt.Errorf("failed to migrate sector: %w", err)
return
} else if done {
return
}

if index%256 == 0 {
jitterSleep(time.Millisecond) // allow other transactions to run
}
// allow other transactions to run
jitterSleep(50 * time.Millisecond) // maximum of 48000 sectors per hour
}
}

Expand Down Expand Up @@ -338,7 +336,7 @@ func (s *Store) RemoveVolume(id int64, force bool) error {
} else if removed == 0 {
break
}
jitterSleep(time.Millisecond)
jitterSleep(50 * time.Millisecond)
}

return s.transaction(func(tx *txn) error {
Expand Down
82 changes: 82 additions & 0 deletions persist/sqlite/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"path/filepath"
"reflect"
"sync"
"testing"
"time"

"go.sia.tech/core/types"
"go.sia.tech/hostd/host/contracts"
"go.sia.tech/hostd/host/storage"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -414,6 +416,86 @@ func TestShrinkVolume(t *testing.T) {
}
}

func TestMigrateConcurrency(t *testing.T) {
const initialSectors = 256 * 100 // 100GiB
log := zap.NewNop()
db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log)
if err != nil {
t.Fatal(err)
}
defer db.Close()

v1, err := addTestVolume(db, "test", initialSectors)
if err != nil {
t.Fatal(err)
}

// fill the volume
for i := 0; i < initialSectors; i++ {
root := frand.Entropy256()
if err := db.StoreSector(root, func(_ storage.SectorLocation) error { return nil }); err != nil {
t.Fatal(err)
} else if err := db.AddTempSector(root, 100); err != nil {
t.Fatal(err)
}
}

v1, err = db.Volume(v1.ID)
if err != nil {
t.Fatal(err)
} else if v1.TotalSectors != v1.UsedSectors {
t.Fatalf("expected v1 to be full")
}

// add a secondary volume to accept the sectors
_, err = addTestVolume(db, "test2", initialSectors*2)
if err != nil {
t.Fatal(err)
}

if err := db.SetReadOnly(v1.ID, true); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// prevent database close panic by waiting until migration has stopped or completed
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := db.MigrateSectors(ctx, v1.ID, 0, func(from, to storage.SectorLocation) error {
time.Sleep(100 * time.Millisecond)
return nil
}) // simulate disk i/o
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
}
}()

// fill the second volume
for i := 0; i < initialSectors; i++ {
root := types.Hash256(frand.Entropy256())
if err := db.StoreSector(root, func(_ storage.SectorLocation) error { return nil }); err != nil {
t.Fatal(err)
} else if err := db.AddTempSector(root, 100); err != nil {
t.Fatal(err)

Check failure on line 483 in persist/sqlite/volumes_test.go

View workflow job for this annotation

GitHub Actions / publish / Docker

Test go.sia.tech/hostd/persist/sqlite/TestMigrateConcurrency failed in 35.35s

volumes_test.go:483: transaction failed (attempt 10): failed to update metric: failed to insert stat: database is locked

Check failure on line 483 in persist/sqlite/volumes_test.go

View workflow job for this annotation

GitHub Actions / publish / Build macOS (amd64)

Test go.sia.tech/hostd/persist/sqlite/TestMigrateConcurrency failed in 27.7s

volumes_test.go:483: transaction failed (attempt 10): failed to update metric: failed to insert stat: database is locked
}
log.Debug("stored sector", zap.Stringer("root", root))
}

cancel()
wg.Wait()

v1, err = db.Volume(v1.ID)
if err != nil {
t.Fatal(err)
} else if v1.TotalSectors == v1.UsedSectors {
t.Fatalf("expected v1 to have migrated sectors")
}
}

func TestRemoveVolume(t *testing.T) {
const initialSectors = 128
log := zaptest.NewLogger(t)
Expand Down

0 comments on commit a92e0ca

Please sign in to comment.