Skip to content

Commit

Permalink
Fixed process crash on message handler (#214)
Browse files Browse the repository at this point in the history
Fixed multiple publications to soter
  • Loading branch information
fraliv13 authored Mar 16, 2022
1 parent 9528ffe commit d6c9c1d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
100 changes: 63 additions & 37 deletions src/Messaging/NBB.Messaging.Abstractions/DefaultDeadLetterQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace NBB.Messaging.Abstractions
public class DefaultDeadLetterQueue : IDeadLetterQueue
{
public const string ErrorTopicName = "_error";
private const string PushErrorMessage = "Error publishing to dead letter queue";

private readonly IMessageBusPublisher _messageBusPublisher;
private readonly IMessageSerDes _messageSerDes;
Expand All @@ -26,45 +27,54 @@ public DefaultDeadLetterQueue(IMessageBusPublisher messageBusPublisher, IMessage

public void Push(MessagingEnvelope messageEnvelope, string topicName, Exception ex)
{
var payload = new
try
{
ExceptionType = ex.GetType(),
ErrorMessage = ex.Message,
ex.StackTrace,
ex.Source,
Data = messageEnvelope,
OriginalTopic = topicName,
OriginalSystem = messageEnvelope.Headers.TryGetValue(MessagingHeaders.Source, out var source) ? source : string.Empty,
CorrelationId = messageEnvelope.GetCorrelationId(),
MessageType = messageEnvelope.GetMessageTypeId(),
PublishTime = messageEnvelope.Headers.TryGetValue(MessagingHeaders.PublishTime, out var value)
? DateTime.TryParse(value, out var publishTime)
? publishTime
: default
: default,

MessageId = messageEnvelope.Headers.TryGetValue(MessagingHeaders.MessageId, out var messageId) ? messageId : string.Empty
};

// Fire and forget
_ = _messageBusPublisher
.PublishAsync(payload, MessagingPublisherOptions.Default with { TopicName = ErrorTopicName }, default)
.ContinueWith(t => _logger.LogError(t.Exception, "Error publishing to dead letter queue"), TaskContinuationOptions.OnlyOnFaulted);
var payload = new
{
ExceptionType = ex.GetType(),
ErrorMessage = ex.Message,
ex.StackTrace,
ex.Source,
Data = messageEnvelope,
OriginalTopic = topicName,
OriginalSystem = messageEnvelope.Headers.TryGetValue(MessagingHeaders.Source, out var source) ? source : string.Empty,
CorrelationId = messageEnvelope.GetCorrelationId(),
MessageType = messageEnvelope.GetMessageTypeId(),
PublishTime = messageEnvelope.Headers.TryGetValue(MessagingHeaders.PublishTime, out var value) && DateTime.TryParse(value, out var publishTime)
? publishTime : default,
MessageId = messageEnvelope.Headers.TryGetValue(MessagingHeaders.MessageId, out var messageId) ? messageId : string.Empty
};

// Fire and forget
Task.Run(async () => await Publish(payload));
}
catch (Exception localException)
{
// Pushing to DLQ should not throw exceptions
_logger.LogError(localException, PushErrorMessage);
}
}

public void Push(TransportReceivedData messageData, string topicName, Exception ex)
{
switch (messageData)
try
{
case TransportReceivedData.EnvelopeBytes(var envelopeBytes):
Push(envelopeBytes, topicName, ex);
break;
case TransportReceivedData.PayloadBytesAndHeaders(var payloadBytes, var headers):
Push(payloadBytes, headers, topicName, ex);
break;
default:
throw new ArgumentException("Invalid transport received data", nameof(messageData));
switch (messageData)
{
case TransportReceivedData.EnvelopeBytes(var envelopeBytes):
Push(envelopeBytes, topicName, ex);
break;
case TransportReceivedData.PayloadBytesAndHeaders(var payloadBytes, var headers):
Push(payloadBytes, headers, topicName, ex);
break;
default:
throw new ArgumentException("Invalid transport received data", nameof(messageData));
}
}
catch (Exception localException)
{
// Pushing to DLQ should not throw exceptions
_logger.LogError(localException, PushErrorMessage);
}
}

Expand All @@ -77,7 +87,10 @@ private void Push(byte[] messageData, string topicName, Exception ex)
Push(partialEnvelope, topicName, ex);
return;
}
catch { } // ignore partial deserialization exception
catch
{
// ignore partial deserialization exception
}

var envelopeString = Encoding.UTF8.GetString(messageData);
var payload = new
Expand All @@ -94,9 +107,7 @@ private void Push(byte[] messageData, string topicName, Exception ex)
};

// Fire and forget
_ = _messageBusPublisher
.PublishAsync(payload, MessagingPublisherOptions.Default with { TopicName = ErrorTopicName }, default)
.ContinueWith(t => _logger.LogError(t.Exception, "Error publishing to dead letter queue"), TaskContinuationOptions.OnlyOnFaulted);
Task.Run(async () => await Publish(payload));
}

private void Push(byte[] payloadData, IDictionary<string, string> headers, string topicName, Exception ex)
Expand All @@ -108,11 +119,26 @@ private void Push(byte[] payloadData, IDictionary<string, string> headers, strin
Push(new MessagingEnvelope(headers, untypedPayload), topicName, ex);
return;
}
catch { } // ignore untyped deserialization exception
catch
{
// ignore untyped deserialization exception
}

var payloadString = Encoding.UTF8.GetString(payloadData);

Push(new MessagingEnvelope(headers, payloadString), topicName, ex);
}

private async Task Publish<T>(T payload)
{
try
{
await _messageBusPublisher.PublishAsync(payload, MessagingPublisherOptions.Default with { TopicName = ErrorTopicName }, default);
}
catch (Exception ex)
{
_logger.LogError(ex, PushErrorMessage);
}
}
}
}
9 changes: 3 additions & 6 deletions src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,15 @@ public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveConte
opts.AckWait = subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000;
opts.ManualAcks = true;

//CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

async void StanMsgHandler(object obj, StanMsgHandlerArgs args)
void StanMsgHandler(object obj, StanMsgHandlerArgs args)
{
if (cancellationToken.IsCancellationRequested)
return;

var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data));

await handler(receiveContext);

args.Message.Ack();
// Fire and forget
_ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken);
}

IDisposable subscription = null;
Expand Down

0 comments on commit d6c9c1d

Please sign in to comment.