Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client metrics for http and ws transports #344

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func startClient(t *testing.T, settings types.StartSettings, client OpAMPClient)
func createNoServerSettings() types.StartSettings {
return types.StartSettings{
OpAMPServerURL: "ws://" + testhelpers.GetAvailableLocalAddress(),
Metrics: types.NewClientMetrics(64),
}
}

Expand Down
2 changes: 2 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func NewHTTP(logger types.Logger) *httpClient {

// Start implements OpAMPClient.Start.
func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) error {
c.sender.SetMetrics(settings.Metrics)

if err := c.common.PrepareStart(ctx, settings); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestHTTPClientStartWithHeartbeatInterval(t *testing.T) {
settings := types.StartSettings{
OpAMPServerURL: "http://" + srv.Endpoint,
HeartbeatInterval: &heartbeat,
Metrics: types.NewClientMetrics(64),
}
if tt.enableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
Expand Down
56 changes: 53 additions & 3 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
type requestWrapper struct {
*http.Request

dataLen int
msg *protobufs.AgentToServer
bodyReader func() io.ReadCloser
}

Expand Down Expand Up @@ -62,7 +64,17 @@
getHeader func() http.Header

// Processor to handle received messages.
receiveProcessor receivedProcessor
receiveProcessor rcvProcessor

metrics *types.ClientMetrics
}

// SetMetrics is used to set the sender's metrics. This is useful because the
// metrics object is not available until the StartSettings are available.
func (s *HTTPSender) SetMetrics(metrics *types.ClientMetrics) {
if metrics != nil {
s.metrics = metrics
}
}

// NewHTTPSender creates a new Sender that uses HTTP to send messages
Expand All @@ -73,6 +85,7 @@
logger: logger,
client: http.DefaultClient,
pollingIntervalMs: defaultPollingIntervalMs,
metrics: types.NewClientMetrics(1),
}
// initialize the headers with no additional headers
h.SetRequestHeader(nil, nil)
Expand Down Expand Up @@ -167,6 +180,15 @@
h.receiveResponse(ctx, resp)
}

func httpTxMessageInfo(req *requestWrapper) types.TxMessageInfo {
return types.TxMessageInfo{
InstanceUID: req.msg.InstanceUid,
Capabilities: req.msg.Capabilities,
SequenceNum: req.msg.SequenceNum,
Attrs: httpMessageAttrs(txMessageAttrs(req.msg)),
}
}

func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response, error) {
req, err := h.prepareRequest(ctx)
if err != nil {
Expand Down Expand Up @@ -197,11 +219,18 @@
case <-timer.C:
{
req.rewind(ctx)
startSend := time.Now()
resp, err := h.client.Do(req.Request)
latency := time.Since(startSend)
h.metrics.TxBytes.Add(int64(req.dataLen))
h.metrics.TxMessages.Add(1)
if err == nil {
switch resp.StatusCode {
case http.StatusOK:
// We consider it connected if we receive 200 status from the Server.
messageInfo := httpTxMessageInfo(req)
messageInfo.TxLatency = latency
h.metrics.TxMessageInfo.Insert(messageInfo)
h.callbacks.OnConnect(ctx)
return resp, nil

Expand All @@ -210,6 +239,7 @@
err = fmt.Errorf("server response code=%d", resp.StatusCode)

default:
h.metrics.TxErrors.Add(1)

Check warning on line 242 in client/internal/httpsender.go

View check run for this annotation

Codecov / codecov/patch

client/internal/httpsender.go#L242

Added line #L242 was not covered by tests
return nil, fmt.Errorf("invalid response from server: %d", resp.StatusCode)
}
} else if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -256,7 +286,7 @@
if err != nil {
return nil, err
}
req := requestWrapper{Request: r}
req := requestWrapper{Request: r, dataLen: len(data), msg: msgToSend}

if h.compressionEnabled {
var buf bytes.Buffer
Expand All @@ -269,8 +299,11 @@
h.logger.Errorf(ctx, "Failed to close the writer: %v", err)
return nil, err
}
req.bodyReader = bodyReader(buf.Bytes())
bufBytes := buf.Bytes()
req.dataLen = len(bufBytes)
req.bodyReader = bodyReader(bufBytes)
} else {
req.dataLen = len(data)
req.bodyReader = bodyReader(data)
}
if err != nil {
Expand All @@ -281,21 +314,38 @@
return &req, nil
}

func httpMessageAttrs(attrs types.MessageAttrs) types.MessageAttrs {
attrs.Set(types.HTTPTransportAttr)
return attrs
}

func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
startReceive := time.Now()
msgBytes, err := io.ReadAll(resp.Body)
h.metrics.RxBytes.Add(int64(len(msgBytes)))
if err != nil {
h.metrics.RxErrors.Add(1)

Check warning on line 327 in client/internal/httpsender.go

View check run for this annotation

Codecov / codecov/patch

client/internal/httpsender.go#L327

Added line #L327 was not covered by tests
_ = resp.Body.Close()
h.logger.Errorf(ctx, "cannot read response body: %v", err)
return
}
_ = resp.Body.Close()
latency := time.Since(startReceive)
h.metrics.RxMessages.Add(1)

var response protobufs.ServerToAgent
if err := proto.Unmarshal(msgBytes, &response); err != nil {
h.logger.Errorf(ctx, "cannot unmarshal response: %v", err)
return
}

h.metrics.RxMessageInfo.Insert(types.RxMessageInfo{
InstanceUID: response.InstanceUid,
Capabilities: response.Capabilities,
Attrs: httpMessageAttrs(rxMessageAttrs(&response)),
RxLatency: latency,
})

h.receiveProcessor.ProcessReceivedMessage(ctx, &response)
}

Expand Down
95 changes: 93 additions & 2 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/types"
sharedinternal "github.com/open-telemetry/opamp-go/internal"
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
})

// Then the RequestInstanceUid flag stays intact.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
assert.Equal(t, sender.receiveProcessor.(*receivedProcessor).clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)

// If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
Expand All @@ -227,7 +229,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
})

// Then the flag is reset so we don't request a new instance uid yet again.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
assert.Equal(t, sender.receiveProcessor.(*receivedProcessor).clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}

Expand Down Expand Up @@ -354,3 +356,92 @@ func TestPackageUpdatesWithError(t *testing.T) {

cancel()
}

type fakeReciveProcessor struct {
}

func (fakeReciveProcessor) ProcessReceivedMessage(context.Context, *protobufs.ServerToAgent) {}

func TestHTTPSendMessageMetrics(t *testing.T) {
sendMsg := &protobufs.AgentToServer{
InstanceUid: []byte("abcd"),
Capabilities: 2,
SequenceNum: 1,
}

recvMsg := &protobufs.ServerToAgent{
InstanceUid: []byte("dcba"),
Capabilities: 4,
}

sender := NewHTTPSender(TestLogger{T: t})
metrics := types.NewClientMetrics(64)
sender.SetMetrics(metrics)
sender.receiveProcessor = fakeReciveProcessor{}

srv := StartMockServer(t)
defer srv.Close()

sender.url = "http://" + srv.Endpoint

srv.OnRequest = func(w http.ResponseWriter, r *http.Request) {
b, _ := proto.Marshal(recvMsg)
_, _ = w.Write(b)
}

sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
*msg = *sendMsg
})
sender.callbacks = types.Callbacks{
OnConnect: func(ctx context.Context) {
},
OnConnectFailed: func(ctx context.Context, _ error) {
},
}
resp, err := sender.sendRequestWithRetries(context.Background())
if err != nil {
t.Fatal(err)
}
if status := resp.StatusCode; status < 200 || status >= 300 {
t.Fatalf("unexpected http status: %d", status)
}
if got, want := metrics.TxMessages.Read(), int64(1); got != want {
t.Errorf("wrong number of messages: got %d, want %d", got, want)
}
if metrics.TxBytes.Read() == 0 {
t.Error("TxBytes not recorded")
}
txm := metrics.TxMessageInfo.Drain()[0]
if got, want := txm.InstanceUID, []byte("abcd"); !cmp.Equal(want, got) {
t.Errorf("instance uid does not match: %s", cmp.Diff(want, got))
}
if got, want := txm.Capabilities, uint64(2); got != want {
t.Errorf("incorrect capabilities: got %d, want %d", got, want)
}
if got, want := txm.SequenceNum, uint64(1); got != want {
t.Errorf("incorrect sequence num: got %d, want %d", got, want)
}
if txm.TxLatency == 0 {
t.Error("latency was not measured")
}

sender.receiveResponse(context.Background(), resp)

if got, want := metrics.RxMessages.Read(), int64(1); got != want {
t.Errorf("wrong number of messages: got %d, want %d", got, want)
}
if metrics.RxBytes.Read() == 0 {
t.Error("rx bytes not recorded")
}

rxm := metrics.RxMessageInfo.Drain()[0]
if got, want := rxm.InstanceUID, []byte("dcba"); !cmp.Equal(want, got) {
t.Errorf("instance uid does not match: %s", cmp.Diff(want, got))
}
if got, want := rxm.Capabilities, uint64(4); got != want {
t.Errorf("incorrect capabilities: got %d, want %d", got, want)
}
if rxm.RxLatency == 0 {
t.Error("latency was not measured")
}
}
9 changes: 9 additions & 0 deletions client/internal/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package internal

type Counter interface {
Add(int64)
}

type RingBuffer[T any] interface {
Insert(T)
}
8 changes: 6 additions & 2 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

type rcvProcessor interface {
ProcessReceivedMessage(context.Context, *protobufs.ServerToAgent)
}

// receivedProcessor handles the processing of messages received from the Server.
type receivedProcessor struct {
logger types.Logger
Expand Down Expand Up @@ -41,8 +45,8 @@ func newReceivedProcessor(
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) receivedProcessor {
return receivedProcessor{
) *receivedProcessor {
return &receivedProcessor{
logger: logger,
callbacks: callbacks,
sender: sender,
Expand Down
29 changes: 25 additions & 4 deletions client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,30 @@

// wsReceiver implements the WebSocket client's receiving portion of OpAMP protocol.
type wsReceiver struct {
conn *websocket.Conn
conn internal.WebsocketConn
logger types.Logger
sender *WSSender
callbacks types.Callbacks
processor receivedProcessor
processor rcvProcessor

// Indicates that the receiver has fully stopped.
stopped chan struct{}

metrics *types.ClientMetrics
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
// messages from the server.
func NewWSReceiver(
logger types.Logger,
callbacks types.Callbacks,
conn *websocket.Conn,
conn internal.WebsocketConn,
sender *WSSender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
metrics *types.ClientMetrics,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
Expand All @@ -42,6 +45,7 @@
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
stopped: make(chan struct{}),
metrics: metrics,
}

return w
Expand Down Expand Up @@ -97,14 +101,31 @@
}
}

func rxMessageAttrs(msg *protobufs.ServerToAgent) types.MessageAttrs {
attrs := types.MessageAttrs(types.RxMessageAttr | types.ServerToAgentMessageAttr)
if msg.ErrorResponse != nil {
attrs.Set(types.ErrorMessageAttr)
}

Check warning on line 108 in client/internal/wsreceiver.go

View check run for this annotation

Codecov / codecov/patch

client/internal/wsreceiver.go#L107-L108

Added lines #L107 - L108 were not covered by tests
return attrs
}

func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error {
_, bytes, err := r.conn.ReadMessage()
r.metrics.RxBytes.Add(int64(len(bytes)))
if err != nil {
r.metrics.RxErrors.Add(1)
return err
}
r.metrics.RxMessages.Add(1)
err = internal.DecodeWSMessage(bytes, msg)
if err != nil {
r.metrics.RxErrors.Add(1)
return fmt.Errorf("cannot decode received message: %w", err)
}
return err
r.metrics.RxMessageInfo.Insert(types.RxMessageInfo{
InstanceUID: msg.InstanceUid,
Capabilities: msg.Capabilities,
Attrs: wsMessageAttrs(rxMessageAttrs(msg)),
})
return nil
}
Loading
Loading