Skip to content
This repository has been archived by the owner on Sep 8, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5 from e-conomic/server-status-code-fix
Browse files Browse the repository at this point in the history
Server interceptor - fix for returned status code, client interceptor - fix for received status code on server streaming
  • Loading branch information
michalderdak authored Mar 13, 2018
2 parents 53c8a78 + b632f09 commit 9f8be04
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 278 deletions.
136 changes: 85 additions & 51 deletions NetGrpcPrometheus/ClientInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Grpc.Core;
using Grpc.Core.Interceptors;
using NetGrpcPrometheus.Helpers;
Expand All @@ -24,8 +23,7 @@ public bool EnableLatencyMetrics
get => _metrics.EnableLatencyMetrics;
set => _metrics.EnableLatencyMetrics = value;
}

private readonly StatusCode[] _statusCodes;

private readonly MetricsBase _metrics;

/// <summary>
Expand All @@ -35,7 +33,9 @@ public bool EnableLatencyMetrics
/// <param name="hostname">Host name for Prometheus metrics server - e.g. localhost</param>
/// <param name="port">Port for Prometheus server</param>
/// <param name="defaultMetrics">Indicates if Prometheus metrics server should record default metrics</param>
public ClientInterceptor(string hostname, int port, bool defaultMetrics = true)
/// <param name="enableLatencyMetrics">Enable recording of latency for responses. By default it's set to false</param>
public ClientInterceptor(string hostname, int port, bool defaultMetrics = true,
bool enableLatencyMetrics = false)
{
MetricServer metricServer = new MetricServer(hostname, port);
metricServer.Start();
Expand All @@ -46,7 +46,8 @@ public ClientInterceptor(string hostname, int port, bool defaultMetrics = true)
}

_metrics = new ClientMetrics();
_statusCodes = Enum.GetValues(typeof(StatusCode)).Cast<StatusCode>().ToArray();
EnableLatencyMetrics = enableLatencyMetrics;
//_statusCodes = Enum.GetValues(typeof(StatusCode)).Cast<StatusCode>().ToArray();
}

/// <summary>
Expand All @@ -56,25 +57,30 @@ public ClientInterceptor(string hostname, int port, bool defaultMetrics = true)
/// <param name="endpoint">Endpoint for pushgateway - e.g. http://pushgateway.example.org:9091/metrics</param>
/// <param name="job"></param>
/// <param name="defaultMetrics"></param>
/// <param name="enableLatencyMetrics">Enable recording of latency for responses. By default it's set to false</param>
/// <param name="instance"></param>
/// <param name="intervalMilliseconds"></param>
/// <param name="additionalLabels"></param>
/// <param name="registry"></param>
public ClientInterceptor(string endpoint, string job, bool defaultMetrics = true, string instance = null, ulong intervalMilliseconds = 1000, IEnumerable<Tuple<string,string>> additionalLabels = null, ICollectorRegistry registry = null)
public ClientInterceptor(string endpoint, string job, bool defaultMetrics = true, bool enableLatencyMetrics = false,
string instance = null, ulong intervalMilliseconds = 1000,
IEnumerable<Tuple<string, string>> additionalLabels = null, ICollectorRegistry registry = null)
{
MetricPusher metricServer = new MetricPusher(endpoint, job, instance, (long) intervalMilliseconds, additionalLabels, registry);
MetricPusher metricServer = new MetricPusher(endpoint, job, instance, (long) intervalMilliseconds,
additionalLabels, registry);
metricServer.Start();

if (!defaultMetrics)
{
DefaultCollectorRegistry.Instance.Clear();
}

_metrics = new ClientMetrics();
_statusCodes = Enum.GetValues(typeof(StatusCode)).Cast<StatusCode>().ToArray();
EnableLatencyMetrics = enableLatencyMetrics;
}

public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
{
GrpcMethodInfo method = new GrpcMethodInfo(context.Method.FullName, context.Method.Type);
Expand All @@ -85,25 +91,22 @@ public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest reques
watch.Start();

TResponse result;
StatusCode code = StatusCode.OK;

try
{
result = continuation(request, context);
}
catch (Exception e)
catch (RpcException e)
{
code = _statusCodes.DefaultIfEmpty(StatusCode.Unknown).FirstOrDefault(s => e.Message.Contains(s.ToString()));

_metrics.ResponseCounterInc(method, e.Status.StatusCode);
throw;
}
finally
{
watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);
_metrics.ResponseCounterInc(method, code);
}

return result;
}

Expand All @@ -119,17 +122,21 @@ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TR
watch.Start();

AsyncUnaryCall<TResponse> result = continuation(request, context);

result.ResponseAsync.ContinueWith(task =>
{
watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);

StatusCode code = task.Exception == null
? StatusCode.OK
: _statusCodes.FirstOrDefault(s => task.Exception.Message.Contains(s.ToString()));

_metrics.ResponseCounterInc(method, code);
if (task.Exception == null)
{
_metrics.ResponseCounterInc(method, StatusCode.OK);
}
else
{
RpcException exception = (RpcException) task.Exception.InnerException;
_metrics.ResponseCounterInc(method, exception.Status.StatusCode);
}
});

return result;
Expand Down Expand Up @@ -159,11 +166,15 @@ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreami
watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);

StatusCode code = task.Exception == null
? StatusCode.OK
: _statusCodes.FirstOrDefault(s => task.Exception.Message.Contains(s.ToString()));

_metrics.ResponseCounterInc(method, code);
if (task.Exception == null)
{
_metrics.ResponseCounterInc(method, StatusCode.OK);
}
else
{
RpcException exception = (RpcException)task.Exception.InnerException;
_metrics.ResponseCounterInc(method, exception.Status.StatusCode);
}
});

return result;
Expand All @@ -181,18 +192,29 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
Stopwatch watch = new Stopwatch();
watch.Start();

AsyncServerStreamingCall<TResponse> streamingCall = continuation(request, context);
AsyncServerStreamingCall<TResponse> result;

watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);
// TODO: Investigate whether or not any response code should be returned on server streaming
_metrics.ResponseCounterInc(method, StatusCode.OK);
try
{
AsyncServerStreamingCall<TResponse> streamingCall = continuation(request, context);

AsyncServerStreamingCall<TResponse> result = new AsyncServerStreamingCall<TResponse>(
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); }),
streamingCall.ResponseHeadersAsync, streamingCall.GetStatus, streamingCall.GetTrailers,
streamingCall.Dispose);
result = new AsyncServerStreamingCall<TResponse>(
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); }), streamingCall.ResponseHeadersAsync,
streamingCall.GetStatus, streamingCall.GetTrailers, streamingCall.Dispose);

_metrics.ResponseCounterInc(method, StatusCode.OK);
}
catch (RpcException e)
{
_metrics.ResponseCounterInc(method, e.Status.StatusCode);
throw;
}
finally
{
watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);
}

return result;
}
Expand All @@ -208,24 +230,36 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
Stopwatch watch = new Stopwatch();
watch.Start();

AsyncDuplexStreamingCall<TRequest, TResponse> streamingCall = continuation(context);
AsyncDuplexStreamingCall<TRequest, TResponse> result;

watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);
// TODO: Investigate whether or not any response code should be returned on server streaming
_metrics.ResponseCounterInc(method, StatusCode.OK);
try
{
AsyncDuplexStreamingCall<TRequest, TResponse> streamingCall = continuation(context);

WrapperStreamReader<TResponse> responseStream =
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); });
AsyncDuplexStreamingCall<TRequest, TResponse> result = new AsyncDuplexStreamingCall<TRequest, TResponse>(
new WrapperClientStreamWriter<TRequest>(streamingCall.RequestStream,
() => { _metrics.StreamSentCounterInc(method); }), responseStream,
streamingCall.ResponseHeadersAsync, streamingCall.GetStatus, streamingCall.GetTrailers,
streamingCall.Dispose);
WrapperStreamReader<TResponse> responseStream =
new WrapperStreamReader<TResponse>(streamingCall.ResponseStream,
() => { _metrics.StreamReceivedCounterInc(method); });

result = new AsyncDuplexStreamingCall<TRequest, TResponse>(
new WrapperClientStreamWriter<TRequest>(streamingCall.RequestStream,
() => { _metrics.StreamSentCounterInc(method); }), responseStream,
streamingCall.ResponseHeadersAsync, streamingCall.GetStatus, streamingCall.GetTrailers,
streamingCall.Dispose);

_metrics.ResponseCounterInc(method, StatusCode.OK);
}
catch (RpcException e)
{
_metrics.ResponseCounterInc(method, e.Status.StatusCode);
throw;
}
finally
{
watch.Stop();
_metrics.RecordLatency(method, watch.Elapsed.TotalSeconds);
}

return result;
}

}
}
2 changes: 0 additions & 2 deletions NetGrpcPrometheus/Models/ClientMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public sealed class ClientMetrics : MetricsBase

public ClientMetrics()
{
EnableLatencyMetrics = false;

RequestCounter = Metrics.CreateCounter("grpc_client_started_total",
"Total number of RPCs started on the client", "grpc_type", "grpc_service", "grpc_method");

Expand Down
2 changes: 0 additions & 2 deletions NetGrpcPrometheus/Models/ServerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public sealed class ServerMetrics : MetricsBase

public ServerMetrics()
{
EnableLatencyMetrics = false;

RequestCounter = Metrics.CreateCounter("grpc_server_started_total",
"Total number of RPCs started on the server", "grpc_type", "grpc_service", "grpc_method");

Expand Down
4 changes: 2 additions & 2 deletions NetGrpcPrometheus/NetGrpcPrometheus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<PackageLicenseUrl>https://github.com/e-conomic/csharp-grpc-prometheus/blob/master/LICENSE</PackageLicenseUrl>
<PackageProjectUrl>https://github.com/e-conomic/csharp-grpc-prometheus</PackageProjectUrl>
<RepositoryUrl>https://github.com/e-conomic/csharp-grpc-prometheus</RepositoryUrl>
<PackageTags>Prometheus gRPC Interceptor Metrics</PackageTags>
<Copyright>2018 Visma e-conomic</Copyright>
<PackageTags>Prometheus gRPC Interceptor Metrics Intercept</PackageTags>
<Copyright>Copyright © 2018 Visma e-conomic</Copyright>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 9f8be04

Please sign in to comment.