Skip to content

Commit

Permalink
feat: filter ordinals
Browse files Browse the repository at this point in the history
Implements #1384

Signed-off-by: Zoltán Reegn <[email protected]>
  • Loading branch information
reegnz committed Oct 25, 2024
1 parent d33b950 commit 625b63e
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 112 deletions.
99 changes: 54 additions & 45 deletions apis/fluentbit/v1alpha2/clusterfilter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type FilterSpec struct {
LogLevel string `json:"logLevel,omitempty"`
// A set of filter plugins in order.
FilterItems []FilterItem `json:"filters,omitempty"`
// An ordinal to influence filter ordering
Ordinal int32 `json:"ordinal,omitempty"`
}

type FilterItem struct {
Expand Down Expand Up @@ -101,19 +103,45 @@ type ClusterFilterList struct {

// +kubebuilder:object:generate:=false

// FilterByName implements sort.Interface for []ClusterFilter based on the Name field.
type FilterByName []ClusterFilter

func (a FilterByName) Len() int { return len(a) }
func (a FilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
// FilterByOrdinalAndName implements sort.Interface for []ClusterFilter based on the Ordinal and Name field.
type FilterByOrdinalAndName []ClusterFilter

func (a FilterByOrdinalAndName) Len() int { return len(a) }
func (a FilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FilterByOrdinalAndName) Less(i, j int) bool {
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
return true
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
return a[i].Name < a[j].Name
} else {
return false
}
}

func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
predicate := func(filter ClusterFilter) bool { return true }
return list.LoadWithPredicate(sl, predicate)
}

func (list ClusterFilterList) LoadBefore(sl plugins.SecretLoader) (string, error) {
predicate := func(filter ClusterFilter) bool { return filter.Spec.Ordinal <= 0 }
return list.LoadWithPredicate(sl, predicate)
}

func (list ClusterFilterList) LoadAfter(sl plugins.SecretLoader) (string, error) {
predicate := func(filter ClusterFilter) bool { return filter.Spec.Ordinal > 0 }
return list.LoadWithPredicate(sl, predicate)
}

func (list ClusterFilterList) LoadWithPredicate(sl plugins.SecretLoader, predicate func(filter ClusterFilter) bool) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))

for _, item := range list.Items {
if !predicate(item) {
continue
}
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
Expand Down Expand Up @@ -154,16 +182,34 @@ func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
}

func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
predicate := func(filter ClusterFilter) bool { return true }
return list.LoadAsYamlWithPredicate(sl, depth, predicate)
}

func (list ClusterFilterList) LoadAsYamlBefore(sl plugins.SecretLoader, depth int) (string, error) {
predicate := func(filter ClusterFilter) bool { return filter.Spec.Ordinal <= 0 }
return list.LoadAsYamlWithPredicate(sl, depth, predicate)
}

func (list ClusterFilterList) LoadAsYamlAfter(sl plugins.SecretLoader, depth int) (string, error) {
predicate := func(filter ClusterFilter) bool { return filter.Spec.Ordinal > 0 }
return list.LoadAsYamlWithPredicate(sl, depth, predicate)
}

func (list ClusterFilterList) LoadAsYamlWithPredicate(sl plugins.SecretLoader, depth int, predicate func(filter ClusterFilter) bool) (string, error) {
var buf bytes.Buffer

sort.Sort(FilterByName(list.Items))
sort.Sort(FilterByOrdinalAndName(list.Items))
if len(list.Items) == 0 {
return "", nil
}
buf.WriteString(fmt.Sprintf("%sfilters:\n", utils.YamlIndent(depth)))
padding := utils.YamlIndent(depth + 2)

for _, item := range list.Items {
if !predicate(item) {
continue
}
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
Expand Down Expand Up @@ -202,43 +248,6 @@ func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (st
return buf.String(), nil
}

func (clusterFilter ClusterFilter) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
var buf bytes.Buffer
padding := utils.YamlIndent(depth + 2)
merge := func(p plugins.Plugin) error {
if p == nil || reflect.ValueOf(p).IsNil() {
return nil
}

if p.Name() != "" {
buf.WriteString(fmt.Sprintf("%s- name: %s\n", utils.YamlIndent(depth+1), p.Name()))
}
if clusterFilter.Spec.LogLevel != "" {
buf.WriteString(fmt.Sprintf("%slog_level: %s\n", padding, clusterFilter.Spec.LogLevel))
}
if clusterFilter.Spec.Match != "" {
buf.WriteString(fmt.Sprintf("%smatch: \"%s\"\n", padding, clusterFilter.Spec.Match))
}
if clusterFilter.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf("%smatch_regex: %s\n", padding, clusterFilter.Spec.MatchRegex))
}
kvs, err := p.Params(sl)
if err != nil {
return err
}
buf.WriteString(kvs.YamlString(depth + 2))
return nil
}
for _, elem := range clusterFilter.Spec.FilterItems {
for i := 0; i < reflect.ValueOf(elem).NumField(); i++ {
p, _ := reflect.ValueOf(elem).Field(i).Interface().(plugins.Plugin)
if err := merge(p); err != nil {
return "", err
}
}
}
return buf.String(), nil
}
func init() {
SchemeBuilder.Register(&ClusterFilter{}, &ClusterFilterList{})
}
206 changes: 202 additions & 4 deletions apis/fluentbit/v1alpha2/clusterfilter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestClusterFilterList_Load(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name modify
Match logs.foo.bar
Condition Key_value_equals kve0 kvev0
Expand Down Expand Up @@ -185,8 +185,206 @@ func TestClusterFilterList_Load(t *testing.T) {
}
}

func TestClusterFilterList_Load_Before(t *testing.T) {
filtersExpected := `[Filter]
Name grep
Match *
Alias third
Regex ^.*$
[Filter]
Name grep
Match *
Alias first
Regex ^.*$
`

g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, "testnamespace")

filterObj1 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "first",
},
Spec: FilterSpec{
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "first",
},
Regex: "^.*$",
},
},
},
},
}

filterObj2 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "second",
},
Spec: FilterSpec{
Ordinal: 10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "second",
},
Regex: "^.*$",
},
},
},
},
}

filterObj3 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "third",
},
Spec: FilterSpec{
Ordinal: -10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "third",
},
Regex: "^.*$",
},
},
},
},
}

filters := ClusterFilterList{
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
}

i := 0
for i < 5 {
clusterFilters, err := filters.LoadBefore(sl)
g.Expect(err).NotTo(HaveOccurred())

g.Expect(clusterFilters).To(Equal(filtersExpected))

i++
}
}

func TestClusterFilterList_Load_After(t *testing.T) {
filtersExpected := `[Filter]
Name grep
Match *
Alias second
Regex ^.*$
`

g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, "testnamespace")

filterObj1 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "first",
},
Spec: FilterSpec{
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "first",
},
Regex: "^.*$",
},
},
},
},
}

filterObj2 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "second",
},
Spec: FilterSpec{
Ordinal: 10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "second",
},
Regex: "^.*$",
},
},
},
},
}

filterObj3 := &ClusterFilter{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterFilter",
},
ObjectMeta: metav1.ObjectMeta{
Name: "third",
},
Spec: FilterSpec{
Ordinal: -10,
Match: "*",
FilterItems: []FilterItem{
{
Grep: &filter.Grep{
CommonParams: plugins.CommonParams{
Alias: "third",
},
Regex: "^.*$",
},
},
},
},
}

filters := ClusterFilterList{
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
}

i := 0
for i < 5 {
clusterFilters, err := filters.LoadAfter(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(clusterFilters).To(Equal(filtersExpected))

i++
}
}

func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
var filtersExpected = `[Filter]
filtersExpected := `[Filter]
Name record_modifier
Match logs.foo.bar
Record hostname ${HOSTNAME}
Expand Down Expand Up @@ -260,7 +458,7 @@ func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
}

func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: modify
match: "logs.foo.bar"
condition:
Expand Down Expand Up @@ -438,7 +636,7 @@ func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
}

func TestClusterFilter_RecordModifier_Generated_Load_As_Yaml(t *testing.T) {
var filtersExpected = `filters:
filtersExpected := `filters:
- name: record_modifier
match: "logs.foo.bar"
record:
Expand Down
Loading

0 comments on commit 625b63e

Please sign in to comment.