Skip to content

Commit

Permalink
Flexible File Filtering Feature for Blobfuse Mounting (#1435)
Browse files Browse the repository at this point in the history
* Implemented 1st step that is parsing of filter string given by user
  • Loading branch information
t-yashmahale authored Dec 9, 2024
1 parent a68cace commit 8fb1141
Show file tree
Hide file tree
Showing 17 changed files with 670 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ test/manual_scripts/cachetest.go
lint.log
azure-storage-fuse
bfusemon
test/scripts/dirIterate.go
test/scripts/dirIterate.go
13 changes: 13 additions & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"

"github.com/sevlyar/go-daemon"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -88,6 +89,7 @@ type mountOptions struct {
MonitorOpt monitorOptions `config:"health_monitor"`
WaitForMount time.Duration `config:"wait-for-mount"`
LazyWrite bool `config:"lazy-write"`
blobFilter string `config:"blob-filter"`

// v1 support
Streaming bool `config:"streaming"`
Expand Down Expand Up @@ -242,6 +244,13 @@ var mountCmd = &cobra.Command{

configFileExists := true

if config.IsSet("blob-filter") {
if len(options.blobFilter) > 0 {
filter.ProvidedFilter = options.blobFilter
config.Set("read-only", "true") //set read-only mode if filter is provided
}
}

if options.ConfigFile == "" {
// Config file is not set in cli parameters
// Blobfuse2 defaults to config.yaml in current directory
Expand Down Expand Up @@ -707,6 +716,10 @@ func init() {
config.BindPFlag("pre-mount-validate", mountCmd.Flags().Lookup("pre-mount-validate"))
mountCmd.Flags().Lookup("pre-mount-validate").Hidden = true

//accessing blobFilter
mountCmd.PersistentFlags().StringVar(&options.blobFilter, "blob-filter", "", "Filter string for blob filtering.")
config.BindPFlag("blob-filter", mountCmd.PersistentFlags().Lookup("blob-filter"))

mountCmd.Flags().Bool("basic-remount-check", true, "Validate blobfuse2 is mounted by reading /etc/mtab.")
config.BindPFlag("basic-remount-check", mountCmd.Flags().Lookup("basic-remount-check"))
mountCmd.Flags().Lookup("basic-remount-check").Hidden = true
Expand Down
27 changes: 25 additions & 2 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package azstorage

import (
"context"
"errors"
"fmt"
"sync/atomic"
"syscall"
Expand All @@ -45,9 +46,9 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
"github.com/Azure/azure-storage-fuse/v2/internal/handlemap"
"github.com/Azure/azure-storage-fuse/v2/internal/stats_manager"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -298,6 +299,7 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
path := formatListDirName(options.Name)

new_list, new_marker, err := az.storage.List(path, &options.Token, options.Count)

if err != nil {
log.Err("AzStorage::StreamDir : Failed to read dir [%s]", err)
return new_list, "", err
Expand Down Expand Up @@ -331,6 +333,11 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
// increment streamdir call count
azStatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1))

//check for filters provided
if az.stConfig.filters != nil { //only apply if user has given filter
filtered_list := az.stConfig.filters.ApplyFilterOnBlobs(new_list)
return filtered_list, *new_marker, nil
}
return new_list, *new_marker, nil
}

Expand Down Expand Up @@ -521,7 +528,23 @@ func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error)
// Attribute operations
func (az *AzStorage) GetAttr(options internal.GetAttrOptions) (attr *internal.ObjAttr, err error) {
//log.Trace("AzStorage::GetAttr : Get attributes of file %s", name)
return az.storage.GetAttr(options.Name)
// return az.storage.GetAttr(options.Name)
resp, err := az.storage.GetAttr(options.Name)
if err != nil {
return resp, err
}

if az.stConfig.filters != nil {
fileValidatorObj := &filter.FileValidator{
FilterArr: az.stConfig.filters.FilterArr,
}
if fileValidatorObj.CheckFileWithFilters(resp) { //if this particular file passes all filters, return it
return resp, nil
} else {
return nil, errors.New("the file does not pass the provided filters")
}
}
return resp, err
}

func (az *AzStorage) Chmod(options internal.ChmodOptions) error {
Expand Down
13 changes: 13 additions & 0 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (bb *BlockBlob) Configure(cfg AzStorageConfig) error {
Snapshots: false,
}

//if filter is provided, and blobtag filter is also present then we need the details about blobtags
if bb.AzStorageConnection.Config.filters != nil && bb.AzStorageConnection.Config.filters.TagChk {
bb.listDetails.Tags = true
}
return nil
}

Expand Down Expand Up @@ -446,6 +450,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
Tier: *prop.AccessTier,
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -593,11 +598,15 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
Crtime: dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
Tier: string(*blobInfo.Properties.AccessTier),
}
parseMetadata(attr, blobInfo.Metadata)
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
}
if bb.listDetails.Tags { //if we need blobtags
attr.Tags = parseBlobTags(blobInfo.BlobTags)
}
blobList = append(blobList, attr)

if attr.IsDir() {
Expand Down Expand Up @@ -636,6 +645,10 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
attr.Ctime = attr.Mtime
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
attr.Tier = ""
if bb.Config.defaultTier != nil { //if any defualt value of access tier is provided ,set it
attr.Tier = string(*bb.Config.defaultTier)
}
blobList = append(blobList, attr)
}
}
Expand Down
14 changes: 14 additions & 0 deletions component/azstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"

"github.com/JeffreyRichter/enum/enum"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ type AzStorageOptions struct {
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"`
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"`
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"`
// BlobFilter string `config:"blobFilter" yaml:"blobFilter"`

// v1 support
UseAdls bool `config:"use-adls" yaml:"-"`
Expand Down Expand Up @@ -386,6 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error {

az.stConfig.telemetry = opt.Telemetry

//if blobFilter is provided, parse string and setup filters
if len(filter.ProvidedFilter) > 0 {
log.Info("ParseAndValidateConfig : provided filter is %s", filter.ProvidedFilter)
az.stConfig.filters = &filter.UserInputFilters{}
erro := az.stConfig.filters.ParseInp(&filter.ProvidedFilter)
log.Info("ParseAndValidateConfig : number of OR seperated filters are %d", len(az.stConfig.filters.FilterArr))
if erro != nil {
log.Err("ParseAndValidateConfig : mount failed due to an error encountered while parsing")
return erro
}
}

httpProxyProvided := opt.HttpProxyAddress != ""
httpsProxyProvided := opt.HttpsProxyAddress != ""

Expand Down
4 changes: 4 additions & 0 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
)

// Example for azblob usage : https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob#pkg-examples
Expand Down Expand Up @@ -81,6 +82,9 @@ type AzStorageConfig struct {
cpkEnabled bool
cpkEncryptionKey string
cpkEncryptionKeySha256 string

// Filter related config
filters *filter.UserInputFilters
}

type AzStorageConnection struct {
Expand Down
4 changes: 3 additions & 1 deletion component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) {
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
Tier: *prop.AccessTier, //set up tier
}
parseMetadata(attr, prop.Metadata)

Expand Down Expand Up @@ -471,7 +472,8 @@ func (dl *Datalake) List(prefix string, marker *string, count int32) ([]*interna
for _, pathInfo := range listPath.Paths {
var attr *internal.ObjAttr
var lastModifiedTime time.Time
if dl.Config.disableSymlink {
//if tier filter is provided by user then we need to set it up in the else statement
if (dl.Config.disableSymlink && dl.Config.filters == nil) || (dl.Config.disableSymlink && !dl.Config.filters.TierChk) {
var mode fs.FileMode
if pathInfo.Permissions != nil {
mode, err = getFileMode(*pathInfo.Permissions)
Expand Down
19 changes: 19 additions & 0 deletions component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
serviceBfs "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-storage-fuse/v2/common"
Expand Down Expand Up @@ -304,6 +305,24 @@ func parseMetadata(attr *internal.ObjAttr, metadata map[string]*string) {
}
}

// ----------- BlobTags handling ---------------
func parseBlobTags(tags *container.BlobTags) map[string]string {
blobtags := make(map[string]string) //pushing blobtags in map for fast execution during filtering
if tags != nil {
for _, tag := range tags.BlobTagSet {
if tag != nil {
if tag.Key != nil {
blobtags[*tag.Key] = ""
}
if tag.Value != nil {
blobtags[*tag.Key] = *tag.Value
}
}
}
}
return blobtags
}

// ----------- Content-type handling ---------------

// ContentTypeMap : Store file extension to content-type mapping
Expand Down
2 changes: 2 additions & 0 deletions internal/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ObjAttr struct {
Name string // base name of the path
MD5 []byte
Metadata map[string]*string // extra information to preserve
Tier string //access tier of blob
Tags map[string]string //blobtags (key value pair)
}

// IsDir : Test blob is a directory or not
Expand Down
47 changes: 47 additions & 0 deletions internal/filter/accesstier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package filter

import (
"errors"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

const lenTier = len(tier)

type AccessTierFilter struct {
opr bool // true means equal to , false means not equal to
tier string
}

func (filter AccessTierFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("AccessTier filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
return (filter.opr == (filter.tier == strings.ToLower(fileInfo.Tier))) //if both are same then return true
}

// used for dynamic creation of AccessTierFilter
func newAccessTierFilter(args ...interface{}) Filter {
return AccessTierFilter{
opr: args[0].(bool),
tier: args[1].(string),
}
}

func giveAccessTierFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase
sinChk := (*singleFilter)[lenTier : lenTier+1] //single char after tier (ex- tier=hot , here sinChk will be "=")
doubChk := (*singleFilter)[lenTier : lenTier+2] //2 chars after tier (ex- tier != cold , here doubChk will be "!=")
erro := errors.New("invalid accesstier filter, no files passed")
if !((sinChk == "=") || (doubChk == "!=")) {
return nil, erro
}
if (doubChk == "!=") && (len(*singleFilter) > lenTier+2) {
value := (*singleFilter)[lenTier+2:] // len(tier) + 2 = 4 and + 2
return newAccessTierFilter(false, value), nil
} else if (sinChk == "=") && (len(*singleFilter) > lenTier+1) {
value := (*singleFilter)[lenTier+1:] // len(tier) + 1 = 4 and + 1
return newAccessTierFilter(true, value), nil
} else {
return nil, erro
}
}
53 changes: 53 additions & 0 deletions internal/filter/blobtag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package filter

import (
"errors"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

const lenTag = len(tag)

type BlobTagFilter struct {
key string
value string
}

func (filter BlobTagFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("BlobTag filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
if val, ok := fileInfo.Tags[filter.key]; ok {
return (filter.value == strings.ToLower(val))
}
return false
}

// used for dynamic creation of BlobTagFilter
func newBlobTagFilter(args ...interface{}) Filter {
return BlobTagFilter{
key: args[0].(string),
value: args[1].(string),
}
}

func giveBlobTagFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase
sinChk := (*singleFilter)[lenTag : lenTag+1] //single char after tag (ex- tag=hot:yes , here sinChk will be "=")
erro := errors.New("invalid blobtag filter, no files passed")
if !(sinChk == "=") {
return nil, erro
}
splitEq := strings.Split(*singleFilter, "=")
if len(splitEq) == 2 {
splitCol := strings.Split(splitEq[1], ":")
if len(splitCol) == 2 {
tagKey := splitCol[0]
tagVal := splitCol[1]
return newBlobTagFilter(tagKey, tagVal), nil
} else {
return nil, erro
}
} else {
return nil, erro
}
}
Loading

0 comments on commit 8fb1141

Please sign in to comment.