diff --git a/README.md b/README.md index 6a154fd2..dbbf1cd3 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,14 @@ Why use SlimFaas? - Scale to 0 after a period of inactivity (work with deployment and statefulset) +- Scale up : compatible with HPA (Horizontal Auto Scaler) and Keda (SlimFaas integrated autonomous Scale Up coming soon) - Synchronous HTTP calls - Asynchronous HTTP calls - Allows you to limit the number of parallel HTTP requests for each underlying function -- Retry: 3 times with graduation: 2 seconds, 4 seconds, 8 seconds +- Retry Pattern configurable - Private and Public functions - Private functions can be accessed only by internal namespace http call from pods -- Synchronous Publish/Subscribe internal events via HTTP calls to every replicas via HTTP without any use of specific drivers/libraries (**Couple you application with SlimFaas**) +- Synchronous Publish/Subscribe internal events via HTTP calls to every replicas via HTTP without any use of specific drivers/libraries (**Couple your application with SlimFaas**) - Mind Changer: REST API that show the status of your functions and allow to wake up your infrastructure (**Couple your application with Slimfaas**) - Very useful to inform end users that your infrastructure is starting - Plug and Play: just deploy a standard pod @@ -27,7 +28,8 @@ To test SlimFaas on your local machine by using kubernetes with Docker Desktop, ```bash git clone https://github.com/AxaFrance/slimfaas.git -cd slimfaas/demo +cd slimfaas +cd demo # Create slimfaas service account and pods kubectl apply -f deployment-slimfaas.yml # Expose SlimFaaS service as NodePort or Ingress @@ -40,7 +42,8 @@ kubectl apply -f deployment-functions.yml # Install MySql kubectl apply -f deployment-mysql.yml # to run Single Page webapp demo (optional) on http://localhost:8000 -docker run -p 8000:8000 --rm axaguildev/fibonacci-webapp:latest +docker run -d -p 8000:8000 --rm axaguildev/fibonacci-webapp:latest +kubectl port-forward svc/slimfaas-nodeport 30021:5000 -n slimfaas-demo ``` Now, you can access your pod via SlimFaas proxy: @@ -372,6 +375,28 @@ spec: - **SlimFaas/ExcludeDeploymentsFromVisibilityPrivate** : "" - Comma separated list of deployment names or statefulset names - Message from that pods will be considered as public. It is useful if you want to exclude some pods from the private visibility, for example for a backend for frontend. +- **SlimFaas/Configuration** : json configuration default values displayed below + - Allows you to define a configuration for your functions. For example, you can define a timeout for HTTP calls, a retry pattern for timeouts and HTTP status codes. + +````bash +{ + "DefaultSync":{ + "HttpTimeout": 120, # Timeout in seconds + "TimeoutRetries": [2,4,8] # Retry pattern in seconds + "HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes + } + "DefaultAsync":{ + "HttpTimeout": 120, # Timeout in seconds + "TimeoutRetries": [2,4,8] # Retry pattern in seconds + "HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes + }, + "DefaultPublish":{ + "HttpTimeout": 120, # Timeout in seconds + "TimeoutRetries": [2,4,8] # Retry pattern in seconds + "HttpStatusRetries": [500,502,503] # Retry only for 500,502,503 HTTP status codes + } +} +```` - **SlimFaas/Schedule** : json configuration - Allows you to define a schedule for your functions. If you want to wake up your infrastructure at 07:00 or for example scale down after 60 seconds of inactivity after 07:00 and scale down after 10 seconds of inactivity after 21:00. Time zones are defined as IANA time zones. The full list is available [here](https://nodatime.org/TimeZones) @@ -380,10 +405,10 @@ spec: { "TimeZoneID":"Europe/Paris", # Time Zone ID can be found here: https://nodatime.org/TimeZones "Default":{ - "WakeUp":["07:00"], // Wake up your infrastructure at 07:00 + "WakeUp":["07:00"], # Wake up your infrastructure at 07:00 "ScaleDownTimeout":[ - {"Time":"07:00","Value":20}, // Scale down after 20 seconds of inactivity after 07:00 - {"Time":"21:00","Value":10} // Scale down after 10 seconds of inactivity after 21:00 + {"Time":"07:00","Value":20}, # Scale down after 20 seconds of inactivity after 07:00 + {"Time":"21:00","Value":10} # Scale down after 10 seconds of inactivity after 21:00 ] } } diff --git a/demo/deployment-functions.yml b/demo/deployment-functions.yml index 0c24187a..a77f0e7e 100644 --- a/demo/deployment-functions.yml +++ b/demo/deployment-functions.yml @@ -21,7 +21,7 @@ spec: automountServiceAccountToken: false containers: - name: fibonacci1 - image: axaguildev/fibonacci:0.24.18 + image: axaguildev/fibonacci:latest livenessProbe: httpGet: path: /health @@ -29,9 +29,20 @@ spec: initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 5000 + initialDelaySeconds: 3 + periodSeconds: 1 + timeoutSeconds: 1 + successThreshold: 1 + failureThreshold: 3 env: - name: ASPNETCORE_URLS value: http://+:5000 + - name: Logging__LogLevel__Default + value: Debug resources: limits: memory: "96Mi" @@ -65,7 +76,7 @@ spec: automountServiceAccountToken: false containers: - name: fibonacci2 - image: axaguildev/fibonacci:0.24.18 + image: axaguildev/fibonacci:latest livenessProbe: httpGet: path: /health @@ -73,6 +84,15 @@ spec: initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 5000 + initialDelaySeconds: 3 + periodSeconds: 1 + timeoutSeconds: 1 + successThreshold: 1 + failureThreshold: 3 env: - name: ASPNETCORE_URLS value: http://+:5000 @@ -109,7 +129,7 @@ spec: automountServiceAccountToken: false containers: - name: fibonacci3 - image: axaguildev/fibonacci:0.24.18 + image: axaguildev/fibonacci:latest livenessProbe: httpGet: path: /health @@ -117,6 +137,15 @@ spec: initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 5000 + initialDelaySeconds: 3 + periodSeconds: 1 + timeoutSeconds: 1 + successThreshold: 1 + failureThreshold: 3 env: - name: ASPNETCORE_URLS value: http://+:5000 @@ -158,7 +187,7 @@ spec: automountServiceAccountToken: false containers: - name: fibonacci4 - image: axaguildev/fibonacci:0.24.18 + image: axaguildev/fibonacci:latest livenessProbe: httpGet: path: /health @@ -166,6 +195,15 @@ spec: initialDelaySeconds: 5 periodSeconds: 5 timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 5000 + initialDelaySeconds: 3 + periodSeconds: 1 + timeoutSeconds: 1 + successThreshold: 1 + failureThreshold: 3 env: - name: ASPNETCORE_URLS value: http://+:5000 diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index f5999190..88f6eedd 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -60,6 +60,31 @@ roleRef: name: deployment-statefulset-manager apiGroup: rbac.authorization.k8s.io --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: endpoints-viewer + namespace: slimfaas-demo +rules: + - apiGroups: [""] + resources: ["endpoints"] + verbs: ["get", "list", "watch"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: slimfaas-endpoints-viewer + namespace: slimfaas-demo +subjects: + - kind: ServiceAccount + name: slimfaas + namespace: slimfaas-demo +roleRef: + kind: Role + name: endpoints-viewer + apiGroup: rbac.authorization.k8s.io +--- apiVersion: apps/v1 kind: StatefulSet metadata: @@ -84,7 +109,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:0.24.18 + image: axaguildev/slimfaas:latest livenessProbe: httpGet: path: /health @@ -114,6 +139,9 @@ spec: value: "Debug" #- name: SLIMDATA_CONFIGURATION # value: | + # {"coldStart":"true"} + #- name: SLIMDATA_CONFIGURATION + # value: | # {"lowerElectionTimeout":"500","upperElectionTimeout":"1000","requestTimeout":"00:01:20.0000000","rpcTimeout":"00:00:40.0000000","heartbeatThreshold":"0.5"} #- name: SLIMDATA_SOCKETS_HTTP_HANDLER_TIMEOUT # value: "500" diff --git a/src/Fibonacci/Program.cs b/src/Fibonacci/Program.cs index 987ead94..9e879ea8 100644 --- a/src/Fibonacci/Program.cs +++ b/src/Fibonacci/Program.cs @@ -28,6 +28,12 @@ app.MapGet("/health", () => "OK"); +app.MapGet("/error", async () => +{ + await Task.Delay(100); + throw new Exception("Error"); +}); + app.MapGet("/hello/{name}", ([FromServices] ILogger logger, string name) => { logger.LogInformation("Hello Called with name: {Name}", name); @@ -46,7 +52,7 @@ [FromServices] Fibonacci fibonacci, FibonacciInput input) => { - logger.LogInformation("Fibonacci Called"); + logger.LogInformation("Fibonacci Called with input: {Input}", input.Input); var output = new FibonacciOutput(); output.Result = fibonacci.Run(input.Input); logger.LogInformation("Fibonacci output: {Output}", output.Result); diff --git a/src/SlimData/Commands/AddKeyValueCommand.cs b/src/SlimData/Commands/AddKeyValueCommand.cs index 6bb877df..c1d0af1f 100644 --- a/src/SlimData/Commands/AddKeyValueCommand.cs +++ b/src/SlimData/Commands/AddKeyValueCommand.cs @@ -12,7 +12,7 @@ public struct AddKeyValueCommand : ISerializable public string Key { get; set; } public ReadOnlyMemory Value { get; set; } - long? IDataTransferObject.Length => sizeof(int) + Value.Length; + long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Key) + Value.Length; public async ValueTask WriteToAsync(TWriter writer, CancellationToken token) where TWriter : notnull, IAsyncBinaryWriter diff --git a/src/SlimData/Commands/ListCallbackCommand.cs b/src/SlimData/Commands/ListCallbackCommand.cs new file mode 100644 index 00000000..99534c23 --- /dev/null +++ b/src/SlimData/Commands/ListCallbackCommand.cs @@ -0,0 +1,45 @@ +using System.Text; +using DotNext.IO; +using DotNext.Runtime.Serialization; +using DotNext.Text; + +namespace SlimData.Commands; + +public struct ListCallbackCommand : ISerializable +{ + public const int Id = 15; + + public string Identifier { get; set; } + public string Key { get; set; } + + public int HttpCode { get; set; } + + public long NowTicks { get; set; } + + public async ValueTask WriteToAsync(TWriter writer, CancellationToken token) where TWriter : notnull, IAsyncBinaryWriter + { + var command = this; + await writer.EncodeAsync(command.Identifier.AsMemory(), new EncodingContext(Encoding.UTF8, false), + LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.EncodeAsync(command.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false), + LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(HttpCode, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(NowTicks, token).ConfigureAwait(false); + } + + long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Identifier) + sizeof(int) + Encoding.UTF8.GetByteCount(Key) + sizeof(long); + + public static async ValueTask ReadFromAsync(TReader reader, CancellationToken token) where TReader : notnull, IAsyncBinaryReader + { + var identifier = await reader.DecodeAsync( new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false); + var key = await reader.DecodeAsync( new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false); + + return new ListCallbackCommand + { + Identifier = identifier.ToString(), + Key = key.ToString(), + HttpCode = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false), + NowTicks = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false) + }; + } +} \ No newline at end of file diff --git a/src/SlimData/Commands/ListLeftPushCommand.cs b/src/SlimData/Commands/ListLeftPushCommand.cs index 8fa9a60f..faeaac3f 100644 --- a/src/SlimData/Commands/ListLeftPushCommand.cs +++ b/src/SlimData/Commands/ListLeftPushCommand.cs @@ -10,9 +10,19 @@ public struct ListLeftPushCommand : ISerializable public const int Id = 13; public string Key { get; set; } + + public string Identifier { get; set; } + public long NowTicks { get; set; } + + public int RetryTimeout { get; set; } + + public List Retries { get; set; } + + public List HttpStatusCodesWorthRetrying { get; set; } + public ReadOnlyMemory Value { get; set; } - long? IDataTransferObject.Length => sizeof(int) + Value.Length; + long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Key) + Value.Length + Encoding.UTF8.GetByteCount(Identifier) + sizeof(long) + sizeof(int) + Retries.Count * sizeof(int) + sizeof(int) + sizeof(int) + HttpStatusCodesWorthRetrying.Count * sizeof(int); public async ValueTask WriteToAsync(TWriter writer, CancellationToken token) where TWriter : notnull, IAsyncBinaryWriter @@ -20,7 +30,22 @@ public async ValueTask WriteToAsync(TWriter writer, CancellationToken t var command = this; await writer.EncodeAsync(command.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.EncodeAsync(command.Identifier.AsMemory(), new EncodingContext(Encoding.UTF8, false), + LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(NowTicks, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(RetryTimeout, token).ConfigureAwait(false); await writer.WriteAsync(command.Value, LengthFormat.Compressed, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(Retries.Count, token).ConfigureAwait(false); + foreach (var retry in Retries) + { + await writer.WriteLittleEndianAsync(retry, token).ConfigureAwait(false); + } + await writer.WriteLittleEndianAsync(HttpStatusCodesWorthRetrying.Count, token).ConfigureAwait(false); + foreach (var httpStatus in HttpStatusCodesWorthRetrying) + { + await writer.WriteLittleEndianAsync(httpStatus, token).ConfigureAwait(false); + } + } #pragma warning disable CA2252 @@ -29,11 +54,32 @@ public static async ValueTask ReadFromAsync(TReade where TReader : notnull, IAsyncBinaryReader { var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false); + var identifier = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token).ConfigureAwait(false); + var nowTicks = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false); + var timeout = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false); using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); + var retriesCount = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false); + var retries = new List(retriesCount); + while (retriesCount-- > 0) + { + retries.Add(await reader.ReadLittleEndianAsync(token).ConfigureAwait(false)); + } + var httpStatusCodesWorthRetryingCount = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false); + var httpStatusCodesWorthRetrying = new List(httpStatusCodesWorthRetryingCount); + while (httpStatusCodesWorthRetryingCount-- > 0) + { + httpStatusCodesWorthRetrying.Add(await reader.ReadLittleEndianAsync(token).ConfigureAwait(false)); + } + return new ListLeftPushCommand { Key = key.ToString(), - Value = value.Memory.ToArray() + Identifier = identifier.ToString(), + NowTicks = nowTicks, + RetryTimeout = timeout, + Retries = retries, + Value = value.Memory.ToArray(), + HttpStatusCodesWorthRetrying = httpStatusCodesWorthRetrying }; } } \ No newline at end of file diff --git a/src/SlimData/Commands/ListRightPopCommand.cs b/src/SlimData/Commands/ListRightPopCommand.cs index c78956ef..d32c36da 100644 --- a/src/SlimData/Commands/ListRightPopCommand.cs +++ b/src/SlimData/Commands/ListRightPopCommand.cs @@ -11,8 +11,9 @@ public struct ListRightPopCommand : ISerializable public string Key { get; set; } public int Count { get; set; } + public long NowTicks { get; set; } - long? IDataTransferObject.Length => sizeof(int); + long? IDataTransferObject.Length => Encoding.UTF8.GetByteCount(Key) + sizeof(int) + sizeof(long); public async ValueTask WriteToAsync(TWriter writer, CancellationToken token) where TWriter : notnull, IAsyncBinaryWriter @@ -21,6 +22,7 @@ public async ValueTask WriteToAsync(TWriter writer, CancellationToken t await writer.EncodeAsync(command.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token).ConfigureAwait(false); await writer.WriteLittleEndianAsync(Count, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(NowTicks, token).ConfigureAwait(false); } #pragma warning disable CA2252 @@ -32,7 +34,8 @@ public static async ValueTask ReadFromAsync(TReade return new ListRightPopCommand { Key = key.ToString(), - Count = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false) + Count = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false), + NowTicks = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false) }; } } \ No newline at end of file diff --git a/src/SlimData/Commands/LogSnapshotCommand.cs b/src/SlimData/Commands/LogSnapshotCommand.cs index f111003d..050effe2 100644 --- a/src/SlimData/Commands/LogSnapshotCommand.cs +++ b/src/SlimData/Commands/LogSnapshotCommand.cs @@ -6,14 +6,14 @@ namespace SlimData.Commands; public readonly struct LogSnapshotCommand(Dictionary> keysValues, - Dictionary> hashsets, Dictionary>> queues) + Dictionary> hashsets, Dictionary> queues) : ISerializable { public const int Id = 5; public readonly Dictionary> keysValues = keysValues; public readonly Dictionary> hashsets = hashsets; - public readonly Dictionary>> queues = queues; + public readonly Dictionary> queues = queues; long? IDataTransferObject.Length // optional implementation, may return null @@ -31,7 +31,22 @@ public readonly struct LogSnapshotCommand(Dictionary result += x.Length); + queue.Value.ForEach(x => + { + result += x.Value.Length + Encoding.UTF8.GetByteCount(x.Id) + sizeof(Int64); + result += sizeof(Int32); // 4 bytes for timeout + result += sizeof(Int32); // 4 bytes for retries count + result += x.TimeoutRetries.Count * sizeof(Int32); + + result += sizeof(Int32); // 4 bytes for hashset count + foreach (var retryQueueElement in x.RetryQueueElements) + { + result += sizeof(Int64) * 2 + sizeof(Int32); + } + + result += sizeof(Int32); // 4 bytes for HttpStatusRetries count + result += x.HttpStatusRetries.Count * sizeof(Int32); + }); } // compute length of the serialized data, in bytes @@ -68,8 +83,31 @@ public async ValueTask WriteToAsync(TWriter writer, CancellationToken t { await writer.EncodeAsync(queue.Key.AsMemory(), new EncodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token).ConfigureAwait(false); await writer.WriteLittleEndianAsync(queue.Value.Count, token).ConfigureAwait(false); - foreach (var value in queue.Value) - await writer.WriteAsync(value, LengthFormat.Compressed, token).ConfigureAwait(false); + foreach (var value in queue.Value){ + await writer.WriteAsync(value.Value, LengthFormat.Compressed, token).ConfigureAwait(false); + await writer.EncodeAsync(value.Id.AsMemory(), new EncodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token).ConfigureAwait(false); + await writer.WriteBigEndianAsync(value.InsertTimeStamp, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(value.HttpTimeout, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(value.TimeoutRetries.Count, token).ConfigureAwait(false); + foreach (var valueRetry in value.TimeoutRetries) + { + await writer.WriteLittleEndianAsync(valueRetry, token).ConfigureAwait(false); + } + + await writer.WriteLittleEndianAsync(value.RetryQueueElements.Count, token).ConfigureAwait(false); + foreach (var retryQueueElement in value.RetryQueueElements) + { + await writer.WriteBigEndianAsync(retryQueueElement.StartTimeStamp, token).ConfigureAwait(false); + await writer.WriteBigEndianAsync(retryQueueElement.EndTimeStamp, token).ConfigureAwait(false); + await writer.WriteLittleEndianAsync(retryQueueElement.HttpCode, token).ConfigureAwait(false); + } + + await writer.WriteLittleEndianAsync(value.HttpStatusRetries.Count, token).ConfigureAwait(false); + foreach (var httpStatusCode in value.HttpStatusRetries) + { + await writer.WriteLittleEndianAsync(httpStatusCode, token).ConfigureAwait(false); + } + } } // write the number of entries @@ -105,18 +143,47 @@ public static async ValueTask ReadFromAsync(TReader } var countQueues = await reader.ReadLittleEndianAsync(token).ConfigureAwait(false); - var queues = new Dictionary>>(countQueues); + var queues = new Dictionary>(countQueues); // deserialize entries while (countQueues-- > 0) { var key = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token) .ConfigureAwait(false); var countQueue = await reader.ReadLittleEndianAsync(token); - var queue = new List>(countQueue); + var queue = new List(countQueue); while (countQueue-- > 0) { using var value = await reader.ReadAsync(LengthFormat.Compressed, token: token).ConfigureAwait(false); - queue.Add(value.Memory); + var id = await reader.DecodeAsync(new DecodingContext(Encoding.UTF8, false), LengthFormat.LittleEndian, token: token) + .ConfigureAwait(false); + var insertTimeStamp = await reader.ReadBigEndianAsync(token); + var timeout = await reader.ReadLittleEndianAsync(token); + var countRetries = await reader.ReadLittleEndianAsync(token); + var retries = new List(countRetries); + while (countRetries-- > 0) + { + retries.Add(await reader.ReadLittleEndianAsync(token)); + } + + + var countRetryQueueElements = await reader.ReadLittleEndianAsync(token); + var retryQueueElements = new List(countRetryQueueElements); + while (countRetryQueueElements-- > 0) + { + var startTimestamp = await reader.ReadBigEndianAsync(token); + var endTimestamp = await reader.ReadBigEndianAsync(token); + var httpCode = await reader.ReadLittleEndianAsync(token); + retryQueueElements.Add(new QueueHttpTryElement(startTimestamp, endTimestamp, httpCode)); + } + + var countHttpStatusCodesWorthRetrying = await reader.ReadLittleEndianAsync(token); + var httpStatusCodesWorthRetrying = new List(countHttpStatusCodesWorthRetrying); + while (countHttpStatusCodesWorthRetrying-- > 0) + { + httpStatusCodesWorthRetrying.Add(await reader.ReadLittleEndianAsync(token)); + } + + queue.Add(new QueueElement(value.Memory, id.ToString(), insertTimeStamp, timeout, retries, retryQueueElements, httpStatusCodesWorthRetrying)); } queues.Add(key.ToString(), queue); diff --git a/src/SlimData/Commands/SlimDataPayload.cs b/src/SlimData/Commands/SlimDataPayload.cs index beb0efc3..ef504787 100644 --- a/src/SlimData/Commands/SlimDataPayload.cs +++ b/src/SlimData/Commands/SlimDataPayload.cs @@ -4,6 +4,6 @@ public struct SlimDataPayload { public IDictionary> KeyValues { get; set; } - public Dictionary>> Queues { get; set; } + public Dictionary> Queues { get; set; } public IDictionary> Hashsets { get; set; } } \ No newline at end of file diff --git a/src/SlimData/Endpoints.cs b/src/SlimData/Endpoints.cs index 448cf7b1..c3fd0a63 100644 --- a/src/SlimData/Endpoints.cs +++ b/src/SlimData/Endpoints.cs @@ -5,6 +5,21 @@ namespace SlimData; +[MemoryPackable] +public partial record ListLeftPushInput(byte[] Value, byte[] RetryInformation); + +[MemoryPackable] +public partial record RetryInformation(List Retries, int RetryTimeoutSeconds, List HttpStatusRetries); + +[MemoryPackable] +public partial record QueueItemStatus(string Id="", int HttpCode=0); + +[MemoryPackable] +public partial record ListQueueItemStatus +{ + public List? Items { get; set; } +} + public class Endpoints { public delegate Task RespondDelegate(IRaftCluster cluster, SlimPersistentState provider, @@ -46,7 +61,7 @@ public static async Task DoAsync(HttpContext context, RespondDelegate respondDel } } - public static Task AddHashSet(HttpContext context) + public static Task AddHashSetAsync(HttpContext context) { return DoAsync(context, async (cluster, provider, source) => { @@ -80,7 +95,7 @@ public static async Task AddHashSetCommand(SlimPersistentState provider, string await cluster.ReplicateAsync(logEntry, source.Token); } - public static Task ListRightPop(HttpContext context) + public static Task ListRightPopAsync(HttpContext context) { return DoAsync(context, async (cluster, provider, source) => { @@ -102,11 +117,11 @@ public static Task ListRightPop(HttpContext context) } private static readonly IDictionary SemaphoreSlims = new Dictionary(); - public static async Task ListRightPopCommand(SlimPersistentState provider, string key, int count, IRaftCluster cluster, + public static async Task ListRightPopCommand(SlimPersistentState provider, string key, int count, IRaftCluster cluster, CancellationTokenSource source) { - var values = new ListString(); - values.Items = new List(); + var values = new ListItems(); + values.Items = new List(); if(SemaphoreSlims.TryGetValue(key, out var semaphoreSlim)) { @@ -124,18 +139,19 @@ public static async Task ListRightPopCommand(SlimPersistentState pro Console.WriteLine("Master node is waiting for lease token"); await Task.Delay(10); } + var nowTicks = DateTime.UtcNow.Ticks; var queues = ((ISupplier)provider).Invoke().Queues; if (queues.TryGetValue(key, out var queue)) { - for (var i = 0; i < count; i++) + var queueElements = queue.GetQueueAvailableElement(nowTicks, count); + foreach (var queueElement in queueElements) { - if (queue.Count <= i) break; - values.Items.Add(queue[i].ToArray()); + values.Items.Add(new QueueData(queueElement.Id ,queueElement.Value.ToArray())); } var logEntry = provider.Interpreter.CreateLogEntry( - new ListRightPopCommand { Key = key, Count = count }, + new ListRightPopCommand { Key = key, Count = count, NowTicks = nowTicks }, cluster.Term); await cluster.ReplicateAsync(logEntry, source.Token); } @@ -148,7 +164,8 @@ public static async Task ListRightPopCommand(SlimPersistentState pro } - public static Task ListLeftPush(HttpContext context) + + public static Task ListLeftPushAsync(HttpContext context) { return DoAsync(context, async (cluster, provider, source) => { @@ -167,15 +184,67 @@ public static Task ListLeftPush(HttpContext context) await ListLeftPushCommand(provider, key, value, cluster, source); }); } - + public static async Task ListLeftPushCommand(SlimPersistentState provider, string key, byte[] value, IRaftCluster cluster, CancellationTokenSource source) { + ListLeftPushInput input = MemoryPackSerializer.Deserialize(value); + RetryInformation retryInformation = MemoryPackSerializer.Deserialize(input.RetryInformation); var logEntry = - provider.Interpreter.CreateLogEntry(new ListLeftPushCommand { Key = key, Value = value }, + provider.Interpreter.CreateLogEntry(new ListLeftPushCommand { Key = key, + Identifier = Guid.NewGuid().ToString(), + Value = input.Value, + NowTicks = DateTime.UtcNow.Ticks, + Retries = retryInformation.Retries, + RetryTimeout = retryInformation.RetryTimeoutSeconds, + HttpStatusCodesWorthRetrying = retryInformation.HttpStatusRetries + }, cluster.Term); await cluster.ReplicateAsync(logEntry, source.Token); } + + public static Task ListCallbackAsync(HttpContext context) + { + return DoAsync(context, async (cluster, provider, source) => + { + context.Request.Query.TryGetValue("key", out var key); + if (string.IsNullOrEmpty(key)) + { + context.Response.StatusCode = StatusCodes.Status400BadRequest; + await context.Response.WriteAsync("not data found", context.RequestAborted); + return; + } + + var inputStream = context.Request.Body; + await using var memoryStream = new MemoryStream(); + await inputStream.CopyToAsync(memoryStream, source.Token); + var value = memoryStream.ToArray(); + var list = MemoryPackSerializer.Deserialize(value); + await ListCallbackCommandAsync(provider, key, list, cluster, source); + }); + } + + public static async Task ListCallbackCommandAsync(SlimPersistentState provider, string key, ListQueueItemStatus list, IRaftCluster cluster, CancellationTokenSource source) + { + if (list.Items == null) + { + return; + } + + foreach (var queueItemStatus in list.Items) + { + var logEntry = + provider.Interpreter.CreateLogEntry(new ListCallbackCommand + { + Identifier = queueItemStatus.Id, + Key = key, + HttpCode = queueItemStatus.HttpCode, + NowTicks = DateTime.UtcNow.Ticks + }, + cluster.Term); + await cluster.ReplicateAsync(logEntry, source.Token); + } + } private static (string key, string value) GetKeyValue(IFormCollection form) { @@ -192,7 +261,7 @@ private static (string key, string value) GetKeyValue(IFormCollection form) return (key, value); } - public static Task AddKeyValue(HttpContext context) + public static Task AddKeyValueAsync(HttpContext context) { return DoAsync(context, async (cluster, provider, source) => { diff --git a/src/SlimData/ListItems.cs b/src/SlimData/ListItems.cs new file mode 100644 index 00000000..a4b43129 --- /dev/null +++ b/src/SlimData/ListItems.cs @@ -0,0 +1,12 @@ +using MemoryPack; + +namespace SlimData; + +[MemoryPackable] +public partial record QueueData(string Id, byte[] Data); + +[MemoryPackable] +public partial record ListItems +{ + public List? Items { get; set; } +} \ No newline at end of file diff --git a/src/SlimData/ListString.cs b/src/SlimData/ListString.cs deleted file mode 100644 index cbeeb232..00000000 --- a/src/SlimData/ListString.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.Text.Json.Serialization; -using MemoryPack; - -namespace SlimData; - -[MemoryPackable] -public partial class ListString -{ - public List Items { get; set; } -} \ No newline at end of file diff --git a/src/SlimData/QueueElementExtensions.cs b/src/SlimData/QueueElementExtensions.cs new file mode 100644 index 00000000..4a3db50f --- /dev/null +++ b/src/SlimData/QueueElementExtensions.cs @@ -0,0 +1,161 @@ +namespace SlimData; + +public static class QueueElementExtensions +{ + + + public static bool IsTimeout(this QueueElement element, long nowTicks) + { + if (element.RetryQueueElements.Count <= 0) return false; + int timeout=element.HttpTimeout; + var retryQueueElement = element.RetryQueueElements[^1]; + if (retryQueueElement.EndTimeStamp == 0 && + retryQueueElement.StartTimeStamp + TimeSpan.FromSeconds(timeout).Ticks <= nowTicks) + { + return true; + } + return false; + } + + public static bool IsWaitingForRetry(this QueueElement element, long nowTicks) + { + List retries= element.TimeoutRetries; + var count = element.RetryQueueElements.Count; + if(count == 0 || count > retries.Count ) return false; + + if(element.IsFinished(nowTicks)) return false; + + if(element.IsRunning(nowTicks)) return false; + + var retryQueueElement = element.RetryQueueElements[^1]; + var retryTimeout = retries[count - 1]; + if(element.IsTimeout(nowTicks) && retryQueueElement.StartTimeStamp + TimeSpan.FromSeconds(retryTimeout).Ticks <= nowTicks) + { + return true; + } + + if (retryQueueElement.EndTimeStamp != 0 && + (retryQueueElement.EndTimeStamp + TimeSpan.FromSeconds(retryTimeout).Ticks > nowTicks)) + { + return true; + } + + return false; + } + + public static bool IsFinished(this QueueElement queueElement, long nowTicks) + { + var count = queueElement.RetryQueueElements.Count; + if (count <= 0) return false; + List retries= queueElement.TimeoutRetries; + var retryQueueElement = queueElement.RetryQueueElements[^1]; + if (retryQueueElement.EndTimeStamp > 0 && + !queueElement.HttpStatusRetries.Contains(retryQueueElement.HttpCode)) + { + return true; + } + + if (retries.Count < count) + { + if (queueElement.IsTimeout(nowTicks) || retryQueueElement.EndTimeStamp > 0) + { + return true; + } + } + + return false; + } + + public static bool IsRunning(this QueueElement queueElement, long nowTicks) + { + if (queueElement.RetryQueueElements.Count <= 0) return false; + var retryQueueElement = queueElement.RetryQueueElements[^1]; + if (retryQueueElement.EndTimeStamp == 0 && + !queueElement.IsTimeout(nowTicks)) + { + return true; + } + return false; + } + + public static List GetQueueTimeoutElement(this List element, long nowTicks) + { + var timeoutElements = new List(); + foreach (var queueElement in element) + { + if(queueElement.IsTimeout(nowTicks)) + { + timeoutElements.Add(queueElement); + } + + } + return timeoutElements; + } + + public static List GetQueueRunningElement(this List element, long nowTicks) + { + var runningElement = new List(); + foreach (var queueElement in element) + { + if(queueElement.IsRunning(nowTicks)) + { + runningElement.Add(queueElement); + } + } + return runningElement; + } + + public static List GetQueueWaitingForRetryElement(this List element, long nowTicks) + { + var waitingForRetry = new List(); + foreach (var queueElement in element) + { + if (queueElement.IsWaitingForRetry(nowTicks)) + { + waitingForRetry.Add(queueElement); + } + } + return waitingForRetry; + } + + public static List GetQueueAvailableElement(this List elements, long nowTicks, int maximum) + { + var runningElements = elements.GetQueueRunningElement(nowTicks); + var runningWaitingForRetryElements = elements.GetQueueWaitingForRetryElement(nowTicks); + var finishedElements = elements.GetQueueFinishedElement(nowTicks); + var availableElements = new List(); + var currentCount = runningElements.Count + runningWaitingForRetryElements.Count; + var currentElements = elements.Except(runningElements).Except(runningWaitingForRetryElements).Except(finishedElements); + + if (currentCount >= maximum) + { + return availableElements; + } + + foreach (var queueElement in currentElements) + { + if (currentCount == maximum) + { + return availableElements; + } + availableElements.Add(queueElement); + currentCount++; + } + return availableElements; + } + + public static IList GetQueueFinishedElement(this IList element, long nowTicks) + { + var queueFinishedElements = new List(); + foreach (var queueElement in element) + { + if (queueElement.IsFinished(nowTicks)) + { + queueFinishedElements.Add(queueElement); + } + + } + return queueFinishedElements; + } + +} \ No newline at end of file diff --git a/src/SlimData/SlimData.csproj b/src/SlimData/SlimData.csproj index a841141d..9fd072f5 100644 --- a/src/SlimData/SlimData.csproj +++ b/src/SlimData/SlimData.csproj @@ -5,7 +5,6 @@ net8.0 enable true - true true full false @@ -14,7 +13,7 @@ - + diff --git a/src/SlimData/SlimDataInterpreter.cs b/src/SlimData/SlimDataInterpreter.cs index 7cbe2f3c..d69b0ae7 100644 --- a/src/SlimData/SlimDataInterpreter.cs +++ b/src/SlimData/SlimDataInterpreter.cs @@ -3,56 +3,131 @@ namespace SlimData; + + public record SlimDataState( - Dictionary> hashsets, - Dictionary> keyValues, - Dictionary>> queues); + Dictionary> Hashsets, + Dictionary> KeyValues, + Dictionary> Queues); + +public class QueueElement( + ReadOnlyMemory value, + string id, + long insertTimeStamp, + int httpTimeout, + List timeoutRetries, + IList retryQueueElements, + List httpStatusRetries + ) +{ + public ReadOnlyMemory Value { get; } = value; + public string Id { get; } = id; + public long InsertTimeStamp { get; } = insertTimeStamp; + + public List TimeoutRetries { get; } = timeoutRetries; + + public int HttpTimeout { get; } = httpTimeout; + public IList RetryQueueElements { get; } = retryQueueElements; + + public List HttpStatusRetries { get; } = httpStatusRetries; +} +public class QueueHttpTryElement(long startTimeStamp=0, long endTimeStamp=0, int httpCode=0) +{ + public long StartTimeStamp { get; set; } = startTimeStamp; + public long EndTimeStamp { get;set; } = endTimeStamp; + public int HttpCode { get;set; } = httpCode; +} #pragma warning restore CA2252 public class SlimDataInterpreter : CommandInterpreter { - public SlimDataState SlimDataState = new(new Dictionary>(), new Dictionary>(), new Dictionary>>()); + public SlimDataState SlimDataState = new(new Dictionary>(), new Dictionary>(), new Dictionary>()); [CommandHandler] public ValueTask ListRightPopAsync(ListRightPopCommand addHashSetCommand, CancellationToken token) { - return DoListRightPopAsync(addHashSetCommand, SlimDataState.queues); + return DoListRightPopAsync(addHashSetCommand, SlimDataState.Queues); } + - internal static ValueTask DoListRightPopAsync(ListRightPopCommand addHashSetCommand, Dictionary>> queues) + internal static ValueTask DoListRightPopAsync(ListRightPopCommand addHashSetCommand, Dictionary> queues) { if (queues.TryGetValue(addHashSetCommand.Key, out var queue)) - for (var i = 0; i < addHashSetCommand.Count; i++) - if (queue.Count > 0) - queue.RemoveAt(0); - else - break; + { + var nowTicks =addHashSetCommand.NowTicks; + var queueTimeoutElements = queue.GetQueueTimeoutElement(nowTicks); + foreach (var queueTimeoutElement in queueTimeoutElements) + { + var retryQueueElement = queueTimeoutElement.RetryQueueElements[^1]; + retryQueueElement.EndTimeStamp = nowTicks; + retryQueueElement.HttpCode = 504; + } + + var queueFinishedElements = queue.GetQueueFinishedElement(nowTicks); + foreach (var queueFinishedElement in queueFinishedElements) + { + queue.Remove(queueFinishedElement); + } + + var queueAvailableElements = queue.GetQueueAvailableElement(nowTicks, addHashSetCommand.Count); + foreach (var queueAvailableElement in queueAvailableElements) + { + queueAvailableElement.RetryQueueElements.Add(new QueueHttpTryElement(nowTicks)); + } + } + return default; } [CommandHandler] public ValueTask ListLeftPushAsync(ListLeftPushCommand addHashSetCommand, CancellationToken token) { - return DoListLeftPushAsync(addHashSetCommand, SlimDataState.queues); + return DoListLeftPushAsync(addHashSetCommand, SlimDataState.Queues); } - internal static ValueTask DoListLeftPushAsync(ListLeftPushCommand addHashSetCommand, Dictionary>> queues) + internal static ValueTask DoListLeftPushAsync(ListLeftPushCommand listLeftPushCommand, Dictionary> queues) { - if (queues.TryGetValue(addHashSetCommand.Key, out List>? value)) - value.Add(addHashSetCommand.Value); + if (queues.TryGetValue(listLeftPushCommand.Key, out List? value)) + value.Add(new QueueElement(listLeftPushCommand.Value, listLeftPushCommand.Identifier, listLeftPushCommand.NowTicks, listLeftPushCommand.RetryTimeout, listLeftPushCommand.Retries,new List(), listLeftPushCommand.HttpStatusCodesWorthRetrying)); else - queues.Add(addHashSetCommand.Key, new List> { addHashSetCommand.Value }); + queues.Add(listLeftPushCommand.Key, new List() {new(listLeftPushCommand.Value,listLeftPushCommand.Identifier, listLeftPushCommand.NowTicks, listLeftPushCommand.RetryTimeout, listLeftPushCommand.Retries,new List(), listLeftPushCommand.HttpStatusCodesWorthRetrying)}); + return default; + } + + [CommandHandler] + public ValueTask ListCallbackAsync(ListCallbackCommand addHashSetCommand, CancellationToken token) + { + return DoListCallbackAsync(addHashSetCommand, SlimDataState.Queues); + } + + internal static ValueTask DoListCallbackAsync(ListCallbackCommand listCallbackCommand, Dictionary> queues) + { + if (!queues.TryGetValue(listCallbackCommand.Key, out List? value)) return default; + + var queueElement = value.FirstOrDefault(x => x.Id == listCallbackCommand.Identifier); + if (queueElement == null) + { + return default; + } + var retryQueueElement = queueElement.RetryQueueElements[^1]; + retryQueueElement.EndTimeStamp = listCallbackCommand.NowTicks; + retryQueueElement.HttpCode = listCallbackCommand.HttpCode; + if (queueElement.IsFinished(listCallbackCommand.NowTicks)) + { + value.Remove(queueElement); + } + return default; } [CommandHandler] public ValueTask AddHashSetAsync(AddHashSetCommand addHashSetCommand, CancellationToken token) { - return DoAddHashSetAsync(addHashSetCommand, SlimDataState.hashsets); + return DoAddHashSetAsync(addHashSetCommand, SlimDataState.Hashsets); } internal static ValueTask DoAddHashSetAsync(AddHashSetCommand addHashSetCommand, Dictionary> hashsets) @@ -64,7 +139,7 @@ internal static ValueTask DoAddHashSetAsync(AddHashSetCommand addHashSetCommand, [CommandHandler] public ValueTask AddKeyValueAsync(AddKeyValueCommand valueCommand, CancellationToken token) { - return DoAddKeyValueAsync(valueCommand, SlimDataState.keyValues); + return DoAddKeyValueAsync(valueCommand, SlimDataState.KeyValues); } internal static ValueTask DoAddKeyValueAsync(AddKeyValueCommand valueCommand, Dictionary> keyValues) @@ -76,20 +151,18 @@ internal static ValueTask DoAddKeyValueAsync(AddKeyValueCommand valueCommand, Di [CommandHandler(IsSnapshotHandler = true)] public ValueTask HandleSnapshotAsync(LogSnapshotCommand command, CancellationToken token) { - SlimDataState = SlimDataState with { keyValues = command.keysValues }; - SlimDataState = SlimDataState with { queues = command.queues }; - SlimDataState = SlimDataState with { hashsets = command.hashsets }; + DoHandleSnapshotAsync(command, SlimDataState.KeyValues, SlimDataState.Hashsets, SlimDataState.Queues); return default; } - internal static ValueTask DoHandleSnapshotAsync(LogSnapshotCommand command, Dictionary> keyValues, Dictionary> hashsets, Dictionary>> queues) + internal static ValueTask DoHandleSnapshotAsync(LogSnapshotCommand command, Dictionary> keyValues, Dictionary> hashsets, Dictionary> queues) { keyValues.Clear(); foreach (var keyValue in command.keysValues) { keyValues[keyValue.Key] = keyValue.Value; } - + queues.Clear(); foreach (var queue in command.queues) { @@ -106,17 +179,19 @@ internal static ValueTask DoHandleSnapshotAsync(LogSnapshotCommand command, Dict public static CommandInterpreter InitInterpreter(SlimDataState state) { - ValueTask ListRightPopHandler(ListRightPopCommand command, CancellationToken token) => DoListRightPopAsync(command, state.queues); - ValueTask ListLeftPushHandler(ListLeftPushCommand command, CancellationToken token) => DoListLeftPushAsync(command, state.queues); - ValueTask AddHashSetHandler(AddHashSetCommand command, CancellationToken token) => DoAddHashSetAsync(command, state.hashsets); - ValueTask AddKeyValueHandler(AddKeyValueCommand command, CancellationToken token) => DoAddKeyValueAsync(command, state.keyValues); - ValueTask SnapshotHandler(LogSnapshotCommand command, CancellationToken token) => DoHandleSnapshotAsync(command, state.keyValues, state.hashsets, state.queues); + ValueTask ListRightPopHandler(ListRightPopCommand command, CancellationToken token) => DoListRightPopAsync(command, state.Queues); + ValueTask ListLeftPushHandler(ListLeftPushCommand command, CancellationToken token) => DoListLeftPushAsync(command, state.Queues); + ValueTask AddHashSetHandler(AddHashSetCommand command, CancellationToken token) => DoAddHashSetAsync(command, state.Hashsets); + ValueTask AddKeyValueHandler(AddKeyValueCommand command, CancellationToken token) => DoAddKeyValueAsync(command, state.KeyValues); + ValueTask ListSetQueueItemStatusAsync(ListCallbackCommand command, CancellationToken token) => DoListCallbackAsync(command, state.Queues); + ValueTask SnapshotHandler(LogSnapshotCommand command, CancellationToken token) => DoHandleSnapshotAsync(command, state.KeyValues, state.Hashsets, state.Queues); var interpreter = new Builder() .Add(ListRightPopCommand.Id, (Func)ListRightPopHandler) .Add(ListLeftPushCommand.Id, (Func)ListLeftPushHandler) .Add(AddHashSetCommand.Id, (Func)AddHashSetHandler) .Add(AddKeyValueCommand.Id, (Func)AddKeyValueHandler) + .Add(ListCallbackCommand.Id, (Func)ListSetQueueItemStatusAsync) .Add(LogSnapshotCommand.Id, (Func)SnapshotHandler, true) .Build(); diff --git a/src/SlimData/SlimPersistentState.cs b/src/SlimData/SlimPersistentState.cs index ec5ad58f..42cb7f16 100644 --- a/src/SlimData/SlimPersistentState.cs +++ b/src/SlimData/SlimPersistentState.cs @@ -12,7 +12,7 @@ public sealed class SlimPersistentState : MemoryBasedStateMachine, ISupplier>(), new Dictionary>(), - new Dictionary>>() + new Dictionary>() ); public CommandInterpreter Interpreter { get; } @@ -42,9 +42,9 @@ SlimDataPayload ISupplier.Invoke() { return new SlimDataPayload() { - KeyValues = _state.keyValues, - Hashsets = _state.hashsets, - Queues = _state.queues + KeyValues = _state.KeyValues, + Hashsets = _state.Hashsets, + Queues = _state.Queues }; } @@ -67,7 +67,7 @@ private sealed class SimpleSnapshotBuilder : IncrementalSnapshotBuilder { private readonly SlimDataState _state = new(new Dictionary>(), new Dictionary>(), - new Dictionary>>() + new Dictionary>() ); private readonly CommandInterpreter _interpreter; @@ -84,9 +84,9 @@ protected override async ValueTask ApplyAsync(LogEntry entry) public override async ValueTask WriteToAsync(TWriter writer, CancellationToken token) { - var keysValues = _state.keyValues; - var queues = _state.queues; - var hashsets = _state.hashsets; + var keysValues = _state.KeyValues; + var queues = _state.Queues; + var hashsets = _state.Hashsets; LogSnapshotCommand command = new(keysValues, hashsets, queues); await command.WriteToAsync(writer, token).ConfigureAwait(false); diff --git a/src/SlimData/Startup.cs b/src/SlimData/Startup.cs index 7841256e..70794123 100644 --- a/src/SlimData/Startup.cs +++ b/src/SlimData/Startup.cs @@ -23,6 +23,7 @@ public void Configure(IApplicationBuilder app) const string ListLeftPushResource = "/SlimData/ListLeftPush"; const string AddKeyValueResource = "/SlimData/AddKeyValue"; const string ListLengthResource = "/SlimData/ListLength"; + const string ListSetQueueItemStatus = "/SlimData/ListCallback"; const string HealthResource = "/health"; app.UseConsensusProtocolHandler() @@ -32,15 +33,17 @@ public void Configure(IApplicationBuilder app) .RedirectToLeader(ListRightPopResource) .RedirectToLeader(AddKeyValueResource) .RedirectToLeader(AddHashSetResource) + .RedirectToLeader(ListSetQueueItemStatus) .UseRouting() .UseEndpoints(static endpoints => { endpoints.MapGet(LeaderResource, Endpoints.RedirectToLeaderAsync); endpoints.MapGet(HealthResource, async context => { await context.Response.WriteAsync("OK"); }); - endpoints.MapPost(ListLeftPushResource, Endpoints.ListLeftPush); - endpoints.MapPost(ListRightPopResource, Endpoints.ListRightPop); - endpoints.MapPost(AddHashSetResource, Endpoints.AddHashSet); - endpoints.MapPost(AddKeyValueResource, Endpoints.AddKeyValue); + endpoints.MapPost(ListLeftPushResource, Endpoints.ListLeftPushAsync); + endpoints.MapPost(ListRightPopResource, Endpoints.ListRightPopAsync); + endpoints.MapPost(AddHashSetResource, Endpoints.AddHashSetAsync); + endpoints.MapPost(AddKeyValueResource, Endpoints.AddKeyValueAsync); + endpoints.MapPost(ListSetQueueItemStatus, Endpoints.ListCallbackAsync); }); } diff --git a/src/SlimFaas/Database/DatabaseMockService.cs b/src/SlimFaas/Database/DatabaseMockService.cs index c236760b..3b877b5b 100644 --- a/src/SlimFaas/Database/DatabaseMockService.cs +++ b/src/SlimFaas/Database/DatabaseMockService.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using SlimData; namespace SlimFaas; @@ -7,7 +8,7 @@ public class DatabaseMockService : IDatabaseService private readonly ConcurrentDictionary> hashSet = new(); private readonly ConcurrentDictionary keys = new(); - private readonly ConcurrentDictionary> queue = new(); + private readonly ConcurrentDictionary> queue = new(); public Task GetAsync(string key) { @@ -57,51 +58,67 @@ public Task> HashGetAllAsync(string key) return Task.FromResult>(new Dictionary()); } - public Task ListLeftPushAsync(string key, byte[] field) + public Task ListLeftPushAsync(string key, byte[] field, RetryInformation retryInformation) { - List list; + List list; if (queue.ContainsKey(key)) { list = queue[key]; } else { - list = new List(); + list = new List(); queue.TryAdd(key, list); } - list.Add(field); + list.Add(new QueueData(Guid.NewGuid().ToString(), field)); return Task.CompletedTask; } - public Task> ListRightPopAsync(string key, int count = 1) + public Task?> ListRightPopAsync(string key, int count = 1) { if (!queue.ContainsKey(key)) { - return Task.FromResult>(new List()); + return Task.FromResult?>(new List()); } - List list = queue[key]; - - List listToReturn = list.TakeLast((int)count).ToList(); + var list = queue[key]; + var listToReturn = list.TakeLast(count).ToList(); if (listToReturn.Count > 0) { list.RemoveRange(listToReturn.Count - 1, listToReturn.Count); - return Task.FromResult>(listToReturn); + return Task.FromResult?>(listToReturn); + } + + return Task.FromResult?>(new List()); + } + + public Task ListCountAvailableElementAsync(string key, int maximum = int.MaxValue) + { + if (!queue.ContainsKey(key)) + { + return Task.FromResult(0); } - return Task.FromResult>(new List()); + var list = queue[key]; + + return Task.FromResult(list.Count); } - public Task ListLengthAsync(string key) + public Task ListCountElementAsync(string key, int maximum = Int32.MaxValue) { if (!queue.ContainsKey(key)) { return Task.FromResult(0); } - List list = queue[key]; + var list = queue[key]; return Task.FromResult(list.Count); } + + public async Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) + { + await Task.Delay(100); + } } diff --git a/src/SlimFaas/Database/IDatabaseService.cs b/src/SlimFaas/Database/IDatabaseService.cs index 73df2733..3da0e754 100644 --- a/src/SlimFaas/Database/IDatabaseService.cs +++ b/src/SlimFaas/Database/IDatabaseService.cs @@ -1,4 +1,6 @@ -namespace SlimFaas; +using SlimData; + +namespace SlimFaas; public interface IDatabaseService { @@ -6,7 +8,9 @@ public interface IDatabaseService Task SetAsync(string key, byte[] value); Task HashSetAsync(string key, IDictionary values); Task> HashGetAllAsync(string key); - Task ListLeftPushAsync(string key, byte[] field); - Task> ListRightPopAsync(string key, int count = 1); - Task ListLengthAsync(string key); + Task ListLeftPushAsync(string key, byte[] field, RetryInformation retryInformation); + Task?> ListRightPopAsync(string key, int count = 1); + Task ListCountAvailableElementAsync(string key, int maximum = int.MaxValue); + Task ListCountElementAsync(string key, int maximum = int.MaxValue); + Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus); } diff --git a/src/SlimFaas/Database/ISlimFaasQueue.cs b/src/SlimFaas/Database/ISlimFaasQueue.cs index c181247f..5db699db 100644 --- a/src/SlimFaas/Database/ISlimFaasQueue.cs +++ b/src/SlimFaas/Database/ISlimFaasQueue.cs @@ -1,9 +1,12 @@ -namespace SlimFaas.Database; +using SlimData; + +namespace SlimFaas.Database; public interface ISlimFaasQueue { - Task EnqueueAsync(string key, byte[] message); - Task> DequeueAsync(string key, long count = 1); - - public Task CountAsync(string key); + Task EnqueueAsync(string key, byte[] message, RetryInformation retryInformation); + Task?> DequeueAsync(string key, int count = 1); + Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus); + public Task CountAvailableElementAsync(string key, int maximum = int.MaxValue); + public Task CountElementAsync(string key, int maximum = int.MaxValue); } diff --git a/src/SlimFaas/Database/SlimDataService.cs b/src/SlimFaas/Database/SlimDataService.cs index b7ae2c91..a374f65a 100644 --- a/src/SlimFaas/Database/SlimDataService.cs +++ b/src/SlimFaas/Database/SlimDataService.cs @@ -12,8 +12,7 @@ public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvi : IDatabaseService { public const string HttpClientName = "SlimDataHttpClient"; - private const int MaxAttemptCount = 3; - private readonly TimeSpan _retryInterval = TimeSpan.FromSeconds(1); + private readonly IList _retryInterval = new List { 1, 1, 1 }; private readonly TimeSpan _timeMaxToWaitForLeader = TimeSpan.FromMilliseconds(3000); private ISupplier SimplePersistentState => @@ -21,7 +20,7 @@ public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvi public async Task GetAsync(string key) { - return await Retry.Do(() => DoGetAsync(key), _retryInterval, logger, MaxAttemptCount); + return await Retry.DoAsync(() => DoGetAsync(key), logger, _retryInterval); } private async Task DoGetAsync(string key) @@ -34,7 +33,7 @@ public class SlimDataService(IHttpClientFactory httpClientFactory, IServiceProvi public async Task SetAsync(string key, byte[] value) { - await Retry.Do(() =>DoSetAsync(key, value), _retryInterval, logger, MaxAttemptCount); + await Retry.DoAsync(() => DoSetAsync(key, value), logger, _retryInterval); } private async Task DoSetAsync(string key, byte[] value) @@ -61,7 +60,7 @@ private async Task DoSetAsync(string key, byte[] value) public async Task HashSetAsync(string key, IDictionary values) { - await Retry.Do(() =>DoHashSetAsync(key, values), _retryInterval, logger, MaxAttemptCount); + await Retry.DoAsync(() => DoHashSetAsync(key, values), logger, _retryInterval); } private async Task DoHashSetAsync(string key, IDictionary values) @@ -92,7 +91,7 @@ private async Task DoHashSetAsync(string key, IDictionary values public async Task> HashGetAllAsync(string key) { - return await Retry.Do(() =>DoHashGetAllAsync(key), _retryInterval, logger, MaxAttemptCount); + return await Retry.DoAsync(() =>DoHashGetAllAsync(key), logger, _retryInterval); } private async Task> DoHashGetAllAsync(string key) @@ -106,23 +105,25 @@ private async Task> DoHashGetAllAsync(string key) : new Dictionary(); } - public async Task ListLeftPushAsync(string key, byte[] field) + public async Task ListLeftPushAsync(string key, byte[] field, RetryInformation retryInformation) { - await Retry.Do(() =>DoListLeftPushAsync(key, field), _retryInterval, logger, MaxAttemptCount); + await Retry.DoAsync(() =>DoListLeftPushAsync(key, field, retryInformation), logger, _retryInterval); } - private async Task DoListLeftPushAsync(string key, byte[] field) + private async Task DoListLeftPushAsync(string key, byte[] field, RetryInformation retryInformation) { EndPoint endpoint = await GetAndWaitForLeader(); + ListLeftPushInput listLeftPushInput = new(field, MemoryPackSerializer.Serialize(retryInformation)); + byte[] serialize = MemoryPackSerializer.Serialize(listLeftPushInput); if (!cluster.LeadershipToken.IsCancellationRequested) { var simplePersistentState = serviceProvider.GetRequiredService(); - await Endpoints.ListLeftPushCommand(simplePersistentState, key, field, cluster, new CancellationTokenSource()); + await Endpoints.ListLeftPushCommand(simplePersistentState, key, serialize, cluster, new CancellationTokenSource()); } else { using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListLeftPush?key={key}")); - request.Content = new ByteArrayContent(field); + request.Content = new ByteArrayContent(serialize); using var httpClient = httpClientFactory.CreateClient(HttpClientName); HttpResponseMessage response = await httpClient.SendAsync(request); if ((int)response.StatusCode >= 500) @@ -132,12 +133,12 @@ private async Task DoListLeftPushAsync(string key, byte[] field) } } - public async Task> ListRightPopAsync(string key, int count = 1) + public async Task?> ListRightPopAsync(string key, int count = 1) { - return await Retry.Do(() =>DoListRightPopAsync(key, count), _retryInterval, logger, MaxAttemptCount); + return await Retry.DoAsync(() => DoListRightPopAsync(key, count), logger, _retryInterval); } - private async Task> DoListRightPopAsync(string key, int count = 1) + private async Task?> DoListRightPopAsync(string key, int count = 1) { EndPoint endpoint = await GetAndWaitForLeader(); if (!cluster.LeadershipToken.IsCancellationRequested) @@ -161,24 +162,82 @@ private async Task> DoListRightPopAsync(string key, int count = 1) } var bin = await response.Content.ReadAsByteArrayAsync(); - ListString? result = MemoryPackSerializer.Deserialize(bin); - return result?.Items ?? new List(); + ListItems? result = MemoryPackSerializer.Deserialize(bin); + return result?.Items ?? new List(); } } - public async Task ListLengthAsync(string key) + public Task ListCountElementAsync(string key, int maximum = Int32.MaxValue) { - return await Retry.Do(() =>DoListLengthAsync(key), _retryInterval, logger, MaxAttemptCount); + return Retry.DoAsync(() => DoListCountElementAsync(key, maximum), logger, _retryInterval); } - private async Task DoListLengthAsync(string key) + private async Task DoListCountElementAsync(string key, int maximum) { await GetAndWaitForLeader(); await MasterWaitForleaseToken(); SlimDataPayload data = SimplePersistentState.Invoke(); - long result = data.Queues.TryGetValue(key, out List>? value) ? value.Count : 0L; - return result; + + if (data.Queues.TryGetValue(key, out List? value)) + { + var nowTicks = DateTime.UtcNow.Ticks; + var elements = value.GetQueueAvailableElement(nowTicks, maximum); + var runningElements = value.GetQueueRunningElement(nowTicks); + var runningWaitingForRetryElements = value.GetQueueWaitingForRetryElement(nowTicks); + return elements.Count + runningElements.Count + runningWaitingForRetryElements.Count; + } + + return 0L; + } + + public async Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) + { + await Retry.DoAsync(() => DoListCallbackAsync(key, queueItemStatus), logger, _retryInterval); + } + + private async Task DoListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) + { + EndPoint endpoint = await GetAndWaitForLeader(); + if (!cluster.LeadershipToken.IsCancellationRequested) + { + var simplePersistentState = serviceProvider.GetRequiredService(); + await Endpoints.ListCallbackCommandAsync(simplePersistentState, key, queueItemStatus, cluster, new CancellationTokenSource()); + } + else + { + using HttpRequestMessage request = new(HttpMethod.Post, new Uri($"{endpoint}SlimData/ListCallback?key={key}")); + var field = MemoryPackSerializer.Serialize(queueItemStatus); + request.Content = new ByteArrayContent(field); + using var httpClient = httpClientFactory.CreateClient(HttpClientName); + using HttpResponseMessage response = await httpClient.SendAsync(request); + if ((int)response.StatusCode >= 500) + { + throw new DataException("Error in calling SlimData HTTP Service"); + } + } + } + + public async Task ListCountAvailableElementAsync(string key, int maximum) + { + return await Retry.DoAsync(() => DoListCountAvailableElementAsync(key, maximum), logger, _retryInterval); + } + + private async Task DoListCountAvailableElementAsync(string key, int maximum) + { + await GetAndWaitForLeader(); + await MasterWaitForleaseToken(); + + SlimDataPayload data = SimplePersistentState.Invoke(); + + if (data.Queues.TryGetValue(key, out List? value)) + { + var elements = value.GetQueueAvailableElement(DateTime.UtcNow.Ticks, maximum); + var number = elements.Count; + return number; + } + + return 0L; } private async Task MasterWaitForleaseToken() @@ -208,34 +267,3 @@ private async Task GetAndWaitForLeader() } } #pragma warning restore CA2252 -public static class Retry -{ - - - public static T Do( - Func action, - TimeSpan retryInterval, - ILogger logger, - int maxAttemptCount = 3) - { - var exceptions = new List(); - - for (int attempted = 0; attempted < maxAttemptCount; attempted++) - { - try - { - if (attempted > 0) - { - Task.Delay(retryInterval).Wait(); - logger.LogWarning("SlimDataService Retry number {RetryInterval}", retryInterval); - } - return action(); - } - catch (Exception ex) - { - exceptions.Add(ex); - } - } - throw new AggregateException(exceptions); - } -} diff --git a/src/SlimFaas/Database/SlimFaasQueue.cs b/src/SlimFaas/Database/SlimFaasQueue.cs new file mode 100644 index 00000000..9c998190 --- /dev/null +++ b/src/SlimFaas/Database/SlimFaasQueue.cs @@ -0,0 +1,27 @@ +using SlimData; + +namespace SlimFaas.Database; + + + +public class SlimFaasQueue(IDatabaseService databaseService) : ISlimFaasQueue +{ + private const string KeyPrefix = "Queue:"; + + public async Task EnqueueAsync(string key, byte[] data, RetryInformation retryInformation) => + await databaseService.ListLeftPushAsync($"{KeyPrefix}{key}", data, retryInformation); + + public async Task?> DequeueAsync(string key, int count = 1) + { + var data = await databaseService.ListRightPopAsync($"{KeyPrefix}{key}", count); + return data; + } + + public async Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) => await databaseService.ListCallbackAsync($"{KeyPrefix}{key}", queueItemStatus); + + public async Task CountAvailableElementAsync(string key, int maximum = int.MaxValue) => await databaseService.ListCountAvailableElementAsync($"{KeyPrefix}{key}", maximum); + + public async Task CountElementAsync(string key, int maximum = int.MaxValue) => await databaseService.ListCountElementAsync($"{KeyPrefix}{key}", maximum); + + +} diff --git a/src/SlimFaas/Database/SlimFaasSlimFaasQueue.cs b/src/SlimFaas/Database/SlimFaasSlimFaasQueue.cs deleted file mode 100644 index 3e4460ac..00000000 --- a/src/SlimFaas/Database/SlimFaasSlimFaasQueue.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace SlimFaas.Database; - -public class SlimFaasSlimFaasQueue(IDatabaseService databaseService) : ISlimFaasQueue -{ - private const string KeyPrefix = "Queue:"; - - public async Task EnqueueAsync(string key, byte[] data) => - await databaseService.ListLeftPushAsync($"{KeyPrefix}{key}", data); - - public async Task> DequeueAsync(string key, long count = 1) - { - IList data = await databaseService.ListRightPopAsync($"{KeyPrefix}{key}"); - return data; - } - - public async Task CountAsync(string key) => await databaseService.ListLengthAsync($"{KeyPrefix}{key}"); -} diff --git a/src/SlimFaas/EnvironmentVariables.cs b/src/SlimFaas/EnvironmentVariables.cs index 3c536d85..3f39c03f 100644 --- a/src/SlimFaas/EnvironmentVariables.cs +++ b/src/SlimFaas/EnvironmentVariables.cs @@ -22,7 +22,7 @@ public static class EnvironmentVariables public const string BaseSlimDataUrlDefault = "http://{pod_name}.slimfaas.default.svc.cluster.local:3262/"; - public const int SlimProxyMiddlewareTimeoutWaitWakeSyncFunctionMilliSecondsDefault = 10000; + public const int SlimProxyMiddlewareTimeoutWaitWakeSyncFunctionMilliSecondsDefault = 30000; public const string TimeMaximumWaitForAtLeastOnePodStartedForSyncFunction = "TIME_MAXIMUM_WAIT_FOR_AT_LEAST_ONE_POD_STARTED_FOR_SYNC_FUNCTION"; diff --git a/src/SlimFaas/Kubernetes/IKubernetesService.cs b/src/SlimFaas/Kubernetes/IKubernetesService.cs index e1b22718..c2218f98 100644 --- a/src/SlimFaas/Kubernetes/IKubernetesService.cs +++ b/src/SlimFaas/Kubernetes/IKubernetesService.cs @@ -5,5 +5,5 @@ namespace SlimFaas; public interface IKubernetesService { Task ScaleAsync(ReplicaRequest request); - Task ListFunctionsAsync(string kubeNamespace); + Task ListFunctionsAsync(string kubeNamespace, DeploymentsInformations previousDeployments); } diff --git a/src/SlimFaas/Kubernetes/KubernetesService.cs b/src/SlimFaas/Kubernetes/KubernetesService.cs index 86bb5d06..d9351372 100644 --- a/src/SlimFaas/Kubernetes/KubernetesService.cs +++ b/src/SlimFaas/Kubernetes/KubernetesService.cs @@ -1,4 +1,5 @@ using System.Diagnostics.CodeAnalysis; +using System.Globalization; using System.Net; using System.Text; using System.Text.Json; @@ -31,6 +32,25 @@ public record ScaleDownTimeout [JsonSourceGenerationOptions(WriteIndented = false, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] public partial class ScheduleConfigSerializerContext : JsonSerializerContext; +public record SlimFaasConfiguration { + + public SlimFaasDefaultConfiguration DefaultSync { get; init; } = new(); + + public SlimFaasDefaultConfiguration DefaultAsync { get; init; } = new(); + public SlimFaasDefaultConfiguration DefaultPublish { get; init; } = new(); + +} + +public record SlimFaasDefaultConfiguration { + + public int HttpTimeout { get; init; } = 120; + public List TimeoutRetries { get; init; } = [2, 4, 8]; + public List HttpStatusRetries { get; init; } = [500, 502, 503]; +} + +[JsonSerializable(typeof(SlimFaasConfiguration))] +[JsonSourceGenerationOptions(WriteIndented = false, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] +public partial class SlimFaasConfigurationSerializerContext : JsonSerializerContext; public enum FunctionVisibility { @@ -55,7 +75,11 @@ public record DeploymentsInformations(IList Functions, [JsonSourceGenerationOptions(WriteIndented = false, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] public partial class DeploymentsInformationsSerializerContext : JsonSerializerContext; -public record DeploymentInformation(string Deployment, string Namespace, IList Pods, int Replicas, +public record DeploymentInformation(string Deployment, + string Namespace, + IList Pods, + SlimFaasConfiguration Configuration, + int Replicas, int ReplicasAtStart = 1, int ReplicasMin = 0, int TimeoutSecondBeforeSetReplicasMin = 300, @@ -68,7 +92,8 @@ public record DeploymentInformation(string Deployment, string Namespace, IList

? PathsStartWithVisibility = null, IList? ExcludeDeploymentsFromVisibilityPrivate = null, - string ResourceVersion = "" + string ResourceVersion = "", + bool EndpointReady = false ); public record PodInformation(string Name, bool? Started, bool? Ready, string Ip, string DeploymentName); @@ -78,6 +103,7 @@ public class KubernetesService : IKubernetesService { private const string ReplicasMin = "SlimFaas/ReplicasMin"; private const string Schedule = "SlimFaas/Schedule"; + private const string Configuration = "SlimFaas/Configuration"; private const string Function = "SlimFaas/Function"; private const string ReplicasAtStart = "SlimFaas/ReplicasAtStart"; private const string DependsOn = "SlimFaas/DependsOn"; @@ -91,6 +117,10 @@ public class KubernetesService : IKubernetesService private const string TimeoutSecondBeforeSetReplicasMin = "SlimFaas/TimeoutSecondBeforeSetReplicasMin"; private const string NumberParallelRequest = "SlimFaas/NumberParallelRequest"; + private const string SynchrounousRetry = "SlimFaas/SynchrounousRetry"; + private const string AsynchrounousRetry = "SlimFaas/AsynchrounousRetry"; + + private const string SlimfaasDeploymentName = "slimfaas"; private readonly ILogger _logger; private readonly k8s.Kubernetes _client; @@ -163,7 +193,7 @@ public KubernetesService(ILogger logger, bool useKubeConfig) } - public async Task ListFunctionsAsync(string kubeNamespace) + public async Task ListFunctionsAsync(string kubeNamespace, DeploymentsInformations previousDeployments) { try { @@ -176,7 +206,7 @@ public async Task ListFunctionsAsync(string kubeNamespa await Task.WhenAll(deploymentListTask, podListTask, statefulSetListTask); V1DeploymentList? deploymentList = deploymentListTask.Result; - IEnumerable podList = MapPodInformations(podListTask.Result); + IEnumerable podList = await MapPodInformations(podListTask.Result); V1StatefulSetList? statefulSetList = statefulSetListTask.Result; SlimFaasDeploymentInformation? slimFaasDeploymentInformation = statefulSetList.Items @@ -187,8 +217,8 @@ public async Task ListFunctionsAsync(string kubeNamespa .FirstOrDefault(); IEnumerable podInformations = podList.ToArray(); - AddDeployments(kubeNamespace, deploymentList, podInformations, deploymentInformationList, _logger); - AddStatefulSets(kubeNamespace, statefulSetList, podInformations, deploymentInformationList, _logger); + await AddDeployments(kubeNamespace, deploymentList, podInformations, deploymentInformationList, _logger, client, previousDeployments.Functions); + await AddStatefulSets(kubeNamespace, statefulSetList, podInformations, deploymentInformationList, _logger, client, previousDeployments.Functions); return new DeploymentsInformations(deploymentInformationList, slimFaasDeploymentInformation ?? new SlimFaasDeploymentInformation(1, new List()), podInformations); @@ -201,8 +231,25 @@ public async Task ListFunctionsAsync(string kubeNamespa } } - private static void AddDeployments(string kubeNamespace, V1DeploymentList deploymentList, IEnumerable podList, - IList deploymentInformationList, ILogger logger) + private static IList ReadRetryAnnotation( IDictionary? annotations) + { + if (annotations != null && annotations.TryGetValue(AsynchrounousRetry, out string? valueAsynchrounousRetry)) + { + var result = new List(); + var array = valueAsynchrounousRetry.Split(';'); + foreach (string s in array) + { + result.Add( int.Parse(s, NumberStyles.Integer)); + } + + return result; + } + + return new List(){2, 4, 8}; + } + + private static async Task AddDeployments(string kubeNamespace, V1DeploymentList deploymentList, IEnumerable podList, + IList deploymentInformationList, ILogger logger, k8s.Kubernetes client , IList previousDeploymentInformationList) { foreach (V1Deployment? deploymentListItem in deploymentList.Items) { @@ -216,44 +263,55 @@ private static void AddDeployments(string kubeNamespace, V1DeploymentList deploy } var name = deploymentListItem.Metadata.Name; + var pods = podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(); ScheduleConfig? scheduleConfig = GetScheduleConfig(annotations, name, logger); - - DeploymentInformation deploymentInformation = new( - name, - kubeNamespace, - podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(), - deploymentListItem.Spec.Replicas ?? 0, - annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart) - ? int.Parse(annotationReplicasAtStart) - : 1, annotations.TryGetValue(ReplicasMin, out string? annotationReplicaMin) - ? int.Parse(annotationReplicaMin) - : 0, annotations.TryGetValue(TimeoutSecondBeforeSetReplicasMin, - out string? annotationTimeoutSecondBeforeSetReplicasMin) - ? int.Parse(annotationTimeoutSecondBeforeSetReplicasMin) - : 300, annotations.TryGetValue(NumberParallelRequest, - out string? annotationNumberParallelRequest) - ? int.Parse(annotationNumberParallelRequest) - : 10, annotations.ContainsKey( - ReplicasStartAsSoonAsOneFunctionRetrieveARequest) && - annotations[ReplicasStartAsSoonAsOneFunctionRetrieveARequest].ToLower() == "true", - PodType.Deployment, - annotations.TryGetValue(DependsOn, out string? value) - ? value.Split(',').ToList() - : new List(), - scheduleConfig, - annotations.TryGetValue(SubscribeEvents, out string? valueSubscribeEvents) - ? valueSubscribeEvents.Split(',').ToList() - : new List(), - annotations.TryGetValue(DefaultVisibility, out string? visibility) - ? Enum.Parse(visibility) - : FunctionVisibility.Public, - annotations.TryGetValue(PathsStartWithVisibility, out string? valueUrlsStartWithVisibility) - ? valueUrlsStartWithVisibility.Split(',').ToList() - : new List(), - annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, out string? valueExcludeDeploymentsFromVisibilityPrivate) ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() : new List(), - deploymentListItem.Metadata.ResourceVersion + SlimFaasConfiguration configuration = GetConfiguration(annotations, name, logger); + var previousDeployment = previousDeploymentInformationList.FirstOrDefault(d => d.Deployment == name); + bool endpointReady = await GetEndpointReady(kubeNamespace, client, previousDeployment, name, pods); + var resourceVersion = $"{deploymentListItem.Metadata.ResourceVersion}-{endpointReady}"; + if (previousDeployment != null && previousDeployment.ResourceVersion == resourceVersion) + { + deploymentInformationList.Add(previousDeployment); + } else { + DeploymentInformation deploymentInformation = new( + name, + kubeNamespace, + pods, + configuration, + deploymentListItem.Spec.Replicas ?? 0, + annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart) + ? int.Parse(annotationReplicasAtStart) + : 1, annotations.TryGetValue(ReplicasMin, out string? annotationReplicaMin) + ? int.Parse(annotationReplicaMin) + : 0, annotations.TryGetValue(TimeoutSecondBeforeSetReplicasMin, + out string? annotationTimeoutSecondBeforeSetReplicasMin) + ? int.Parse(annotationTimeoutSecondBeforeSetReplicasMin) + : 300, annotations.TryGetValue(NumberParallelRequest, + out string? annotationNumberParallelRequest) + ? int.Parse(annotationNumberParallelRequest) + : 10, annotations.ContainsKey( + ReplicasStartAsSoonAsOneFunctionRetrieveARequest) && + annotations[ReplicasStartAsSoonAsOneFunctionRetrieveARequest].ToLower() == "true", + PodType.Deployment, + annotations.TryGetValue(DependsOn, out string? value) + ? value.Split(',').ToList() + : new List(), + scheduleConfig, + annotations.TryGetValue(SubscribeEvents, out string? valueSubscribeEvents) + ? valueSubscribeEvents.Split(',').ToList() + : new List(), + annotations.TryGetValue(DefaultVisibility, out string? visibility) + ? Enum.Parse(visibility) + : FunctionVisibility.Public, + annotations.TryGetValue(PathsStartWithVisibility, out string? valueUrlsStartWithVisibility) + ? valueUrlsStartWithVisibility.Split(',').ToList() + : new List(), + annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, out string? valueExcludeDeploymentsFromVisibilityPrivate) ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() : new List(), + resourceVersion, + EndpointReady: endpointReady ); - deploymentInformationList.Add(deploymentInformation); + deploymentInformationList.Add(deploymentInformation); + } } catch (Exception e) { @@ -262,6 +320,40 @@ private static void AddDeployments(string kubeNamespace, V1DeploymentList deploy } } + private static async Task GetEndpointReady(string kubeNamespace, k8s.Kubernetes client, + DeploymentInformation? previousDeployment, string name, List pods) + { + try + { + if (pods.Count == 0) + { + return false; + } + + if (previousDeployment is not { EndpointReady: false } || pods.Count != 1) + { + return previousDeployment is { EndpointReady: true }; + } + + var endpoints = await client.CoreV1.ReadNamespacedEndpointsAsync(name, kubeNamespace); + if (endpoints is not { Subsets: not null }) + { + return previousDeployment is { EndpointReady: true }; + } + + var readyAddresses = endpoints.Subsets + .Where(s => s.Addresses != null) + .SelectMany(s => s.Addresses) + .ToList(); + return readyAddresses.Count > 0; + } + catch (Exception e) + { + Console.WriteLine(e); + } + return false; + } + private static ScheduleConfig? GetScheduleConfig(IDictionary annotations, string name, ILogger logger) { try @@ -279,8 +371,25 @@ private static void AddDeployments(string kubeNamespace, V1DeploymentList deploy return new ScheduleConfig(); } - private static void AddStatefulSets(string kubeNamespace, V1StatefulSetList deploymentList, IEnumerable podList, - IList deploymentInformationList, ILogger logger) + private static SlimFaasConfiguration GetConfiguration(IDictionary annotations, string name, ILogger logger) + { + try + { + if (annotations.TryGetValue(Configuration, out string? annotation) && !string.IsNullOrEmpty(annotation.Trim())) + { + return JsonSerializer.Deserialize(annotation, SlimFaasConfigurationSerializerContext.Default.SlimFaasConfiguration) ?? new SlimFaasConfiguration(); + } + } + catch (Exception e) + { + logger.LogError( e, "name: {Name}\\n annotations[Configuration]: {Configuration}", name, annotations[Configuration]); + } + + return new SlimFaasConfiguration(); + } + + private static async Task AddStatefulSets(string kubeNamespace, V1StatefulSetList deploymentList, IEnumerable podList, + IList deploymentInformationList, ILogger logger, k8s.Kubernetes client , IList previousDeploymentInformationList) { foreach (V1StatefulSet? deploymentListItem in deploymentList.Items) { @@ -294,44 +403,60 @@ private static void AddStatefulSets(string kubeNamespace, V1StatefulSetList depl } var name = deploymentListItem.Metadata.Name; + var pods = podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(); ScheduleConfig? scheduleConfig = GetScheduleConfig(annotations, name, logger); - - DeploymentInformation deploymentInformation = new( - name, - kubeNamespace, - podList.Where(p => p.DeploymentName.StartsWith(name)).ToList(), - deploymentListItem.Spec.Replicas ?? 0, - annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart) - ? int.Parse(annotationReplicasAtStart) - : 1, annotations.TryGetValue(ReplicasMin, out string? annotationReplicasMin) - ? int.Parse(annotationReplicasMin) - : 0, annotations.TryGetValue(TimeoutSecondBeforeSetReplicasMin, - out string? annotationTimeoutSecondBeforeSetReplicasMin) - ? int.Parse(annotationTimeoutSecondBeforeSetReplicasMin) - : 300, annotations.TryGetValue(NumberParallelRequest, - out string? annotationNumberParallelRequest) - ? int.Parse(annotationNumberParallelRequest) - : 10, annotations.ContainsKey( - ReplicasStartAsSoonAsOneFunctionRetrieveARequest) && - annotations[ReplicasStartAsSoonAsOneFunctionRetrieveARequest].ToLower() == "true", - PodType.StatefulSet, - annotations.TryGetValue(DependsOn, out string? value) - ? value.Split(',').ToList() - : new List(), - scheduleConfig, - annotations.TryGetValue(SubscribeEvents, out string? valueSubscribeEvents) - ? valueSubscribeEvents.Split(',').ToList() - : new List(), - annotations.TryGetValue(DefaultVisibility, out string? visibility) - ? Enum.Parse(visibility) - : FunctionVisibility.Public, - annotations.TryGetValue(PathsStartWithVisibility, out string? valueUrlsStartWithVisibility) - ? valueUrlsStartWithVisibility.Split(',').ToList() - : new List(), - annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, out string? valueExcludeDeploymentsFromVisibilityPrivate) ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() : new List(), - deploymentListItem.Metadata.ResourceVersion); - - deploymentInformationList.Add(deploymentInformation); + SlimFaasConfiguration configuration = GetConfiguration(annotations, name, logger); + var previousDeployment = previousDeploymentInformationList.FirstOrDefault(d => d.Deployment == name); + bool endpointReady = await GetEndpointReady(kubeNamespace, client, previousDeployment, name, pods); + var resourceVersion = $"{deploymentListItem.Metadata.ResourceVersion}-{endpointReady}"; + if (previousDeployment != null && previousDeployment.ResourceVersion == resourceVersion) + { + deploymentInformationList.Add(previousDeployment); + } + else + { + DeploymentInformation deploymentInformation = new( + name, + kubeNamespace, + pods, + configuration, + deploymentListItem.Spec.Replicas ?? 0, + annotations.TryGetValue(ReplicasAtStart, out string? annotationReplicasAtStart) + ? int.Parse(annotationReplicasAtStart) + : 1, annotations.TryGetValue(ReplicasMin, out string? annotationReplicasMin) + ? int.Parse(annotationReplicasMin) + : 0, annotations.TryGetValue(TimeoutSecondBeforeSetReplicasMin, + out string? annotationTimeoutSecondBeforeSetReplicasMin) + ? int.Parse(annotationTimeoutSecondBeforeSetReplicasMin) + : 300, annotations.TryGetValue(NumberParallelRequest, + out string? annotationNumberParallelRequest) + ? int.Parse(annotationNumberParallelRequest) + : 10, annotations.ContainsKey( + ReplicasStartAsSoonAsOneFunctionRetrieveARequest) && + annotations[ReplicasStartAsSoonAsOneFunctionRetrieveARequest].ToLower() == "true", + PodType.StatefulSet, + annotations.TryGetValue(DependsOn, out string? value) + ? value.Split(',').ToList() + : new List(), + scheduleConfig, + annotations.TryGetValue(SubscribeEvents, out string? valueSubscribeEvents) + ? valueSubscribeEvents.Split(',').ToList() + : new List(), + annotations.TryGetValue(DefaultVisibility, out string? visibility) + ? Enum.Parse(visibility) + : FunctionVisibility.Public, + annotations.TryGetValue(PathsStartWithVisibility, out string? valueUrlsStartWithVisibility) + ? valueUrlsStartWithVisibility.Split(',').ToList() + : new List(), + annotations.TryGetValue(ExcludeDeploymentsFromVisibilityPrivate, + out string? valueExcludeDeploymentsFromVisibilityPrivate) + ? valueExcludeDeploymentsFromVisibilityPrivate.Split(',').ToList() + : new List(), + resourceVersion, + EndpointReady: endpointReady); + + deploymentInformationList.Add(deploymentInformation); + } } catch (Exception e) { @@ -340,8 +465,9 @@ private static void AddStatefulSets(string kubeNamespace, V1StatefulSetList depl } } - private static IEnumerable MapPodInformations(V1PodList v1PodList) + private static async Task> MapPodInformations(V1PodList v1PodList) { + var result = new List(); foreach (V1Pod? item in v1PodList.Items) { string? podIp = item.Status.PodIP; @@ -351,13 +477,17 @@ private static IEnumerable MapPodInformations(V1PodList v1PodLis } V1ContainerStatus? containerStatus = item.Status.ContainerStatuses.FirstOrDefault(); - bool ready = containerStatus?.Ready ?? false; bool started = containerStatus?.Started ?? false; + bool containerReady = item.Status.Conditions.FirstOrDefault(c => c.Type == "ContainersReady")?.Status == "True"; + bool podReady = item.Status.Conditions.FirstOrDefault(c => c.Type == "Ready")?.Status == "True"; string? podName = item.Metadata.Name; string deploymentName = item.Metadata.OwnerReferences[0].Name; - PodInformation podInformation = new(podName, started, ready, podIp, deploymentName); - yield return podInformation; + + PodInformation podInformation = new(podName, started, started && containerReady && podReady, podIp, deploymentName); + result.Add(podInformation); } + return result; } + } diff --git a/src/SlimFaas/Kubernetes/MockKubernetesService.cs b/src/SlimFaas/Kubernetes/MockKubernetesService.cs index b5a688d0..252224ca 100644 --- a/src/SlimFaas/Kubernetes/MockKubernetesService.cs +++ b/src/SlimFaas/Kubernetes/MockKubernetesService.cs @@ -1,4 +1,5 @@ -using System.Diagnostics.CodeAnalysis; +using System.Configuration; +using System.Diagnostics.CodeAnalysis; using System.Text.Json; using System.Text.Json.Serialization; using SlimFaas.Kubernetes; @@ -64,7 +65,8 @@ public MockKubernetesService() Namespace: "default", ReplicasStartAsSoonAsOneFunctionRetrieveARequest: false, NumberParallelRequest: function.NumberParallelRequest, - Pods: new List() { new("", true, true, "", "") } + Pods: new List() { new("", true, true, "", "") }, + Configuration : new SlimFaasConfiguration() ); _deploymentInformations.Functions.Add(deploymentInformation); } @@ -74,7 +76,7 @@ public MockKubernetesService() return Task.FromResult(request); } - public Task ListFunctionsAsync(string kubeNamespace) + public Task ListFunctionsAsync(string kubeNamespace, DeploymentsInformations previousDeployments) { return Task.FromResult(_deploymentInformations); } diff --git a/src/SlimFaas/Program.cs b/src/SlimFaas/Program.cs index ed1a434d..e61cd5df 100644 --- a/src/SlimFaas/Program.cs +++ b/src/SlimFaas/Program.cs @@ -2,8 +2,6 @@ using System.Text.Json; using DotNext.Net.Cluster.Consensus.Raft.Http; using Microsoft.AspNetCore.Server.Kestrel.Core; -using Polly; -using Polly.Extensions.Http; using Prometheus; using SlimData; using SlimFaas; @@ -76,7 +74,7 @@ serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddHostedService(); serviceCollectionSlimFaas.AddHttpClient(); -serviceCollectionSlimFaas.AddSingleton(); +serviceCollectionSlimFaas.AddSingleton(); serviceCollectionSlimFaas.AddSingleton(); serviceCollectionSlimFaas.AddSingleton(sp => (ReplicasService)serviceProviderStarter.GetService()!); @@ -198,8 +196,8 @@ } return httpClientHandler; - }) - .AddPolicyHandler(GetRetryPolicy()); + }); + //.AddPolicyHandler(GetRetryPolicy()); if (!string.IsNullOrEmpty(podDataDirectoryPersistantStorage)) { @@ -315,25 +313,6 @@ serviceProviderStarter.Dispose(); -static IAsyncPolicy GetRetryPolicy() -{ - return HttpPolicyExtensions - .HandleTransientHttpError() - .OrResult(msg => - { - HttpStatusCode[] httpStatusCodesWorthRetrying = - { - HttpStatusCode.RequestTimeout, // 408 - HttpStatusCode.InternalServerError, // 500 - HttpStatusCode.BadGateway, // 502 - HttpStatusCode.ServiceUnavailable, // 503 - HttpStatusCode.GatewayTimeout // 504 - }; - return httpStatusCodesWorthRetrying.Contains(msg.StatusCode); - }) - .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, - retryAttempt))); -} public partial class Program diff --git a/src/SlimFaas/ReplicasService.cs b/src/SlimFaas/ReplicasService.cs index d0a3de87..9b2bf7c6 100644 --- a/src/SlimFaas/ReplicasService.cs +++ b/src/SlimFaas/ReplicasService.cs @@ -1,4 +1,5 @@ -using SlimFaas.Kubernetes; +using System.Text.Json; +using SlimFaas.Kubernetes; using NodaTime; using NodaTime.TimeZones; @@ -49,7 +50,7 @@ public async Task SyncDeploymentsFromSlimData(DeploymentsInformations deployment public async Task SyncDeploymentsAsync(string kubeNamespace) { - DeploymentsInformations deployments = await kubernetesService.ListFunctionsAsync(kubeNamespace); + DeploymentsInformations deployments = await kubernetesService.ListFunctionsAsync(kubeNamespace, Deployments); lock (Lock) { if (logger.IsEnabled(LogLevel.Information)) @@ -61,6 +62,12 @@ public async Task SyncDeploymentsAsync(string kubeNames f.ResourceVersion == deploymentInformation.ResourceVersion); if (currentDeployment == null) { + + string podsInformationString = ""; + foreach (PodInformation deploymentInformationPod in deploymentInformation.Pods) + { + podsInformationString += deploymentInformationPod.Name + " " + deploymentInformationPod.Ready + " " + deploymentInformationPod.Started + " " + deploymentInformationPod.Ip + " " + deploymentInformationPod.DeploymentName + "\n"; + } // Un log information avec toutes les informations de toutes les propriété de la fonction logger.LogInformation("New deployment {Deployment} \n" + "with {Replicas} replicas \n" + @@ -70,18 +77,21 @@ public async Task SyncDeploymentsAsync(string kubeNames "with {TimeoutSecondBeforeSetReplicasMin} timeout second before set replicas min \n" + "with {PodType} pod type \n" + "with {ResourceVersion} resource version \n"+ - "with {NumberParallelRequest} number parallel request \n", - "with dependOn {DependsOn} \n", + "with {NumberParallelRequest} number parallel request \n" + + "with dependOn {DependsOn} \n" + + "with {EndpointReady} endpoint ready \n" + + "with {Configuration} configuration \n" + + "with pods {Pods}", deploymentInformation.Deployment, deploymentInformation.Replicas, deploymentInformation.ReplicasAtStart, deploymentInformation.ReplicasMin, deploymentInformation.ReplicasStartAsSoonAsOneFunctionRetrieveARequest, deploymentInformation.TimeoutSecondBeforeSetReplicasMin, - deploymentInformation.PodType, deploymentInformation.ResourceVersion, deploymentInformation.NumberParallelRequest, deploymentInformation.DependsOn); - + deploymentInformation.PodType, deploymentInformation.ResourceVersion, deploymentInformation.NumberParallelRequest, + deploymentInformation.DependsOn, + deploymentInformation.EndpointReady, + JsonSerializer.Serialize(deploymentInformation.Configuration, SlimFaasConfigurationSerializerContext.Default.SlimFaasConfiguration), + podsInformationString); } } - } - - _deployments = deployments; } return deployments; @@ -145,7 +155,9 @@ public async Task CheckScaleAsync(string kubeNamespace) if (currentScale == deploymentInformation.ReplicasMin) { continue; - } else if(currentScale < deploymentInformation.ReplicasMin) + } + + if(currentScale < deploymentInformation.ReplicasMin) { logger.LogInformation("Scale up {Deployment} from {CurrentScale} to {ReplicaAtStart}", deploymentInformation.Deployment, currentScale, deploymentInformation.ReplicasAtStart); } diff --git a/src/SlimFaas/Retry.cs b/src/SlimFaas/Retry.cs new file mode 100644 index 00000000..ecdbf1a2 --- /dev/null +++ b/src/SlimFaas/Retry.cs @@ -0,0 +1,123 @@ +namespace SlimFaas; + +public static class Retry +{ + + public static async Task DoAsync( + Func> action, + ILogger logger, + IList delays + ) + { + var exceptions = new List(); + + for (int attempt = -1; attempt < delays.Count; attempt++) + { + try + { + if (attempt >= 0) + { + var delay = delays[attempt]; + logger.LogWarning("Try {Attempt} : wait number {Delay} second", attempt, delay); + await Task.Delay(delay * 1000); + } + + return await action(); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + + throw new AggregateException(exceptions); + } + + public static async Task DoAsync( + Func action, + ILogger logger, + IList delays + ) + { + var exceptions = new List(); + + for (int attempt = -1; attempt < delays.Count; attempt++) + { + try + { + if (attempt >= 0) + { + var delay = delays[attempt]; + logger.LogWarning("Try {Attempt} : wait numnber {Delay} second", attempt, delay); + await Task.Delay(delay * 1000); + } + await action(); + return; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + + throw new AggregateException(exceptions); + } + + public static async Task DoRequestAsync( + Func> action, + ILogger logger, + IList delays, + IList httpStatusRetries + ) + { + var exceptions = new List(); + + for (int attempt = -1; attempt < delays.Count; attempt++) + { + if (attempt >= 0) + { + var delay = delays[attempt]; + logger.LogWarning("DoRequestAsync Try {Attempt} : wait number {Delay} second", attempt+1, delay); + await Task.Delay(delay * 1000); + } + + var responseMessage = await WrapRequestAction(action); + var statusCode = (int)responseMessage.StatusCode; + if (!httpStatusRetries.Contains(statusCode)) + { + return responseMessage; + } + responseMessage.Dispose(); + exceptions.Add(new Exception($"DoRequestAsync received code Http {statusCode}")); + } + + throw new AggregateException(exceptions); + } + + private static async Task WrapRequestAction(Func> action) + { + try + { + return await action(); + } + catch (TaskCanceledException ex) when (ex.InnerException is TimeoutException) + { + var fallbackResponse = new HttpResponseMessage(System.Net.HttpStatusCode.GatewayTimeout) + { + Content = new StringContent("Error 504 simulated due to a timeout") + }; + return fallbackResponse; + } + catch (HttpRequestException ex) + { + Console.WriteLine($"Network exception : {ex.Message}"); + + var fallbackResponse = new HttpResponseMessage(System.Net.HttpStatusCode.InternalServerError) + { + Content = new StringContent("Error 500 simulated due to a network problem") + }; + return fallbackResponse; + } + } + +} diff --git a/src/SlimFaas/SendClient.cs b/src/SlimFaas/SendClient.cs index 3cf5d1a6..85d570a9 100644 --- a/src/SlimFaas/SendClient.cs +++ b/src/SlimFaas/SendClient.cs @@ -1,13 +1,14 @@ using Microsoft.Extensions.Primitives; +using SlimFaas.Kubernetes; namespace SlimFaas; public interface ISendClient { - Task SendHttpRequestAsync(CustomRequest customRequest, HttpContext? context = null, string? baseUrl = null); + Task SendHttpRequestAsync(CustomRequest customRequest, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null, CancellationTokenSource? cancellationToken = null); Task SendHttpRequestSync(HttpContext httpContext, string functionName, string functionPath, - string functionQuery, string? baseUrl = null); + string functionQuery, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null); } public class SendClient(HttpClient httpClient, ILogger logger) : ISendClient @@ -19,7 +20,7 @@ public class SendClient(HttpClient httpClient, ILogger logger) : ISe Environment.GetEnvironmentVariable(EnvironmentVariables.Namespace) ?? EnvironmentVariables.NamespaceDefault; public async Task SendHttpRequestAsync(CustomRequest customRequest, - HttpContext? context = null, string? baseUrl = null) + SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null, CancellationTokenSource? cancellationToken = null) { try { @@ -30,33 +31,41 @@ public async Task SendHttpRequestAsync(CustomRequest custom string targetUrl = ComputeTargetUrl(functionUrl, customRequestFunctionName, customRequestPath, customRequestQuery, _namespaceSlimFaas); logger.LogDebug("Sending async request to {TargetUrl}", targetUrl); - HttpRequestMessage targetRequestMessage = CreateTargetMessage(customRequest, new Uri(targetUrl)); - if (context != null) - { - return await httpClient.SendAsync(targetRequestMessage, - HttpCompletionOption.ResponseHeadersRead, context.RequestAborted); - } - return await httpClient.SendAsync(targetRequestMessage, - HttpCompletionOption.ResponseHeadersRead); + + httpClient.Timeout = TimeSpan.FromSeconds(slimFaasDefaultConfiguration.HttpTimeout); + return await Retry.DoRequestAsync(() => + { + HttpRequestMessage targetRequestMessage = CreateTargetMessage(customRequest, new Uri(targetUrl)); + return httpClient.SendAsync(targetRequestMessage, + HttpCompletionOption.ResponseHeadersRead, + cancellationToken?.Token ?? CancellationToken.None); + }, + logger, slimFaasDefaultConfiguration.TimeoutRetries, slimFaasDefaultConfiguration.HttpStatusRetries) + .ConfigureAwait(false); } catch (Exception e) { - logger.LogError(e, "Error in SendHttpRequestAsync to {FunctionName} to {FunctionPath} ", customRequest.FunctionName, customRequest.FunctionName); + logger.LogError(e, "Error in SendHttpRequestAsync to {FunctionName} to {FunctionPath} ", customRequest.FunctionName, customRequest.Path); throw; } } public async Task SendHttpRequestSync(HttpContext context, string functionName, - string functionPath, string functionQuery, string? baseUrl = null) + string functionPath, string functionQuery, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null) { try { string targetUrl = ComputeTargetUrl(baseUrl ?? _baseFunctionUrl, functionName, functionPath, functionQuery, _namespaceSlimFaas); logger.LogDebug("Sending sync request to {TargetUrl}", targetUrl); - HttpRequestMessage targetRequestMessage = CreateTargetMessage(context, new Uri(targetUrl)); - HttpResponseMessage responseMessage = await httpClient.SendAsync(targetRequestMessage, - HttpCompletionOption.ResponseHeadersRead, context.RequestAborted); + httpClient.Timeout = TimeSpan.FromSeconds(slimFaasDefaultConfiguration.HttpTimeout); + HttpResponseMessage responseMessage = await Retry.DoRequestAsync(() => + { + HttpRequestMessage targetRequestMessage = CreateTargetMessage(context, new Uri(targetUrl)); + return httpClient.SendAsync(targetRequestMessage, + HttpCompletionOption.ResponseHeadersRead, context.RequestAborted); + }, + logger, slimFaasDefaultConfiguration.TimeoutRetries, slimFaasDefaultConfiguration.HttpStatusRetries).ConfigureAwait(false); return responseMessage; } catch (Exception e) diff --git a/src/SlimFaas/SlimFaas.csproj b/src/SlimFaas/SlimFaas.csproj index 0abedd3c..217a7eee 100644 --- a/src/SlimFaas/SlimFaas.csproj +++ b/src/SlimFaas/SlimFaas.csproj @@ -31,8 +31,7 @@ - - + diff --git a/src/SlimFaas/SlimProxyMiddleware.cs b/src/SlimFaas/SlimProxyMiddleware.cs index 6ce7b481..ef99504e 100644 --- a/src/SlimFaas/SlimProxyMiddleware.cs +++ b/src/SlimFaas/SlimProxyMiddleware.cs @@ -1,5 +1,6 @@ using System.Text.Json.Serialization; using MemoryPack; +using SlimData; using SlimFaas.Database; using SlimFaas.Kubernetes; @@ -286,7 +287,8 @@ private async Task BuildAsyncResponseAsync(ILogger logger, await InitCustomRequest(context, context.Request, functionName, functionPath); var bin = MemoryPackSerializer.Serialize(customRequest); - await slimFaasQueue.EnqueueAsync(functionName, bin); + var defaultAsync = function.Configuration.DefaultAsync; + await slimFaasQueue.EnqueueAsync(functionName, bin, new RetryInformation(defaultAsync.TimeoutRetries, defaultAsync.HttpTimeout, defaultAsync.HttpStatusRetries)); context.Response.StatusCode = 202; } @@ -310,6 +312,7 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem await InitCustomRequest(context, context.Request, "", functionPath); List tasks = new(); + var queryString = context.Request.QueryString.ToUriComponent(); foreach (DeploymentInformation function in functions) { foreach (var pod in function.Pods) @@ -332,7 +335,7 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem var baseUrl = SlimDataEndpoint.Get(pod, baseFunctionPodUrl); logger.LogDebug("Sending event {EventName} to {FunctionDeployment} at {BaseUrl} with path {FunctionPath} and query {UriComponent}", eventName, function.Deployment, baseUrl, functionPath, context.Request.QueryString.ToUriComponent()); - Task task = SendRequest(context, sendClient, customRequest with {FunctionName = function.Deployment}, baseUrl, logger, eventName); + Task task = SendRequest(queryString, sendClient, customRequest with {FunctionName = function.Deployment}, baseUrl, logger, eventName, function.Configuration.DefaultPublish); tasks.Add(task); } } @@ -342,7 +345,7 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem foreach (string baseUrl in slimFaasSubscribeEvent.Value) { logger.LogDebug("Sending event {EventName} to {BaseUrl} with path {FunctionPath} and query {UriComponent}", eventName, baseUrl, functionPath, context.Request.QueryString.ToUriComponent()); - Task task = SendRequest(context, sendClient, customRequest with {FunctionName = ""}, baseUrl, logger, eventName); + Task task = SendRequest(queryString, sendClient, customRequest with {FunctionName = ""}, baseUrl, logger, eventName, new SlimFaasDefaultConfiguration()); tasks.Add(task); } } @@ -366,20 +369,20 @@ private async Task BuildPublishResponseAsync(HttpContext context, HistoryHttpMem context.Response.StatusCode = 204; } - private static async Task SendRequest(HttpContext context, ISendClient sendClient, CustomRequest customRequest, string baseUrl, ILogger logger, string eventName) + private static async Task SendRequest(string queryString, ISendClient sendClient, CustomRequest customRequest, string baseUrl, ILogger logger, string eventName, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration) { try { - using HttpResponseMessage responseMessage = await sendClient.SendHttpRequestAsync(customRequest, context, baseUrl); + using HttpResponseMessage responseMessage = await sendClient.SendHttpRequestAsync(customRequest, slimFaasDefaultConfiguration, baseUrl); logger.LogDebug( "Response from event {EventName} to {FunctionDeployment} at {BaseUrl} with path {FunctionPath} and query {UriComponent} is {StatusCode}", - eventName, customRequest.FunctionName, baseUrl, customRequest.Path, context.Request.QueryString.ToUriComponent(), + eventName, customRequest.FunctionName, baseUrl, customRequest.Path, queryString, responseMessage.StatusCode); } catch (Exception e) { logger.LogError(e, "Error in sending event {EventName} to {FunctionDeployment} at {BaseUrl} with path {FunctionPath} and query {UriComponent}", - eventName, customRequest.FunctionName, baseUrl, customRequest.Path, context.Request.QueryString.ToUriComponent()); + eventName, customRequest.FunctionName, baseUrl, customRequest.Path, queryString); } } @@ -401,10 +404,10 @@ private async Task BuildSyncResponseAsync(HttpContext context, HistoryHttpMemory return; } - await WaitForAnyPodStartedAsync(context, historyHttpService, replicasService, functionName); + await WaitForAnyPodStartedAsync(logger, context, historyHttpService, replicasService, functionName); Task responseMessagePromise = sendClient.SendHttpRequestSync(context, functionName, - functionPath, context.Request.QueryString.ToUriComponent()); + functionPath, context.Request.QueryString.ToUriComponent(), function.Configuration.DefaultSync); long lastSetTicks = DateTime.UtcNow.Ticks; historyHttpService.SetTickLastCall(functionName, lastSetTicks); @@ -428,7 +431,7 @@ private async Task BuildSyncResponseAsync(HttpContext context, HistoryHttpMemory await responseMessage.Content.CopyToAsync(context.Response.Body); } - private async Task WaitForAnyPodStartedAsync(HttpContext context, HistoryHttpMemoryService historyHttpService, + private async Task WaitForAnyPodStartedAsync(ILogger logger, HttpContext context, HistoryHttpMemoryService historyHttpService, IReplicasService replicasService, string functionName) { int numberLoop = _timeoutMaximumWaitWakeSyncFunctionMilliSecond / 10; @@ -436,9 +439,14 @@ private async Task WaitForAnyPodStartedAsync(HttpContext context, HistoryHttpMem historyHttpService.SetTickLastCall(functionName, lastSetTicks); while (numberLoop > 0) { - bool isAnyContainerStarted = replicasService.Deployments.Functions.Any(f => - f is { Replicas: > 0, Pods: not null } && f.Pods.Any(p => p.Ready.HasValue && p.Ready.Value)); - if (!isAnyContainerStarted && !context.RequestAborted.IsCancellationRequested) + DeploymentInformation? function = SearchFunction(replicasService, functionName); + if(function == null) + { + continue; + } + bool? isAnyContainerStarted = function.Pods.Any(p => p.Ready.HasValue && p.Ready.Value); + bool isReady = isAnyContainerStarted.Value && function.EndpointReady; + if (!isReady && !context.RequestAborted.IsCancellationRequested) { numberLoop--; await Task.Delay(10, context.RequestAborted); diff --git a/src/SlimFaas/SlimWorker.cs b/src/SlimFaas/SlimWorker.cs index 1045a3bb..6af6e4b0 100644 --- a/src/SlimFaas/SlimWorker.cs +++ b/src/SlimFaas/SlimWorker.cs @@ -1,14 +1,17 @@ using MemoryPack; +using SlimData; using SlimFaas.Database; using SlimFaas.Kubernetes; namespace SlimFaas; -internal record struct RequestToWait(Task Task, CustomRequest CustomRequest); +internal record struct RequestToWait(Task Task, CustomRequest CustomRequest, string Id); public class SlimWorker(ISlimFaasQueue slimFaasQueue, IReplicasService replicasService, - HistoryHttpMemoryService historyHttpService, ILogger logger, IServiceProvider serviceProvider, + HistoryHttpMemoryService historyHttpService, ILogger logger, + IServiceProvider serviceProvider, ISlimDataStatus slimDataStatus, + IMasterService masterService, int delay = EnvironmentVariables.SlimWorkerDelayMillisecondsDefault) : BackgroundService { @@ -40,20 +43,25 @@ private async Task DoOneCycle(CancellationToken stoppingToken, { string functionDeployment = function.Deployment; setTickLastCallCounterDictionary.TryAdd(functionDeployment, 0); - int numberProcessingTasks = ManageProcessingTasks(processingTasks, functionDeployment); - int? numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(slimFaas, function); + int numberProcessingTasks = await ManageProcessingTasksAsync(slimFaasQueue, processingTasks, functionDeployment); + int numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(slimFaas, function); setTickLastCallCounterDictionary[functionDeployment]++; int functionReplicas = function.Replicas; - long queueLength = await UpdateTickLastCallIfRequestStillInProgress(functionReplicas, + long queueLength = await UpdateTickLastCallIfRequestStillInProgress( + masterService, + functionReplicas, setTickLastCallCounterDictionary, - functionDeployment, numberProcessingTasks); + functionDeployment, + numberProcessingTasks, + numberLimitProcessingTasks); + if (functionReplicas == 0 || queueLength <= 0) { continue; } bool? isAnyContainerStarted = function.Pods?.Any(p => p.Ready.HasValue && p.Ready.Value); - if (!isAnyContainerStarted.HasValue || !isAnyContainerStarted.Value) + if (!isAnyContainerStarted.HasValue || !isAnyContainerStarted.Value || !function.EndpointReady) { continue; } @@ -63,8 +71,8 @@ private async Task DoOneCycle(CancellationToken stoppingToken, continue; } - await SendHttpRequestToFunction(processingTasks, numberLimitProcessingTasks, numberProcessingTasks, - functionDeployment); + await SendHttpRequestToFunction(processingTasks, numberLimitProcessingTasks, + function); } } catch (Exception e) @@ -74,49 +82,69 @@ await SendHttpRequestToFunction(processingTasks, numberLimitProcessingTasks, num } private async Task SendHttpRequestToFunction(Dictionary> processingTasks, - int? numberLimitProcessingTasks, int numberProcessingTasks, - string functionDeployment) + int numberLimitProcessingTasks, + DeploymentInformation function) { - int? numberTasksToDequeue = numberLimitProcessingTasks - numberProcessingTasks; - IList jsons = await slimFaasQueue.DequeueAsync(functionDeployment, - numberTasksToDequeue.HasValue ? (long)numberTasksToDequeue : 1); + string functionDeployment = function.Deployment; + var jsons = await slimFaasQueue.DequeueAsync(functionDeployment, numberLimitProcessingTasks); + if (jsons == null) + { + return; + } foreach (var requestJson in jsons) { - CustomRequest customRequest = MemoryPackSerializer.Deserialize(requestJson); + CustomRequest customRequest = MemoryPackSerializer.Deserialize(requestJson.Data); logger.LogDebug("{CustomRequestMethod}: {CustomRequestPath}{CustomRequestQuery} Sending", customRequest.Method, customRequest.Path, customRequest.Query); logger.LogDebug("{RequestJson}", requestJson); historyHttpService.SetTickLastCall(functionDeployment, DateTime.UtcNow.Ticks); using IServiceScope scope = serviceProvider.CreateScope(); + var slimfaasDefaultConfiguration = new SlimFaasDefaultConfiguration() + { + HttpTimeout = function.Configuration.DefaultAsync.HttpTimeout, + TimeoutRetries = [], + HttpStatusRetries = [] + }; Task taskResponse = scope.ServiceProvider.GetRequiredService() - .SendHttpRequestAsync(customRequest); - processingTasks[functionDeployment].Add(new RequestToWait(taskResponse, customRequest)); + .SendHttpRequestAsync(customRequest, slimfaasDefaultConfiguration); + processingTasks[functionDeployment].Add(new RequestToWait(taskResponse, customRequest, requestJson.Id)); } } - private async Task UpdateTickLastCallIfRequestStillInProgress(int? functionReplicas, - Dictionary setTickLastCallCounterDictionnary, string functionDeployment, int numberProcessingTasks) + private async Task UpdateTickLastCallIfRequestStillInProgress(IMasterService masterService, int? functionReplicas, + Dictionary setTickLastCallCounterDictionnary, + string functionDeployment, + int numberProcessingTasks, + int numberLimitProcessingTasks) { - int counterLimit = functionReplicas == 0 ? 10 : 300; - long queueLength = await slimFaasQueue.CountAsync(functionDeployment); - if (setTickLastCallCounterDictionnary[functionDeployment] > counterLimit) + if (masterService.IsMaster) { - setTickLastCallCounterDictionnary[functionDeployment] = 0; + int counterLimit = functionReplicas == 0 ? 10 : 40; + long queueLength = await slimFaasQueue.CountElementAsync(functionDeployment); + if (setTickLastCallCounterDictionnary[functionDeployment] > counterLimit) + { + setTickLastCallCounterDictionnary[functionDeployment] = 0; - if (queueLength > 0 || numberProcessingTasks > 0) + if (queueLength > 0 || numberProcessingTasks > 0) + { + historyHttpService.SetTickLastCall(functionDeployment, DateTime.UtcNow.Ticks); + } + } + + if (queueLength == 0) { - historyHttpService.SetTickLastCall(functionDeployment, DateTime.UtcNow.Ticks); + return 0; } } - return queueLength; + return await slimFaasQueue.CountAvailableElementAsync(functionDeployment, numberLimitProcessingTasks); } - private static int? ComputeNumberLimitProcessingTasks(SlimFaasDeploymentInformation slimFaas, + private static int ComputeNumberLimitProcessingTasks(SlimFaasDeploymentInformation slimFaas, DeploymentInformation function) { - int? numberLimitProcessingTasks; + int numberLimitProcessingTasks; int numberReplicas = slimFaas.Replicas; if (function.NumberParallelRequest < numberReplicas || numberReplicas == 0) @@ -131,14 +159,17 @@ private async Task UpdateTickLastCallIfRequestStillInProgress(int? functio return numberLimitProcessingTasks; } - private int ManageProcessingTasks(Dictionary> processingTasks, + private async Task ManageProcessingTasksAsync(ISlimFaasQueue slimFaasQueue, + Dictionary> processingTasks, string functionDeployment) { if (processingTasks.ContainsKey(functionDeployment) == false) { processingTasks.Add(functionDeployment, new List()); } - + var listQueueItemStatus = new ListQueueItemStatus(); + var queueItemStatusList = new List(); + listQueueItemStatus.Items = queueItemStatusList; List httpResponseMessagesToDelete = new(); foreach (RequestToWait processing in processingTasks[functionDeployment]) { @@ -151,18 +182,20 @@ private int ManageProcessingTasks(Dictionary> proce } HttpResponseMessage httpResponseMessage = processing.Task.Result; - httpResponseMessage.Dispose(); + var statusCode = (int)httpResponseMessage.StatusCode; logger.LogDebug( - "{CustomRequestMethod}: /async-function/{CustomRequestPath}{CustomRequestQuery} {StatusCode}", + "{CustomRequestMethod}: /async-function{CustomRequestPath}{CustomRequestQuery} {StatusCode}", processing.CustomRequest.Method, processing.CustomRequest.Path, processing.CustomRequest.Query, httpResponseMessage.StatusCode); httpResponseMessagesToDelete.Add(processing); + queueItemStatusList.Add(new QueueItemStatus(processing.Id, statusCode)); + httpResponseMessage.Dispose(); } catch (Exception e) { + queueItemStatusList.Add(new QueueItemStatus(processing.Id, 500)); httpResponseMessagesToDelete.Add(processing); logger.LogWarning("Request Error: {Message} {StackTrace}", e.Message, e.StackTrace); - historyHttpService.SetTickLastCall(functionDeployment, DateTime.UtcNow.Ticks); } } @@ -171,6 +204,11 @@ private int ManageProcessingTasks(Dictionary> proce processingTasks[functionDeployment].Remove(httpResponseMessage); } + if (listQueueItemStatus.Items.Count > 0) + { + await slimFaasQueue.ListCallbackAsync(functionDeployment, listQueueItemStatus); + } + int numberProcessingTasks = processingTasks[functionDeployment].Count; return numberProcessingTasks; } diff --git a/src/SlimFaas/SlimfaasSerializer.cs b/src/SlimFaas/SlimfaasSerializer.cs deleted file mode 100644 index 4239abd1..00000000 --- a/src/SlimFaas/SlimfaasSerializer.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System.Text.Json; -using MemoryPack; - -namespace SlimFaas; - -public class SlimfaasSerializer -{ - public static byte[] GetBytes(string str) - { - byte[] bytes = new byte[str.Length * sizeof(char)]; - Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length); - return bytes; - } - - // Do NOT use on arbitrary bytes; only use on GetBytes's output on the SAME system - public static string GetString(byte[] bytes) - { - char[] chars = new char[bytes.Length / sizeof(char)]; - Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length); - return new string(chars); - } - - public static CustomRequest Deserialize(byte[] data) => - MemoryPackSerializer.Deserialize(data); - - public static byte[] SerializeBin(CustomRequest data) => - MemoryPackSerializer.Serialize(data); -} diff --git a/tests/SlimData.Tests/CommandsTests.cs b/tests/SlimData.Tests/CommandsTests.cs index 2ed82e7a..6cb754c3 100644 --- a/tests/SlimData.Tests/CommandsTests.cs +++ b/tests/SlimData.Tests/CommandsTests.cs @@ -17,11 +17,11 @@ public static async Task InterpreterWithPersistentState() { byte[] bytes = RandomBytes(1000); using var wal = new SlimPersistentState(Path.Combine(Path.GetTempPath(), Path.GetRandomFileName())); - var entry1 = wal.CreateLogEntry(new ListLeftPushCommand { Key = "youhou" , Value = bytes }); + var entry1 = wal.CreateLogEntry(new ListLeftPushCommand { Key = "youhou" , Value = bytes, Identifier = "1", RetryTimeout = 100, Retries = new List { 1, 2, 3 }, NowTicks = DateTime.UtcNow.Ticks, HttpStatusCodesWorthRetrying = new List()}); await wal.AppendAsync(entry1); - Assert.Empty(wal.SlimDataState.queues); + Assert.Empty(wal.SlimDataState.Queues); await wal.CommitAsync(CancellationToken.None); - Assert.Equal(bytes, wal.SlimDataState.queues["youhou"].First().ToArray()); + Assert.Equal(bytes, wal.SlimDataState.Queues["youhou"].First().Value.ToArray()); var bin = MemoryPackSerializer.Serialize(3); var final = MemoryPackSerializer.Deserialize(bin); diff --git a/tests/SlimData.Tests/QueueElementExtentionsTests.cs b/tests/SlimData.Tests/QueueElementExtentionsTests.cs new file mode 100644 index 00000000..ae4fbe38 --- /dev/null +++ b/tests/SlimData.Tests/QueueElementExtentionsTests.cs @@ -0,0 +1,103 @@ +namespace SlimData.Tests; + +public class QueueElementExtentionsTests +{ + [Fact] + public static void QueueElementExtensionsGetQueueRunningElement() + { + // I want a test which text my extention + var nowTicks = DateTime.UtcNow.Ticks; + List retries = [2, 6, 10]; + int retryTimeout = 30; + int[] httpStatusCodesWorthRetrying = + [ + // 408 , // HttpStatusCode.RequestTimeout, + 500, // HttpStatusCode.InternalServerError, + 502, // HttpStatusCode.BadGateway, + 503, // HttpStatusCode.ServiceUnavailable, + //504, // HttpStatusCode.GatewayTimeout + ]; + + var timeout = 30; + var timeoutSpanTicks = TimeSpan.FromSeconds(31).Ticks; + List queueElements = new(); + var httpRetriesCode = new List(httpStatusCodesWorthRetrying) ; + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "-1", 090902, timeout, retries, new List() + { + new(nowTicks -100, nowTicks, 500), + new(nowTicks -50, nowTicks, 500), + new(nowTicks -20, nowTicks, 500), + new(nowTicks -10, nowTicks, 500), + }, httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "0", 090902, timeout, retries, new List() + { + new(nowTicks - timeoutSpanTicks -100, nowTicks, 500), + new(nowTicks- timeoutSpanTicks -50, nowTicks, 500), + new(nowTicks- timeoutSpanTicks -30, nowTicks, 500), + new(nowTicks- timeoutSpanTicks -20, 0, 0), + }, httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "0-ok", 090902, timeout, retries, new List() + { + new(nowTicks -100, nowTicks, 200), + }, httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "1", 090902, timeout, retries, new List() + { + new(nowTicks - 1000, nowTicks, 500), + new(nowTicks- 500, nowTicks, 500), + new(nowTicks- 200, nowTicks, 500), + new(nowTicks- 100, 0, 0), + }, httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "1timeout", 090902, timeout, retries, new List() + { + new(nowTicks - 1000, nowTicks, 500), + new(nowTicks- 500, nowTicks, 500), + new(nowTicks- 400, nowTicks, 500), + new(nowTicks- timeoutSpanTicks, 0, 0), + }, httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "2", 090902, timeout, retries, new List(), httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "3", 090902, timeout, retries, new List(), httpRetriesCode)); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "4", 090902, timeout, retries, new List(), httpRetriesCode)); + + var availableElements = queueElements.GetQueueAvailableElement(nowTicks, 3); + + Assert.Equal(2, availableElements.Count); + Assert.Equal("2", availableElements[0].Id); + Assert.Equal("3", availableElements[1].Id); + + var runningElements = queueElements.GetQueueRunningElement(nowTicks); + Assert.Equal(1, runningElements.Count); + Assert.Equal("1", runningElements[0].Id); + + + var finishedElements = queueElements.GetQueueFinishedElement(nowTicks); + Assert.Equal(4, finishedElements.Count); + } + + /* [Fact] + public static void QueueElementExtensionsGetQueueRunningElement2() + { + // I want a test which text my extention + var nowTicks = DateTime.UtcNow.Ticks; + List queueElements = new(); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "1", nowTicks, new List())); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "2", nowTicks, new List())); + queueElements.Add(new QueueElement(new ReadOnlyMemory([1]), "3", nowTicks, new List())); + + var availableElements = queueElements.GetQueueAvailableElement(SlimDataInterpreter.TimeoutRetries, nowTicks, 1, 30); + + foreach (QueueElement queueElement in queueElements) + { + Assert.Equal(2, availableElements.Count); + Assert.Equal("2", availableElements[0].Id); + Assert.Equal("3", availableElements[1].Id); + } + + var runningElements = queueElements.GetQueueRunningElement(nowTicks); + Assert.Equal(1, runningElements.Count); + Assert.Equal("1", runningElements[0].Id); + + + var finishedElements = queueElements.GetQueueFinishedElement(nowTicks, SlimDataInterpreter.TimeoutRetries); + Assert.Equal(4, finishedElements.Count); + }*/ +} diff --git a/tests/SlimData.Tests/RaftClusterTests.cs b/tests/SlimData.Tests/RaftClusterTests.cs index 9f49e29a..e95c4fd7 100644 --- a/tests/SlimData.Tests/RaftClusterTests.cs +++ b/tests/SlimData.Tests/RaftClusterTests.cs @@ -249,13 +249,32 @@ await databaseServiceSlave.HashSetAsync("hashsetKey1", Assert.Equal("value1", hashGet["field1"]); Assert.Equal("value2", hashGet["field2"]); - await databaseServiceSlave.ListLeftPushAsync("listKey1", MemoryPackSerializer.Serialize("value1")); + await databaseServiceSlave.ListLeftPushAsync("listKey1", MemoryPackSerializer.Serialize("value1"), new RetryInformation([], 30, [])); await GetLocalClusterView(host1).ForceReplicationAsync(); - long listLength = await databaseServiceSlave.ListLengthAsync("listKey1"); + long listLength = await databaseServiceSlave.ListCountAvailableElementAsync("listKey1"); Assert.Equal(1, listLength); - IList listRightPop = await databaseServiceSlave.ListRightPopAsync("listKey1"); - Assert.Equal("value1", MemoryPackSerializer.Deserialize(listRightPop[0])); + IList? listRightPop = await databaseServiceSlave.ListRightPopAsync("listKey1"); + Assert.Equal("value1", MemoryPackSerializer.Deserialize(listRightPop.First().Data)); + + ListQueueItemStatus queueItemStatus = new() + { + Items = new List { }, + }; + foreach (QueueData queueData in listRightPop) + { + queueItemStatus.Items.Add(new QueueItemStatus + { + Id = queueData.Id, + HttpCode = 200, + }); + } + await databaseServiceSlave.ListCallbackAsync("listKey1", queueItemStatus); + + await GetLocalClusterView(host1).ForceReplicationAsync(); + var listLength2 = await databaseServiceSlave.ListCountAvailableElementAsync("listKey1"); + + Assert.Equal(0, listLength2); await host1.StopAsync(); await host2.StopAsync(); diff --git a/tests/SlimData.Tests/SlimData.Tests.csproj b/tests/SlimData.Tests/SlimData.Tests.csproj index 7a070244..89c99cb2 100644 --- a/tests/SlimData.Tests/SlimData.Tests.csproj +++ b/tests/SlimData.Tests/SlimData.Tests.csproj @@ -1,7 +1,7 @@ - net8.0 + net9.0 enable enable @@ -13,13 +13,13 @@ - - + + - - + + - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs b/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs index 04e7698f..d3c8db85 100644 --- a/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs +++ b/tests/SlimFaas.Tests/HistorySynchronizationWorkerShould.cs @@ -17,11 +17,11 @@ public async Task SyncLastTicksBetweenDatabaseAndMemory() DeploymentsInformations deploymentsInformations = new DeploymentsInformations( new List { - new("fibonacci1", "default", Replicas: 1, Pods: new List()), - new("fibonacci2", "default", Replicas: 0, Pods: new List()) + new("fibonacci1", "default", Replicas: 1, Pods: new List(), Configuration: new SlimFaasConfiguration()), + new("fibonacci2", "default", Replicas: 0, Pods: new List(), Configuration: new SlimFaasConfiguration()) }, new SlimFaasDeploymentInformation(1, new List()), new List()); - kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny())).ReturnsAsync(deploymentsInformations); + kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny(), It.IsAny())).ReturnsAsync(deploymentsInformations); HistoryHttpMemoryService historyHttpMemoryService = new HistoryHttpMemoryService(); Mock> loggerReplicasService = new Mock>(); diff --git a/tests/SlimFaas.Tests/ProgramShould.cs b/tests/SlimFaas.Tests/ProgramShould.cs index bea1efd4..8ca51276 100644 --- a/tests/SlimFaas.Tests/ProgramShould.cs +++ b/tests/SlimFaas.Tests/ProgramShould.cs @@ -11,9 +11,7 @@ public async Task TestRootEndpoint() Environment.SetEnvironmentVariable(EnvironmentVariables.SlimDataConfiguration, "{\"coldStart\":\"true\"}"); Environment.SetEnvironmentVariable(EnvironmentVariables.MockKubernetesFunctions, "{\"Functions\":[{\"Name\":\"fibonacci1\",\"NumberParallelRequest\":1},{\"Name\":\"fibonacci2\",\"NumberParallelRequest\":1}],\"Slimfaas\":[{\"Name\":\"slimfaas-1\"}]}"); -#pragma warning disable CA2252 await using WebApplicationFactory application = new(); -#pragma warning restore CA2252 using HttpClient client = application.CreateClient(); string response = await client.GetStringAsync("http://localhost:5000/health"); diff --git a/tests/SlimFaas.Tests/ReplicasScaleWorkerShould.cs b/tests/SlimFaas.Tests/ReplicasScaleWorkerShould.cs index 8ab15292..0500c12e 100644 --- a/tests/SlimFaas.Tests/ReplicasScaleWorkerShould.cs +++ b/tests/SlimFaas.Tests/ReplicasScaleWorkerShould.cs @@ -22,8 +22,8 @@ public IEnumerator GetEnumerator() new DeploymentsInformations( new List { - new("fibonacci1", "default", Replicas: 1, Pods: new List()), - new("fibonacci2", "default", Replicas: 0, Pods: new List()) + new("fibonacci1", "default", Replicas: 1, Pods: new List(), Configuration: new SlimFaasConfiguration()), + new("fibonacci2", "default", Replicas: 0, Pods: new List(), Configuration: new SlimFaasConfiguration()) }, new SlimFaasDeploymentInformation(1, new List()), new List() @@ -36,8 +36,8 @@ public IEnumerator GetEnumerator() new DeploymentsInformations( new List { - new("fibonacci1", "default", Replicas: 1, Pods: new List() { new PodInformation("fibonacci1", true, true, "localhost", "fibonacci1") }), - new("fibonacci2", "default", Replicas: 0, Pods: new List(), DependsOn: new List { "fibonacci1" }) + new("fibonacci1", "default", Replicas: 1, Pods: new List() { new PodInformation("fibonacci1", true, true, "localhost", "fibonacci1")}, Configuration: new SlimFaasConfiguration()), + new("fibonacci2", "default", Replicas: 0, Pods: new List(), DependsOn: new List { "fibonacci1" }, Configuration: new SlimFaasConfiguration()) }, new SlimFaasDeploymentInformation(1, new List()), new List() @@ -68,7 +68,7 @@ public async Task ScaleFunctionUpAndDown(DeploymentsInformations deploymentsInfo historyHttpService, loggerReplicasService.Object); masterService.Setup(ms => ms.IsMaster).Returns(true); - kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny())).ReturnsAsync(deploymentsInformations); + kubernetesService.Setup(k => k.ListFunctionsAsync(It.IsAny(), It.IsAny())).ReturnsAsync(deploymentsInformations); ReplicaRequest scaleRequestFibonacci1 = new("fibonacci1", "default", 0, PodType.Deployment); kubernetesService.Setup(k => k.ScaleAsync(scaleRequestFibonacci1)).ReturnsAsync(scaleRequestFibonacci1); @@ -119,6 +119,7 @@ public void GetTimeoutSecondBeforeSetReplicasMin() var deplymentInformation = new DeploymentInformation("fibonacci1", "default", Replicas: 1, + Configuration: new SlimFaasConfiguration(), Pods: new List() { new PodInformation("fibonacci1", true, true, "localhost", "fibonacci1") @@ -152,6 +153,7 @@ public void GetLastTicksFromSchedule() var deploymentInformation = new DeploymentInformation("fibonacci1", "default", Replicas: 1, + Configuration: new SlimFaasConfiguration(), Pods: new List() { new PodInformation("fibonacci1", true, true, "localhost", "fibonacci1") diff --git a/tests/SlimFaas.Tests/ReplicasSynchronizationWorkerShould.cs b/tests/SlimFaas.Tests/ReplicasSynchronizationWorkerShould.cs index 0d72e983..d9441e1b 100644 --- a/tests/SlimFaas.Tests/ReplicasSynchronizationWorkerShould.cs +++ b/tests/SlimFaas.Tests/ReplicasSynchronizationWorkerShould.cs @@ -20,8 +20,8 @@ public IEnumerator GetEnumerator() new DeploymentsInformations( new List { - new("fibonacci1", "default", Replicas: 1, Pods: new List()), - new("fibonacci2", "default", Replicas: 0, Pods: new List()) + new("fibonacci1", "default", Replicas: 1, Pods: new List(), Configuration: new SlimFaasConfiguration()), + new("fibonacci2", "default", Replicas: 0, Pods: new List(), Configuration: new SlimFaasConfiguration()) }, new SlimFaasDeploymentInformation(1, new List()), new List() diff --git a/tests/SlimFaas.Tests/SendClientShould.cs b/tests/SlimFaas.Tests/SendClientShould.cs index 175289c1..2380a177 100644 --- a/tests/SlimFaas.Tests/SendClientShould.cs +++ b/tests/SlimFaas.Tests/SendClientShould.cs @@ -2,6 +2,7 @@ using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Moq; +using SlimFaas.Kubernetes; namespace SlimFaas.Tests; @@ -36,7 +37,7 @@ public async Task CallFunctionAsync(string httpMethod) CustomRequest customRequest = new CustomRequest(new List { new() { Key = "key", Values = new[] { "value1" } } }, new byte[1], "fibonacci", "health", httpMethod, ""); - HttpResponseMessage response = await sendClient.SendHttpRequestAsync(customRequest); + HttpResponseMessage response = await sendClient.SendHttpRequestAsync(customRequest, new SlimFaasDefaultConfiguration()); Uri expectedUri = new Uri("http://fibonacci:8080/health"); Assert.Equal(HttpStatusCode.OK, response.StatusCode); @@ -83,7 +84,9 @@ public async Task CallFunctionSync(string httpMethod) httpContextRequest.ContentLength = 1; httpContextRequest.ContentType = "application/json"; - HttpResponseMessage response = await sendClient.SendHttpRequestSync(httpContext, "fibonacci", "health", ""); + SlimFaasDefaultConfiguration slimFaasDefaultConfiguration = new(); + + HttpResponseMessage response = await sendClient.SendHttpRequestSync(httpContext, "fibonacci", "health", "", slimFaasDefaultConfiguration); Uri expectedUri = new Uri("http://fibonacci:8080/health"); Assert.NotNull(sendedRequest); diff --git a/tests/SlimFaas.Tests/SlimFaas.Tests.csproj b/tests/SlimFaas.Tests/SlimFaas.Tests.csproj index 48b56da3..4eba54c5 100644 --- a/tests/SlimFaas.Tests/SlimFaas.Tests.csproj +++ b/tests/SlimFaas.Tests/SlimFaas.Tests.csproj @@ -1,7 +1,7 @@ - net8.0 + net9.0 enable enable @@ -14,13 +14,13 @@ - - - - + + + + - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs b/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs index 295f5cff..624f7df2 100644 --- a/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs +++ b/tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs @@ -9,6 +9,7 @@ using Moq; using SlimFaas.Kubernetes; using MemoryPack; +using SlimData; using SlimFaas.Database; namespace SlimFaas.Tests; @@ -20,7 +21,7 @@ internal class MemoryReplicasService : IReplicasService new List { new(Replicas: 0, Deployment: "fibonacci", Namespace: "default", - Pods: new List { new("", true, true, "", "") }) + Pods: new List { new("", true, true, "", "") }, Configuration: new SlimFaasConfiguration()) }, new SlimFaasDeploymentInformation(1, new List()), new List()); public Task SyncDeploymentsAsync(string kubeNamespace) => throw new NotImplementedException(); @@ -53,6 +54,7 @@ internal class MemoryReplicas2ReplicasService : IReplicasService "/noprefix", }, Namespace: "default", + Configuration: new SlimFaasConfiguration(), Pods: new List { new("fibonacci-1", true, true, "0", "fibonacci"), new("fibonacci-2", true, true, "0", "fibonacci"), @@ -72,11 +74,15 @@ public async Task SyncDeploymentsFromSlimData(DeploymentsInformations deployment internal class MemorySlimFaasQueue : ISlimFaasQueue { - public Task> DequeueAsync(string key, long count = 1) => throw new NotImplementedException(); + public Task?> DequeueAsync(string key, int count = 1) => throw new NotImplementedException(); - public Task CountAsync(string key) => throw new NotImplementedException(); + public Task CountAvailableElementAsync(string key, int maximum) => throw new NotImplementedException(); + public Task CountElementAsync(string key, int maximum) => throw new NotImplementedException(); + + public Task ListCallbackAsync(string key, ListQueueItemStatus queueItemStatus) => throw new NotImplementedException(); + + public async Task EnqueueAsync(string key, byte[] message, RetryInformation retryInformation) => await Task.Delay(100); - public async Task EnqueueAsync(string key, byte[] message) => await Task.Delay(100); } internal record SendData(string FunctionName, string Path, string BaseUrl); @@ -84,7 +90,7 @@ internal record SendData(string FunctionName, string Path, string BaseUrl); internal class SendClientMock : ISendClient { public IList SendDatas = new List(); - public Task SendHttpRequestAsync(CustomRequest customRequest, HttpContext? context = null, string? baseUrl = null) + public Task SendHttpRequestAsync(CustomRequest customRequest, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null, CancellationTokenSource? cancellationToken = null) { HttpResponseMessage responseMessage = new HttpResponseMessage(); responseMessage.StatusCode = HttpStatusCode.OK; @@ -94,7 +100,7 @@ public Task SendHttpRequestAsync(CustomRequest customReques } public Task SendHttpRequestSync(HttpContext httpContext, string functionName, - string functionPath, string functionQuery, string? baseUrl = null) + string functionPath, string functionQuery, SlimFaasDefaultConfiguration slimFaasDefaultConfiguration, string? baseUrl = null) { HttpResponseMessage responseMessage = new HttpResponseMessage(); responseMessage.StatusCode = HttpStatusCode.OK; @@ -169,7 +175,8 @@ public async Task CallFunctionInSyncModeAndReturnOk(string path, HttpStatusCode HttpResponseMessage responseMessage = new HttpResponseMessage(); responseMessage.StatusCode = HttpStatusCode.OK; Mock sendClientMock = new Mock(); - sendClientMock.Setup(s => s.SendHttpRequestAsync(It.IsAny(), It.IsAny(), It.IsAny())) + sendClientMock.Setup(s => s.SendHttpRequestAsync(It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(responseMessage); using IHost host = await new HostBuilder() diff --git a/tests/SlimFaas.Tests/SlimWorkerShould.cs b/tests/SlimFaas.Tests/SlimWorkerShould.cs index 02f4d311..8d1f6f19 100644 --- a/tests/SlimFaas.Tests/SlimWorkerShould.cs +++ b/tests/SlimFaas.Tests/SlimWorkerShould.cs @@ -6,6 +6,7 @@ using SlimFaas.Kubernetes; using MemoryPack; using SlimFaas.Database; +using SlimData; namespace SlimFaas.Tests; @@ -18,7 +19,7 @@ public async Task OnlyCallOneFunctionAsync() responseMessage.StatusCode = HttpStatusCode.OK; Mock sendClientMock = new Mock(); - sendClientMock.Setup(s => s.SendHttpRequestAsync(It.IsAny(), It.IsAny(), It.IsAny())) + sendClientMock.Setup(s => s.SendHttpRequestAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(responseMessage); Mock serviceProvider = new Mock(); @@ -41,6 +42,9 @@ public async Task OnlyCallOneFunctionAsync() Mock slimDataStatus = new Mock(); slimDataStatus.Setup(s => s.WaitForReadyAsync()).Returns(Task.CompletedTask); + Mock masterService = new Mock(); + masterService.Setup(s => s.IsMaster).Returns(true); + Mock replicasService = new Mock(); replicasService.Setup(rs => rs.Deployments).Returns(new DeploymentsInformations( SlimFaas: new SlimFaasDeploymentInformation(2, new List()), @@ -48,50 +52,53 @@ public async Task OnlyCallOneFunctionAsync() { new(Replicas: 1, Deployment: "fibonacci", Namespace: "default", NumberParallelRequest: 1, ReplicasMin: 0, ReplicasAtStart: 1, TimeoutSecondBeforeSetReplicasMin: 300, - ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, - Pods: new List { new("", true, true, "", "") }), + ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, Configuration: new SlimFaasConfiguration(), + Pods: new List { new("", true, true, "", "")}, EndpointReady: true), new(Replicas: 1, Deployment: "no-pod-started", Namespace: "default", NumberParallelRequest: 1, ReplicasMin: 0, ReplicasAtStart: 1, TimeoutSecondBeforeSetReplicasMin: 300, - ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, - Pods: new List { new("", false, false, "", "") }), + ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, Configuration: new SlimFaasConfiguration(), + Pods: new List { new("", false, false, "", "")}, EndpointReady: true), new(Replicas: 0, Deployment: "no-replicas", Namespace: "default", NumberParallelRequest: 1, - ReplicasMin: 0, ReplicasAtStart: 1, TimeoutSecondBeforeSetReplicasMin: 300, - ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, Pods: new List()) + ReplicasMin: 0, ReplicasAtStart: 1, TimeoutSecondBeforeSetReplicasMin: 300, Configuration: new SlimFaasConfiguration(), + ReplicasStartAsSoonAsOneFunctionRetrieveARequest: true, Pods: new List(), EndpointReady: false) }, Pods: new List())); HistoryHttpMemoryService historyHttpService = new HistoryHttpMemoryService(); Mock> logger = new Mock>(); - SlimFaasSlimFaasQueue redisQueue = new SlimFaasSlimFaasQueue(new DatabaseMockService()); + SlimFaasQueue slimFaasQueue = new SlimFaasQueue(new DatabaseMockService()); CustomRequest customRequest = new CustomRequest(new List { new() { Key = "key", Values = new[] { "value1" } } }, new byte[1], "fibonacci", "/download", "GET", ""); var jsonCustomRequest = MemoryPackSerializer.Serialize(customRequest); - await redisQueue.EnqueueAsync("fibonacci", jsonCustomRequest); + var retryInformation = new RetryInformation([], 30, []); + await slimFaasQueue.EnqueueAsync("fibonacci", jsonCustomRequest, retryInformation); CustomRequest customRequestNoPodStarted = new CustomRequest(new List { new() { Key = "key", Values = new[] { "value1" } } }, new byte[1], "no-pod-started", "/download", "GET", ""); var jsonCustomNoPodStarted = MemoryPackSerializer.Serialize(customRequestNoPodStarted); - await redisQueue.EnqueueAsync("no-pod-started", jsonCustomNoPodStarted); + await slimFaasQueue.EnqueueAsync("no-pod-started", jsonCustomNoPodStarted, retryInformation); CustomRequest customRequestReplicas = new CustomRequest(new List { new() { Key = "key", Values = new[] { "value1" } } }, new byte[1], "no-replicas", "/download", "GET", ""); var jsonCustomNoReplicas = MemoryPackSerializer.Serialize(customRequestReplicas); - await redisQueue.EnqueueAsync("no-replicas", jsonCustomNoReplicas); + await slimFaasQueue.EnqueueAsync("no-replicas", jsonCustomNoReplicas, retryInformation); - SlimWorker service = new SlimWorker(redisQueue, + SlimWorker service = new SlimWorker(slimFaasQueue, replicasService.Object, historyHttpService, logger.Object, - serviceProvider.Object, slimDataStatus.Object); + serviceProvider.Object, + slimDataStatus.Object, + masterService.Object); Task task = service.StartAsync(CancellationToken.None); await Task.Delay(3000); Assert.True(task.IsCompleted); - sendClientMock.Verify(v => v.SendHttpRequestAsync(It.IsAny(), It.IsAny(), It.IsAny()), + sendClientMock.Verify(v => v.SendHttpRequestAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once()); } @@ -103,16 +110,20 @@ public async Task LogErrorWhenExceptionIsThrown() replicasService.Setup(rs => rs.Deployments).Throws(new Exception()); HistoryHttpMemoryService historyHttpService = new HistoryHttpMemoryService(); Mock> logger = new Mock>(); - SlimFaasSlimFaasQueue redisQueue = new SlimFaasSlimFaasQueue(new DatabaseMockService()); + SlimFaasQueue redisQueue = new SlimFaasQueue(new DatabaseMockService()); Mock slimDataStatus = new Mock(); slimDataStatus.Setup(s => s.WaitForReadyAsync()).Returns(Task.CompletedTask); + Mock masterService = new Mock(); + masterService.Setup(s => s.IsMaster).Returns(true); + SlimWorker service = new SlimWorker(redisQueue, replicasService.Object, historyHttpService, logger.Object, serviceProvider.Object, - slimDataStatus.Object); + slimDataStatus.Object, + masterService.Object); Task task = service.StartAsync(CancellationToken.None);