diff --git a/pkg/injector/injector.go b/pkg/injector/injector.go index b0fe5ac..1ababd4 100644 --- a/pkg/injector/injector.go +++ b/pkg/injector/injector.go @@ -154,7 +154,6 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) { } } - klog.Infof("ready to write response ...") respBytes, err := json.Marshal(admissionReview) if err != nil { http.Error( @@ -163,13 +162,17 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) { http.StatusInternalServerError, ) - klog.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err) + klog.Errorf("Injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err) } w.Header().Set("Content-Type", runtime.ContentTypeJSON) if _, err := w.Write(respBytes); err != nil { klog.Error(err) } else { - klog.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID) + if len(patchOps) == 0 { + klog.Infof("Injector ignored app '%s'", diagAppID) + } else { + klog.Infof("Injector succeeded injection for app '%s'", diagAppID) + } } } diff --git a/pkg/messaging/extractors.go b/pkg/messaging/extractors.go new file mode 100644 index 0000000..7f13643 --- /dev/null +++ b/pkg/messaging/extractors.go @@ -0,0 +1,5 @@ +package messaging + +func ExtractTopicFromEnvelope(msg *MessageEnvelope) string { + return "" +} diff --git a/pkg/messaging/nats/nats.go b/pkg/messaging/nats/nats.go index e0d96ff..dd36c41 100644 --- a/pkg/messaging/nats/nats.go +++ b/pkg/messaging/nats/nats.go @@ -245,9 +245,9 @@ func (n *natsStreamingPubSub) Subscribe(topic string, handler messaging.Handler, if err == nil { // we only send a successful ACK if there is no error natsMsg.Ack() - klog.V(4).InfoS("Message manually acknowledged in NATS") + klog.V(4).InfoS("Manual ack", "topic", natsMsg.Subject, "Id", msg.Id) } else { - klog.ErrorS(err, "Error running subscriber pipeline, message was not ACK") + klog.ErrorS(err, "Error running subscriber pipeline, message was not ACK", "topic", natsMsg.Subject) } }() } diff --git a/pkg/middleware/metrics.go b/pkg/middleware/metrics.go index c6a412d..b47c6f0 100644 --- a/pkg/middleware/metrics.go +++ b/pkg/middleware/metrics.go @@ -12,7 +12,12 @@ func SubscriberMetricsMiddleware() messaging.Middleware { return func(ctx context.Context, msg *messaging.MessageEnvelope) error { start := time.Now() err := next(ctx, msg) - metrics.DefaultPubSubMetrics().RecordSubscriberProcessingTime(ctx, msg.Subject, err == nil, time.Since(start)) + topic := msg.Subject + if topic == "" { + topic = ctx.Value(messaging.TopicKey).(string) + } + + metrics.DefaultPubSubMetrics().RecordSubscriberProcessingTime(ctx, topic, err == nil, time.Since(start)) return err } }