Skip to content

Commit

Permalink
Review serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Nov 22, 2023
1 parent 6c08e34 commit 497acee
Show file tree
Hide file tree
Showing 71 changed files with 685 additions and 776 deletions.
7 changes: 0 additions & 7 deletions RabbitMQ.Next.sln
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Next.Serialization
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Next.Serialization.MessagePack", "src\RabbitMQ.Next.Serialization.MessagePack\RabbitMQ.Next.Serialization.MessagePack.csproj", "{973AEE92-6A7E-4C5A-8295-F89B3384F393}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Next.Serialization", "src\RabbitMQ.Next.Serialization\RabbitMQ.Next.Serialization.csproj", "{90BA399F-F446-4FA1-A0B3-4C5853EE61CC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Next.TopologyBuilder.Abstractions", "src\RabbitMQ.Next.TopologyBuilder.Abstractions\RabbitMQ.Next.TopologyBuilder.Abstractions.csproj", "{C34F7844-CF6A-4247-961A-415D17E127EC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Next.Serialization.NewtonsoftJson", "src\RabbitMQ.Next.Serialization.NewtonsoftJson\RabbitMQ.Next.Serialization.NewtonsoftJson.csproj", "{6D34D919-0630-40EB-9291-CB28DB2BBA25}"
Expand Down Expand Up @@ -122,10 +120,6 @@ Global
{973AEE92-6A7E-4C5A-8295-F89B3384F393}.Debug|Any CPU.Build.0 = Debug|Any CPU
{973AEE92-6A7E-4C5A-8295-F89B3384F393}.Release|Any CPU.ActiveCfg = Release|Any CPU
{973AEE92-6A7E-4C5A-8295-F89B3384F393}.Release|Any CPU.Build.0 = Release|Any CPU
{90BA399F-F446-4FA1-A0B3-4C5853EE61CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90BA399F-F446-4FA1-A0B3-4C5853EE61CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90BA399F-F446-4FA1-A0B3-4C5853EE61CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90BA399F-F446-4FA1-A0B3-4C5853EE61CC}.Release|Any CPU.Build.0 = Release|Any CPU
{C34F7844-CF6A-4247-961A-415D17E127EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C34F7844-CF6A-4247-961A-415D17E127EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C34F7844-CF6A-4247-961A-415D17E127EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -162,7 +156,6 @@ Global
{CA01E04C-3659-4426-A9F6-A4145C63B70A} = {750B1D71-E43B-4714-8AA3-AB9B0EC7E931}
{A44E120A-C992-41F0-8484-296AAA95668A} = {AD3AF1AA-71EE-4A2C-84DD-5C7DF141C187}
{973AEE92-6A7E-4C5A-8295-F89B3384F393} = {AD3AF1AA-71EE-4A2C-84DD-5C7DF141C187}
{90BA399F-F446-4FA1-A0B3-4C5853EE61CC} = {C895EE24-13DF-45C3-AF1D-B6B6EE4A10A9}
{C34F7844-CF6A-4247-961A-415D17E127EC} = {C895EE24-13DF-45C3-AF1D-B6B6EE4A10A9}
{6D34D919-0630-40EB-9291-CB28DB2BBA25} = {AD3AF1AA-71EE-4A2C-84DD-5C7DF141C187}
{95003EED-4B62-430E-8A4D-1D3B09CAA173} = {AD3AF1AA-71EE-4A2C-84DD-5C7DF141C187}
Expand Down
26 changes: 9 additions & 17 deletions docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ static async Task Main()
{
await using var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.UseDynamicSerializer(serializer => serializer
.When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer()
.When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer()
.When(_ => true).UseSystemJsonSerializer()
)
.ConnectAsync();

Console.WriteLine("Connection opened");
Expand All @@ -27,15 +32,7 @@ static async Task Main()

private static async Task PublishMessagesAsync(IConnection connection)
{
await using var publisher = connection.Publisher(
"amq.fanout",
builder => builder
.UseDynamicSerializer(serializer => serializer
.When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer()
.When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer()
.When(_ => true).UseSystemJsonSerializer()
)
);
await using var publisher = connection.Publisher("amq.fanout");

// The message will be formatted using MessagePackSerializer, because there is corresponding registration
await publisher.PublishAsync(new DummyDto { SomeProperty = "some message with msgpack content type"},
Expand All @@ -56,17 +53,12 @@ private static async Task ConsumeMessagesAsync(IConnection connection)
await using var consumer = connection.Consumer(
builder => builder
.BindToQueue("my-queue")
.PrefetchCount(10)
.UseDynamicSerializer(serializer => serializer
.When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer()
.When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer()
.When(_ => true).UseSystemJsonSerializer()
));
.PrefetchCount(10));

var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds
await consumer.ConsumeAsync(message =>
await consumer.ConsumeAsync((message, content) =>
{
Console.WriteLine($"Message content-type: {message.Properties.ContentType}, {message.Content<DummyDto>().SomeProperty}");
Console.WriteLine($"Message content-type: {message.ContentType}, {content.Get<DummyDto>().SomeProperty}");
} ,cancellation.Token);
}
}
12 changes: 6 additions & 6 deletions docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ static async Task Main()
Console.WriteLine("Hello World! This is consumer based on RabbitMQ.Next library.");

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.Endpoint("amqp://guest:guest@localhost:5672/")
.UsePlainTextSerializer()
.ConnectAsync()
.ConfigureAwait(false);

Expand All @@ -22,19 +23,18 @@ static async Task Main()
await using var consumer = connection.Consumer(
builder => builder
.BindToQueue("test-queue")
.PrefetchCount(10)
.UsePlainTextSerializer());
.PrefetchCount(10));

Console.WriteLine("Consumer created. Press Ctrl+C to exit.");

var cancellation = new CancellationTokenSource();

MonitorKeypressAsync(cancellation);

await consumer.ConsumeAsync(async message =>
await consumer.ConsumeAsync(async (message, content) =>
{
Console.WriteLine($"[{DateTimeOffset.Now.TimeOfDay}] Message received via '{message.Exchange}' exchange: {message.Content<string>()}");
} ,cancellation.Token).ConfigureAwait(false);
Console.WriteLine($"[{DateTimeOffset.Now.TimeOfDay}] Message received via '{message.Exchange}' exchange: {content.Get<string>()}");
} ,cancellation.Token);
}

private static Task MonitorKeypressAsync(CancellationTokenSource cancellation)
Expand Down
15 changes: 13 additions & 2 deletions docs/examples/RabbitMQ.Next.Examples.SimplePublisher/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;

using RabbitMQ.Next.Publisher;
Expand All @@ -13,13 +14,23 @@ static async Task Main()
Console.WriteLine("Hello World! This is publisher based on RabbitMQ.Next library.");

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.Endpoint("amqp://guest:guest@localhost:5672/")
.UsePlainTextSerializer()
.ConnectAsync()
.ConfigureAwait(false);

Console.WriteLine("Connection opened");

await using var publisher = connection.Publisher("amq.fanout", builder => builder.UsePlainTextSerializer());
await using var publisher = connection.Publisher("amq.fanout",
builder => builder.UsePublishMiddleware(async (message, content, next) =>
{
var ts = Stopwatch.GetTimestamp();
await next(message, content);
Console.WriteLine(Stopwatch.GetTimestamp() - ts);
}).UsePublishMiddleware((message, content) =>
{
message.SetType(message.ClrType.Name);
}));

Console.WriteLine("Publisher created. Type any text to send it to the 'amq.fanout' exchange. Enter empty string to exit");

Expand Down
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ IDisposable WithMessageHandler<TMethod>(IMessageHandler<TMethod> handler)

Task Completion { get; }

Task SendAsync<TRequest>(TRequest request, CancellationToken cancellation = default)
ValueTask SendAsync<TRequest>(TRequest request, CancellationToken cancellation = default)
where TRequest : struct, IOutgoingMethod;

Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellation = default)
where TRequest : struct, IOutgoingMethod
where TResponse : struct, IIncomingMethod;

Task<ulong> PublishAsync<TState>(TState contentBuilderState, string exchange, string routingKey, IMessageProperties properties, Action<TState, IBufferWriter<byte>> payload, PublishFlags flags = PublishFlags.None, CancellationToken cancellation = default);
ValueTask<ulong> PublishAsync<TContent>(string exchange, string routingKey, TContent content, IMessageProperties properties, PublishFlags flags = PublishFlags.None, CancellationToken cancellation = default);

Task<TMethod> WaitAsync<TMethod>(CancellationToken cancellation = default)
where TMethod : struct, IIncomingMethod;
Expand Down
1 change: 0 additions & 1 deletion src/RabbitMQ.Next.Abstractions/Channels/IMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using RabbitMQ.Next.Messaging;
using RabbitMQ.Next.Methods;

namespace RabbitMQ.Next.Channels;
Expand Down
9 changes: 9 additions & 0 deletions src/RabbitMQ.Next.Abstractions/Channels/IPayload.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;
using RabbitMQ.Next.Messaging;
using RabbitMQ.Next.Serialization;

namespace RabbitMQ.Next.Channels;

public interface IPayload: IMessageProperties, IContentAccessor, IDisposable
{
}
5 changes: 3 additions & 2 deletions src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Serialization;

namespace RabbitMQ.Next;

public interface IConnectionBuilder
public interface IConnectionBuilder: ISerializationBuilder<IConnectionBuilder>
{
IConnectionBuilder Auth(IAuthMechanism mechanism);

Expand All @@ -16,6 +17,6 @@ public interface IConnectionBuilder
IConnectionBuilder Locale(string locale);

IConnectionBuilder MaxFrameSize(int sizeBytes);

Task<IConnection> ConnectAsync(CancellationToken cancellation = default);
}
6 changes: 6 additions & 0 deletions src/RabbitMQ.Next.Abstractions/Messaging/IContentAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace RabbitMQ.Next.Messaging;

public interface IContentAccessor
{
public T Get<T>();
}
9 changes: 0 additions & 9 deletions src/RabbitMQ.Next.Abstractions/Messaging/IPayload.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Next.Messaging;

namespace RabbitMQ.Next.Consumer;

Expand Down Expand Up @@ -41,4 +43,26 @@ public static IConsumerBuilder BindToStream(this IConsumerBuilder consumer, stri

return consumer;
}

public static IConsumerBuilder UseConsumerMiddleware(this IConsumerBuilder builder, Func<IMessageProperties, IContentAccessor, Task> middleware)
{
builder.UseConsumerMiddleware(async (message, content, next) =>
{
await middleware.Invoke(message, content).ConfigureAwait(false);
await next.Invoke(message, content).ConfigureAwait(false);
});

return builder;
}

public static IConsumerBuilder UseConsumerMiddleware(this IConsumerBuilder builder, Action<IMessageProperties, IContentAccessor> middleware)
{
builder.UseConsumerMiddleware((message, content, next) =>
{
middleware.Invoke(message, content);
return next.Invoke(message, content);
});

return builder;
}
}
7 changes: 4 additions & 3 deletions src/RabbitMQ.Next.Consumer.Abstractions/ConsumerExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Messaging;

namespace RabbitMQ.Next.Consumer;

public static class ConsumerExtensions
{
public static Task ConsumeAsync(this IConsumer consumer, Action<IDeliveredMessage> handler, CancellationToken cancellation = default)
=> consumer.ConsumeAsync(m =>
public static Task ConsumeAsync(this IConsumer consumer, Action<IDeliveredMessage,IContentAccessor> handler, CancellationToken cancellation = default)
=> consumer.ConsumeAsync((m, c) =>
{
handler(m);
handler(m, c);
return default;
}, cancellation);
}
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.Consumer.Abstractions/IAcknowledgement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace RabbitMQ.Next.Consumer;

public interface IAcknowledgement : IAsyncDisposable
{
Task AckAsync(ulong deliveryTag);
ValueTask AckAsync(ulong deliveryTag);

Task NackAsync(ulong deliveryTag, bool requeue);
ValueTask NackAsync(ulong deliveryTag, bool requeue);
}
3 changes: 2 additions & 1 deletion src/RabbitMQ.Next.Consumer.Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Messaging;

namespace RabbitMQ.Next.Consumer;

public interface IConsumer : IAsyncDisposable
{
Task ConsumeAsync(Func<IDeliveredMessage, ValueTask> handler, CancellationToken cancellation = default);
Task ConsumeAsync(Func<IDeliveredMessage, IContentAccessor, Task> handler, CancellationToken cancellation = default);
}
8 changes: 6 additions & 2 deletions src/RabbitMQ.Next.Consumer.Abstractions/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Channels;
using RabbitMQ.Next.Serialization;
using RabbitMQ.Next.Messaging;

namespace RabbitMQ.Next.Consumer;

public interface IConsumerBuilder : ISerializationBuilder<IConsumerBuilder>
public interface IConsumerBuilder
{
IConsumerBuilder BindToQueue(string queue, Action<IQueueConsumerBuilder> builder = null);

Expand All @@ -17,4 +19,6 @@ public interface IConsumerBuilder : ISerializationBuilder<IConsumerBuilder>
IConsumerBuilder SetAcknowledgement(Func<IChannel, IAcknowledgement> acknowledgementFactory);

IConsumerBuilder OnPoisonMessage(PoisonMessageMode mode);

IConsumerBuilder UseConsumerMiddleware(Func<IDeliveredMessage,IContentAccessor,Func<IDeliveredMessage,IContentAccessor,Task>,Task> middleware);
}

This file was deleted.

6 changes: 1 addition & 5 deletions src/RabbitMQ.Next.Consumer.Abstractions/IDeliveredMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@

namespace RabbitMQ.Next.Consumer;

public interface IDeliveredMessage
public interface IDeliveredMessage : IMessageProperties
{
public string Exchange { get; }

public string RoutingKey { get; }

public bool Redelivered { get; }

IMessageProperties Properties { get; }

T Content<T>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

<ItemGroup>
<ProjectReference Include="..\RabbitMQ.Next.Abstractions\RabbitMQ.Next.Abstractions.csproj" />
<ProjectReference Include="..\RabbitMQ.Next.Serialization\RabbitMQ.Next.Serialization.csproj" />
</ItemGroup>

</Project>
9 changes: 2 additions & 7 deletions src/RabbitMQ.Next.Consumer/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@ public static IConsumer Consumer(this IConnection connection, Action<IConsumerBu
var consumerBuilder = new ConsumerBuilder();
builder?.Invoke(consumerBuilder);

if (consumerBuilder.Serializer == null)
{
throw new InvalidOperationException("Cannot create message consumer without configured serializer. Consider to call UseSerializer.");
}

if (consumerBuilder.Queues.Count == 0)
{
throw new InvalidOperationException("Cannot start consumer without binding to queue. Consider to call BindToQueue.");
}

var consumer = new Consumer(connection, consumerBuilder.AcknowledgementFactory, consumerBuilder.Serializer,
consumerBuilder.Queues, consumerBuilder.PrefetchSize, consumerBuilder.PrefetchCount,
var consumer = new Consumer(connection, consumerBuilder.AcknowledgementFactory,
consumerBuilder.Queues, consumerBuilder.Middlewares, consumerBuilder.PrefetchSize, consumerBuilder.PrefetchCount,
consumerBuilder.ConcurrencyLevel, consumerBuilder.OnPoisonMessage);

return consumer;
Expand Down
Loading

0 comments on commit 497acee

Please sign in to comment.