Skip to content

Commit

Permalink
define buffer api (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
eventhorizon-cli authored Nov 24, 2023
1 parent a17af3f commit 5d25923
Show file tree
Hide file tree
Showing 33 changed files with 1,984 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: .NET format
name: .NET Build

on:
push:
Expand All @@ -7,7 +7,7 @@ on:
branches: [ "main" ]

jobs:
dotnet-format:
dotnet-build:

runs-on: ubuntu-latest

Expand All @@ -27,3 +27,9 @@ jobs:
run: dotnet build
- name: Format
run: dotnet format --verify-no-changes --verbosity diagnostic
- name: Test
run: dotnet test -c Release --collect:"XPlat Code Coverage"
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
8 changes: 8 additions & 0 deletions Mocha.sln
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.gitmodules = .gitmodules
LICENSE = LICENSE
README.md = README.md
.github\workflows\dotnet-build.yml = .github\workflows\dotnet-build.yml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mocha.Storage", "src\Mocha.Storage\Mocha.Storage.csproj", "{8EEB6697-B975-430D-9CC3-3048E76C5ECA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mocha.Core.Benchmarks", "tests\Mocha.Core.Benchmarks\Mocha.Core.Benchmarks.csproj", "{2107E75D-9717-4CCD-BE85-713BEF75366A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -37,6 +40,7 @@ Global
{2C317C3A-E138-419A-9570-975C62E6B4F4} = {6983D239-07DA-4DFA-9AAA-F6876029FF8D}
{F53E1E73-CA7D-4921-8CEC-5629379AEDB7} = {24F9E34A-D92A-4C0A-851F-1E864181BF97}
{8EEB6697-B975-430D-9CC3-3048E76C5ECA} = {6983D239-07DA-4DFA-9AAA-F6876029FF8D}
{2107E75D-9717-4CCD-BE85-713BEF75366A} = {24F9E34A-D92A-4C0A-851F-1E864181BF97}
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{DCA600F0-4D6C-44DA-A493-F63097CCE74E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand All @@ -63,5 +67,9 @@ Global
{8EEB6697-B975-430D-9CC3-3048E76C5ECA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8EEB6697-B975-430D-9CC3-3048E76C5ECA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8EEB6697-B975-430D-9CC3-3048E76C5ECA}.Release|Any CPU.Build.0 = Release|Any CPU
{2107E75D-9717-4CCD-BE85-713BEF75366A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2107E75D-9717-4CCD-BE85-713BEF75366A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2107E75D-9717-4CCD-BE85-713BEF75366A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2107E75D-9717-4CCD-BE85-713BEF75366A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
13 changes: 13 additions & 0 deletions src/Mocha.Core/Buffer/BufferConsumerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer;

public class BufferConsumerOptions
{
public string TopicName { get; init; } = default!;

public string GroupName { get; init; } = default!;

public bool AutoCommit { get; init; }
}
16 changes: 16 additions & 0 deletions src/Mocha.Core/Buffer/BufferOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using Microsoft.Extensions.DependencyInjection;

namespace Mocha.Core.Buffer;

public class BufferOptionsBuilder
{
public BufferOptionsBuilder(IServiceCollection services)
{
Services = services;
}

public IServiceCollection Services { get; }
}
37 changes: 37 additions & 0 deletions src/Mocha.Core/Buffer/BufferQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using Microsoft.Extensions.DependencyInjection;

namespace Mocha.Core.Buffer;

internal class BufferQueue : IBufferQueue
{
private readonly IServiceProvider _serviceProvider;

public BufferQueue(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public IBufferProducer<T> CreateProducer<T>(string topicName)
{
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));
var queue = _serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(topicName);
return queue.CreateProducer();
}

public IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options)
{
ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName));
var queue = _serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(options.TopicName);
return queue.CreateConsumer(options);
}

public IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber)
{
ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName));
var queue = _serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(options.TopicName);
return queue.CreateConsumers(options, consumerNumber);
}
}
18 changes: 18 additions & 0 deletions src/Mocha.Core/Buffer/BufferServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using Mocha.Core.Buffer;

namespace Microsoft.Extensions.DependencyInjection;

public static class BufferServiceCollectionExtensions
{
public static IServiceCollection AddBuffer(
this IServiceCollection services,
Action<BufferOptionsBuilder> configure)
{
services.AddSingleton<IBufferQueue, BufferQueue>();
configure(new BufferOptionsBuilder(services));
return services;
}
}
15 changes: 15 additions & 0 deletions src/Mocha.Core/Buffer/IBufferConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer;

public interface IBufferConsumer<out T>
{
string TopicName { get; }

string GroupName { get; }

IAsyncEnumerable<T> ConsumeAsync(CancellationToken cancellationToken = default);

ValueTask CommitAsync();
}
11 changes: 11 additions & 0 deletions src/Mocha.Core/Buffer/IBufferProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer;

public interface IBufferProducer<in T>
{
string TopicName { get; }

ValueTask ProduceAsync(T item);
}
13 changes: 13 additions & 0 deletions src/Mocha.Core/Buffer/IBufferQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer;

public interface IBufferQueue
{
IBufferProducer<T> CreateProducer<T>(string topicName);

IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options);

IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber);
}
15 changes: 15 additions & 0 deletions src/Mocha.Core/Buffer/IBufferQueueT.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer;

internal interface IBufferQueue<T>
{
string TopicName { get; }

IBufferProducer<T> CreateProducer();

IBufferConsumer<T> CreateConsumer(BufferConsumerOptions options);

IEnumerable<IBufferConsumer<T>> CreateConsumers(BufferConsumerOptions options, int consumerNumber);
}
19 changes: 19 additions & 0 deletions src/Mocha.Core/Buffer/Memory/BufferOptionsBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using Mocha.Core.Buffer.Memory;

namespace Mocha.Core.Buffer;

public static class BufferOptionsBuilderExtensions
{
public static BufferOptionsBuilder UseMemory(
this BufferOptionsBuilder builder,
Action<MemoryBufferOptions> configure)
{
var options = new MemoryBufferOptions(builder.Services);
configure(options);

return builder;
}
}
105 changes: 105 additions & 0 deletions src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

namespace Mocha.Core.Buffer.Memory;

internal sealed class MemoryBufferQueue<T> : IBufferQueue<T>
{
private readonly MemoryBufferPartition<T>[] _partitions;
private readonly int _partitionNumber;

private readonly IBufferProducer<T> _producer;

// Consider that the frequency of creating consumers will not be very high,
// so the lock is relatively coarse-grained.
private readonly object _consumersLock;
private readonly Dictionary<string /* GroupName */, List<MemoryBufferConsumer<T>>> _consumers;

public MemoryBufferQueue(string topicName, int partitionNumber)
{
TopicName = topicName;
_partitionNumber = partitionNumber;
_partitions = new MemoryBufferPartition<T>[partitionNumber];
for (var i = 0; i < partitionNumber; i++)
{
_partitions[i] = new MemoryBufferPartition<T>();
}

_producer = new MemoryBufferProducer<T>(topicName, _partitions);

_consumers = new Dictionary<string, List<MemoryBufferConsumer<T>>>();
_consumersLock = new object();
}

public string TopicName { get; }

public IBufferProducer<T> CreateProducer() => _producer;

public IBufferConsumer<T> CreateConsumer(BufferConsumerOptions options)
{
var consumers = CreateConsumers(options, 1);
return consumers.Single();
}

public IEnumerable<IBufferConsumer<T>> CreateConsumers(BufferConsumerOptions options, int consumerNumber)
{
if (consumerNumber < 1)
{
throw new ArgumentOutOfRangeException(nameof(consumerNumber),
"The number of consumers must be greater than 0.");
}

if (consumerNumber > _partitionNumber)
{
throw new ArgumentOutOfRangeException(nameof(consumerNumber),
"The number of consumers cannot be greater than the number of partitions.");
}

var groupName = options.GroupName;
ArgumentException.ThrowIfNullOrEmpty(groupName, nameof(options.GroupName));

lock (_consumersLock)
{
if (_consumers.ContainsKey(groupName))
{
throw new InvalidOperationException($"The consumer group '{groupName}' already exists.");
}

var consumers = new List<MemoryBufferConsumer<T>>();
for (var i = 0; i < consumerNumber; i++)
{
var consumer = new MemoryBufferConsumer<T>(options);
consumers.Add(consumer);
}

AssignPartitions(consumers);

_consumers.Add(groupName, consumers);
return consumers;
}
}

private void AssignPartitions(List<MemoryBufferConsumer<T>> consumers)
{
var consumerNumber = consumers.Count;
var partitionsPerConsumer = _partitionNumber / consumerNumber;
var partitionsRemainder = _partitionNumber % consumerNumber;
var partitionStartIndex = 0;
foreach (var consumer in consumers)
{
var extraPartitions = partitionsRemainder > 0 ? 1 : 0;
var partitionEndIndex = partitionStartIndex
+ partitionsPerConsumer
+ extraPartitions;
var partitions = _partitions[partitionStartIndex..partitionEndIndex];
consumer.AssignPartitions(partitions);

partitionStartIndex = partitionEndIndex;

if (partitionsRemainder > 0)
{
partitionsRemainder--;
}
}
}
}
Loading

0 comments on commit 5d25923

Please sign in to comment.