Skip to content

Commit

Permalink
Merge pull request #1614 from Azure/syeleti/rename
Browse files Browse the repository at this point in the history
Reduce the number of REST calls while doing Rename Operation
  • Loading branch information
syeleti-msft authored Jan 31, 2025
2 parents c71ea55 + 04abe1e commit 70890da
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

**Other Changes**
- Optimized listing operation on HNS account to support symlinks.
- Optimized Rename operation to do less number of REST calls.

**Features**
- Mount container or directory but restrict the view of blobs that you can see. This feature is available only in read-only mount.
Expand Down
22 changes: 19 additions & 3 deletions component/attr_cache/attr_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,21 @@ func (ac *AttrCache) invalidateDirectory(path string) {
ac.invalidatePath(path)
}

// Copies the attr to the given path.
func (ac *AttrCache) updateCacheEntry(path string, attr *internal.ObjAttr) {
cacheEntry, found := ac.cacheMap[path]
if found {
// Copy the attr
cacheEntry.attr = attr
// Update the path inside the attr
cacheEntry.attr.Path = path
// Update the Existence of the entry
cacheEntry.attrFlag.Set(AttrFlagExists)
// Refresh the cache entry
cacheEntry.cachedAt = time.Now()
}
}

// invalidatePath: invalidates a path
func (ac *AttrCache) invalidatePath(path string) {
// Keys in the cache map do not contain trailing /, truncate the path before referencing a key in the map.
Expand Down Expand Up @@ -360,14 +375,15 @@ func (ac *AttrCache) DeleteFile(options internal.DeleteFileOptions) error {
// RenameFile : Mark the source file deleted. Invalidate the destination file.
func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error {
log.Trace("AttrCache::RenameFile : %s -> %s", options.Src, options.Dst)

srcAttr := options.SrcAttr
err := ac.NextComponent().RenameFile(options)
if err == nil {
// Copy source attribute to destination.
// LMT of Source will be modified by next component if the copy is success.
ac.cacheLock.RLock()
defer ac.cacheLock.RUnlock()

ac.updateCacheEntry(options.Dst, srcAttr)
ac.deletePath(options.Src, time.Now())
ac.invalidatePath(options.Dst)
}

return err
Expand Down
49 changes: 47 additions & 2 deletions component/attr_cache/attr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ func assertUntouched(suite *attrCacheTestSuite, path string) {
suite.assert.True(suite.attrCache.cacheMap[path].exists())
}

// This method is used when we transfer the attributes from the src to dst, and mark src as invalid
func assertAttributesTransferred(suite *attrCacheTestSuite, srcAttr *internal.ObjAttr, dstAttr *internal.ObjAttr) {
suite.assert.EqualValues(srcAttr.Size, dstAttr.Size)
suite.assert.EqualValues(srcAttr.Path, dstAttr.Path)
suite.assert.EqualValues(srcAttr.Mode, dstAttr.Mode)
suite.assert.EqualValues(srcAttr.Atime, dstAttr.Atime)
suite.assert.EqualValues(srcAttr.Mtime, dstAttr.Mtime)
suite.assert.EqualValues(srcAttr.Ctime, dstAttr.Ctime)
suite.assert.True(suite.attrCache.cacheMap[dstAttr.Path].exists())
suite.assert.True(suite.attrCache.cacheMap[dstAttr.Path].valid())
}

// If next component changes the times of the attribute.
func assertSrcAttributeTimeChanged(suite *attrCacheTestSuite, srcAttr *internal.ObjAttr, srcAttrCopy internal.ObjAttr) {
suite.assert.NotEqualValues(suite, srcAttr.Atime, srcAttrCopy.Atime)
suite.assert.NotEqualValues(suite, srcAttr.Mtime, srcAttrCopy.Mtime)
suite.assert.NotEqualValues(suite, srcAttr.Ctime, srcAttrCopy.Ctime)
}

// Directory structure
// a/
//
Expand Down Expand Up @@ -676,15 +695,41 @@ func (suite *attrCacheTestSuite) TestRenameFile() {
suite.assert.NotContains(suite.attrCache.cacheMap, src)
suite.assert.NotContains(suite.attrCache.cacheMap, dst)

// Entry Already Exists
// Src, Dst Entry Already Exists
addPathToCache(suite.assert, suite.attrCache, src, false)
addPathToCache(suite.assert, suite.attrCache, dst, false)
options.SrcAttr = suite.attrCache.cacheMap[src].attr
options.SrcAttr.Size = 1
options.SrcAttr.Mode = 2
options.DstAttr = suite.attrCache.cacheMap[dst].attr
options.DstAttr.Size = 3
options.DstAttr.Mode = 4
srcAttrCopy := *options.SrcAttr

suite.mock.EXPECT().RenameFile(options).Return(nil)
err = suite.attrCache.RenameFile(options)
suite.assert.Nil(err)
assertDeleted(suite, src)
modifiedDstAttr := suite.attrCache.cacheMap[dst].attr
assertSrcAttributeTimeChanged(suite, options.SrcAttr, srcAttrCopy)
// Check the attributes of the dst are same as the src.
assertAttributesTransferred(suite, options.SrcAttr, modifiedDstAttr)

// Src Entry Exist and Dst Entry Don't Exist
addPathToCache(suite.assert, suite.attrCache, src, false)
// Add negative entry to cache for Dst
suite.attrCache.cacheMap[dst] = newAttrCacheItem(&internal.ObjAttr{}, false, time.Now())
options.SrcAttr = suite.attrCache.cacheMap[src].attr
options.DstAttr = suite.attrCache.cacheMap[dst].attr
options.SrcAttr.Size = 1
options.SrcAttr.Mode = 2
suite.mock.EXPECT().RenameFile(options).Return(nil)
err = suite.attrCache.RenameFile(options)
suite.assert.Nil(err)
assertDeleted(suite, src)
assertInvalid(suite, dst)
modifiedDstAttr = suite.attrCache.cacheMap[dst].attr
assertSrcAttributeTimeChanged(suite, options.SrcAttr, srcAttrCopy)
assertAttributesTransferred(suite, options.SrcAttr, modifiedDstAttr)
}

// Tests Write File
Expand Down
2 changes: 1 addition & 1 deletion component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (az *AzStorage) DeleteFile(options internal.DeleteFileOptions) error {
func (az *AzStorage) RenameFile(options internal.RenameFileOptions) error {
log.Trace("AzStorage::RenameFile : %s to %s", options.Src, options.Dst)

err := az.storage.RenameFile(options.Src, options.Dst)
err := az.storage.RenameFile(options.Src, options.Dst, options.SrcAttr)

if err == nil {
azStatsCollector.PushEvents(renameFile, options.Src, map[string]interface{}{src: options.Src, dest: options.Dst})
Expand Down
29 changes: 23 additions & 6 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,19 @@ func (bb *BlockBlob) DeleteDirectory(name string) (err error) {

// RenameFile : Rename the file
// Source file must exist in storage account before calling this method.
func (bb *BlockBlob) RenameFile(source string, target string) error {
// When the rename is success, Data, metadata, of the blob will be copied to the destination.
// Creation time and LMT is not preserved for copyBlob API.
// Copy the LMT to the src attr if the copy is success.
// https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id
func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
log.Trace("BlockBlob::RenameFile : %s -> %s", source, target)

blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source))
newBlobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, target))

// not specifying source blob metadata, since passing empty metadata headers copies
// the source blob metadata to destination blob
startCopy, err := newBlobClient.StartCopyFromURL(context.Background(), blobClient.URL(), &blob.StartCopyFromURLOptions{
copyResponse, err := newBlobClient.StartCopyFromURL(context.Background(), blobClient.URL(), &blob.StartCopyFromURLOptions{
Tier: bb.Config.defaultTier,
})

Expand All @@ -341,10 +345,15 @@ func (bb *BlockBlob) RenameFile(source string, target string) error {
return err
}

copyStatus := startCopy.CopyStatus
var dstLMT *time.Time = copyResponse.LastModified

copyStatus := copyResponse.CopyStatus
var prop blob.GetPropertiesResponse
pollCnt := 0
for copyStatus != nil && *copyStatus == blob.CopyStatusTypePending {
time.Sleep(time.Second * 1)
prop, err := newBlobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{
pollCnt++
prop, err = newBlobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{
CPKInfo: bb.blobCPKOpt,
})
if err != nil {
Expand All @@ -353,6 +362,14 @@ func (bb *BlockBlob) RenameFile(source string, target string) error {
copyStatus = prop.CopyStatus
}

if pollCnt > 0 {
dstLMT = prop.LastModified
}

if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess {
modifyLMT(srcAttr, dstLMT)
}

log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target)

// Copy of the file is done so now delete the older file
Expand Down Expand Up @@ -394,7 +411,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error {
for _, blobInfo := range listBlobResp.Segment.BlobItems {
srcDirPresent = true
srcPath := split(bb.Config.prefixPath, *blobInfo.Name)
err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1))
err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1), nil)
if err != nil {
log.Err("BlockBlob::RenameDirectory : Failed to rename file %s [%s]", srcPath, err.Error)
}
Expand All @@ -420,7 +437,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error {
}
}

return bb.RenameFile(source, target)
return bb.RenameFile(source, target, nil)
}

func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err error) {
Expand Down
2 changes: 1 addition & 1 deletion component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type AzConnection interface {
DeleteFile(name string) error
DeleteDirectory(name string) error

RenameFile(string, string) error
RenameFile(string, string, *internal.ObjAttr) error
RenameDirectory(string, string) error

GetAttr(name string) (attr *internal.ObjAttr, err error)
Expand Down
7 changes: 4 additions & 3 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func (dl *Datalake) DeleteDirectory(name string) (err error) {
}

// RenameFile : Rename the file
func (dl *Datalake) RenameFile(source string, target string) error {
// While renaming the file, Creation time is preserved but LMT is changed for the destination blob.
func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
log.Trace("Datalake::RenameFile : %s -> %s", source, target)

fileClient := dl.Filesystem.NewFileClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source)))

_, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{
renameResponse, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{
CPKInfo: dl.datalakeCPKOpt,
})
if err != nil {
Expand All @@ -336,7 +337,7 @@ func (dl *Datalake) RenameFile(source string, target string) error {
return err
}
}

modifyLMT(srcAttr, renameResponse.LastModified)
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ func removeLeadingSlashes(s string) string {
return s
}

func modifyLMT(attr *internal.ObjAttr, lmt *time.Time) {
if attr != nil {
attr.Atime = *lmt
attr.Mtime = *lmt
attr.Ctime = *lmt
}
}

// func parseBlobTags(tags *container.BlobTags) map[string]string {

// if tags == nil {
Expand Down
12 changes: 10 additions & 2 deletions component/libfuse/libfuse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,10 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int {
}
}

err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{Src: srcPath, Dst: dstPath})
err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{
Src: srcPath,
Dst: dstPath,
})
if err != nil {
log.Err("Libfuse::libfuse_rename : error renaming directory %s -> %s [%s]", srcPath, dstPath, err.Error())
return -C.EIO
Expand All @@ -989,7 +992,12 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int {
libfuseStatsCollector.UpdateStats(stats_manager.Increment, renameDir, (int64)(1))

} else {
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{Src: srcPath, Dst: dstPath})
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{
Src: srcPath,
Dst: dstPath,
SrcAttr: srcAttr,
DstAttr: dstAttr,
})
if err != nil {
log.Err("Libfuse::libfuse_rename : error renaming file %s -> %s [%s]", srcPath, dstPath, err.Error())
return -C.EIO
Expand Down
6 changes: 4 additions & 2 deletions internal/component_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ type CloseFileOptions struct {
}

type RenameFileOptions struct {
Src string
Dst string
Src string
Dst string
SrcAttr *ObjAttr
DstAttr *ObjAttr
}

type ReadFileOptions struct {
Expand Down
6 changes: 6 additions & 0 deletions internal/mock_component.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 70890da

Please sign in to comment.