Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ETAG based validation on every block download to provide higher consistency #1608

Open
wants to merge 25 commits into
base: blobfuse/2.4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b189ca0
Add strong consistency check for data on disk
vibhansa-msft Jan 6, 2025
ae4abc0
Updating changelog
vibhansa-msft Jan 6, 2025
5f167cf
Updating readme
vibhansa-msft Jan 8, 2025
1799048
Adding etag support
vibhansa-msft Jan 9, 2025
7611be6
Adding parsing logic for ETAG in list and getAttr api
vibhansa-msft Jan 9, 2025
2572cc2
Correcting etag validation check
vibhansa-msft Jan 9, 2025
49edfa3
Migrating from md5 to crc64
vibhansa-msft Jan 13, 2025
7649ed3
Merge branch 'vibhansa/diskcrc' into vibhansa/etagvalidation
vibhansa-msft Jan 15, 2025
b6e4195
Adding etag based validation on strong consistency check only
vibhansa-msft Jan 15, 2025
3492ddd
Sync with feature branch
vibhansa-msft Jan 15, 2025
46a3b1b
Merge branch 'blobfuse/2.4.1' into vibhansa/etagvalidation
vibhansa-msft Jan 15, 2025
c1941c3
Sync with feature branch
vibhansa-msft Jan 16, 2025
df864c9
Adding UT for readInBufferWithEtag
vibhansa-msft Jan 17, 2025
37410a2
renamed variable
jainakanksha-msft Jan 20, 2025
813f2f5
renamed variables
jainakanksha-msft Jan 20, 2025
dcbf997
Merge branch 'blobfuse/2.4.1' into vibhansa/etagvalidation
vibhansa-msft Jan 20, 2025
791e16c
Adding UT for etag mismatch case
vibhansa-msft Jan 20, 2025
db4ccf3
Update as per review comments'
vibhansa-msft Jan 20, 2025
63c6425
Renamed variables
jainakanksha-msft Jan 20, 2025
4f746b0
Make ReadInStream default option for readin buffer api
vibhansa-msft Jan 21, 2025
161906b
Make etag validation a defualt option
vibhansa-msft Jan 21, 2025
96fb025
Remove quotes from etag before passing it down as some api return bac…
vibhansa-msft Jan 21, 2025
5e5bd2f
Update etag when data is flushed
vibhansa-msft Jan 21, 2025
51f625f
Correcting changelog
vibhansa-msft Jan 21, 2025
ebaa7d5
Correcting etag population
vibhansa-msft Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

**Features**
- Mount container or directory but restrict the view of blobs that you can see. This feature is available only in read-only mount.
- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To protect against accidental overwrites on data stored by block-cache on temp path, crc64 hash will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To provide strong consistency check, ETAG of the file will be preserved on open. For any subsequent block download, with block-cache, ETAG will be verified and if the blob has changed in container the download will be declare failure resulting into read failure. This feature will be enabled with above cli option. Disable `attr_cache` when you want this feature to work with latest contents updated on blob.
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved

## 2.4.0 (2024-12-03)
**Features**
Expand Down
14 changes: 11 additions & 3 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,17 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
return 0, nil
}

err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data)
if err != nil {
log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error())
if options.Etag != nil {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
etag, err := az.storage.ReadInBufferWithETag(options.Handle.Path, options.Offset, dataLen, options.Data)
if err != nil {
log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error())
}
*(options.Etag) = etag
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
} else {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data)
if err != nil {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error())
}
}

length = int(dataLen)
Expand Down
53 changes: 53 additions & 0 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -455,6 +456,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
ETag: string(*prop.ETag),
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -663,6 +665,7 @@ func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAtt
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
ETag: (string)(*blobInfo.Properties.ETag),
}

parseMetadata(attr, blobInfo.Metadata)
Expand Down Expand Up @@ -918,6 +921,56 @@ func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []b
return nil
}

// ReadInBufferWithETag : Download specific range from a file to a user provided buffer, return back the current etag along with data
func (bb *BlockBlob) ReadInBufferWithETag(name string, offset int64, len int64, data []byte) (string, error) {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
// log.Trace("BlockBlob::ReadInBuffer : name %s", name)
blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name))
ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute)
defer cancel()

opt := &blob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: offset,
Count: len,
},
CPKInfo: bb.blobCPKOpt,
}

dr, err := blobClient.DownloadStream(ctx, opt)
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
e := storeBlobErrToErr(err)
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
if e == ErrFileNotFound {
return "", syscall.ENOENT
} else if e == InvalidRange {
return "", syscall.ERANGE
}

log.Err("BlockBlob::ReadInBuffer : Failed to download blob %s [%s]", name, err.Error())
return "", err
}

var streamBody io.ReadCloser = dr.NewRetryReader(ctx, nil)
dataRead, err := io.ReadFull(streamBody, data)

if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s [%s]", name, err.Error())
return "", err
}

if dataRead < 0 {
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s", name)
return "", errors.New("failed to copy data from body to buffer")
}

err = streamBody.Close()
if err != nil {
log.Err("BlockBlob::ReadInBuffer : Failed to close body for blob %s [%s]", name, err.Error())
}

return string(*dr.ETag), nil
}

func (bb *BlockBlob) calculateBlockSize(name string, fileSize int64) (blockSize int64, err error) {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if fileSize > MaxBlobSize {
Expand Down
19 changes: 19 additions & 0 deletions component/azstorage/block_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,25 @@ func (s *blockBlobTestSuite) TestReadInBuffer() {
s.assert.EqualValues(testData[:5], output)
}

func (s *blockBlobTestSuite) TestReadInBufferWithETAG() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})

output := make([]byte, 5)
var etag string
len, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: output, Etag: &etag})
s.assert.Nil(err)
s.assert.NotEqual(etag, "")
s.assert.EqualValues(5, len)
s.assert.EqualValues(testData[:5], output)
}

func (s *blockBlobTestSuite) TestReadInBufferLargeBuffer() {
defer s.cleanupTest()
// Setup
Expand Down
1 change: 1 addition & 0 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type AzConnection interface {
ReadToFile(name string, offset int64, count int64, fi *os.File) error
ReadBuffer(name string, offset int64, len int64) ([]byte, error)
ReadInBuffer(name string, offset int64, len int64, data []byte) error
ReadInBufferWithETag(name string, offset int64, len int64, data []byte) (string, error)

WriteFromFile(name string, metadata map[string]*string, fi *os.File) error
WriteFromBuffer(name string, metadata map[string]*string, data []byte) error
Expand Down
6 changes: 6 additions & 0 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error)
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
ETag: (string)(*prop.ETag),
}
parseMetadata(blobAttr, prop.Metadata)

Expand Down Expand Up @@ -458,6 +459,11 @@ func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []by
return dl.BlockBlob.ReadInBuffer(name, offset, len, data)
}

// ReadInBufferWithETag : Download specific range from a file to a user provided buffer, return back the current etag along with data
func (dl *Datalake) ReadInBufferWithETag(name string, offset int64, len int64, data []byte) (string, error) {
return dl.BlockBlob.ReadInBufferWithETag(name, offset, len, data)
}

// WriteFromFile : Upload local file to file
func (dl *Datalake) WriteFromFile(name string, metadata map[string]*string, fi *os.File) (err error) {
// File in DataLake may have permissions and ACL set. Just uploading the file will override them.
Expand Down
19 changes: 19 additions & 0 deletions component/azstorage/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,25 @@ func (s *datalakeTestSuite) TestReadInBuffer() {
s.assert.EqualValues(testData[:5], output)
}

func (s *datalakeTestSuite) TestReadInBufferWithETAG() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})

output := make([]byte, 5)
var etag string
len, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: output, Etag: &etag})
s.assert.Nil(err)
s.assert.NotEqual(etag, "")
s.assert.EqualValues(5, len)
s.assert.EqualValues(testData[:5], output)
}

func (s *datalakeTestSuite) TestReadInBufferLargeBuffer() {
defer s.cleanupTest()
// Setup
Expand Down
51 changes: 34 additions & 17 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
handle.Mtime = attr.Mtime
handle.Size = attr.Size

if bc.consistency && attr.ETag != "" {
handle.SetValue("ETAG", attr.ETag)
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
}

log.Debug("BlockCache::OpenFile : Size of file handle.Size %v", handle.Size)
bc.prepareHandleForBlockCache(handle)

Expand Down Expand Up @@ -952,31 +956,31 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre
}

// download : Method to download the given amount of data
func (blockCache *BlockCache) download(item *workItem) {
func (bc *BlockCache) download(item *workItem) {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)

// filename_blockindex is the key for the lock
// this ensure that at a given time a block from a file is downloaded only once across all open handles
flock := blockCache.fileLocks.Get(fileName)
flock := bc.fileLocks.Get(fileName)
flock.Lock()
defer flock.Unlock()

var diskNode any
found := false
localPath := ""

if blockCache.tmpPath != "" {
if bc.tmpPath != "" {
// Update diskpolicy to reflect the new file
diskNode, found = blockCache.fileNodeMap.Load(fileName)
diskNode, found = bc.fileNodeMap.Load(fileName)
if !found {
diskNode = blockCache.diskPolicy.Add(fileName)
blockCache.fileNodeMap.Store(fileName, diskNode)
diskNode = bc.diskPolicy.Add(fileName)
bc.fileNodeMap.Store(fileName, diskNode)
} else {
blockCache.diskPolicy.Refresh(diskNode.(*list.Element))
bc.diskPolicy.Refresh(diskNode.(*list.Element))
}

// Check local file exists for this offset and file combination or not
localPath = filepath.Join(blockCache.tmpPath, fileName)
localPath = filepath.Join(bc.tmpPath, fileName)
_, err := os.Stat(localPath)

if err == nil {
Expand All @@ -995,8 +999,8 @@ func (blockCache *BlockCache) download(item *workItem) {
_ = os.Remove(localPath)
}

if numberOfBytes != int(blockCache.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) {
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", blockCache.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size)
if numberOfBytes != int(bc.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) {
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size)
successfulRead = false
_ = os.Remove(localPath)
}
Expand All @@ -1005,7 +1009,7 @@ func (blockCache *BlockCache) download(item *workItem) {

if successfulRead {
// If user has enabled consistency check then compute the md5sum and match it in xattr
successfulRead = checkBlockConsistency(blockCache, item, numberOfBytes, localPath, fileName)
successfulRead = checkBlockConsistency(bc, item, numberOfBytes, localPath, fileName)

// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete
Expand All @@ -1018,11 +1022,13 @@ func (blockCache *BlockCache) download(item *workItem) {
}
}

var etag string
// If file does not exists then download the block from the container
n, err := blockCache.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
Handle: item.handle,
Offset: int64(item.block.offset),
Data: item.block.data,
Etag: &etag,
})

if item.failCnt > MAX_FAIL_CNT {
Expand All @@ -1037,17 +1043,28 @@ func (blockCache *BlockCache) download(item *workItem) {
// Fail to read the data so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
item.failCnt++
blockCache.threadPool.Schedule(false, item)
bc.threadPool.Schedule(false, item)
return
} else if n == 0 {
// No data read so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id)
item.failCnt++
blockCache.threadPool.Schedule(false, item)
bc.threadPool.Schedule(false, item)
return
}

if blockCache.tmpPath != "" {
// Compare the ETAG value and fail download if blob has changed
if bc.consistency && etag != "" {
etagVal, found := item.handle.GetValue("ETAG")
if found && etagVal != etag {
log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
item.block.Failed()
item.block.Ready(BlockStatusDownloadFailed)
return
}
}

if bc.tmpPath != "" {
err := os.MkdirAll(filepath.Dir(localPath), 0777)
if err != nil {
log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error())
Expand All @@ -1064,10 +1081,10 @@ func (blockCache *BlockCache) download(item *workItem) {
}

f.Close()
blockCache.diskPolicy.Refresh(diskNode.(*list.Element))
bc.diskPolicy.Refresh(diskNode.(*list.Element))

// If user has enabled consistency check then compute the md5sum and save it in xattr
if blockCache.consistency {
if bc.consistency {
hash := common.GetCRC64(item.block.data, n)
err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type ObjAttr struct {
Flags common.BitMap16 // flags
Path string // full path
Name string // base name of the path
MD5 []byte // MD5 hash of the blob
MD5 []byte // MD5 of the blob as per last GetAttr
ETag string // ETag of the blob as per last GetAttr
Metadata map[string]*string // extra information to preserve
}

Expand Down
1 change: 1 addition & 0 deletions internal/component_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type ReadFileOptions struct {
type ReadInBufferOptions struct {
Handle *handlemap.Handle
Offset int64
Etag *string
Data []byte
}

Expand Down
Loading