Skip to content

Commit

Permalink
respect to max delivery count (#14)
Browse files Browse the repository at this point in the history
* respect to max delivery count

Issue: #13

* remove previous MaxDeliveryCount check logic
improve logs
  • Loading branch information
PejmanNik authored Dec 26, 2024
1 parent 826bdda commit 3c0096f
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 77 deletions.
2 changes: 2 additions & 0 deletions src/Lazvard.Message.Amqp.Server/BrokerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public TopicSubscriptionConfig(string name)
public int MaxDeliveryCount { get; set; } = 50;
public Duration TimeToLive { get; set; } = Duration.FromDays(14);

public string FullName => $"{TopicName}/{Name}";

internal TopicSubscriptionConfig SetTopicName(string name)
{
TopicName = name;
Expand Down
4 changes: 2 additions & 2 deletions src/Lazvard.Message.Amqp.Server/BrokerMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public record class BrokerMessage
public Guid LockToken { get; init; }
public string LockHolderLink { get; init; }

public long SequenceNumber => Message.GetSequenceNumber();
public string TraceId => Message.GetTraceId();

public BrokerMessage(AmqpMessage message)
{
Expand Down Expand Up @@ -48,7 +48,7 @@ public void AssertStatus(Func<BrokerMessage, bool> assert)
{
if (!assert(this))
{
throw new InvalidOperationException($"The message {SequenceNumber} status is invalid.");
throw new InvalidOperationException($"The message {TraceId} status is invalid.");
}
}
}
3 changes: 2 additions & 1 deletion src/Lazvard.Message.Amqp.Server/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ private bool TryDisposeMessage(Guid lockToken, DeliveryState state)
{
if (modified.UndeliverableHere == true)
{
// Defer
return messageQueue.TryDefer(lockToken, link.Name);
}
else
{
// Abandon
return messageQueue.TryRelease(lockToken, link.Name);
}
}
Expand Down Expand Up @@ -144,7 +146,6 @@ public bool TryToDeliver(AmqpMessage message)
try
{
link.SendMessageNoWait(clonedMessage.Value, clonedMessage.Value.DeliveryTag, new ArraySegment<byte>());
message.Header.DeliveryCount += 1;

logger.LogTrace("delivered message {MessageSeqNo} to {Link} with DeliveryTag {DeliveryTag}, DeliveryCount {DeliveryCount}",
clonedMessage.Value.GetSequenceNumber(), LinkName, clonedMessage.Value.DeliveryTag, clonedMessage.Value.Header.DeliveryCount ?? 0);
Expand Down
12 changes: 9 additions & 3 deletions src/Lazvard.Message.Amqp.Server/Helpers/AmqpMessageExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ public static string GetTraceId(this AmqpMessage message)
.GetProperties(BindingFlags.Instance | BindingFlags.Public)
.ToDictionary(x => x.Name));

public static AmqpMessage IncreaseDeliveryCount(this AmqpMessage message)
{
message.Header.DeliveryCount = (message.Header.DeliveryCount ?? 0) + 1;
return message;
}

public static AmqpMessage Clone(this AmqpMessage message, bool deepClone = false)
{
// Microsoft.Azure.Amqp v2 don't have a suitable clone method
var cloned = message.Clone();
AmqpMessage cloned = message.Clone();
GetPayloadInitializedField(cloned).Value.SetValue(cloned, false);
var properties = GetProperties(cloned).Value;
Dictionary<string, PropertyInfo> properties = GetProperties(cloned).Value;

if (!deepClone)
{
Expand Down Expand Up @@ -128,7 +134,7 @@ public static AmqpMessage Clone(this AmqpMessage message, bool deepClone = false

public static ArraySegment<byte> GetMergedPayload(this AmqpMessage message)
{
var data = message.GetPayload().SelectMany(x => x).ToArray();
byte[] data = message.GetPayload().SelectMany(x => x).ToArray();
return new ArraySegment<byte>(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<Authors>Pejman Nikram</Authors>
<PackageReadmeFile>README.md</PackageReadmeFile>
<RepositoryType>git</RepositoryType>
<Version>1.3.0</Version>
<Version>2.0.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions src/Lazvard.Message.Amqp.Server/ManagementSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ private void DeliverMessage(AmqpMessage message)
var delivered = consumer.Value.TryToDeliver(message);

logger.LogTrace("delivering message {MessageSeqNo} in subscription {Subscription} to consumer {Link} was {Status}",
message.GetSequenceNumber(), fullName, consumer.Value.LinkName, delivered ? "Successful" : "Failed");
message.GetSequenceNumber(), config.FullName, consumer.Value.LinkName, delivered ? "Successful" : "Failed");

if (delivered) return;
}

logger.LogError("delivering message {MessageSeqNo} in subscription {Subscription} Failed",
message.GetSequenceNumber(), fullName);
message.GetSequenceNumber(), config.FullName);
}

private Result<Consumer> GetTargetConsumer(AmqpMessage message)
Expand Down
81 changes: 58 additions & 23 deletions src/Lazvard.Message.Amqp.Server/MessageQueue.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Iso8601DurationHelper;
using Lazvard.Message.Amqp.Server.Constants;
using Lazvard.Message.Amqp.Server.Constants;
using Lazvard.Message.Amqp.Server.Helpers;
using Microsoft.Azure.Amqp;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -34,8 +33,7 @@ public sealed class MessageQueue : IMessageQueue, IDisposable
private readonly SemaphoreSlim semaphore;
private readonly IExpirationList expirationList;
private readonly ILogger<MessageQueue> logger;
private readonly Duration lockDuration;
private readonly Duration timeToLive;
private readonly TopicSubscriptionConfig config;
private readonly IMessageQueue? deadletterQueue;
private long sequenceNo = 0;

Expand All @@ -44,26 +42,32 @@ public MessageQueue(TopicSubscriptionConfig topicSubscriptionConfig, Cancellatio
items = new();
sendQueue = new();
semaphore = new(1);
expirationList = new ExpirationList(lockDuration.ToTimeSpan() / 2, OnLockExpiration, stopToken);
config = topicSubscriptionConfig;
expirationList = new ExpirationList(config.LockDuration.ToTimeSpan() / 2, OnLockExpiration, stopToken);
logger = loggerFactory.CreateLogger<MessageQueue>();
lockDuration = topicSubscriptionConfig.LockDuration;
timeToLive = topicSubscriptionConfig.TimeToLive;
this.deadletterQueue = deadletterQueue;
}

private void OnLockExpiration(BrokerMessage message)
{
logger.LogTrace("message {MessageSeqNo} with lock {LockId} expired and requeued",
message.SequenceNumber, message.LockToken);
logger.LogTrace("message {MessageSeqNo} in subscription {Subscription} with lock {LockId} expired and requeued",
message.TraceId, config.FullName, message.LockToken);

Replace(message.Unlock());

if (!message.IsDeferred)
if (message.IsDeferred)
{
TryReEnqueue(message.Message);
return;
}

// when message is expired, we should increase the delivery count
if (MovedToDeadletterAfterIncreaseDeliveryCount(message))
{
return;
}
}

TryReEnqueue(message.Message);
}
public long Enqueue(AmqpMessage message)
{
var clonedMessage = message.Clone(true);
Expand All @@ -77,9 +81,9 @@ public long Enqueue(AmqpMessage message)

clonedMessage.Header.DeliveryCount ??= 0;
clonedMessage.Properties.MessageId ??= Guid.NewGuid();
clonedMessage.Properties.AbsoluteExpiryTime = DateTime.UtcNow + timeToLive;
clonedMessage.Properties.AbsoluteExpiryTime = DateTime.UtcNow + config.TimeToLive;
clonedMessage.Properties.CreationTime = DateTime.UtcNow;
clonedMessage.Header.Ttl = Convert.ToUInt32(timeToLive.ToTimeSpan().TotalSeconds);
clonedMessage.Header.Ttl = Convert.ToUInt32(config.TimeToLive.ToTimeSpan().TotalSeconds);

var brokerMessage = new BrokerMessage(clonedMessage);
items.TryAdd(messageSeqNo, brokerMessage);
Expand Down Expand Up @@ -115,7 +119,7 @@ public Result<AmqpMessage> TryLock(AmqpMessage message, string linkName)

public Result<AmqpMessage> TryLock(BrokerMessage message, string linkName)
{
var lockedUntil = DateTime.UtcNow + lockDuration;
var lockedUntil = DateTime.UtcNow + config.LockDuration;
var lockToken = Guid.NewGuid();
var lockedMessage = message.Lock(lockToken, lockedUntil, linkName);

Expand All @@ -129,8 +133,8 @@ public Result<AmqpMessage> TryLock(BrokerMessage message, string linkName)
clonedMessage.MessageAnnotations.Map[AmqpMessageConstants.LockedUntil] = lockedUntil;
clonedMessage.DeliveryAnnotations.Map[AmqpMessageConstants.LockToken] = lockToken;

logger.LogTrace("add lock {LockId} until {LockedUntil} for message {MessageSeqNo}",
lockToken, lockedUntil.Ticks, message.SequenceNumber);
logger.LogTrace("add lock {LockId} until {LockedUntil} for message {MessageSeqNo} in subscription {Subscription}",
lockToken, lockedUntil.Ticks, message.TraceId, config.FullName);

return clonedMessage;
}
Expand All @@ -140,7 +144,7 @@ public Result<DateTime> TryRenewLock(Guid lockToken, string linkName)
var message = expirationList.TryRemove(lockToken, linkName);
if (!message.IsSuccess) return Result.Fail();

var lockedUntil = DateTime.UtcNow + lockDuration;
var lockedUntil = DateTime.UtcNow + config.LockDuration;
var lockedMessage = message.Value.RenewLock(lockedUntil);

if (!expirationList.TryAdd(lockedMessage))
Expand Down Expand Up @@ -198,13 +202,19 @@ public bool TryRelease(Guid lockToken, string linkName)
var releasedMessage = message.Value.Unlock();
Replace(releasedMessage);

if (!releasedMessage.IsDeferred)
if (releasedMessage.IsDeferred)
{
// put it in the send queue again so it will send to the consumers again
sendQueue.Enqueue(message.Value);
semaphore.Release();
return true;
}

// put it in the send queue again so it will send to the consumers again
if (MovedToDeadletterAfterIncreaseDeliveryCount(message.Value))
{
return true;
}

sendQueue.Enqueue(message.Value);
semaphore.Release();
return true;
}

Expand Down Expand Up @@ -282,12 +292,37 @@ public IEnumerable<AmqpMessage> Peek(int maxMessages, long? fromSequenceNumber)

private void Replace(BrokerMessage message)
{
items[message.SequenceNumber] = message;
var sequenceNumber = message.Message.GetSequenceNumber();
items[sequenceNumber] = message;
}

public void Dispose()
{
semaphore.Dispose();
}

private bool MovedToDeadletterAfterIncreaseDeliveryCount(BrokerMessage message)
{
message.Message.IncreaseDeliveryCount();
logger.LogTrace("increase delivery count to {DeliveryCount} for message {MessageSeqNo} in subscription {Subscription}",
message.Message.Header.DeliveryCount, message.TraceId, config.FullName);

if (message.Message.Header.DeliveryCount >= config.MaxDeliveryCount)
{
logger.LogError("message {MessageSeqNo} in subscription {Subscription} has reached the maximum delivery count {MaxDeliveryCount} and will be dead-lettered",
message.TraceId, config.FullName, config.MaxDeliveryCount);

if (!TryDeadletter(message.Message))
{
logger.LogError("can not move message {MessageSeqNo} in subscription {Subscription} to dead-lettered",
message.TraceId, config.FullName);

return false;
}

return true;
}

return false;
}
}
18 changes: 3 additions & 15 deletions src/Lazvard.Message.Amqp.Server/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private IEnumerable<Consumer> GetActiveConsumers()
protected override void ProcessIncomingMessage(AmqpMessage message, CancellationToken stopToken)
{
logger.LogTrace("process message {MessageSeqNo} in subscription {Subscription}",
message.GetTraceId(), fullName);
message.GetTraceId(), config.FullName);

var delivered = false;

Expand All @@ -37,31 +37,19 @@ protected override void ProcessIncomingMessage(AmqpMessage message, Cancellation
delivered = consumer.TryToDeliver(message);

logger.LogTrace("delivering message {MessageSeqNo} in subscription {Subscription} to consumer {Link} was {Status}",
message.GetTraceId(), fullName, "", delivered ? "Successful" : "Failed");
message.GetTraceId(), config.FullName, "", delivered ? "Successful" : "Failed");

if (delivered)
break;
}

if (!delivered)
{
if (message.Header.DeliveryCount >= config.MaxDeliveryCount)
{
logger.LogError("the message {MessageSeqNo} in subscription {Subscription} has reached the maximum delivery count {MaxDeliveryCount} and will be dead-lettered",
message.GetTraceId(), fullName, config.MaxDeliveryCount);

if (!messageQueue.TryDeadletter(message))
{
logger.LogError("can not move message {MessageSeqNo} in subscription {Subscription} to dead-lettered",
message.GetTraceId(), fullName);
}
}

// try again to send the message
if (!messageQueue.TryReEnqueue(message))
{
logger.LogError("can not re-enqueue message {MessageSeqNo} in subscription {Subscription}",
message.GetTraceId(), fullName);
message.GetTraceId(), config.FullName);
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/Lazvard.Message.Amqp.Server/SubscriptionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public abstract class SubscriptionBase : ISubscription
protected readonly IMessageQueue messageQueue;
protected readonly TopicSubscriptionConfig config;
protected readonly ILogger logger;
protected readonly string fullName;

public string Name => config.Name;

Expand All @@ -40,7 +39,6 @@ public SubscriptionBase(
this.stopToken = stopToken;
this.messageQueue = messageQueue;
this.consumerFactory = consumerFactory;
fullName = string.Join('/', config.TopicName, config.Name);

logger = loggerFactory.CreateLogger<Subscription>();
consumers = new(2, 5);
Expand All @@ -57,7 +55,8 @@ public bool HasAddress(Address address)
public void Write(AmqpMessage message)
{
var sequenceNumber = messageQueue.Enqueue(message);
logger.LogTrace("write message {MessageSeqNo} to subscription {Subscription} channel", sequenceNumber, fullName);
logger.LogTrace("write message {MessageSeqNo} to subscription {Subscription} channel",
sequenceNumber, config.FullName);
}

public void OnAttachSendingLink(SendingAmqpLink link)
Expand Down Expand Up @@ -92,7 +91,7 @@ private async Task ProcessIncomingMessages()
try
{
logger.LogTrace("waiting to receive a message in subscription {Subscription}",
fullName);
config.FullName);

// wait for receiving a message
var message = await messageQueue.DequeueAsync(stopToken);
Expand All @@ -106,7 +105,7 @@ private async Task ProcessIncomingMessages()
if (!activeConsumers.Any())
{
logger.LogTrace("no consumers to deliver the message {MessageSeqNo} in subscription {Subscription}, waiting...",
message.Value.GetTraceId(), fullName);
message.Value.GetTraceId(), config.FullName);

// wait for a active consumer
await emptyConsumerEvent.WaitAsync(stopToken);
Expand Down
4 changes: 1 addition & 3 deletions src/Lazvard.Message.Cli/Lazvard.Message.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
<Authors>Pejman Nikram</Authors>
<Description>lightweight AMQP server - Azure Service Bus simulator</Description>
<PackageTags>amqp;service-bus;azure-service-bus;amqp-server</PackageTags>
<Version>0.5.0</Version>
<AssemblyVersion>0.5.0</AssemblyVersion>
<FileVersion>0.5.0</FileVersion>
<Version>0.6.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 3c0096f

Please sign in to comment.