-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flexible File Filtering Feature for Blobfuse Mounting #1435
Merged
Merged
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
b88dc62
Implemented 1st step that is parsing of filter string given by user
t-yashmahale 8d66bdb
added a dummy threadpool where 16 threads are accessing files parallely
t-yashmahale b88e380
added <= and >= features in size filter
t-yashmahale a3bd1d8
Added all functions for checking the file against filters
t-yashmahale 05e6869
solved bug of deadlock if a file does not returns true for any compar…
t-yashmahale 5107413
created modules for code and added meaningfull names
t-yashmahale f4b9ccf
added comments for complex functions
t-yashmahale 99d3458
added regex filter
t-yashmahale 9f54faa
added modification time filter (currently supporting ust only)
t-yashmahale c817cba
made some changes to make code more efficient
t-yashmahale 448e473
increased modularity
t-yashmahale 02c70d5
more optimised prev commit
t-yashmahale 7af33a5
included reading from output channel
t-yashmahale 1f290e4
passed all variables by refrence and other optimisations
t-yashmahale df32e9e
started integration ,added input flag blobFilter
t-yashmahale 024a9e3
updated filter.go
t-yashmahale aec9db8
integrated filters with blobfuse
t-yashmahale e19c97a
filter integrated with prints
t-yashmahale 27eca4f
minor changes
t-yashmahale 057f84e
_
t-yashmahale c48ef28
invalid input filter handeling done
t-yashmahale 861a4a0
GetAttr done and error handling in case of wrong filter
t-yashmahale 0555121
added comments
t-yashmahale 733bad0
made some more changes to clean code
t-yashmahale 40dc026
made changes
t-yashmahale 8d5cdb0
solved bug of infinite waiting when a list containing no file is pass…
t-yashmahale 2532fa0
resolved issue of infinite waiting in recieveOutput function due to a…
t-yashmahale 08c01f1
added access tier filter
t-yashmahale cd8e469
added directory functionality
t-yashmahale 7f98d4a
added blobtag filter and some other changes
t-yashmahale 61450e3
Enable read-only mode when user specifies filters
t-yashmahale a5228d4
changed name of flag to blob-filter
t-yashmahale File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ import ( | |
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" | ||
"github.com/Azure/azure-storage-fuse/v2/common/config" | ||
"github.com/Azure/azure-storage-fuse/v2/common/log" | ||
"github.com/Azure/azure-storage-fuse/v2/internal/filter" | ||
|
||
"github.com/JeffreyRichter/enum/enum" | ||
) | ||
|
@@ -186,6 +187,7 @@ type AzStorageOptions struct { | |
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"` | ||
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"` | ||
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"` | ||
BlobFilter string `config:"blobFilter" yaml:"blobFilter"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this new config towards the end of the structure so that diff does not show existing items. |
||
|
||
// v1 support | ||
UseAdls bool `config:"use-adls" yaml:"-"` | ||
|
@@ -386,6 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error { | |
|
||
az.stConfig.telemetry = opt.Telemetry | ||
|
||
//if blobFilter is provided, parse string and setup filters | ||
if len(opt.BlobFilter) > 0 { | ||
log.Info("ParseAndValidateConfig : provided filter is %s", opt.BlobFilter) | ||
az.stConfig.filters = &filter.UserInputFilters{} | ||
erro := az.stConfig.filters.ParseInp(&opt.BlobFilter) | ||
log.Info("ParseAndValidateConfig : number of OR seperated filters are %d", len(az.stConfig.filters.FilterArr)) | ||
if erro != nil { | ||
log.Err("ParseAndValidateConfig : mount failed due to an error encountered while parsing") | ||
return erro | ||
} | ||
} | ||
|
||
httpProxyProvided := opt.HttpProxyAddress != "" | ||
httpsProxyProvided := opt.HttpsProxyAddress != "" | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package filter | ||
|
||
import ( | ||
"sync/atomic" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
func (fl *UserInputFilters) ApplyFilterOnBlobs(fileInfos []*internal.ObjAttr) []*internal.ObjAttr { //function called from azstorage.go streamDir func | ||
if len(fileInfos) == 0 { | ||
return fileInfos | ||
} | ||
fv := &FileValidator{ | ||
workers: 16, | ||
atomicflag: 0, | ||
fileCnt: 0, | ||
FilterArr: fl.FilterArr, | ||
} | ||
fv.wgo.Add(1) //kept outside thread | ||
fv.outputChan = make(chan *opdata, fv.workers) | ||
fv.fileInpQueue = make(chan *internal.ObjAttr, fv.workers) | ||
|
||
go fv.RecieveOutput() //thread parellely reading from ouput channel | ||
|
||
for w := 1; w <= fv.workers; w++ { | ||
// fv.wgi.Add(1) | ||
go fv.ChkFile() //go routines for each worker (thread) are called | ||
} | ||
for _, fileinfo := range fileInfos { | ||
// fmt.Println("passedFile: ", *fileinfo) | ||
fv.fileInpQueue <- fileinfo //push all files one by one in channel , if channel is full , it will wait | ||
fv.fileCnt++ //incrementing filecount, this will be used to close output channel | ||
} | ||
|
||
atomic.StoreInt32(&fv.atomicflag, 1) | ||
close(fv.fileInpQueue) //close channel once all files have been processed | ||
// fv.wgi.Wait() | ||
fv.wgo.Wait() //wait for completion of all threads | ||
// fmt.Println("All workers stopped ") //exit | ||
|
||
return fv.finalFiles | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
// formatFilter and its attributes | ||
type FormatFilter struct { | ||
ext_type string | ||
} | ||
|
||
// Apply fucntion for format filter , check wheather a file passes the constraints | ||
func (filter FormatFilter) Apply(fileInfo *internal.ObjAttr) bool { | ||
// fmt.Println("Format Filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
fileExt := filepath.Ext((*fileInfo).Name) | ||
chkstr := "." + filter.ext_type | ||
// fmt.Println(fileExt, " For file :", fileInfo.Name) | ||
return chkstr == fileExt | ||
} | ||
|
||
// used for dynamic creation of formatFilter using map | ||
func newFormatFilter(args ...interface{}) Filter { | ||
return FormatFilter{ | ||
ext_type: args[0].(string), | ||
} | ||
} | ||
|
||
func giveFormatFilterObj(singleFilter *string) (Filter, error) { | ||
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) | ||
erro := errors.New("invalid filter, no files passed") | ||
if (len((*singleFilter)) <= 7) || ((*singleFilter)[6] != '=') || (!((*singleFilter)[7] >= 'a' && (*singleFilter)[7] <= 'z')) { //since len(format) = 6, at next position (ie index 6) there should be "=" only and assuming extention type starts from an alphabet | ||
return nil, erro | ||
} | ||
value := (*singleFilter)[7:] //7 is used because len(format) = 6 + 1 | ||
return newFormatFilter(value), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"strings" | ||
"time" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
// modTimeFilter and its attributes | ||
type modTimeFilter struct { | ||
opr string | ||
value time.Time | ||
} | ||
|
||
// Apply fucntion for modTime filter , check wheather a file passes the constraints | ||
func (filter modTimeFilter) Apply(fileInfo *internal.ObjAttr) bool { | ||
// fmt.Println("modTime Filter ", filter.opr, " ", filter.value, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
fileModTimestr := (*fileInfo).Mtime.UTC().Format(time.RFC1123) | ||
fileModTime, _ := time.Parse(time.RFC1123, fileModTimestr) | ||
// fmt.Println(fileModTime, "this is file mod time") | ||
|
||
if (filter.opr == "<=") && (fileModTime.Before(filter.value) || fileModTime.Equal(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">=") && (fileModTime.After(filter.value) || fileModTime.Equal(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">") && (fileModTime.After(filter.value)) { | ||
return true | ||
} else if (filter.opr == "<") && (fileModTime.Before(filter.value)) { | ||
return true | ||
} else if (filter.opr == "=") && (fileModTime.Equal(filter.value)) { | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
// used for dynamic creation of modTimeFilter using map | ||
func newModTimeFilter(args ...interface{}) Filter { | ||
return modTimeFilter{ | ||
opr: args[0].(string), | ||
value: args[1].(time.Time), | ||
} | ||
} | ||
|
||
func giveModtimeFilterObj(singleFilter *string) (Filter, error) { | ||
erro := errors.New("invalid filter, no files passed") | ||
if strings.Contains((*singleFilter), "<=") { | ||
splitedParts := strings.Split((*singleFilter), "<=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("<=", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), ">=") { | ||
splitedParts := strings.Split((*singleFilter), ">=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter(">=", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), "<") { | ||
splitedParts := strings.Split((*singleFilter), "<") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("<", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), ">") { | ||
splitedParts := strings.Split((*singleFilter), ">") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter(">", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), "=") { | ||
splitedParts := strings.Split((*singleFilter), "=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("=", timeRFC1123), nil | ||
} else { | ||
return nil, erro | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"regexp" | ||
"strings" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
// RegexFilter and its attributes | ||
type regexFilter struct { | ||
regex_inp *regexp.Regexp | ||
} | ||
|
||
// Apply fucntion for regex filter , check wheather a file passes the constraints | ||
func (filter regexFilter) Apply(fileInfo *internal.ObjAttr) bool { | ||
// fmt.Println("regex filter ", filter.regex_inp, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
return filter.regex_inp.MatchString((*fileInfo).Name) | ||
} | ||
|
||
// used for dynamic creation of regexFilter | ||
func newRegexFilter(args ...interface{}) Filter { | ||
return regexFilter{ | ||
regex_inp: args[0].(*regexp.Regexp), | ||
} | ||
} | ||
|
||
func giveRegexFilterObj(singleFilter *string) (Filter, error) { | ||
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) | ||
erro := errors.New("invalid filter, no files passed") | ||
if (len((*singleFilter)) <= 6) || ((*singleFilter)[5] != '=') { //since len(regex) = 5, at next position (ie index 5) there should be "=" pnly | ||
return nil, erro | ||
} | ||
value := (*singleFilter)[6:] //6 is used because len(regex) = 5 + 1 | ||
pattern, err := regexp.Compile(value) | ||
if err != nil { | ||
return nil, erro | ||
} | ||
return newRegexFilter(pattern), nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to return here instead of copying over the slice again.