diff --git a/CHANGELOG.md b/CHANGELOG.md index c07aaa402..176ac61d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ **Features** - Mount container or directory but restrict the view of blobs that you can see. This feature is available only in read-only mount. -- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag. +- To protect against accidental overwrites on data stored by block-cache on temp path, crc64 hash will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag. +- To provide strong consistency check, ETAG of the file will be preserved on open. For any subsequent block download, with block-cache, ETAG will be verified and if the blob has changed in container the download will be declare failure resulting into read failure. ## 2.4.0 (2024-12-03) **Features** diff --git a/component/azstorage/azstorage.go b/component/azstorage/azstorage.go index 2c267c5f5..c8bba18f8 100644 --- a/component/azstorage/azstorage.go +++ b/component/azstorage/azstorage.go @@ -453,7 +453,8 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length return 0, nil } - err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data) + err = az.storage.ReadInBuffer(options.Handle.Path, options.Offset, dataLen, options.Data, options.Etag) + if err != nil { log.Err("AzStorage::ReadInBuffer : Failed to read %s [%s]", options.Handle.Path, err.Error()) } @@ -555,7 +556,7 @@ func (az *AzStorage) StageData(opt internal.StageDataOptions) error { } func (az *AzStorage) CommitData(opt internal.CommitDataOptions) error { - return az.storage.CommitBlocks(opt.Name, opt.List) + return az.storage.CommitBlocks(opt.Name, opt.List, opt.NewETag) } // TODO : Below methods are pending to be implemented diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index 5e5563e98..acdffb3d0 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -39,6 +39,7 @@ import ( "encoding/base64" "errors" "fmt" + "io" "math" "os" "path/filepath" @@ -431,10 +432,10 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err }) if err != nil { - e := storeBlobErrToErr(err) - if e == ErrFileNotFound { + serr := storeBlobErrToErr(err) + if serr == ErrFileNotFound { return attr, syscall.ENOENT - } else if e == InvalidPermission { + } else if serr == InvalidPermission { log.Err("BlockBlob::getAttrUsingRest : Insufficient permissions for %s [%s]", name, err.Error()) return attr, syscall.EACCES } else { @@ -455,6 +456,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err Crtime: *prop.CreationTime, Flags: internal.NewFileBitMap(), MD5: prop.ContentMD5, + ETag: strings.Trim(string(*prop.ETag), `"`), } parseMetadata(attr, prop.Metadata) @@ -663,6 +665,7 @@ func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAtt Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified), Flags: internal.NewFileBitMap(), MD5: blobInfo.Properties.ContentMD5, + ETag: strings.Trim((string)(*blobInfo.Properties.ETag), `"`), } parseMetadata(attr, blobInfo.Metadata) @@ -888,20 +891,26 @@ func (bb *BlockBlob) ReadBuffer(name string, offset int64, len int64) ([]byte, e } // ReadInBuffer : Download specific range from a file to a user provided buffer -func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte) error { +func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error { // log.Trace("BlockBlob::ReadInBuffer : name %s", name) - blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) - opt := (blob.DownloadBufferOptions)(*bb.downloadOptions) - opt.BlockSize = len - opt.Range = blob.HTTPRange{ - Offset: offset, - Count: len, + if etag != nil { + *etag = "" } + blobClient := bb.Container.NewBlobClient(filepath.Join(bb.Config.prefixPath, name)) + ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute) defer cancel() - _, err := blobClient.DownloadBuffer(ctx, data, &opt) + opt := &blob.DownloadStreamOptions{ + Range: blob.HTTPRange{ + Offset: offset, + Count: len, + }, + CPKInfo: bb.blobCPKOpt, + } + + downloadResponse, err := blobClient.DownloadStream(ctx, opt) if err != nil { e := storeBlobErrToErr(err) @@ -911,10 +920,32 @@ func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []b return syscall.ERANGE } - log.Err("BlockBlob::ReadInBuffer : Failed to download blob %s [%s]", name, err.Error()) + log.Err("BlockBlob::ReadInBufferWithETag : Failed to download blob %s [%s]", name, err.Error()) return err } + var streamBody io.ReadCloser = downloadResponse.NewRetryReader(ctx, nil) + dataRead, err := io.ReadFull(streamBody, data) + + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s [%s]", name, err.Error()) + return err + } + + if dataRead < 0 { + log.Err("BlockBlob::ReadInBuffer : Failed to copy data from body to buffer for blob %s", name) + return errors.New("failed to copy data from body to buffer") + } + + err = streamBody.Close() + if err != nil { + log.Err("BlockBlob::ReadInBuffer : Failed to close body for blob %s [%s]", name, err.Error()) + } + + if etag != nil { + *etag = strings.Trim(string(*downloadResponse.ETag), `"`) + } + return nil } @@ -1165,7 +1196,7 @@ func (bb *BlockBlob) removeBlocks(blockList *common.BlockOffsetList, size int64, blk.Data = make([]byte, blk.EndIndex-blk.StartIndex) blk.Flags.Set(common.DirtyBlock) - err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data) + err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data, nil) if err != nil { log.Err("BlockBlob::removeBlocks : Failed to remove blocks %s [%s]", name, err.Error()) } @@ -1221,7 +1252,7 @@ func (bb *BlockBlob) TruncateFile(name string, size int64) error { size -= blkSize } - err = bb.CommitBlocks(blobName, blkList) + err = bb.CommitBlocks(blobName, blkList, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error()) return err @@ -1289,12 +1320,12 @@ func (bb *BlockBlob) HandleSmallFile(name string, size int64, originalSize int64 var data = make([]byte, size) var err error if size > originalSize { - err = bb.ReadInBuffer(name, 0, 0, data) + err = bb.ReadInBuffer(name, 0, 0, data, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) } } else { - err = bb.ReadInBuffer(name, 0, size, data) + err = bb.ReadInBuffer(name, 0, size, data, nil) if err != nil { log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error()) } @@ -1369,7 +1400,7 @@ func (bb *BlockBlob) Write(options internal.WriteFileOptions) error { oldDataBuffer := make([]byte, oldDataSize+newBufferSize) if !appendOnly { // fetch the blocks that will be impacted by the new changes so we can overwrite them - err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer) + err = bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer, nil) if err != nil { log.Err("BlockBlob::Write : Failed to read data in buffer %s [%s]", name, err.Error()) } @@ -1560,14 +1591,14 @@ func (bb *BlockBlob) StageBlock(name string, data []byte, id string) error { } // CommitBlocks : persists the block list -func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error { +func (bb *BlockBlob) CommitBlocks(name string, blockList []string, newEtag *string) error { log.Trace("BlockBlob::CommitBlocks : name %s", name) ctx, cancel := context.WithTimeout(context.Background(), max_context_timeout*time.Minute) defer cancel() blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name)) - _, err := blobClient.CommitBlockList(ctx, + resp, err := blobClient.CommitBlockList(ctx, blockList, &blockblob.CommitBlockListOptions{ HTTPHeaders: &blob.HTTPHeaders{ @@ -1582,6 +1613,10 @@ func (bb *BlockBlob) CommitBlocks(name string, blockList []string) error { return err } + if newEtag != nil { + *newEtag = strings.Trim(string(*resp.ETag), `"`) + } + return nil } diff --git a/component/azstorage/block_blob_test.go b/component/azstorage/block_blob_test.go index 411f79a25..710ec2c46 100644 --- a/component/azstorage/block_blob_test.go +++ b/component/azstorage/block_blob_test.go @@ -1191,6 +1191,70 @@ func (s *blockBlobTestSuite) TestReadInBuffer() { s.assert.EqualValues(testData[:5], output) } +func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAG() { + defer bbTestSuite.cleanupTest() + // Setup + name := generateFileName() + handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name}) + testData := "test data" + data := []byte(testData) + bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) + handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name}) + + output := make([]byte, 5) + var etag string + len, err := bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag}) + bbTestSuite.assert.Nil(err) + bbTestSuite.assert.NotEqual(etag, "") + bbTestSuite.assert.EqualValues(5, len) + bbTestSuite.assert.EqualValues(testData[:5], output) + _ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle}) +} + +func (bbTestSuite *blockBlobTestSuite) TestReadInBufferWithETAGMismatch() { + defer bbTestSuite.cleanupTest() + // Setup + name := generateFileName() + handle, _ := bbTestSuite.az.CreateFile(internal.CreateFileOptions{Name: name}) + testData := "test data 12345678910" + data := []byte(testData) + bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) + _ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle}) + + attr, err := bbTestSuite.az.GetAttr(internal.GetAttrOptions{Name: name}) + bbTestSuite.assert.Nil(err) + bbTestSuite.assert.NotNil(attr) + bbTestSuite.assert.NotEqual("", attr.ETag) + bbTestSuite.assert.Equal(int64(len(data)), attr.Size) + + output := make([]byte, 5) + var etag string + + handle, _ = bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name}) + _, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output, Etag: &etag}) + bbTestSuite.assert.Nil(err) + bbTestSuite.assert.NotEqual(etag, "") + etag = strings.Trim(etag, `"`) + bbTestSuite.assert.Equal(etag, attr.ETag) + + // Update the file in parallel using another handle + handle1, err := bbTestSuite.az.OpenFile(internal.OpenFileOptions{Name: name}) + bbTestSuite.assert.Nil(err) + testData = "test data 12345678910 123123123123123123123" + data = []byte(testData) + bbTestSuite.az.WriteFile(internal.WriteFileOptions{Handle: handle1, Offset: 0, Data: data}) + _ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle1}) + + // Read data back using older handle + _, err = bbTestSuite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 5, Data: output, Etag: &etag}) + bbTestSuite.assert.Nil(err) + bbTestSuite.assert.NotEqual(etag, "") + etag = strings.Trim(etag, `"`) + bbTestSuite.assert.NotEqual(etag, attr.ETag) + + _ = bbTestSuite.az.CloseFile(internal.CloseFileOptions{Handle: handle}) +} + func (s *blockBlobTestSuite) TestReadInBufferLargeBuffer() { defer s.cleanupTest() // Setup @@ -2376,7 +2440,7 @@ func (s *blockBlobTestSuite) TestFlushFileUpdateChunkedFile() { updatedBlock := make([]byte, 2*MB) rand.Read(updatedBlock) h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize) - s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data) + s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data, nil) copy(h.CacheObj.BlockOffsetList.BlockList[1].Data[MB:2*MB+MB], updatedBlock) h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock) @@ -2413,7 +2477,7 @@ func (s *blockBlobTestSuite) TestFlushFileTruncateUpdateChunkedFile() { // truncate block h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize/2) h.CacheObj.BlockOffsetList.BlockList[1].EndIndex = int64(blockSize + blockSize/2) - s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data) + s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data, nil) h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock) // remove 2 blocks @@ -3258,7 +3322,7 @@ func (s *blockBlobTestSuite) TestDownloadBlobWithCPKEnabled() { s.assert.EqualValues(data, fileData) buf := make([]byte, len(data)) - err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf) + err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf, nil) s.assert.Nil(err) s.assert.EqualValues(data, buf) diff --git a/component/azstorage/connection.go b/component/azstorage/connection.go index cdc177896..54c4fb463 100644 --- a/component/azstorage/connection.go +++ b/component/azstorage/connection.go @@ -120,7 +120,7 @@ type AzConnection interface { ReadToFile(name string, offset int64, count int64, fi *os.File) error ReadBuffer(name string, offset int64, len int64) ([]byte, error) - ReadInBuffer(name string, offset int64, len int64, data []byte) error + ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error WriteFromFile(name string, metadata map[string]*string, fi *os.File) error WriteFromBuffer(name string, metadata map[string]*string, data []byte) error @@ -134,7 +134,7 @@ type AzConnection interface { GetCommittedBlockList(string) (*internal.CommittedBlockList, error) StageBlock(string, []byte, string) error - CommitBlocks(string, []string) error + CommitBlocks(string, []string, *string) error UpdateServiceClient(_, _ string) error diff --git a/component/azstorage/datalake.go b/component/azstorage/datalake.go index 8a37c4ee5..ac3d1d4f7 100644 --- a/component/azstorage/datalake.go +++ b/component/azstorage/datalake.go @@ -399,6 +399,7 @@ func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error) Ctime: *prop.LastModified, Crtime: *prop.LastModified, Flags: internal.NewFileBitMap(), + ETag: (string)(*prop.ETag), } parseMetadata(blobAttr, prop.Metadata) @@ -454,8 +455,8 @@ func (dl *Datalake) ReadBuffer(name string, offset int64, len int64) ([]byte, er } // ReadInBuffer : Download specific range from a file to a user provided buffer -func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte) error { - return dl.BlockBlob.ReadInBuffer(name, offset, len, data) +func (dl *Datalake) ReadInBuffer(name string, offset int64, len int64, data []byte, etag *string) error { + return dl.BlockBlob.ReadInBuffer(name, offset, len, data, etag) } // WriteFromFile : Upload local file to file @@ -593,8 +594,8 @@ func (dl *Datalake) StageBlock(name string, data []byte, id string) error { } // CommitBlocks : persists the block list -func (dl *Datalake) CommitBlocks(name string, blockList []string) error { - return dl.BlockBlob.CommitBlocks(name, blockList) +func (dl *Datalake) CommitBlocks(name string, blockList []string, newEtag *string) error { + return dl.BlockBlob.CommitBlocks(name, blockList, newEtag) } func (dl *Datalake) SetFilter(filter string) error { diff --git a/component/azstorage/datalake_test.go b/component/azstorage/datalake_test.go index 462ca60bd..236efd3a8 100644 --- a/component/azstorage/datalake_test.go +++ b/component/azstorage/datalake_test.go @@ -1402,6 +1402,26 @@ func (s *datalakeTestSuite) TestReadInBuffer() { s.assert.EqualValues(testData[:5], output) } +func (suite *datalakeTestSuite) TestReadInBufferWithETAG() { + defer suite.cleanupTest() + // Setup + name := generateFileName() + fileHandle, _ := suite.az.CreateFile(internal.CreateFileOptions{Name: name}) + testData := "test data" + data := []byte(testData) + suite.az.WriteFile(internal.WriteFileOptions{Handle: fileHandle, Offset: 0, Data: data}) + fileHandle, _ = suite.az.OpenFile(internal.OpenFileOptions{Name: name}) + + output := make([]byte, 5) + var etag string + len, err := suite.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: fileHandle, Offset: 0, Data: output, Etag: &etag}) + suite.assert.Nil(err) + suite.assert.NotEqual(etag, "") + suite.assert.EqualValues(5, len) + suite.assert.EqualValues(testData[:5], output) + _ = suite.az.CloseFile(internal.CloseFileOptions{Handle: fileHandle}) +} + func (s *datalakeTestSuite) TestReadInBufferLargeBuffer() { defer s.cleanupTest() // Setup @@ -2035,7 +2055,7 @@ func (s *datalakeTestSuite) TestFlushFileUpdateChunkedFile() { updatedBlock := make([]byte, 2*MB) rand.Read(updatedBlock) h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize) - s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data) + s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data, nil) copy(h.CacheObj.BlockOffsetList.BlockList[1].Data[MB:2*MB+MB], updatedBlock) h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock) @@ -2073,7 +2093,7 @@ func (s *datalakeTestSuite) TestFlushFileTruncateUpdateChunkedFile() { // truncate block h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize/2) h.CacheObj.BlockOffsetList.BlockList[1].EndIndex = int64(blockSize + blockSize/2) - s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data) + s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data, nil) h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock) // remove 2 blocks @@ -2490,7 +2510,7 @@ func (s *datalakeTestSuite) TestDownloadWithCPKEnabled() { s.assert.EqualValues(data, fileData) buf := make([]byte, len(data)) - err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf) + err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf, nil) s.assert.Nil(err) s.assert.EqualValues(data, buf) diff --git a/component/block_cache/block_cache.go b/component/block_cache/block_cache.go index 44b83a2fd..858140b70 100644 --- a/component/block_cache/block_cache.go +++ b/component/block_cache/block_cache.go @@ -400,6 +400,10 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han handle.Mtime = attr.Mtime handle.Size = attr.Size + if attr.ETag != "" { + handle.SetValue("ETAG", attr.ETag) + } + log.Debug("BlockCache::OpenFile : Size of file handle.Size %v", handle.Size) bc.prepareHandleForBlockCache(handle) @@ -952,12 +956,12 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre } // download : Method to download the given amount of data -func (blockCache *BlockCache) download(item *workItem) { +func (bc *BlockCache) download(item *workItem) { fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id) // filename_blockindex is the key for the lock // this ensure that at a given time a block from a file is downloaded only once across all open handles - flock := blockCache.fileLocks.Get(fileName) + flock := bc.fileLocks.Get(fileName) flock.Lock() defer flock.Unlock() @@ -965,18 +969,18 @@ func (blockCache *BlockCache) download(item *workItem) { found := false localPath := "" - if blockCache.tmpPath != "" { + if bc.tmpPath != "" { // Update diskpolicy to reflect the new file - diskNode, found = blockCache.fileNodeMap.Load(fileName) + diskNode, found = bc.fileNodeMap.Load(fileName) if !found { - diskNode = blockCache.diskPolicy.Add(fileName) - blockCache.fileNodeMap.Store(fileName, diskNode) + diskNode = bc.diskPolicy.Add(fileName) + bc.fileNodeMap.Store(fileName, diskNode) } else { - blockCache.diskPolicy.Refresh(diskNode.(*list.Element)) + bc.diskPolicy.Refresh(diskNode.(*list.Element)) } // Check local file exists for this offset and file combination or not - localPath = filepath.Join(blockCache.tmpPath, fileName) + localPath = filepath.Join(bc.tmpPath, fileName) _, err := os.Stat(localPath) if err == nil { @@ -995,8 +999,8 @@ func (blockCache *BlockCache) download(item *workItem) { _ = os.Remove(localPath) } - if numberOfBytes != int(blockCache.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) { - log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", blockCache.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size) + if numberOfBytes != int(bc.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) { + log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size) successfulRead = false _ = os.Remove(localPath) } @@ -1005,7 +1009,7 @@ func (blockCache *BlockCache) download(item *workItem) { if successfulRead { // If user has enabled consistency check then compute the md5sum and match it in xattr - successfulRead = checkBlockConsistency(blockCache, item, numberOfBytes, localPath, fileName) + successfulRead = checkBlockConsistency(bc, item, numberOfBytes, localPath, fileName) // We have read the data from disk so there is no need to go over network // Just mark the block that download is complete @@ -1018,11 +1022,13 @@ func (blockCache *BlockCache) download(item *workItem) { } } + var etag string // If file does not exists then download the block from the container - n, err := blockCache.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{ + n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{ Handle: item.handle, Offset: int64(item.block.offset), Data: item.block.data, + Etag: &etag, }) if item.failCnt > MAX_FAIL_CNT { @@ -1037,17 +1043,28 @@ func (blockCache *BlockCache) download(item *workItem) { // Fail to read the data so just reschedule this request log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error()) item.failCnt++ - blockCache.threadPool.Schedule(false, item) + bc.threadPool.Schedule(false, item) return } else if n == 0 { // No data read so just reschedule this request log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id) item.failCnt++ - blockCache.threadPool.Schedule(false, item) + bc.threadPool.Schedule(false, item) return } - if blockCache.tmpPath != "" { + // Compare the ETAG value and fail download if blob has changed + if etag != "" { + etagVal, found := item.handle.GetValue("ETAG") + if found && etagVal != etag { + log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) + item.block.Failed() + item.block.Ready(BlockStatusDownloadFailed) + return + } + } + + if bc.tmpPath != "" { err := os.MkdirAll(filepath.Dir(localPath), 0777) if err != nil { log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error()) @@ -1064,10 +1081,10 @@ func (blockCache *BlockCache) download(item *workItem) { } f.Close() - blockCache.diskPolicy.Refresh(diskNode.(*list.Element)) + bc.diskPolicy.Refresh(diskNode.(*list.Element)) // If user has enabled consistency check then compute the md5sum and save it in xattr - if blockCache.consistency { + if bc.consistency { hash := common.GetCRC64(item.block.data, n) err = syscall.Setxattr(localPath, "user.md5sum", hash, 0) if err != nil { @@ -1556,12 +1573,17 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error { log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path) // Commit the block list now - err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize}) + var newEtag string = "" + err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize, NewETag: &newEtag}) if err != nil { log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error()) return err } + if newEtag != "" { + handle.SetValue("ETAG", newEtag) + } + // set all the blocks as committed list, _ := handle.GetValue("blockList") listMap := list.(map[int64]*blockInfo) diff --git a/internal/attribute.go b/internal/attribute.go index f43280947..f566be45d 100644 --- a/internal/attribute.go +++ b/internal/attribute.go @@ -78,7 +78,8 @@ type ObjAttr struct { Flags common.BitMap16 // flags Path string // full path Name string // base name of the path - MD5 []byte // MD5 hash of the blob + MD5 []byte // MD5 of the blob as per last GetAttr + ETag string // ETag of the blob as per last GetAttr Metadata map[string]*string // extra information to preserve } diff --git a/internal/component_options.go b/internal/component_options.go index dc53ff2ba..609404aa3 100644 --- a/internal/component_options.go +++ b/internal/component_options.go @@ -107,6 +107,7 @@ type ReadFileOptions struct { type ReadInBufferOptions struct { Handle *handlemap.Handle Offset int64 + Etag *string Data []byte } @@ -202,6 +203,7 @@ type CommitDataOptions struct { Name string List []string BlockSize uint64 + NewETag *string } type CommittedBlock struct {