Skip to content

Commit

Permalink
feat: add advanced message filtering options
Browse files Browse the repository at this point in the history
- Introduced filtering options to include or exclude messages based on patterns.
- Added `max-message-size` to limit the size of messages processed.
- Implemented regex filtering using `regexp` for flexible pattern matching.
- Integrated JSON filtering with `gojq` to enable powerful filtering with JSON expressions.
- Updated command-line flags to support new filtering features:
    - `--include-patterns` to specify inclusion patterns.
    - `--exclude-patterns` to exclude messages matching specified patterns.
    - `--regex-filter` for regex-based filtering.
    - `--json-filter` for JSON-based filtering.
    - `--max-message-size` to set a maximum message size in bytes.
- Refactored message processing pipeline in `Consumer` to support layered filters.
- Updated `PrintConfig` to include the new filter configuration.
  • Loading branch information
marianozunino committed Nov 21, 2024
1 parent ce1f731 commit 3442bee
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 3 deletions.
5 changes: 5 additions & 0 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ Messages can be captured from an AMQP or AMQPS RabbitMQ server, with flexible TL
config.WithQueue(queue),
config.WithStopAfterConsume(stopAfterConsume),
config.WithAutoAck(autoAck),

config.WithIncludePatterns(viper.GetStringSlice("include-patterns")),
config.WithExcludePatterns(viper.GetStringSlice("exclude-patterns")),
config.WithMaxMessageSize(viper.GetInt("max-message-size")),
config.WithRegexFilter(viper.GetString("regex-filter")),
)

return app.Dump(cfg)
Expand Down
6 changes: 6 additions & 0 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ This command captures and dumps the received messages to a file for analysis or
config.WithFileMode(viper.GetString("file-mode")),
config.WithPrettyPrint(viper.GetBool("pretty-print")),
config.WithRoutingKeys(routingKeys),

config.WithIncludePatterns(viper.GetStringSlice("include-patterns")),
config.WithExcludePatterns(viper.GetStringSlice("exclude-patterns")),
config.WithJSONFilter(viper.GetString("json-filter")),
config.WithMaxMessageSize(viper.GetInt("max-message-size")),
config.WithRegexFilter(viper.GetString("regex-filter")),
)

return app.Monitor(cfg)
Expand Down
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func init() {
rootCmd.PersistentFlags().BoolP("pretty-print", "p", false, "Pretty print JSON messages")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", xdg.ConfigHome+"/goq/goq.yaml", "config file")

rootCmd.PersistentFlags().StringSliceP("include-patterns", "i", []string{}, "Include messages containing these patterns")
rootCmd.PersistentFlags().StringSliceP("exclude-patterns", "x", []string{}, "Exclude messages containing these patterns")
rootCmd.PersistentFlags().StringP("json-filter", "j", "", "JSON filter expression")
rootCmd.PersistentFlags().IntP("max-message-size", "z", -1, "Maximum message size in bytes")
rootCmd.PersistentFlags().StringP("regex-filter", "R", "", "Regex pattern to filter messages")

rootCmd.AddGroup(&cobra.Group{
ID: "available-commands",
Title: "Available Commands:",
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23.0

require (
github.com/Masterminds/semver/v3 v3.3.0
github.com/itchyny/gojq v0.12.16
github.com/minio/selfupdate v0.6.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand All @@ -12,6 +13,7 @@ require (

require (
aead.dev/minisign v0.2.0 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
golang.org/x/crypto v0.21.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g=
github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down
Binary file added goq
Binary file not shown.
87 changes: 86 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package config
import (
"encoding/json"
"fmt"
"regexp"

"github.com/itchyny/gojq"
"github.com/spf13/viper"
)

Expand All @@ -20,6 +22,15 @@ type Config struct {
StopAfterConsume bool
RoutingKeys []string
PrettyPrint bool

FilterConfig struct {
IncludePatterns []string
ExcludePatterns []string
JSONFilter *gojq.Query
MaxMessageSize int
RegexPattern string
CompileRegex *regexp.Regexp
}
}

type Option func(*Config)
Expand Down Expand Up @@ -96,6 +107,48 @@ func WithPrettyPrint(prettyPrint bool) Option {
}
}

// Add new Option functions
func WithIncludePatterns(patterns []string) Option {
return func(c *Config) {
c.FilterConfig.IncludePatterns = patterns
}
}

func WithExcludePatterns(patterns []string) Option {
return func(c *Config) {
c.FilterConfig.ExcludePatterns = patterns
}
}

// Modify the WithJSONFilter function
func WithJSONFilter(jsonFilter string) Option {
return func(c *Config) {
// Parse the query during configuration
query, err := gojq.Parse(jsonFilter)
if err != nil {
// Handle error - you might want to log or handle this differently
fmt.Printf("Error parsing JSON filter: %v\n", err)
return
}
c.FilterConfig.JSONFilter = query
}
}

func WithMaxMessageSize(size int) Option {
return func(c *Config) {
c.FilterConfig.MaxMessageSize = size
}
}

func WithRegexFilter(pattern string) Option {
return func(c *Config) {
c.FilterConfig.RegexPattern = pattern
if regex, err := regexp.Compile(pattern); err == nil {
c.FilterConfig.CompileRegex = regex
}
}
}

func New(options ...Option) *Config {
c := &Config{
RabbitMQURL: fmt.Sprintf("%s://%s/%s", getProtocol(), viper.GetString("url"), viper.GetString("virtualhost")),
Expand All @@ -109,6 +162,17 @@ func New(options ...Option) *Config {
FileMode: viper.GetString("file-mode"),
StopAfterConsume: viper.GetBool("stop-after-consume"),
RoutingKeys: viper.GetStringSlice("routing-keys"),

FilterConfig: struct {
IncludePatterns []string
ExcludePatterns []string
JSONFilter *gojq.Query
MaxMessageSize int
RegexPattern string
CompileRegex *regexp.Regexp
}{
MaxMessageSize: -1, // Default: no size limit
},
}

for _, option := range options {
Expand All @@ -119,10 +183,31 @@ func New(options ...Option) *Config {
}

func (c *Config) PrintConfig() (string, error) {
configJSON, err := json.MarshalIndent(c, "", " ")
configJSON, err := json.MarshalIndent(map[string]interface{}{
"RabbitMQURL": c.RabbitMQURL,
"Exchange": c.Exchange,
"Queue": c.Queue,
"OutputFile": c.OutputFile,
"UseAMQPS": c.UseAMQPS,
"VirtualHost": c.VirtualHost,
"SkipTLSVerification": c.SkipTLSVerification,
"AutoAck": c.AutoAck,
"FileMode": c.FileMode,
"StopAfterConsume": c.StopAfterConsume,
"RoutingKeys": c.RoutingKeys,
"PrettyPrint": c.PrettyPrint,
"FilterConfig": map[string]interface{}{
"IncludePatterns": c.FilterConfig.IncludePatterns,
"ExcludePatterns": c.FilterConfig.ExcludePatterns,
"JSONFilter": c.FilterConfig.JSONFilter.String(),
"MaxMessageSize": c.FilterConfig.MaxMessageSize,
"RegexPattern": c.FilterConfig.RegexPattern,
},
}, "", " ")
if err != nil {
return "", fmt.Errorf("failed to marshal config: %v", err)
}

return string(configJSON), nil
}

Expand Down
107 changes: 105 additions & 2 deletions internal/rmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package rmq

import (
"crypto/tls"
"encoding/json"
"fmt"
"strings"

"github.com/itchyny/gojq"
"github.com/marianozunino/goq/internal/config"
"github.com/streadway/amqp"
)
Expand Down Expand Up @@ -80,7 +83,19 @@ func (c *Consumer) ConsumeMessages() (<-chan amqp.Delivery, error) {
return nil, fmt.Errorf("failed to register a consumer: %v", err)
}

return msgs, nil
filteredMsgs := make(chan amqp.Delivery)

go func() {
defer close(filteredMsgs)

for msg := range msgs {
if c.shouldProcessMessage(&msg) {
filteredMsgs <- msg
}
}
}()

return filteredMsgs, nil
}

func (c *Consumer) GetQueueInfo() (int, error) {
Expand Down Expand Up @@ -128,5 +143,93 @@ func (c *Consumer) ConsumeMessagesFromQueue(queueName string) (<-chan amqp.Deliv
if err != nil {
return nil, fmt.Errorf("failed to register a consumer for queue %s: %v", queueName, err)
}
return msgs, nil

filteredMsgs := make(chan amqp.Delivery)

go func() {
defer close(filteredMsgs)

for msg := range msgs {
if c.shouldProcessMessage(&msg) {
filteredMsgs <- msg
}
}
}()

return filteredMsgs, nil
}

func (c *Consumer) shouldProcessMessage(msg *Message) bool {
config := c.config.FilterConfig

// Size filtering
if config.MaxMessageSize > 0 && len(msg.Body) > config.MaxMessageSize {
return false
}

// Regex filtering
if config.CompileRegex != nil && !config.CompileRegex.Match(msg.Body) {
return false
}

body := string(msg.Body)

// Include patterns
if len(config.IncludePatterns) > 0 {
if !sliceContainsAny(config.IncludePatterns, body) {
return false
}
}

// Exclude patterns
if sliceContainsAny(config.ExcludePatterns, body) {
return false
}

// JSON filter
if config.JSONFilter != nil {
return matchJSONFilter(msg.Body, config.JSONFilter)
}

return true
}

func sliceContainsAny(patterns []string, body string) bool {
for _, pattern := range patterns {
if strings.Contains(body, pattern) {
return true
}
}
return false
}

func matchJSONFilter(body []byte, query *gojq.Query) bool {
var data interface{}
if err := json.Unmarshal(body, &data); err != nil {
return false
}

iter := query.Run(data)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, isErr := v.(error); isErr {
fmt.Printf("Error applying jq filter: %v\n", err)
return false
}

// Check if result is truthy
switch val := v.(type) {
case bool:
return val
case nil:
continue
default:
result, _ := json.MarshalIndent(v, "", " ")
return strings.TrimSpace(string(result)) != ""
}
}
return false
}

0 comments on commit 3442bee

Please sign in to comment.