Skip to content

Commit

Permalink
Updated metrics with aws sdk middelware
Browse files Browse the repository at this point in the history
  • Loading branch information
anurag4DSB committed Dec 24, 2024
1 parent cf1d211 commit 28034c0
Showing 1 changed file with 46 additions and 25 deletions.
71 changes: 46 additions & 25 deletions pkg/clients/s3/s3_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,89 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go/logging"
"github.com/aws/smithy-go/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/scality/cosi-driver/pkg/metrics"
"github.com/scality/cosi-driver/pkg/util"
"k8s.io/klog/v2"
)

// S3API defines the methods the S3 client must implement.
type S3API interface {
CreateBucket(ctx context.Context, input *s3.CreateBucketInput, opts ...func(*s3.Options)) (*s3.CreateBucketOutput, error)
DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput, opts ...func(*s3.Options)) (*s3.DeleteBucketOutput, error)
}

// S3Client wraps the S3 service client for custom operations and middleware integration.
type S3Client struct {
S3Service S3API
}

// LoadAWSConfig is a wrapper for AWS SDK's default configuration loader.
var LoadAWSConfig = config.LoadDefaultConfig

// AttachPrometheusMiddleware attaches middleware to track metrics using Prometheus.
func AttachPrometheusMiddleware(stack *middleware.Stack) error {
// Define the middleware logic
middlewareFunc := middleware.FinalizeMiddlewareFunc("PrometheusMetrics", func(
ctx context.Context, in middleware.FinalizeInput, next middleware.FinalizeHandler,
) (out middleware.FinalizeOutput, metadata middleware.Metadata, err error) {
operationName := middleware.GetOperationName(ctx)

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(duration float64) {
status := "success"
if err != nil {
status = "error"
}
metrics.S3RequestDuration.WithLabelValues(operationName, status).Observe(duration)
metrics.S3RequestsTotal.WithLabelValues(operationName, status).Inc()

Check warning on line 50 in pkg/clients/s3/s3_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/clients/s3/s3_client.go#L37-L50

Added lines #L37 - L50 were not covered by tests
}))
defer timer.ObserveDuration()

out, metadata, err = next.HandleFinalize(ctx, in)
if err != nil {
klog.ErrorS(err, "AWS SDK operation failed", "operation", operationName)
}
return out, metadata, err

Check warning on line 58 in pkg/clients/s3/s3_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/clients/s3/s3_client.go#L52-L58

Added lines #L52 - L58 were not covered by tests
})

// Add the middleware to the Finalize step
return stack.Finalize.Add(middlewareFunc, middleware.After)

Check warning on line 62 in pkg/clients/s3/s3_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/clients/s3/s3_client.go#L62

Added line #L62 was not covered by tests
}

// InitS3Client initializes the S3 client with Prometheus middleware and custom configuration.
var InitS3Client = func(ctx context.Context, params util.StorageClientParameters) (*S3Client, error) {
// Configure a logger
var logger logging.Logger
if params.Debug {
logger = logging.NewStandardLogger(os.Stdout)
} else {
logger = nil
}

// Configure HTTP client with TLS support if needed
httpClient := &http.Client{
Timeout: util.DefaultRequestTimeout,
}

if strings.HasPrefix(params.Endpoint, "https://") {
httpClient.Transport = util.ConfigureTLSTransport(params.TLSCert)
}

// Load AWS configuration with middleware
awsCfg, err := LoadAWSConfig(ctx,
config.WithRegion(params.Region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(params.AccessKeyID, params.SecretAccessKey, "")),
config.WithHTTPClient(httpClient),
config.WithLogger(logger),
config.WithAPIOptions([]func(*middleware.Stack) error{
AttachPrometheusMiddleware,
}),
)
if err != nil {
return nil, err
}

// Create the S3 client
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
o.BaseEndpoint = aws.String(params.Endpoint)
Expand All @@ -64,14 +105,8 @@ var InitS3Client = func(ctx context.Context, params util.StorageClientParameters
}, nil
}

// CreateBucket creates a new bucket in the S3 service.
func (client *S3Client) CreateBucket(ctx context.Context, bucketName string, params util.StorageClientParameters) error {
metricStatus := "success"

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(duration float64) {
metrics.S3RequestDuration.WithLabelValues("CreateBucket", metricStatus).Observe(duration)
}))
defer timer.ObserveDuration()

input := &s3.CreateBucketInput{Bucket: &bucketName}
if params.Region != util.DefaultRegion {
input.CreateBucketConfiguration = &types.CreateBucketConfiguration{
Expand All @@ -80,25 +115,11 @@ func (client *S3Client) CreateBucket(ctx context.Context, bucketName string, par
}

_, err := client.S3Service.CreateBucket(ctx, input)

if err != nil {
metricStatus = "error"
}
metrics.S3RequestsTotal.WithLabelValues("CreateBucket", metricStatus).Inc()
return err
return err // Metrics are handled by middleware
}

// DeleteBucket deletes a bucket in the S3 service.
func (client *S3Client) DeleteBucket(ctx context.Context, bucketName string) error {
metricStatus := "success"
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(duration float64) {
metrics.S3RequestDuration.WithLabelValues("DeleteBucket", metricStatus).Observe(duration)
}))
defer timer.ObserveDuration()

_, err := client.S3Service.DeleteBucket(ctx, &s3.DeleteBucketInput{Bucket: &bucketName})
if err != nil {
metricStatus = "error"
}
metrics.S3RequestsTotal.WithLabelValues("DeleteBucket", metricStatus).Inc()
return err
return err // Metrics are handled by middleware
}

0 comments on commit 28034c0

Please sign in to comment.