Skip to content

Commit

Permalink
Merge branch 'blobfuse/2.4.1' into syeleti/rename
Browse files Browse the repository at this point in the history
  • Loading branch information
syeleti-msft authored Jan 31, 2025
2 parents 877fa2b + c71ea55 commit 04abe1e
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 70 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

**Features**
- Mount container or directory but restrict the view of blobs that you can see. This feature is available only in read-only mount.
- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To protect against accidental overwrites on data stored by block-cache on temp path, crc64 hash will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.
- To provide strong consistency check, ETAG of the file will be preserved on open. For any subsequent block download, with block-cache, ETAG will be verified and if the blob has changed in container the download will be declare failure resulting into read failure.

## 2.4.0 (2024-12-03)
**Features**
Expand Down
5 changes: 3 additions & 2 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
return 0, nil
}

err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data)
err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data, options.Etag)

if err != nil {
log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error())
}
Expand Down Expand Up @@ -555,7 +556,7 @@ func (az *AzStorage) StageData(opt internal.StageDataOptions) error {
}

func (az *AzStorage) CommitData(opt internal.CommitDataOptions) error {
return az.storage.CommitBlocks(opt.Name, opt.List)
return az.storage.CommitBlocks(opt.Name, opt.List, opt.NewETag)
}

// TODO : Below methods are pending to be implemented
Expand Down
73 changes: 54 additions & 19 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -448,10 +449,10 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
})

if err != nil {
e := storeBlobErrToErr(err)
if e == ErrFileNotFound {
serr := storeBlobErrToErr(err)
if serr == ErrFileNotFound {
return attr, syscall.ENOENT
} else if e == InvalidPermission {
} else if serr == InvalidPermission {
log.Err("BlockBlob::getAttrUsingRest : Insufficient permissions for %s [%s]", name, err.Error())
return attr, syscall.EACCES
} else {
Expand All @@ -472,6 +473,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
ETag: strings.Trim(string(*prop.ETag), `"`),
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -680,6 +682,7 @@ func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAtt
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
ETag: strings.Trim((string)(*blobInfo.Properties.ETag), `"`),
}

parseMetadata(attr, blobInfo.Metadata)
Expand Down Expand Up @@ -905,20 +908,26 @@ func (bb *BlockBlob) ReadBuffer(name string, offset int64, len int64) ([]byte, e
}

// ReadInBuffer : Download specific range from a file to a user provided buffer
func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte) error {
func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error {
// log.Trace("BlockBlob::ReadInBuffer : name %s", name)
blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name))
opt := (blob.DownloadBufferOptions)(*bb.downloadOptions)
opt.BlockSize = len
opt.Range = blob.HTTPRange{
Offset: offset,
Count: len,
if etag != nil {
*etag = ""
}

blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name))

ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute)
defer cancel()

_, err := blobClient.DownloadBuffer(ctx, data, &opt)
opt := &blob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: offset,
Count: len,
},
CPKInfo: bb.blobCPKOpt,
}

downloadResponse, err := blobClient.DownloadStream(ctx, opt)

if err != nil {
e := storeBlobErrToErr(err)
Expand All @@ -928,10 +937,32 @@ func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []b
return syscall.ERANGE
}

log.Err("BlockBlob::ReadInBuffer : Failed to download blob %s [%s]", name, err.Error())
log.Err("BlockBlob::ReadInBufferWithETag : Failed to download blob %s [%s]", name, err.Error())
return err
}

var streamBody io.ReadCloser = downloadResponse.NewRetryReader(ctx, nil)
dataRead, err := io.ReadFull(streamBody, data)

if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s [%s]", name, err.Error())
return err
}

if dataRead < 0 {
log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s", name)
return errors.New("failed to copy data from body to buffer")
}

err = streamBody.Close()
if err != nil {
log.Err("BlockBlob::ReadInBuffer : Failed to close body for blob %s [%s]", name, err.Error())
}

if etag != nil {
*etag = strings.Trim(string(*downloadResponse.ETag), `"`)
}

return nil
}

Expand Down Expand Up @@ -1182,7 +1213,7 @@ func (bb *BlockBlob) removeBlocks(blockList *common.BlockOffsetList, size int64,
blk.Data = make([]byte, blk.EndIndex-blk.StartIndex)
blk.Flags.Set(common.DirtyBlock)

err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data)
err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data, nil)
if err != nil {
log.Err("BlockBlob::removeBlocks : Failed to remove blocks %s [%s]", name, err.Error())
}
Expand Down Expand Up @@ -1238,7 +1269,7 @@ func (bb *BlockBlob) TruncateFile(name string, size int64) error {
size -= blkSize
}

err = bb.CommitBlocks(blobName, blkList)
err = bb.CommitBlocks(blobName, blkList, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
Expand Down Expand Up @@ -1306,12 +1337,12 @@ func (bb *BlockBlob) HandleSmallFile(name string, size int64, originalSize int64
var data = make([]byte, size)
var err error
if size > originalSize {
err = bb.ReadInBuffer(name, 0, 0, data)
err = bb.ReadInBuffer(name, 0, 0, data, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
}
} else {
err = bb.ReadInBuffer(name, 0, size, data)
err = bb.ReadInBuffer(name, 0, size, data, nil)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
}
Expand Down Expand Up @@ -1386,7 +1417,7 @@ func (bb *BlockBlob) Write(options internal.WriteFileOptions) error {
oldDataBuffer := make([]byte, oldDataSize+newBufferSize)
if !appendOnly {
// fetch the blocks that will be impacted by the new changes so we can overwrite them
err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer)
err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer, nil)
if err != nil {
log.Err("BlockBlob::Write : Failed to read data in buffer %s [%s]", name, err.Error())
}
Expand Down Expand Up @@ -1577,14 +1608,14 @@ func (bb *BlockBlob) StageBlock(name string, data []byte, id string) error {
}

// CommitBlocks : persists the block list
func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error {
func (bb *BlockBlob) CommitBlocks(name string, blockList []string, newEtag *string) error {
log.Trace("BlockBlob::CommitBlocks : name %s", name)

ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute)
defer cancel()

blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name))
_, err := blobClient.CommitBlockList(ctx,
resp, err := blobClient.CommitBlockList(ctx,
blockList,
&blockblob.CommitBlockListOptions{
HTTPHeaders: &blob.HTTPHeaders{
Expand All @@ -1599,6 +1630,10 @@ func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error {
return err
}

if newEtag != nil {
*newEtag = strings.Trim(string(*resp.ETag), `"`)
}

return nil
}

Expand Down
70 changes: 67 additions & 3 deletions component/azstorage/block_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,70 @@ func (s *blockBlobTestSuite) TestReadInBuffer() {
s.assert.EqualValues(testData[:5], output)
}

func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAG() {
defer bbTestSuite.cleanupTest()
// Setup
name := generateFileName()
handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data})
handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})

output := make([]byte, 5)
var etag string
len, err := bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
bbTestSuite.assert.EqualValues(5, len)
bbTestSuite.assert.EqualValues(testData[:5], output)
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})
}

func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAGMismatch() {
defer bbTestSuite.cleanupTest()
// Setup
name := generateFileName()
handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data 12345678910"
data := []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data})
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})

attr, err := bbTestSuite.az.GetAttr(internal.GetAttrOptions{Name: name})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotNil(attr)
bbTestSuite.assert.NotEqual("", attr.ETag)
bbTestSuite.assert.Equal(int64(len(data)), attr.Size)

output := make([]byte, 5)
var etag string

handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})
_, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
etag = strings.Trim(etag, `"`)
bbTestSuite.assert.Equal(etag, attr.ETag)

// Update the file in parallel using another handle
handle1, err := bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name})
bbTestSuite.assert.Nil(err)
testData = "test data 12345678910 123123123123123123123"
data = []byte(testData)
bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle1, Offset: 0, Data: data})
_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle1})

// Read data back using older handle
_, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 5, Data: output, Etag: &etag})
bbTestSuite.assert.Nil(err)
bbTestSuite.assert.NotEqual(etag, "")
etag = strings.Trim(etag, `"`)
bbTestSuite.assert.NotEqual(etag, attr.ETag)

_ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle})
}

func (s *blockBlobTestSuite) TestReadInBufferLargeBuffer() {
defer s.cleanupTest()
// Setup
Expand Down Expand Up @@ -2376,7 +2440,7 @@ func (s *blockBlobTestSuite) TestFlushFileUpdateChunkedFile() {
updatedBlock := make([]byte, 2*MB)
rand.Read(updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data, nil)
copy(h.CacheObj.BlockOffsetList.BlockList[1].Data[MB:2*MB+MB], updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)

Expand Down Expand Up @@ -2413,7 +2477,7 @@ func (s *blockBlobTestSuite) TestFlushFileTruncateUpdateChunkedFile() {
// truncate block
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize/2)
h.CacheObj.BlockOffsetList.BlockList[1].EndIndex = int64(blockSize + blockSize/2)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data, nil)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)

// remove 2 blocks
Expand Down Expand Up @@ -3258,7 +3322,7 @@ func (s *blockBlobTestSuite) TestDownloadBlobWithCPKEnabled() {
s.assert.EqualValues(data, fileData)

buf := make([]byte, len(data))
err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf)
err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf, nil)
s.assert.Nil(err)
s.assert.EqualValues(data, buf)

Expand Down
4 changes: 2 additions & 2 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type AzConnection interface {

ReadToFile(name string, offset int64, count int64, fi *os.File) error
ReadBuffer(name string, offset int64, len int64) ([]byte, error)
ReadInBuffer(name string, offset int64, len int64, data []byte) error
ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error

WriteFromFile(name string, metadata map[string]*string, fi *os.File) error
WriteFromBuffer(name string, metadata map[string]*string, data []byte) error
Expand All @@ -134,7 +134,7 @@ type AzConnection interface {

GetCommittedBlockList(string) (*internal.CommittedBlockList, error)
StageBlock(string, []byte, string) error
CommitBlocks(string, []string) error
CommitBlocks(string, []string, *string) error

UpdateServiceClient(_, _ string) error

Expand Down
9 changes: 5 additions & 4 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error)
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
ETag: (string)(*prop.ETag),
}
parseMetadata(blobAttr, prop.Metadata)

Expand Down Expand Up @@ -455,8 +456,8 @@ func (dl *Datalake) ReadBuffer(name string, offset int64, len int64) ([]byte, er
}

// ReadInBuffer : Download specific range from a file to a user provided buffer
func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte) error {
return dl.BlockBlob.ReadInBuffer(name, offset, len, data)
func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error {
return dl.BlockBlob.ReadInBuffer(name, offset, len, data, etag)
}

// WriteFromFile : Upload local file to file
Expand Down Expand Up @@ -594,8 +595,8 @@ func (dl *Datalake) StageBlock(name string, data []byte, id string) error {
}

// CommitBlocks : persists the block list
func (dl *Datalake) CommitBlocks(name string, blockList []string) error {
return dl.BlockBlob.CommitBlocks(name, blockList)
func (dl *Datalake) CommitBlocks(name string, blockList []string, newEtag *string) error {
return dl.BlockBlob.CommitBlocks(name, blockList, newEtag)
}

func (dl *Datalake) SetFilter(filter string) error {
Expand Down
Loading

0 comments on commit 04abe1e

Please sign in to comment.