Skip to content

Commit

Permalink
Add ITransportMonitor implementation for InProcess messaging (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraliv13 authored Jan 12, 2022
1 parent 433a51b commit 0f909cc
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ public static class DependencyInjectionExtensions
{
public static void AddInProcessTransport(this IServiceCollection services)
{
services.AddSingleton<IStorage, Storage>();
services.AddSingleton<IMessagingTransport, InProcessMessagingTransport>();
services.AddSingleton<IStorage, Storage>();

services.AddSingleton<InProcessMessagingTransport>();
services.AddSingleton<ITransportMonitor>(sp => sp.GetRequiredService<InProcessMessagingTransport>());
services.AddSingleton<IMessagingTransport>(sp => sp.GetRequiredService<InProcessMessagingTransport>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace NBB.Messaging.InProcessMessaging.Internal
{
public class InProcessMessagingTransport : IMessagingTransport
public class InProcessMessagingTransport : IMessagingTransport, ITransportMonitor
{
private readonly IStorage _storage;
private readonly ILogger<InProcessMessagingTransport> _logger;
Expand All @@ -21,6 +21,8 @@ public InProcessMessagingTransport(IStorage storage, ILogger<InProcessMessagingT
_logger = logger;
}

public event TransportErrorHandler OnError;

public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
Expand All @@ -38,6 +40,9 @@ await _storage.AddSubscription(topic, async msg =>
_logger.LogError(
"InProcessMessagingTopicSubscriber encountered an error when handling a message from topic {TopicName}.\n {Error}",
topic, ex);

OnError?.Invoke(ex);

//TODO: push to DLQ
}
}, cancellationToken);
Expand All @@ -61,10 +66,11 @@ public Task PublishAsync(string topic, TransportSendContext sendContext, Cancell
return Task.CompletedTask;
}

private class Subscription : IDisposable
private sealed class Subscription : IDisposable
{
public void Dispose()
{
// Nothing to dispose
}
}
}
Expand Down

0 comments on commit 0f909cc

Please sign in to comment.