Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ETAG based validation on every block download to provide higher consistency #1608

Open
wants to merge 25 commits into
base: blobfuse/2.4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b189ca0
Add strong consistency check for data on disk
vibhansa-msft Jan 6, 2025
ae4abc0
Updating changelog
vibhansa-msft Jan 6, 2025
5f167cf
Updating readme
vibhansa-msft Jan 8, 2025
1799048
Adding etag support
vibhansa-msft Jan 9, 2025
7611be6
Adding parsing logic for ETAG in list and getAttr api
vibhansa-msft Jan 9, 2025
2572cc2
Correcting etag validation check
vibhansa-msft Jan 9, 2025
49edfa3
Migrating from md5 to crc64
vibhansa-msft Jan 13, 2025
7649ed3
Merge branch 'vibhansa/diskcrc' into vibhansa/etagvalidation
vibhansa-msft Jan 15, 2025
b6e4195
Adding etag based validation on strong consistency check only
vibhansa-msft Jan 15, 2025
3492ddd
Sync with feature branch
vibhansa-msft Jan 15, 2025
46a3b1b
Merge branch 'blobfuse/2.4.1' into vibhansa/etagvalidation
vibhansa-msft Jan 15, 2025
c1941c3
Sync with feature branch
vibhansa-msft Jan 16, 2025
df864c9
Adding UT for readInBufferWithEtag
vibhansa-msft Jan 17, 2025
37410a2
renamed variable
jainakanksha-msft Jan 20, 2025
813f2f5
renamed variables
jainakanksha-msft Jan 20, 2025
dcbf997
Merge branch 'blobfuse/2.4.1' into vibhansa/etagvalidation
vibhansa-msft Jan 20, 2025
791e16c
Adding UT for etag mismatch case
vibhansa-msft Jan 20, 2025
db4ccf3
Update as per review comments'
vibhansa-msft Jan 20, 2025
63c6425
Renamed variables
jainakanksha-msft Jan 20, 2025
4f746b0
Make ReadInStream default option for readin buffer api
vibhansa-msft Jan 21, 2025
161906b
Make etag validation a defualt option
vibhansa-msft Jan 21, 2025
96fb025
Remove quotes from etag before passing it down as some api return bac…
vibhansa-msft Jan 21, 2025
5e5bd2f
Update etag when data is flushed
vibhansa-msft Jan 21, 2025
51f625f
Correcting changelog
vibhansa-msft Jan 21, 2025
ebaa7d5
Correcting etag population
vibhansa-msft Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

**Features**
- Mount container or directory but restrict the view of blobs that you can see. This feature is available only in read-only mount.
- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To protect against accidental overwrites on data stored by block-cache on temp path, crc64 hash will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To provide strong consistency check, ETAG of the file will be preserved on open. For any subsequent block download, with block-cache, ETAG will be verified and if the blob has changed in container the download will be declare failure resulting into read failure.

## 2.4.0 (2024-12-03)
**Features**
Expand Down
5 changes: 3 additions & 2 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
return 0, nil
}

err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data)
err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data, options.Etag)

if err != nil {
log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error())
}
Expand Down Expand Up @@ -555,7 +556,7 @@ func (az *AzStorage) StageData(opt internal.StageDataOptions) error {
}

func (az *AzStorage) CommitData(opt internal.CommitDataOptions) error {
return az.storage.CommitBlocks(opt.Name, opt.List)
return az.storage.CommitBlocks(opt.Name, opt.List, opt.NewETag)
}

// TODO : Below methods are pending to be implemented
Expand Down
73 changes: 54 additions & 19 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -431,10 +432,10 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
})

if err != nil {
e := storeBlobErrToErr(err)
if e == ErrFileNotFound {
serr := storeBlobErrToErr(err)
if serr == ErrFileNotFound {
return attr, syscall.ENOENT
} else if e == InvalidPermission {
} else if serr == InvalidPermission {
log.Err("BlockBlob::getAttrUsingRest : Insufficient permissions for %s [%s]", name, err.Error())
return attr, syscall.EACCES
} else {
Expand All @@ -455,6 +456,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
ETag: strings.Trim(string(*prop.ETag), `"`),
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -663,6 +665,7 @@ func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAtt
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
ETag: strings.Trim((string)(*blobInfo.Properties.ETag), `"`),
}

parseMetadata(attr, blobInfo.Metadata)
Expand Down Expand Up @@ -888,20 +891,26 @@ func (bb *BlockBlob) ReadBuffer(name string, offset int64, len int64) ([]byte, e
}

// ReadInBuffer : Download specific range from a file to a user provided buffer
func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte) error {
func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error {
// log.Trace("BlockBlob::ReadInBuffer : name %s", name)
blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name))
opt := (blob.DownloadBufferOptions)(*bb.downloadOptions)
opt.BlockSize = len
opt.Range = blob.HTTPRange{
Offset: offset,
Count: len,
if etag != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check the nil value as we are passing the pointer to a zerovalue

*etag = ""
}

blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name))

ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute)
defer cancel()

_, err := blobClient.DownloadBuffer(ctx, data, &opt)
opt := &blob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: offset,
Count: len,
},
CPKInfo: bb.blobCPKOpt,
}

downloadResponse, err := blobClient.DownloadStream(ctx, opt)
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
e := storeBlobErrToErr(err)
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -911,10 +920,32 @@ func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []b
return syscall.ERANGE
}

log.Err("BlockBlob::ReadInBuffer : Failed to download blob %s [%s]", name, err.Error())
log.Err("BlockBlob::ReadInBufferWithETag : Failed to download blob %s [%s]", name, err.Error())
return err
}

var streamBody io.ReadCloser = downloadResponse.NewRetryReader(ctx, nil)
dataRead, err := io.ReadFull(streamBody, data)

if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s [%s]", name, err.Error())
return err
}

if dataRead < 0 {
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s", name)
return errors.New("failed to copy data from body to buffer")
}

err = streamBody.Close()
if err != nil {
log.Err("BlockBlob::ReadInBuffer : Failed to close body for blob %s [%s]", name, err.Error())
}

if etag != nil {
*etag = strings.Trim(string(*downloadResponse.ETag), `"`)
}

return nil
}

Expand Down Expand Up @@ -1165,7 +1196,7 @@ func (bb *BlockBlob) removeBlocks(blockList *common.BlockOffsetList, size int64,
blk.Data = make([]byte, blk.EndIndex-blk.StartIndex)
blk.Flags.Set(common.DirtyBlock)

err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data)
err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data, nil)
if err != nil {
log.Err("BlockBlob::removeBlocks : Failed to remove blocks %s [%s]", name, err.Error())
}
Expand Down Expand Up @@ -1221,7 +1252,7 @@ func (bb *BlockBlob) TruncateFile(name string, size int64) error {
size -= blkSize
}

err = bb.CommitBlocks(blobName, blkList)
err = bb.CommitBlocks(blobName, blkList, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
Expand Down Expand Up @@ -1289,12 +1320,12 @@ func (bb *BlockBlob) HandleSmallFile(name string, size int64, originalSize int64
var data = make([]byte, size)
var err error
if size > originalSize {
err = bb.ReadInBuffer(name, 0, 0, data)
err = bb.ReadInBuffer(name, 0, 0, data, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
}
} else {
err = bb.ReadInBuffer(name, 0, size, data)
err = bb.ReadInBuffer(name, 0, size, data, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
}
Expand Down Expand Up @@ -1369,7 +1400,7 @@ func (bb *BlockBlob) Write(options internal.WriteFileOptions) error {
oldDataBuffer := make([]byte, oldDataSize+newBufferSize)
if !appendOnly {
// fetch the blocks that will be impacted by the new changes so we can overwrite them
err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer)
err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer, nil)
if err != nil {
log.Err("BlockBlob::Write : Failed to read data in buffer %s [%s]", name, err.Error())
}
Expand Down Expand Up @@ -1560,14 +1591,14 @@ func (bb *BlockBlob) StageBlock(name string, data []byte, id string) error {
}

// CommitBlocks : persists the block list
func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error {
func (bb *BlockBlob) CommitBlocks(name string, blockList []string, newEtag *string) error {
log.Trace("BlockBlob::CommitBlocks : name %s", name)

ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute)
defer cancel()

blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name))
_, err := blobClient.CommitBlockList(ctx,
resp, err := blobClient.CommitBlockList(ctx,
blockList,
&blockblob.CommitBlockListOptions{
HTTPHeaders: &blob.HTTPHeaders{
Expand All @@ -1582,6 +1613,10 @@ func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error {
return err
}

if newEtag != nil {
*newEtag = strings.Trim(string(*resp.ETag), `"`)
}

return nil
}

Expand Down
70 changes: 67 additions & 3 deletions component/azstorage/block_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,70 @@ func (s *blockBlobTestSuite) TestReadInBuffer() {
s.assert.EqualValues(testData[:5], output)
}

func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAG() {
defer bbTestSuite.cleanupTest()
// Setup
name := generateFileName()
handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data})
handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})

output := make([]byte, 5)
var etag string
len, err := bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
bbTestSuite.assert.EqualValues(5, len)
bbTestSuite.assert.EqualValues(testData[:5], output)
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})
}

func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAGMismatch() {
defer bbTestSuite.cleanupTest()
// Setup
name := generateFileName()
handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data 12345678910"
data := []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data})
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})

attr, err := bbTestSuite.az.GetAttr(internal.GetAttrOptions{Name: name})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotNil(attr)
bbTestSuite.assert.NotEqual("", attr.ETag)
bbTestSuite.assert.Equal(int64(len(data)), attr.Size)

output := make([]byte, 5)
var etag string

handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})
_, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
etag = strings.Trim(etag, `"`)
bbTestSuite.assert.Equal(etag, attr.ETag)

// Update the file in parallel using another handle
handle1, err := bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})
bbTestSuite.assert.Nil(err)
testData = "test data 12345678910 123123123123123123123"
data = []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle1, Offset: 0, Data: data})
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle1})

// Read data back using older handle
_, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 5, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
etag = strings.Trim(etag, `"`)
bbTestSuite.assert.NotEqual(etag, attr.ETag)

_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})
}

func (s *blockBlobTestSuite) TestReadInBufferLargeBuffer() {
defer s.cleanupTest()
// Setup
Expand Down Expand Up @@ -2376,7 +2440,7 @@ func (s *blockBlobTestSuite) TestFlushFileUpdateChunkedFile() {
updatedBlock := make([]byte, 2*MB)
rand.Read(updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data, nil)
copy(h.CacheObj.BlockOffsetList.BlockList[1].Data[MB:2*MB+MB], updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)

Expand Down Expand Up @@ -2413,7 +2477,7 @@ func (s *blockBlobTestSuite) TestFlushFileTruncateUpdateChunkedFile() {
// truncate block
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize/2)
h.CacheObj.BlockOffsetList.BlockList[1].EndIndex = int64(blockSize + blockSize/2)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data, nil)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)

// remove 2 blocks
Expand Down Expand Up @@ -3258,7 +3322,7 @@ func (s *blockBlobTestSuite) TestDownloadBlobWithCPKEnabled() {
s.assert.EqualValues(data, fileData)

buf := make([]byte, len(data))
err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf)
err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf, nil)
s.assert.Nil(err)
s.assert.EqualValues(data, buf)

Expand Down
4 changes: 2 additions & 2 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type AzConnection interface {

ReadToFile(name string, offset int64, count int64, fi *os.File) error
ReadBuffer(name string, offset int64, len int64) ([]byte, error)
ReadInBuffer(name string, offset int64, len int64, data []byte) error
ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error

WriteFromFile(name string, metadata map[string]*string, fi *os.File) error
WriteFromBuffer(name string, metadata map[string]*string, data []byte) error
Expand All @@ -134,7 +134,7 @@ type AzConnection interface {

GetCommittedBlockList(string) (*internal.CommittedBlockList, error)
StageBlock(string, []byte, string) error
CommitBlocks(string, []string) error
CommitBlocks(string, []string, *string) error

UpdateServiceClient(_, _ string) error

Expand Down
9 changes: 5 additions & 4 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error)
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
ETag: (string)(*prop.ETag),
}
parseMetadata(blobAttr, prop.Metadata)

Expand Down Expand Up @@ -454,8 +455,8 @@ func (dl *Datalake) ReadBuffer(name string, offset int64, len int64) ([]byte, er
}

// ReadInBuffer : Download specific range from a file to a user provided buffer
func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte) error {
return dl.BlockBlob.ReadInBuffer(name, offset, len, data)
func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error {
return dl.BlockBlob.ReadInBuffer(name, offset, len, data, etag)
}

// WriteFromFile : Upload local file to file
Expand Down Expand Up @@ -593,8 +594,8 @@ func (dl *Datalake) StageBlock(name string, data []byte, id string) error {
}

// CommitBlocks : persists the block list
func (dl *Datalake) CommitBlocks(name string, blockList []string) error {
return dl.BlockBlob.CommitBlocks(name, blockList)
func (dl *Datalake) CommitBlocks(name string, blockList []string, newEtag *string) error {
return dl.BlockBlob.CommitBlocks(name, blockList, newEtag)
}

func (dl *Datalake) SetFilter(filter string) error {
Expand Down
Loading
Loading