-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flexible File Filtering Feature for Blobfuse Mounting #1435
Changes from 24 commits
b88dc62
8d66bdb
b88e380
a3bd1d8
05e6869
5107413
f4b9ccf
99d3458
9f54faa
c817cba
448e473
02c70d5
7af33a5
1f290e4
df32e9e
024a9e3
aec9db8
e19c97a
27eca4f
057f84e
c48ef28
861a4a0
0555121
733bad0
40dc026
8d5cdb0
2532fa0
08c01f1
cd8e469
7f98d4a
61450e3
a5228d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,4 @@ lint.log | |
azure-storage-fuse | ||
bfusemon | ||
test/scripts/dirIterate.go | ||
filterbin | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ package azstorage | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync/atomic" | ||
"syscall" | ||
|
@@ -45,9 +46,9 @@ import ( | |
"github.com/Azure/azure-storage-fuse/v2/common/config" | ||
"github.com/Azure/azure-storage-fuse/v2/common/log" | ||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
"github.com/Azure/azure-storage-fuse/v2/internal/filter" | ||
"github.com/Azure/azure-storage-fuse/v2/internal/handlemap" | ||
"github.com/Azure/azure-storage-fuse/v2/internal/stats_manager" | ||
|
||
"github.com/spf13/cobra" | ||
) | ||
|
||
|
@@ -298,6 +299,7 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O | |
path := formatListDirName(options.Name) | ||
|
||
new_list, new_marker, err := az.storage.List(path, &options.Token, options.Count) | ||
// fmt.Println(filter.GlbFilterArr) | ||
if err != nil { | ||
log.Err("AzStorage::StreamDir : Failed to read dir [%s]", err) | ||
return new_list, "", err | ||
|
@@ -331,6 +333,10 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O | |
// increment streamdir call count | ||
azStatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1)) | ||
|
||
//check for filters provided | ||
if len(az.stConfig.blobFilter) > 0 { //only apply filter if user has given | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After parsing we might have created a filter object so instead of checking the string length here rather we can check if that object is nil or not, will be a faster check. |
||
new_list = filter.ApplyFilterOnBlobs(new_list) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we keep the input and output variables same ? just think it might not be a safer way to do conversion. |
||
} | ||
return new_list, *new_marker, nil | ||
} | ||
|
||
|
@@ -521,7 +527,23 @@ func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error) | |
// Attribute operations | ||
func (az *AzStorage) GetAttr(options internal.GetAttrOptions) (attr *internal.ObjAttr, err error) { | ||
//log.Trace("AzStorage::GetAttr : Get attributes of file %s", name) | ||
return az.storage.GetAttr(options.Name) | ||
resp, err := az.storage.GetAttr(options.Name) | ||
if err != nil { | ||
return resp, err | ||
} | ||
|
||
if len(az.stConfig.blobFilter) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of checking length try to check object is nil or not. |
||
fv1 := &filter.FileValidator{ | ||
FilterArr: filter.GlbFilterArr, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we can name the variables better than fv1 and GlbFilterArr. |
||
} | ||
if fv1.CheckFileWithFilters(resp) { | ||
return resp, nil | ||
} else { | ||
return nil, errors.New("the file does not pass the provided filters") //debug | ||
} | ||
} | ||
return resp, err | ||
// return az.storage.GetAttr(options.Name) | ||
} | ||
|
||
func (az *AzStorage) Chmod(options internal.ChmodOptions) error { | ||
|
@@ -654,6 +676,9 @@ func init() { | |
telemetry := config.AddStringFlag("telemetry", "", "Additional telemetry information.") | ||
config.BindPFlag(compName+".telemetry", telemetry) | ||
telemetry.Hidden = true | ||
//filter | ||
blobFilter := config.AddStringFlag("blobFilter", "", "Filter to apply on blobs.") | ||
config.BindPFlag(compName+".blobFilter", blobFilter) | ||
|
||
honourACL := config.AddBoolFlag("honour-acl", false, "Match ObjectID in ACL against the one used for authentication.") | ||
config.BindPFlag(compName+".honour-acl", honourACL) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ import ( | |
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" | ||
"github.com/Azure/azure-storage-fuse/v2/common/config" | ||
"github.com/Azure/azure-storage-fuse/v2/common/log" | ||
"github.com/Azure/azure-storage-fuse/v2/internal/filter" | ||
|
||
"github.com/JeffreyRichter/enum/enum" | ||
) | ||
|
@@ -182,10 +183,12 @@ type AzStorageOptions struct { | |
MaxResultsForList int32 `config:"max-results-for-list" yaml:"max-results-for-list"` | ||
DisableCompression bool `config:"disable-compression" yaml:"disable-compression"` | ||
Telemetry string `config:"telemetry" yaml:"telemetry"` | ||
HonourACL bool `config:"honour-acl" yaml:"honour-acl"` | ||
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"` | ||
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"` | ||
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"` | ||
BlobFilter string `config:"blobFilter" yaml:"blobFilter"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this new config towards the end of the structure so that diff does not show existing items. |
||
//filter | ||
HonourACL bool `config:"honour-acl" yaml:"honour-acl"` | ||
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"` | ||
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"` | ||
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"` | ||
|
||
// v1 support | ||
UseAdls bool `config:"use-adls" yaml:"-"` | ||
|
@@ -385,7 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error { | |
az.stConfig.cancelListForSeconds = opt.CancelListForSeconds | ||
|
||
az.stConfig.telemetry = opt.Telemetry | ||
|
||
az.stConfig.blobFilter = opt.BlobFilter | ||
if len(opt.BlobFilter) > 0 { | ||
fmt.Println(opt.BlobFilter) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can move fmt.Println to log.Debug as blobfuse will have no console attached to print, |
||
erro := filter.ParseInp(&opt.BlobFilter) | ||
fmt.Println(len(filter.GlbFilterArr)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These shall be converted to log.Debug or log.Info |
||
fmt.Println(len(opt.BlobFilter)) | ||
if erro != nil { | ||
log.Err("no filters applied, mount failed") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For log.Debug/Err we are following a format which is consistent across all our logs. Just check other existing logs in this file and modify accordingly. |
||
return erro | ||
} | ||
} | ||
//blobfilter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deadcode can be removed. |
||
httpProxyProvided := opt.HttpProxyAddress != "" | ||
httpsProxyProvided := opt.HttpsProxyAddress != "" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package filter | ||
|
||
import ( | ||
"sync/atomic" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
var GlbFilterArr [][]Filter //it will store the fliters, outer array splitted by ||, inner array splitted by && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As part of design we shall move these filters to be a class so that there can be multiple instances of such filters across the blobfuse code. Current design of having it as a global variable restricts that only one such filter can exists in scope of blobfuse instance. |
||
|
||
func ApplyFilterOnBlobs(fileInfos []*internal.ObjAttr) []*internal.ObjAttr { //function called from azstorage.go streamDir func | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From outside it shall be an object based call.
|
||
fv := &FileValidator{ | ||
workers: 16, | ||
atomicflag: 0, | ||
fileCnt: 0, | ||
FilterArr: GlbFilterArr, | ||
} | ||
fv.wgo.Add(1) //kept outside thread | ||
fv.outputChan = make(chan *opdata, fv.workers) | ||
fv.fileInpQueue = make(chan *internal.ObjAttr, fv.workers) | ||
|
||
go fv.RecieveOutput() //thread parellely reading from ouput channel | ||
|
||
for w := 1; w <= fv.workers; w++ { | ||
// fv.wgi.Add(1) | ||
go fv.ChkFile() //go routines for each worker (thread) are called | ||
} | ||
for _, fileinfo := range fileInfos { | ||
// fmt.Println("passedFile: ", *fileinfo) | ||
fv.fileInpQueue <- fileinfo //push all files one by one in channel , if channel is full , it will wait | ||
fv.fileCnt++ //incrementing filecount, this will be used to close output channel | ||
} | ||
|
||
atomic.StoreInt32(&fv.atomicflag, 1) | ||
close(fv.fileInpQueue) //close channel once all files have been processed | ||
// fv.wgi.Wait() | ||
fv.wgo.Wait() //wait for completion of all threads | ||
// fmt.Println("All workers stopped ") //exit | ||
|
||
return fv.finalFiles | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
type FormatFilter struct { //formatFilter and its attributes | ||
ext_type string | ||
} | ||
|
||
func (filter FormatFilter) Apply(fileInfo *internal.ObjAttr) bool { //Apply fucntion for format filter , check wheather a file passes the constraints | ||
// fmt.Println("Format Filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
fileExt := filepath.Ext((*fileInfo).Name) | ||
chkstr := "." + filter.ext_type | ||
// fmt.Println(fileExt, " For file :", fileInfo.Name) | ||
return chkstr == fileExt | ||
} | ||
|
||
func newFormatFilter(args ...interface{}) Filter { // used for dynamic creation of formatFilter using map | ||
return FormatFilter{ | ||
ext_type: args[0].(string), | ||
} | ||
} | ||
|
||
func giveFormatFilterObj(singleFilter *string) (Filter, error) { | ||
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) | ||
erro := errors.New("invalid filter, no files passed") | ||
if (len((*singleFilter)) <= 7) || ((*singleFilter)[6] != '=') || (!((*singleFilter)[7] >= 'a' && (*singleFilter)[7] <= 'z')) { //since len(format) = 6, at next position (ie index 6) there should be "=" only and assuming extention type starts from an alphabet | ||
return nil, erro | ||
} | ||
value := (*singleFilter)[7:] //7 is used because len(format) = 6 + 1 | ||
return newFormatFilter(value), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"strings" | ||
"time" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
type modTimeFilter struct { //modTimeFilter and its attributes | ||
opr string | ||
value time.Time | ||
} | ||
|
||
func (filter modTimeFilter) Apply(fileInfo *internal.ObjAttr) bool { //Apply fucntion for modTime filter , check wheather a file passes the constraints | ||
// fmt.Println("modTime Filter ", filter.opr, " ", filter.value, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
fileModTimestr := (*fileInfo).Mtime.UTC().Format(time.RFC1123) | ||
fileModTime, _ := time.Parse(time.RFC1123, fileModTimestr) | ||
// fmt.Println(fileModTime, "this is file mod time") | ||
|
||
if (filter.opr == "<=") && (fileModTime.Before(filter.value) || fileModTime.Equal(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">=") && (fileModTime.After(filter.value) || fileModTime.Equal(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">") && (fileModTime.After(filter.value)) { | ||
return true | ||
} else if (filter.opr == "<") && (fileModTime.Before(filter.value)) { | ||
return true | ||
} else if (filter.opr == "=") && (fileModTime.Equal(filter.value)) { | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
func newModTimeFilter(args ...interface{}) Filter { // used for dynamic creation of modTimeFilter using map | ||
return modTimeFilter{ | ||
opr: args[0].(string), | ||
value: args[1].(time.Time), | ||
} | ||
} | ||
|
||
func giveModtimeFilterObj(singleFilter *string) (Filter, error) { | ||
erro := errors.New("invalid filter, no files passed") | ||
if strings.Contains((*singleFilter), "<=") { | ||
splitedParts := strings.Split((*singleFilter), "<=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("<=", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), ">=") { | ||
splitedParts := strings.Split((*singleFilter), ">=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter(">=", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), "<") { | ||
splitedParts := strings.Split((*singleFilter), "<") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("<", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), ">") { | ||
splitedParts := strings.Split((*singleFilter), ">") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter(">", timeRFC1123), nil | ||
} else if strings.Contains((*singleFilter), "=") { | ||
splitedParts := strings.Split((*singleFilter), "=") | ||
timeRFC1123str := strings.TrimSpace(splitedParts[1]) | ||
timeRFC1123, _ := time.Parse(time.RFC1123, timeRFC1123str) | ||
return newModTimeFilter("=", timeRFC1123), nil | ||
} else { | ||
return nil, erro | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"regexp" | ||
"strings" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
type regexFilter struct { //RegexFilter and its attributes | ||
regex_inp *regexp.Regexp | ||
} | ||
|
||
func (filter regexFilter) Apply(fileInfo *internal.ObjAttr) bool { //Apply fucntion for regex filter , check wheather a file passes the constraints | ||
// fmt.Println("regex filter ", filter.regex_inp, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
return filter.regex_inp.MatchString((*fileInfo).Name) | ||
} | ||
|
||
func newRegexFilter(args ...interface{}) Filter { // used for dynamic creation of regexFilter | ||
return regexFilter{ | ||
regex_inp: args[0].(*regexp.Regexp), | ||
} | ||
} | ||
|
||
func giveRegexFilterObj(singleFilter *string) (Filter, error) { | ||
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) | ||
erro := errors.New("invalid filter, no files passed") | ||
if (len((*singleFilter)) <= 6) || ((*singleFilter)[5] != '=') { //since len(regex) = 5, at next position (ie index 5) there should be "=" pnly | ||
return nil, erro | ||
} | ||
value := (*singleFilter)[6:] //6 is used because len(regex) = 5 + 1 | ||
pattern, err := regexp.Compile(value) | ||
if err != nil { | ||
return nil, erro | ||
} | ||
return newRegexFilter(pattern), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package filter | ||
|
||
import ( | ||
"errors" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/Azure/azure-storage-fuse/v2/internal" | ||
) | ||
|
||
type SizeFilter struct { //SizeFilter and its attributes | ||
opr string | ||
value float64 | ||
} | ||
|
||
func (filter SizeFilter) Apply(fileInfo *internal.ObjAttr) bool { //Apply fucntion for size filter , check wheather a file passes the constraints | ||
// fmt.Println("size filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT | ||
|
||
if (filter.opr == "<=") && ((*fileInfo).Size <= int64(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">=") && ((*fileInfo).Size >= int64(filter.value)) { | ||
return true | ||
} else if (filter.opr == ">") && ((*fileInfo).Size > int64(filter.value)) { | ||
return true | ||
} else if (filter.opr == "<") && ((*fileInfo).Size < int64(filter.value)) { | ||
return true | ||
} else if (filter.opr == "=") && ((*fileInfo).Size == int64(filter.value)) { | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
func newSizeFilter(args ...interface{}) Filter { // used for dynamic creation of sizeFilter | ||
return SizeFilter{ | ||
opr: args[0].(string), | ||
value: args[1].(float64), | ||
} | ||
} | ||
|
||
func giveSizeFilterObj(singleFilter *string) (Filter, error) { | ||
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase | ||
sinChk := (*singleFilter)[4:5] //single char after size (ex- size=7888 , here sinChk will be "=") | ||
doubChk := (*singleFilter)[4:6] //2 chars after size (ex- size>=8908 , here doubChk will be ">=") | ||
erro := errors.New("invalid filter, no files passed") | ||
if !((sinChk == "=") || (sinChk == ">") || (sinChk == "<") || (doubChk == ">=") || (doubChk == "<=")) { | ||
return nil, erro | ||
} | ||
value := (*singleFilter)[5:] // 5 is used since len(size) = 4 and + 1 | ||
floatVal, err := strconv.ParseFloat(value, 64) | ||
if err != nil { | ||
if (*singleFilter)[5] != '=' { | ||
return nil, erro | ||
} else { | ||
value := (*singleFilter)[6:] // 5 is used since len(size) = 4 and + 2 | ||
floatVal, err = strconv.ParseFloat(value, 64) | ||
if err != nil { | ||
return nil, erro | ||
} | ||
return newSizeFilter((*singleFilter)[4:6], floatVal), nil // 4 to 6 will give operator ex "<=" | ||
} | ||
} else { | ||
return newSizeFilter((*singleFilter)[4:5], floatVal), nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is this binary file _debug* which can be removed from the PR.