Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible File Filtering Feature for Blobfuse Mounting #1435

Merged
merged 32 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b88dc62
Implemented 1st step that is parsing of filter string given by user
t-yashmahale May 29, 2024
8d66bdb
added a dummy threadpool where 16 threads are accessing files parallely
t-yashmahale May 29, 2024
b88e380
added <= and >= features in size filter
t-yashmahale May 29, 2024
a3bd1d8
Added all functions for checking the file against filters
t-yashmahale May 30, 2024
05e6869
solved bug of deadlock if a file does not returns true for any compar…
t-yashmahale May 30, 2024
5107413
created modules for code and added meaningfull names
t-yashmahale May 31, 2024
f4b9ccf
added comments for complex functions
t-yashmahale May 31, 2024
99d3458
added regex filter
t-yashmahale Jun 3, 2024
9f54faa
added modification time filter (currently supporting ust only)
t-yashmahale Jun 3, 2024
c817cba
made some changes to make code more efficient
t-yashmahale Jun 4, 2024
448e473
increased modularity
t-yashmahale Jun 5, 2024
02c70d5
more optimised prev commit
t-yashmahale Jun 5, 2024
7af33a5
included reading from output channel
t-yashmahale Jun 5, 2024
1f290e4
passed all variables by refrence and other optimisations
t-yashmahale Jun 6, 2024
df32e9e
started integration ,added input flag blobFilter
t-yashmahale Jun 9, 2024
024a9e3
updated filter.go
t-yashmahale Jun 9, 2024
aec9db8
integrated filters with blobfuse
t-yashmahale Jun 10, 2024
e19c97a
filter integrated with prints
t-yashmahale Jun 10, 2024
27eca4f
minor changes
t-yashmahale Jun 11, 2024
057f84e
_
t-yashmahale Jun 11, 2024
c48ef28
invalid input filter handeling done
t-yashmahale Jun 12, 2024
861a4a0
GetAttr done and error handling in case of wrong filter
t-yashmahale Jun 12, 2024
0555121
added comments
t-yashmahale Jun 13, 2024
733bad0
made some more changes to clean code
t-yashmahale Jun 13, 2024
40dc026
made changes
t-yashmahale Jun 13, 2024
8d5cdb0
solved bug of infinite waiting when a list containing no file is pass…
t-yashmahale Jun 13, 2024
2532fa0
resolved issue of infinite waiting in recieveOutput function due to a…
t-yashmahale Jun 14, 2024
08c01f1
added access tier filter
t-yashmahale Jun 18, 2024
cd8e469
added directory functionality
t-yashmahale Jun 18, 2024
7f98d4a
added blobtag filter and some other changes
t-yashmahale Jun 19, 2024
61450e3
Enable read-only mode when user specifies filters
t-yashmahale Jun 24, 2024
a5228d4
changed name of flag to blob-filter
t-yashmahale Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ test/manual_scripts/cachetest.go
lint.log
azure-storage-fuse
bfusemon
test/scripts/dirIterate.go
test/scripts/dirIterate.go
30 changes: 28 additions & 2 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package azstorage

import (
"context"
"errors"
"fmt"
"sync/atomic"
"syscall"
Expand All @@ -45,9 +46,9 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
"github.com/Azure/azure-storage-fuse/v2/internal/handlemap"
"github.com/Azure/azure-storage-fuse/v2/internal/stats_manager"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -298,6 +299,7 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
path := formatListDirName(options.Name)

new_list, new_marker, err := az.storage.List(path, &options.Token, options.Count)

if err != nil {
log.Err("AzStorage::StreamDir : Failed to read dir [%s]", err)
return new_list, "", err
Expand Down Expand Up @@ -331,6 +333,11 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
// increment streamdir call count
azStatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1))

//check for filters provided
if az.stConfig.filters != nil { //only apply if user has given filter
filtered_list := az.stConfig.filters.ApplyFilterOnBlobs(new_list)
return filtered_list, *new_marker, nil
}
return new_list, *new_marker, nil
}

Expand Down Expand Up @@ -521,7 +528,23 @@ func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error)
// Attribute operations
func (az *AzStorage) GetAttr(options internal.GetAttrOptions) (attr *internal.ObjAttr, err error) {
//log.Trace("AzStorage::GetAttr : Get attributes of file %s", name)
return az.storage.GetAttr(options.Name)
// return az.storage.GetAttr(options.Name)
resp, err := az.storage.GetAttr(options.Name)
if err != nil {
return resp, err
}

if az.stConfig.filters != nil {
fileValidatorObj := &filter.FileValidator{
FilterArr: az.stConfig.filters.FilterArr,
}
if fileValidatorObj.CheckFileWithFilters(resp) {
return resp, nil
} else {
return nil, errors.New("the file does not pass the provided filters") //debug
}
}
return resp, err
}

func (az *AzStorage) Chmod(options internal.ChmodOptions) error {
Expand Down Expand Up @@ -654,6 +677,9 @@ func init() {
telemetry := config.AddStringFlag("telemetry", "", "Additional telemetry information.")
config.BindPFlag(compName+".telemetry", telemetry)
telemetry.Hidden = true
//filter
blobFilter := config.AddStringFlag("blobFilter", "", "Filter to apply on blobs.")
config.BindPFlag(compName+".blobFilter", blobFilter)

honourACL := config.AddBoolFlag("honour-acl", false, "Match ObjectID in ACL against the one used for authentication.")
config.BindPFlag(compName+".honour-acl", honourACL)
Expand Down
6 changes: 6 additions & 0 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
Tier: *prop.AccessTier,
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -593,6 +594,7 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
Crtime: dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
Tier: string(*blobInfo.Properties.AccessTier),
}
parseMetadata(attr, blobInfo.Metadata)
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
Expand Down Expand Up @@ -636,6 +638,10 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
attr.Ctime = attr.Mtime
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
attr.Tier = ""
if bb.Config.defaultTier != nil { //if any defualt value of access tier is provided ,set it
attr.Tier = string(*bb.Config.defaultTier)
}
blobList = append(blobList, attr)
}
}
Expand Down
14 changes: 14 additions & 0 deletions component/azstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"

"github.com/JeffreyRichter/enum/enum"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ type AzStorageOptions struct {
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"`
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"`
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"`
BlobFilter string `config:"blobFilter" yaml:"blobFilter"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this new config towards the end of the structure so that diff does not show existing items.


// v1 support
UseAdls bool `config:"use-adls" yaml:"-"`
Expand Down Expand Up @@ -386,6 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error {

az.stConfig.telemetry = opt.Telemetry

//if blobFilter is provided, parse string and setup filters
if len(opt.BlobFilter) > 0 {
log.Info("ParseAndValidateConfig : provided filter is %s", opt.BlobFilter)
az.stConfig.filters = &filter.UserInputFilters{}
erro := az.stConfig.filters.ParseInp(&opt.BlobFilter)
log.Info("ParseAndValidateConfig : number of OR seperated filters are %d", len(az.stConfig.filters.FilterArr))
if erro != nil {
log.Err("ParseAndValidateConfig : mount failed due to an error encountered while parsing")
return erro
}
}

httpProxyProvided := opt.HttpProxyAddress != ""
httpsProxyProvided := opt.HttpsProxyAddress != ""

Expand Down
4 changes: 4 additions & 0 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
)

// Example for azblob usage : https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob#pkg-examples
Expand Down Expand Up @@ -81,6 +82,9 @@ type AzStorageConfig struct {
cpkEnabled bool
cpkEncryptionKey string
cpkEncryptionKeySha256 string

// Filter related config
filters *filter.UserInputFilters
}

type AzStorageConnection struct {
Expand Down
3 changes: 2 additions & 1 deletion component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) {
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
Tier: *prop.AccessTier,
}
parseMetadata(attr, prop.Metadata)

Expand Down Expand Up @@ -471,7 +472,7 @@ func (dl *Datalake) List(prefix string, marker *string, count int32) ([]*interna
for _, pathInfo := range listPath.Paths {
var attr *internal.ObjAttr
var lastModifiedTime time.Time
if dl.Config.disableSymlink {
if (dl.Config.disableSymlink && dl.Config.filters == nil) || (dl.Config.disableSymlink && !dl.Config.filters.TierChk) {
var mode fs.FileMode
if pathInfo.Permissions != nil {
mode, err = getFileMode(*pathInfo.Permissions)
Expand Down
1 change: 1 addition & 0 deletions internal/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type ObjAttr struct {
Name string // base name of the path
MD5 []byte
Metadata map[string]*string // extra information to preserve
Tier string //access tier of blob
}

// IsDir : Test blob is a directory or not
Expand Down
52 changes: 52 additions & 0 deletions internal/filter/accesstier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package filter

import (
"errors"
"fmt"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

type AccessTierFilter struct {
opr string
tier string
}

func (filter AccessTierFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("AccessTier filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
fmt.Println("inside filter tier ", filter, " with given tier ", filter.tier, " and file tier ", fileInfo.Tier)
if (filter.opr == "=") && (filter.tier == strings.ToLower(fileInfo.Tier)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple string checks can be avoided.

return true
} else if (filter.opr == "!=") && (filter.tier != strings.ToLower(fileInfo.Tier)) {
return true
}
return false
}

// used for dynamic creation of AccessTierFilter
func newAccessTierFilter(args ...interface{}) Filter {
return AccessTierFilter{
opr: args[0].(string),
tier: args[1].(string),
}
}

func giveAccessTierFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase
sinChk := (*singleFilter)[4:5] //single char after tier (ex- tier=hot , here sinChk will be "=")
Copy link
Member

@vibhansa-msft vibhansa-msft Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of magic numbers use len("keyword")

doubChk := (*singleFilter)[4:6] //2 chars after tier (ex- tier != cold , here doubChk will be "!=")
erro := errors.New("invalid filter, no files passed")
if !((sinChk == "=") || (doubChk == "!=")) {
return nil, erro
}
if (doubChk == "!=") && (len(*singleFilter) > 6) {
value := (*singleFilter)[6:] // 5 is used since len(tier) = 4 and + 1
return newAccessTierFilter(doubChk, value), nil
} else if (sinChk == "=") && (len(*singleFilter) > 5) {
value := (*singleFilter)[5:] // 5 is used since len(tier) = 4 and + 1
return newAccessTierFilter(sinChk, value), nil
} else {
return nil, erro
}
}
43 changes: 43 additions & 0 deletions internal/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package filter

import (
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
)

func (fl *UserInputFilters) ApplyFilterOnBlobs(fileInfos []*internal.ObjAttr) []*internal.ObjAttr { //function called from azstorage.go streamDir func
log.Debug("came inside filter")
if len(fileInfos) == 0 {
return fileInfos
}
fv := &FileValidator{
workers: 16,
// atomicflag: 0,
fileCnt: int64(len(fileInfos)),
FilterArr: fl.FilterArr,
}
fv.wgo.Add(1) //kept outside thread
fv.outputChan = make(chan *opdata, fv.workers)
fv.fileInpQueue = make(chan *internal.ObjAttr, fv.workers)

go fv.RecieveOutput() //thread parellely reading from ouput channel

for w := 1; w <= fv.workers; w++ {
// fv.wgi.Add(1)
go fv.ChkFile() //go routines for each worker (thread) are called
}
for _, fileinfo := range fileInfos {
// fmt.Println("passedFile: ", *fileinfo)
fv.fileInpQueue <- fileinfo //push all files one by one in channel , if channel is full , it will wait
// fv.fileCnt++ //incrementing filecount, this will be used to close output channel
}

// atomic.StoreInt32(&fv.atomicflag, 1)
close(fv.fileInpQueue) //close channel once all files have been processed
// fv.wgi.Wait()
fv.wgo.Wait() //wait for completion of all threads
// fmt.Println("All workers stopped ") //exit
log.Debug("came outside filter")

return fv.finalFiles
}
40 changes: 40 additions & 0 deletions internal/filter/formatfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package filter

import (
"errors"
"path/filepath"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

// formatFilter and its attributes
type FormatFilter struct {
ext_type string
}

// Apply fucntion for format filter , check wheather a file passes the constraints
func (filter FormatFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("Format Filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
fileExt := filepath.Ext((*fileInfo).Name)
chkstr := "." + filter.ext_type
// fmt.Println(fileExt, " For file :", fileInfo.Name)
return chkstr == fileExt
}

// used for dynamic creation of formatFilter using map
func newFormatFilter(args ...interface{}) Filter {
return FormatFilter{
ext_type: args[0].(string),
}
}

func giveFormatFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter))
erro := errors.New("invalid filter, no files passed")
if (len((*singleFilter)) <= 7) || ((*singleFilter)[6] != '=') || (!((*singleFilter)[7] >= 'a' && (*singleFilter)[7] <= 'z')) { //since len(format) = 6, at next position (ie index 6) there should be "=" only and assuming extention type starts from an alphabet
return nil, erro
}
value := (*singleFilter)[7:] //7 is used because len(format) = 6 + 1
return newFormatFilter(value), nil
}
Loading