Skip to content

Commit

Permalink
Multiple minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Feb 28, 2024
1 parent 4303da8 commit 8cc43f0
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 45 deletions.
14 changes: 8 additions & 6 deletions src/RabbitMQ.Next.Publisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ private async Task PublishAsyncImpl<TContent>(TContent content, MessageBuilder m
this.messagePropsPool.Return(message);
}
}



private async Task InternalPublishAsync<TContent>(IMessageBuilder message, TContent content)
{
var flags = ComposePublishFlags(message);
Expand All @@ -120,11 +119,14 @@ private async Task InternalPublishAsync<TContent>(IMessageBuilder message, TCont
var deliveryTag = await ch.PublishAsync(this.exchange, message.RoutingKey, content, message, flags)
.ConfigureAwait(false);

var confirmed = await this.confirms.WaitForConfirmAsync(deliveryTag, default).ConfigureAwait(false);
if (!confirmed)
if (this.confirms != null)
{
// todo: provide some useful info here
throw new DeliveryFailedException();
var confirmed = await this.confirms.WaitForConfirmAsync(deliveryTag, default).ConfigureAwait(false);
if (!confirmed)
{
// todo: provide some useful info here
throw new DeliveryFailedException();
}
}
}

Expand Down
53 changes: 32 additions & 21 deletions src/RabbitMQ.Next/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public async Task OpenConnectionAsync(CancellationToken cancellation)
this.socket = await EndpointResolver.OpenSocketAsync(this.connectionDetails.Settings.Endpoints, cancellation).ConfigureAwait(false);

this.socketIoCancellation = new CancellationTokenSource();
Task.Factory.StartNew(() => this.ReceiveLoop(this.socketIoCancellation.Token), TaskCreationOptions.LongRunning);
Task.Factory.StartNew(this.SendLoop, TaskCreationOptions.LongRunning);
StartThread(() => this.ReceiveLoop(this.socketIoCancellation.Token), "RabbitMQ.Next-Receive loop");
StartThread(this.SendLoop, "RabbitMQ.Next-Send loop");

this.connectionChannel = this.CreateChannel(ProtocolConstants.ConnectionChannel);
var connectionCloseWait = new WaitMethodMessageHandler<CloseMethod>(default);
Expand All @@ -110,38 +110,27 @@ public async Task OpenConnectionAsync(CancellationToken cancellation)
var amqpHeaderMemory = new MemoryAccessor(ProtocolConstants.AmqpHeader);
await this.socketSender.Writer.WriteAsync(amqpHeaderMemory, cancellation).ConfigureAwait(false);

this.connectionDetails.Negotiated = await negotiateTask.ConfigureAwait(false);

// start heartbeat
Task.Factory.StartNew(() => this.HeartbeatLoop(this.connectionDetails.Negotiated.HeartbeatInterval, this.socketIoCancellation.Token), TaskCreationOptions.LongRunning);

var negotiationResults = await negotiateTask.ConfigureAwait(false);
this.connectionDetails.PopulateWithNegotiationResults(negotiationResults);

this.State = ConnectionState.Configuring;
this.State = ConnectionState.Open;
}

private IChannelInternal CreateChannel(ushort channelNumber)
{
var maxFrameSize = this.connectionDetails?.Negotiated?.FrameMaxSize ?? ProtocolConstants.FrameMinSize;
var maxFrameSize = this.connectionDetails.FrameMaxSize ?? ProtocolConstants.FrameMinSize;
var policy = new MessageBuilderPoolPolicy(this.memoryPool, channelNumber, maxFrameSize);
var messageBuilderPool = new DefaultObjectPool<MessageBuilder>(policy);

return new Channel(this.socketSender.Writer, messageBuilderPool, this.connectionDetails.Settings.Serializer);
}

private async Task HeartbeatLoop(TimeSpan interval, CancellationToken cancellation)
{
var heartbeatMemory = new MemoryAccessor(ProtocolConstants.HeartbeatFrame);
while (!cancellation.IsCancellationRequested)
{
await Task.Delay(interval, cancellation).ConfigureAwait(false);
await this.socketSender.Writer.WriteAsync(heartbeatMemory, cancellation).ConfigureAwait(false);
}
}

private void SendLoop()
{
var heartbeatMemory = new MemoryAccessor(ProtocolConstants.HeartbeatFrame);
var socketChannel = this.socketSender.Reader;
while (socketChannel.WaitToReadAsync().Wait())
do
{
while (socketChannel.TryRead(out var memory))
{
Expand All @@ -154,7 +143,21 @@ private void SendLoop()
memory = next;
}
}
}

var waitResult = socketChannel.WaitToReadAsync().Wait(this.connectionDetails.HeartbeatInterval);
if (waitResult.IsCompleted)
{
if (waitResult.Result)
{
continue;
}

break;
}

// wait long enough and nothing was sent, need to send heartbeat frame
this.socket.Send(heartbeatMemory);
} while (true);
}

private void ReceiveLoop(CancellationToken cancellationToken)
Expand Down Expand Up @@ -298,6 +301,15 @@ private void ConnectionClose(Exception ex)
this.connectionChannel.TryComplete(ex);
}

private static void StartThread(Action threadStart, string threadName)
{
var thread = new Thread(new ThreadStart(threadStart))
{
Name = threadName,
};
thread.Start();
}

private static async Task<NegotiationResults> NegotiateConnectionAsync(IChannel channel, ConnectionSettings settings, CancellationToken cancellation)
{
// connection should be forcibly closed if negotiation phase take more then 10s.
Expand Down Expand Up @@ -334,7 +346,6 @@ private static async Task<NegotiationResults> NegotiateConnectionAsync(IChannel

var tuneMethod = await tuneMethodTask.ConfigureAwait(false);
var negotiationResult = new NegotiationResults(
settings.Auth.Type,
tuneMethod.ChannelMax,
Math.Min(settings.MaxFrameSize, (int)tuneMethod.MaxFrameSize),
TimeSpan.FromSeconds(tuneMethod.HeartbeatInterval));
Expand Down
25 changes: 16 additions & 9 deletions src/RabbitMQ.Next/ConnectionDetails.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;

namespace RabbitMQ.Next;

internal class ConnectionDetails
Expand All @@ -8,14 +10,19 @@ public ConnectionDetails(ConnectionSettings settings)
}

public ConnectionSettings Settings { get; }

public int? ChannelMax { get; private set; }

public NegotiationResults Negotiated { get; set; }

public string RemoteHost { get; set; }

public string RemotePort { get; set; }
public int? FrameMaxSize { get; private set; }

public bool IsSsl { get; set; }

public string VirtualHost { get; set; }
}
public TimeSpan? HeartbeatInterval { get; private set; }

public void PopulateWithNegotiationResults(NegotiationResults negotiationResults)
{
ArgumentNullException.ThrowIfNull(negotiationResults);

this.ChannelMax = negotiationResults.ChannelMax;
this.FrameMaxSize = negotiationResults.FrameMaxSize;
this.HeartbeatInterval = negotiationResults.HeartbeatInterval;
}
}
7 changes: 2 additions & 5 deletions src/RabbitMQ.Next/NegotiationResults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ namespace RabbitMQ.Next;

internal class NegotiationResults
{
public NegotiationResults(string authMechanism, int channelMax, int frameMaxSize, TimeSpan heartbeatInterval)
public NegotiationResults(int channelMax, int frameMaxSize, TimeSpan heartbeatInterval)
{
this.AuthMechanism = authMechanism;
this.ChannelMax = channelMax;
this.FrameMaxSize = frameMaxSize;
this.HeartbeatInterval = heartbeatInterval;
Expand All @@ -17,6 +16,4 @@ public NegotiationResults(string authMechanism, int channelMax, int frameMaxSize
public int FrameMaxSize { get; }

public TimeSpan HeartbeatInterval { get; }

public string AuthMechanism { get; }
}
}
20 changes: 16 additions & 4 deletions src/RabbitMQ.Next/Tasks/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -17,15 +18,26 @@ public static Task AsTask(this CancellationToken cancellation)
return tcs.Task;
}

public static T Wait<T>(this ValueTask<T> valueTask)
public static (bool IsCompleted, T Result) Wait<T>(this ValueTask<T> valueTask, TimeSpan? timeout)
{
if (valueTask.IsCompleted)
{
return valueTask.Result;
return (true, valueTask.Result);
}

var task = valueTask.AsTask();
return task.GetAwaiter().GetResult();
var timeoutMs = -1; // -1 means infinite
if (timeout.HasValue)
{
timeoutMs = (int)timeout.Value.TotalMilliseconds;
}

if (task.Wait(timeoutMs))
{
return (true, task.Result);
}

return (false, default);
}

public static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancellation)
Expand Down

0 comments on commit 8cc43f0

Please sign in to comment.