Skip to content

Commit

Permalink
feat(slimfaas): add queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet committed Jan 3, 2025
1 parent f1f05f3 commit e158bd4
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/SlimFaas/DynamicGaugeService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace SlimFaas;

using Prometheus;
using System.Collections.Concurrent;

public class DynamicGaugeService
{
private readonly ConcurrentDictionary<string, Gauge> _gauges = new();

private Gauge GetOrCreateGauge(string name, string help)
{
return _gauges.GetOrAdd(name, _ => Metrics.CreateGauge(name, help));
}

public void SetGaugeValue(string name, double value, string help = "")
{
var gauge = GetOrCreateGauge(name, help);
gauge.Set(value);
}

public double GetGaugeValue(string name, string help = "")
{
var gauge = GetOrCreateGauge(name, help);
return gauge.Value;
}

}
35 changes: 35 additions & 0 deletions src/SlimFaas/MetricsWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using SlimFaas.Database;

namespace SlimFaas;

public class MetricsWorker(IReplicasService replicasService, ISlimFaasQueue slimFaasQueue, DynamicGaugeService dynamicGaugeService,
ILogger<MetricsWorker> logger,
int delay = EnvironmentVariables.ScaleReplicasWorkerDelayMillisecondsDefault)
: BackgroundService
{
private readonly int _delay =
EnvironmentVariables.ReadInteger(logger, EnvironmentVariables.ScaleReplicasWorkerDelayMilliseconds, delay);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (stoppingToken.IsCancellationRequested == false)
{
try
{
await Task.Delay(_delay, stoppingToken);
var deployments = replicasService.Deployments;
foreach (var deployment in deployments.Functions)
{
var numberElement = await slimFaasQueue.CountElementAsync(deployment.Deployment);
dynamicGaugeService.SetGaugeValue(
$"slimfaas_queue_{deployment.Deployment.ToLowerInvariant()}_length",
numberElement, "Current number of elements in the queue");
}
}
catch (Exception e)
{
logger.LogError(e, "Global Error in MetricsWorker");
}
}
}
}
2 changes: 2 additions & 0 deletions src/SlimFaas/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@
serviceCollectionSlimFaas.AddHostedService<ScaleReplicasWorker>();
serviceCollectionSlimFaas.AddHostedService<ReplicasSynchronizationWorker>();
serviceCollectionSlimFaas.AddHostedService<HistorySynchronizationWorker>();
serviceCollectionSlimFaas.AddHostedService<MetricsWorker>();
serviceCollectionSlimFaas.AddHostedService<HealthWorker>();
serviceCollectionSlimFaas.AddHttpClient();
serviceCollectionSlimFaas.AddSingleton<ISlimFaasQueue, SlimFaasQueue>();
serviceCollectionSlimFaas.AddSingleton<DynamicGaugeService>();
serviceCollectionSlimFaas.AddSingleton<ISlimDataStatus, SlimDataStatus>();
serviceCollectionSlimFaas.AddSingleton<IReplicasService, ReplicasService>(sp =>
(ReplicasService)serviceProviderStarter.GetService<IReplicasService>()!);
Expand Down
56 changes: 56 additions & 0 deletions tests/SlimFaas.Tests/MetricsWorkerShould.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using MemoryPack;
using Microsoft.Extensions.Logging;
using Moq;
using SlimData;
using SlimFaas.Database;
using SlimFaas.Kubernetes;

namespace SlimFaas.Tests;

public class MetricsWorkerShould
{
[Fact]
public async Task AddQueueMetrics()
{
var deploymentsInformations = new DeploymentsInformations(
new List<DeploymentInformation>
{
new("fibonacci1", "default", Replicas: 1, Pods: new List<PodInformation>(),
Configuration: new SlimFaasConfiguration()),
new("fibonacci2", "default", Replicas: 0, Pods: new List<PodInformation>(),
Configuration: new SlimFaasConfiguration())
},
new SlimFaasDeploymentInformation(1, new List<PodInformation>()),
new List<PodInformation>()
);
Mock<ILogger<MetricsWorker>> logger = new();
Mock<IKubernetesService> kubernetesService = new();
Mock<IMasterService> masterService = new();
HistoryHttpMemoryService historyHttpService = new();
Mock<ILogger<ReplicasService>> loggerReplicasService = new();
ReplicasService replicasService =
new(kubernetesService.Object,
historyHttpService,
loggerReplicasService.Object);
masterService.Setup(ms => ms.IsMaster).Returns(true);
kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny<string>(), It.IsAny<DeploymentsInformations>())).ReturnsAsync(deploymentsInformations);

await replicasService.SyncDeploymentsAsync("default");

SlimFaasQueue slimFaasQueue = new(new DatabaseMockService());
CustomRequest customRequest =
new(new List<CustomHeader> { new() { Key = "key", Values = new[] { "value1" } } },
new byte[1], "fibonacci1", "/download", "GET", "");
var jsonCustomRequest = MemoryPackSerializer.Serialize(customRequest);
var retryInformation = new RetryInformation([], 30, []);
await slimFaasQueue.EnqueueAsync("fibonacci1", jsonCustomRequest, retryInformation);
var dynamicGaugeService = new DynamicGaugeService();
MetricsWorker service = new(replicasService, slimFaasQueue, dynamicGaugeService, logger.Object, 100);
Task task = service.StartAsync(CancellationToken.None);
await Task.Delay(3000);


Assert.True(task.IsCompleted);
}

}

0 comments on commit e158bd4

Please sign in to comment.