Skip to content

Commit

Permalink
core/rawdb: support freezer batch read with no size limit (ethereum#2…
Browse files Browse the repository at this point in the history
…7687)

This change adds the ability to perform reads from freezer without size limitation. This can be useful in cases where callers are certain that out-of-memory will not happen (e.g. reading only a few elements). 

The previous API was designed to behave both optimally and secure while servicing a request from a peer, whereas this change should _not_ be used when an untrusted peer can influence the query size.
  • Loading branch information
rjl493456442 authored Jul 12, 2023
1 parent cecd221 commit 0b1f97e
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 23 deletions.
7 changes: 4 additions & 3 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) {

// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present.
func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
Expand Down
33 changes: 16 additions & 17 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
if !t.noCompression {
decompressedSize, _ = snappy.DecodedLen(item)
}
if i > 0 && uint64(outputSize+decompressedSize) > maxBytes {
if i > 0 && maxBytes != 0 && uint64(outputSize+decompressedSize) > maxBytes {
break
}
if !t.noCompression {
Expand All @@ -730,8 +730,10 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
}

// retrieveItems reads up to 'count' items from the table. It reads at least
// one item, but otherwise avoids reading more than maxBytes bytes.
// It returns the (potentially compressed) data, and the sizes.
// one item, but otherwise avoids reading more than maxBytes bytes. Freezer
// will ignore the size limitation and continuously allocate memory to store
// data if maxBytes is 0. It returns the (potentially compressed) data, and
// the sizes.
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock()
defer t.lock.RUnlock()
Expand All @@ -752,25 +754,22 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
if start+count > items {
count = items - start
}
var (
output = make([]byte, maxBytes) // Buffer to read data into
outputSize int // Used size of that buffer
)
var output []byte // Buffer to read data into
if maxBytes != 0 {
output = make([]byte, 0, maxBytes)
} else {
output = make([]byte, 0, 1024) // initial buffer cap
}
// readData is a helper method to read a single data item from disk.
readData := func(fileId, start uint32, length int) error {
// In case a small limit is used, and the elements are large, may need to
// realloc the read-buffer when reading the first (and only) item.
if len(output) < length {
output = make([]byte, length)
}
output = grow(output, length)
dataFile, exist := t.files[fileId]
if !exist {
return fmt.Errorf("missing data file %d", fileId)
}
if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil {
if _, err := dataFile.ReadAt(output[len(output)-length:], int64(start)); err != nil {
return err
}
outputSize += length
return nil
}
// Read all the indexes in one go
Expand Down Expand Up @@ -801,7 +800,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
}
readStart = 0
}
if i > 0 && uint64(totalSize+size) > maxBytes {
if i > 0 && uint64(totalSize+size) > maxBytes && maxBytes != 0 {
// About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now.
if unreadSize > 0 {
Expand All @@ -815,7 +814,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
unreadSize += size
totalSize += size
sizes = append(sizes, size)
if i == len(indices)-2 || uint64(totalSize) > maxBytes {
if i == len(indices)-2 || (uint64(totalSize) > maxBytes && maxBytes != 0) {
// Last item, need to do the read now
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
Expand All @@ -826,7 +825,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i

// Update metrics.
t.readMeter.Mark(int64(totalSize))
return output[:outputSize], sizes, nil
return output, sizes, nil
}

// has returns an indicator whether the specified number data is still accessible
Expand Down
46 changes: 46 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,52 @@ func TestSequentialReadByteLimit(t *testing.T) {
}
}

// TestSequentialReadNoByteLimit tests the batch-read if maxBytes is not specified.
// Freezer should return the requested items regardless the size limitation.
func TestSequentialReadNoByteLimit(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-3-%d", rand.Uint64())
{ // Fill table
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false)
if err != nil {
t.Fatal(err)
}
// Write 10 bytes 30 times,
// Splitting it at every 100 bytes (10 items)
writeChunks(t, f, 30, 10)
f.Close()
}
for i, tc := range []struct {
items uint64
want int
}{
{1, 1},
{30, 30},
{31, 30},
} {
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false)
if err != nil {
t.Fatal(err)
}
items, err := f.RetrieveItems(0, tc.items, 0)
if err != nil {
t.Fatal(err)
}
if have, want := len(items), tc.want; have != want {
t.Fatalf("test %d: want %d items, have %d ", i, want, have)
}
for ii, have := range items {
want := getChunk(10, ii)
if !bytes.Equal(want, have) {
t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want)
}
}
f.Close()
}
}
}

func TestFreezerReadonly(t *testing.T) {
tmpdir := os.TempDir()
// Case 1: Check it fails on non-existent file.
Expand Down
16 changes: 16 additions & 0 deletions core/rawdb/freezer_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,19 @@ func truncateFreezerFile(file *os.File, size int64) error {
}
return nil
}

// grow prepares the slice space for new item, and doubles the slice capacity
// if space is not enough.
func grow(buf []byte, n int) []byte {
if cap(buf)-len(buf) < n {
newcap := 2 * cap(buf)
if newcap-len(buf) < n {
newcap = len(buf) + n
}
nbuf := make([]byte, len(buf), newcap)
copy(nbuf, buf)
buf = nbuf
}
buf = buf[:len(buf)+n]
return buf
}
7 changes: 4 additions & 3 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ type AncientReaderOp interface {

// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present
AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error)

// Ancients returns the ancient item numbers in the ancient store.
Expand Down

0 comments on commit 0b1f97e

Please sign in to comment.