Skip to content

Commit

Permalink
change topic location
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Nov 9, 2021
1 parent e7ba7eb commit 849453a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
9 changes: 6 additions & 3 deletions pkg/injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
}
}

klog.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(
Expand All @@ -163,13 +162,17 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
http.StatusInternalServerError,
)

klog.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
klog.Errorf("Injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
if _, err := w.Write(respBytes); err != nil {
klog.Error(err)
} else {
klog.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
if len(patchOps) == 0 {
klog.Infof("Injector ignored app '%s'", diagAppID)
} else {
klog.Infof("Injector succeeded injection for app '%s'", diagAppID)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/messaging/extractors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package messaging

func ExtractTopicFromEnvelope(msg *MessageEnvelope) string {
return ""
}
4 changes: 2 additions & 2 deletions pkg/messaging/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func (n *natsStreamingPubSub) Subscribe(topic string, handler messaging.Handler,
if err == nil {
// we only send a successful ACK if there is no error
natsMsg.Ack()
klog.V(4).InfoS("Message manually acknowledged in NATS")
klog.V(4).InfoS("Manual ack", "topic", natsMsg.Subject, "Id", msg.Id)
} else {
klog.ErrorS(err, "Error running subscriber pipeline, message was not ACK")
klog.ErrorS(err, "Error running subscriber pipeline, message was not ACK", "topic", natsMsg.Subject)
}
}()
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ func SubscriberMetricsMiddleware() messaging.Middleware {
return func(ctx context.Context, msg *messaging.MessageEnvelope) error {
start := time.Now()
err := next(ctx, msg)
metrics.DefaultPubSubMetrics().RecordSubscriberProcessingTime(ctx, msg.Subject, err == nil, time.Since(start))
topic := msg.Subject
if topic == "" {
topic = ctx.Value(messaging.TopicKey).(string)
}

metrics.DefaultPubSubMetrics().RecordSubscriberProcessingTime(ctx, topic, err == nil, time.Since(start))
return err
}
}
Expand Down

0 comments on commit 849453a

Please sign in to comment.