Skip to content

Commit

Permalink
fix(slimfaas): queue lock (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet authored Jan 14, 2025
1 parent a286bbf commit 557b02b
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 98 deletions.
7 changes: 4 additions & 3 deletions src/SlimData/QueueElementExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@ public static List<QueueElement> GetQueueAvailableElement(this List<QueueElement
var runningWaitingForRetryElements = elements.GetQueueWaitingForRetryElement(nowTicks);
var finishedElements = elements.GetQueueFinishedElement(nowTicks);

Check warning on line 125 in src/SlimData/QueueElementExtensions.cs

View workflow job for this annotation

GitHub Actions / SonarCloud

Argument of type 'List<QueueElement>' cannot be used for parameter 'element' of type 'IList<QueueElement?>' in 'IList<QueueElement> QueueElementExtensions.GetQueueFinishedElement(IList<QueueElement?> element, long nowTicks)' due to differences in the nullability of reference types.
var availableElements = new List<QueueElement>();
var currentCount = runningElements.Count + runningWaitingForRetryElements.Count;
//var currentCount = runningElements.Count + runningWaitingForRetryElements.Count;
var currentElements = elements.Except(runningElements).Except(runningWaitingForRetryElements).Except(finishedElements);

if (currentCount >= maximum)
/*if (currentCount >= maximum)
{
return availableElements;
}
}*/
var currentCount = 0;

foreach (var queueElement in currentElements)
{
Expand Down
14 changes: 1 addition & 13 deletions src/SlimFaas/Database/DatabaseMockService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,7 @@ public Task ListLeftPushAsync(string key, byte[] field, RetryInformation retryIn
return Task.FromResult<IList<QueueData>?>(new List<QueueData>());
}

public Task<long> ListCountAvailableElementAsync(string key, int maximum = int.MaxValue)
{
if (!queue.ContainsKey(key))
{
return Task.FromResult<long>(0);
}

var list = queue[key];

return Task.FromResult<long>(list.Count);
}

public Task<long> ListCountElementAsync(string key, int maximum = Int32.MaxValue)
public Task<long> ListCountElementAsync(string key, IList<CountType> countTypes, int maximum = Int32.MaxValue)
{
if (!queue.ContainsKey(key))
{
Expand Down
12 changes: 10 additions & 2 deletions src/SlimFaas/Database/IDatabaseService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
using SlimData;
using SlimFaas.Database;

namespace SlimFaas;

public enum CountType
{
Available,
Running,
WaitingForRetry
}


public interface IDatabaseService
{
Task<byte[]?> GetAsync(string key);
Expand All @@ -10,7 +19,6 @@ public interface IDatabaseService
Task<IDictionary<string, string>> HashGetAllAsync(string key);
Task ListLeftPushAsync(string key, byte[] field, RetryInformation retryInformation);
Task<IList<QueueData>?> ListRightPopAsync(string key, int count = 1);
Task<long> ListCountAvailableElementAsync(string key, int maximum = int.MaxValue);
Task<long> ListCountElementAsync(string key, int maximum = int.MaxValue);
Task<long> ListCountElementAsync(string key, IList<CountType> countTypes, int maximum = int.MaxValue);
Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus);
}
3 changes: 1 addition & 2 deletions src/SlimFaas/Database/ISlimFaasQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ public interface ISlimFaasQueue
Task EnqueueAsync(string key, byte[] message, RetryInformation retryInformation);
Task<IList<QueueData>?> DequeueAsync(string key, int count = 1);
Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus);
public Task<long> CountAvailableElementAsync(string key, int maximum = int.MaxValue);
public Task<long> CountElementAsync(string key, int maximum = int.MaxValue);
public Task<long> CountElementAsync(string key, IList<CountType> countTypes, int maximum = int.MaxValue);
}
62 changes: 33 additions & 29 deletions src/SlimFaas/Database/SlimDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
using SlimData;
using SlimData.Commands;



namespace SlimFaas.Database;
#pragma warning disable CA2252



public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvider serviceProvider, IRaftCluster cluster, ILogger<SlimDataService> logger)
: IDatabaseService
{
Expand Down Expand Up @@ -167,12 +172,13 @@ private async Task DoListLeftPushAsync(string key, byte[] field, RetryInformatio
}
}

public Task<long> ListCountElementAsync(string key, int maximum = Int32.MaxValue)
public Task<long> ListCountElementAsync(string key, IList<CountType> countTypes, int maximum = Int32.MaxValue)
{
return Retry.DoAsync(() => DoListCountElementAsync(key, maximum), logger, _retryInterval);

return Retry.DoAsync(() => DoListCountElementAsync(key, countTypes, maximum), logger, _retryInterval);
}

private async Task<long> DoListCountElementAsync(string key, int maximum)
private async Task<long> DoListCountElementAsync(string key, IList<CountType> countTypes, int maximum)
{
await GetAndWaitForLeader();
await MasterWaitForleaseToken();
Expand All @@ -182,10 +188,30 @@ private async Task<long> DoListCountElementAsync(string key, int maximum)
if (data.Queues.TryGetValue(key, out List<QueueElement>? value))
{
var nowTicks = DateTime.UtcNow.Ticks;
var elements = value.GetQueueAvailableElement(nowTicks, maximum);
var runningElements = value.GetQueueRunningElement(nowTicks);
var runningWaitingForRetryElements = value.GetQueueWaitingForRetryElement(nowTicks);
return elements.Count + runningElements.Count + runningWaitingForRetryElements.Count;
if (countTypes.Count == 0)
{
return 0L;
}

var availableElements = new List<QueueElement>();
if (countTypes.Contains(CountType.Available))
{
availableElements = value.GetQueueAvailableElement(nowTicks, maximum);
}

var runningElements = new List<QueueElement>();
if (countTypes.Contains(CountType.Running))
{
runningElements = value.GetQueueRunningElement(nowTicks);
}

var runningWaitingForRetryElements = new List<QueueElement>();

if (countTypes.Contains(CountType.WaitingForRetry))
{
runningWaitingForRetryElements = value.GetQueueWaitingForRetryElement(nowTicks);
}
return availableElements.Count + runningElements.Count + runningWaitingForRetryElements.Count;
}

return 0L;
Expand Down Expand Up @@ -218,28 +244,6 @@ private async Task DoListCallbackAsync(string key, ListQueueItemStatus queueItem
}
}

public async Task<long> ListCountAvailableElementAsync(string key, int maximum)
{
return await Retry.DoAsync(() => DoListCountAvailableElementAsync(key, maximum), logger, _retryInterval);
}

private async Task<long> DoListCountAvailableElementAsync(string key, int maximum)
{
await GetAndWaitForLeader();
await MasterWaitForleaseToken();

SlimDataPayload data = SimplePersistentState.Invoke();

if (data.Queues.TryGetValue(key, out List<QueueElement>? value))
{
var elements = value.GetQueueAvailableElement(DateTime.UtcNow.Ticks, maximum);
var number = elements.Count;
return number;
}

return 0L;
}

private async Task MasterWaitForleaseToken()
{
while (cluster.TryGetLeaseToken(out var leaseToken) && leaseToken.IsCancellationRequested)
Expand Down
4 changes: 1 addition & 3 deletions src/SlimFaas/Database/SlimFaasQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ public async Task EnqueueAsync(string key, byte[] data, RetryInformation retryIn

public async Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) => await databaseService.ListCallbackAsync($"{KeyPrefix}{key}", queueItemStatus);

public async Task<long> CountAvailableElementAsync(string key, int maximum = int.MaxValue) => await databaseService.ListCountAvailableElementAsync($"{KeyPrefix}{key}", maximum);

public async Task<long> CountElementAsync(string key, int maximum = int.MaxValue) => await databaseService.ListCountElementAsync($"{KeyPrefix}{key}", maximum);
public async Task<long> CountElementAsync(string key, IList<CountType> countTypes, int maximum = int.MaxValue) => await databaseService.ListCountElementAsync($"{KeyPrefix}{key}", countTypes, maximum);


}
2 changes: 1 addition & 1 deletion src/SlimFaas/HealthWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace SlimFaas;

public class HealthWorker(IHostApplicationLifetime hostApplicationLifetime, IRaftCluster raftCluster, ISlimDataStatus slimDataStatus,
public class HealthWorker(IHostApplicationLifetime hostApplicationLifetime, IRaftCluster raftCluster,
ILogger<HealthWorker> logger,
int delay = EnvironmentVariables.HealthWorkerDelayMillisecondsDefault,
int delayToExitSeconds = EnvironmentVariables.HealthWorkerDelayToExitSecondsDefault,
Expand Down
25 changes: 20 additions & 5 deletions src/SlimFaas/MetricsWorker.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using SlimFaas.Database;
using DotNext.Net.Cluster.Consensus.Raft;
using SlimFaas.Database;

namespace SlimFaas;

public class MetricsWorker(IReplicasService replicasService, ISlimFaasQueue slimFaasQueue, DynamicGaugeService dynamicGaugeService,
public class MetricsWorker(IReplicasService replicasService, ISlimFaasQueue slimFaasQueue, DynamicGaugeService dynamicGaugeService, IRaftCluster raftCluster,
ILogger<MetricsWorker> logger,
int delay = EnvironmentVariables.ScaleReplicasWorkerDelayMillisecondsDefault)
: BackgroundService
Expand All @@ -17,13 +18,27 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
try
{
await Task.Delay(_delay, stoppingToken);
if (raftCluster.Leader == null)
{
continue;
}
var deployments = replicasService.Deployments;
foreach (var deployment in deployments.Functions)
{
var numberElement = await slimFaasQueue.CountElementAsync(deployment.Deployment);
var numberElementAvailable = await slimFaasQueue.CountElementAsync(deployment.Deployment, new List<CountType>() { CountType.Available });
dynamicGaugeService.SetGaugeValue(
$"slimfaas_queue_available_{deployment.Deployment.ToLowerInvariant()}_length",
numberElementAvailable, "Current number of elements available in the queue");

var numberElementProcessing = await slimFaasQueue.CountElementAsync(deployment.Deployment, new List<CountType>() { CountType.Running });
dynamicGaugeService.SetGaugeValue(
$"slimfaas_queue_processing_{deployment.Deployment.ToLowerInvariant()}_length",
numberElementProcessing, "Current number of elements processing in the queue");

var numberElementWaitingForRetry = await slimFaasQueue.CountElementAsync(deployment.Deployment, new List<CountType>() { CountType.WaitingForRetry });
dynamicGaugeService.SetGaugeValue(
$"slimfaas_queue_{deployment.Deployment.ToLowerInvariant()}_length",
numberElement, "Current number of elements in the queue");
$"slimfaas_queue_waiting_for_retry_{deployment.Deployment.ToLowerInvariant()}_length",
numberElementWaitingForRetry, "Current number of elements waiting for retry in the queue");
}
}
catch (Exception e)
Expand Down
19 changes: 14 additions & 5 deletions src/SlimFaas/SlimWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private async Task DoOneCycle(CancellationToken stoppingToken,
string functionDeployment = function.Deployment;
setTickLastCallCounterDictionary.TryAdd(functionDeployment, 0);
int numberProcessingTasks = await ManageProcessingTasksAsync(slimFaasQueue, processingTasks, functionDeployment);
int numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(slimFaas, function);
int numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(masterService, slimFaas, function);
setTickLastCallCounterDictionary[functionDeployment]++;
int functionReplicas = function.Replicas;
long queueLength = await UpdateTickLastCallIfRequestStillInProgress(
Expand Down Expand Up @@ -121,7 +121,12 @@ private async Task<long> UpdateTickLastCallIfRequestStillInProgress(IMasterServi
if (masterService.IsMaster)
{
int counterLimit = functionReplicas == 0 ? 10 : 40;
long queueLength = await slimFaasQueue.CountElementAsync(functionDeployment);
long queueLength = await slimFaasQueue.CountElementAsync(functionDeployment, new List<CountType>()
{
CountType.Available,
CountType.Running,
CountType.WaitingForRetry
} );
if (setTickLastCallCounterDictionnary[functionDeployment] > counterLimit)
{
setTickLastCallCounterDictionnary[functionDeployment] = 0;
Expand All @@ -138,22 +143,26 @@ private async Task<long> UpdateTickLastCallIfRequestStillInProgress(IMasterServi
}
}

return await slimFaasQueue.CountAvailableElementAsync(functionDeployment, numberLimitProcessingTasks);
return await slimFaasQueue.CountElementAsync(functionDeployment, new List<CountType>() { CountType.Available }, numberLimitProcessingTasks);
}

private static int ComputeNumberLimitProcessingTasks(SlimFaasDeploymentInformation slimFaas,
private static int ComputeNumberLimitProcessingTasks(IMasterService masterService, SlimFaasDeploymentInformation slimFaas,
DeploymentInformation function)
{
int numberLimitProcessingTasks;
int numberReplicas = slimFaas.Replicas;

if (function.NumberParallelRequest < numberReplicas || numberReplicas == 0)
{
numberLimitProcessingTasks = 1;
numberLimitProcessingTasks = masterService.IsMaster ? function.NumberParallelRequest : 0;
}
else
{
numberLimitProcessingTasks = function.NumberParallelRequest / slimFaas.Replicas;
if(masterService.IsMaster)
{
numberLimitProcessingTasks = function.NumberParallelRequest - numberLimitProcessingTasks * (numberReplicas - 1);
}
}

return numberLimitProcessingTasks;
Expand Down
29 changes: 1 addition & 28 deletions tests/SlimData.Tests/QueueElementExtentionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void QueueElementExtensionsGetQueueRunningElement()

var availableElements = queueElements.GetQueueAvailableElement(nowTicks, 3);

Assert.Equal(2, availableElements.Count);
Assert.Equal(3, availableElements.Count);
Assert.Equal("2", availableElements[0].Id);
Assert.Equal("3", availableElements[1].Id);

Expand All @@ -73,31 +73,4 @@ public static void QueueElementExtensionsGetQueueRunningElement()
Assert.Equal(4, finishedElements.Count);
}

/* [Fact]
public static void QueueElementExtensionsGetQueueRunningElement2()
{
// I want a test which text my extention
var nowTicks = DateTime.UtcNow.Ticks;
List<QueueElement> queueElements = new();
queueElements.Add(new QueueElement(new ReadOnlyMemory<byte>([1]), "1", nowTicks, new List<QueueHttpTryElement>()));
queueElements.Add(new QueueElement(new ReadOnlyMemory<byte>([1]), "2", nowTicks, new List<QueueHttpTryElement>()));
queueElements.Add(new QueueElement(new ReadOnlyMemory<byte>([1]), "3", nowTicks, new List<QueueHttpTryElement>()));
var availableElements = queueElements.GetQueueAvailableElement(SlimDataInterpreter.TimeoutRetries, nowTicks, 1, 30);
foreach (QueueElement queueElement in queueElements)
{
Assert.Equal(2, availableElements.Count);
Assert.Equal("2", availableElements[0].Id);
Assert.Equal("3", availableElements[1].Id);
}
var runningElements = queueElements.GetQueueRunningElement(nowTicks);
Assert.Equal(1, runningElements.Count);
Assert.Equal("1", runningElements[0].Id);
var finishedElements = queueElements.GetQueueFinishedElement(nowTicks, SlimDataInterpreter.TimeoutRetries);
Assert.Equal(4, finishedElements.Count);
}*/
}
7 changes: 5 additions & 2 deletions tests/SlimData.Tests/RaftClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ await databaseServiceSlave.HashSetAsync("hashsetKey1",

await databaseServiceSlave.ListLeftPushAsync("listKey1", MemoryPackSerializer.Serialize("value1"), new RetryInformation([], 30, []));
await GetLocalClusterView(host1).ForceReplicationAsync();
long listLength = await databaseServiceSlave.ListCountAvailableElementAsync("listKey1");
long listLength = await databaseServiceSlave.ListCountElementAsync("listKey1" , new List<CountType>()
{
CountType.Available
});
Assert.Equal(1, listLength);

IList<QueueData>? listRightPop = await databaseServiceSlave.ListRightPopAsync("listKey1");
Expand All @@ -272,7 +275,7 @@ await databaseServiceSlave.HashSetAsync("hashsetKey1",
await databaseServiceSlave.ListCallbackAsync("listKey1", queueItemStatus);

await GetLocalClusterView(host1).ForceReplicationAsync();
var listLength2 = await databaseServiceSlave.ListCountAvailableElementAsync("listKey1");
var listLength2 = await databaseServiceSlave.ListCountElementAsync("listKey1", new List<CountType>() { CountType.Available });

Assert.Equal(0, listLength2);

Expand Down
13 changes: 10 additions & 3 deletions tests/SlimFaas.Tests/MetricsWorkerShould.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MemoryPack;
using DotNext.Net.Cluster.Consensus.Raft;
using MemoryPack;
using Microsoft.Extensions.Logging;
using Moq;
using SlimData;
Expand Down Expand Up @@ -35,6 +36,12 @@ public async Task AddQueueMetrics()
masterService.Setup(ms => ms.IsMaster).Returns(true);
kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny<string>(), It.IsAny<DeploymentsInformations>())).ReturnsAsync(deploymentsInformations);

Mock<IRaftClusterMember> raftClusterMember = new();


Mock<IRaftCluster> raftCluster = new();
raftCluster.Setup(rc => rc.Leader).Returns(raftClusterMember.Object);

await replicasService.SyncDeploymentsAsync("default");

SlimFaasQueue slimFaasQueue = new(new DatabaseMockService());
Expand All @@ -45,12 +52,12 @@ public async Task AddQueueMetrics()
var retryInformation = new RetryInformation([], 30, []);
await slimFaasQueue.EnqueueAsync("fibonacci1", jsonCustomRequest, retryInformation);
var dynamicGaugeService = new DynamicGaugeService();
MetricsWorker service = new(replicasService, slimFaasQueue, dynamicGaugeService, logger.Object, 100);
MetricsWorker service = new(replicasService, slimFaasQueue, dynamicGaugeService, raftCluster.Object, logger.Object, 100);
Task task = service.StartAsync(CancellationToken.None);
await Task.Delay(3000);


Assert.True(task.IsCompleted);
//Assert.True(task.IsCompleted);
}

}
3 changes: 1 addition & 2 deletions tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ internal class MemorySlimFaasQueue : ISlimFaasQueue
{
public Task<IList<QueueData>?> DequeueAsync(string key, int count = 1) => throw new NotImplementedException();

public Task<long> CountAvailableElementAsync(string key, int maximum) => throw new NotImplementedException();
public Task<long> CountElementAsync(string key, int maximum) => throw new NotImplementedException();
public Task<long> CountElementAsync(string key, IList<CountType> countTypes, int maximum) => throw new NotImplementedException();

public Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) => throw new NotImplementedException();

Expand Down

0 comments on commit 557b02b

Please sign in to comment.