diff --git a/NBB.sln b/NBB.sln index ab050e36..49db1ed2 100644 --- a/NBB.sln +++ b/NBB.sln @@ -405,7 +405,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Core.Configuration", "s EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Core.Configuration.Tests", "test\UnitTests\Core\NBB.Core.Configuration.Tests\NBB.Core.Configuration.Tests.csproj", "{2A91E54D-F63C-4E92-9A6D-F15C7C55B0D7}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NBB.Messaging.Rusi.IntegrationTests", "test\Integration\NBB.Messaging.Rusi.IntegrationTests\NBB.Messaging.Rusi.IntegrationTests.csproj", "{2D002428-8D55-46BD-A588-84946E33B4EF}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Messaging.Rusi.IntegrationTests", "test\Integration\NBB.Messaging.Rusi.IntegrationTests\NBB.Messaging.Rusi.IntegrationTests.csproj", "{2D002428-8D55-46BD-A588-84946E33B4EF}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Messaging.JetStream", "src\Messaging\NBB.Messaging.JetStream\NBB.Messaging.JetStream.csproj", "{C609385D-A1BD-4411-A675-488CA66AEA8F}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -913,6 +915,10 @@ Global {2D002428-8D55-46BD-A588-84946E33B4EF}.Debug|Any CPU.Build.0 = Debug|Any CPU {2D002428-8D55-46BD-A588-84946E33B4EF}.Release|Any CPU.ActiveCfg = Release|Any CPU {2D002428-8D55-46BD-A588-84946E33B4EF}.Release|Any CPU.Build.0 = Release|Any CPU + {C609385D-A1BD-4411-A675-488CA66AEA8F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C609385D-A1BD-4411-A675-488CA66AEA8F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C609385D-A1BD-4411-A675-488CA66AEA8F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C609385D-A1BD-4411-A675-488CA66AEA8F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1081,6 +1087,7 @@ Global {2C7CA0BD-98AD-4CC1-B496-94ED49636466} = {14726095-DA28-43A6-A9A9-F16C605932E1} {2A91E54D-F63C-4E92-9A6D-F15C7C55B0D7} = {29B7593C-60F4-41DC-A883-4976FF467927} {2D002428-8D55-46BD-A588-84946E33B4EF} = {2D9089E7-54EA-4526-B8FE-12729BF14F16} + {C609385D-A1BD-4411-A675-488CA66AEA8F} = {584C62C0-2AE6-4DD6-9BCF-8FF28B7122CE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {23A42379-616A-43EF-99BC-803DF151F54E} diff --git a/src/Messaging/NBB.Messaging.JetStream/DependencyInjectionExtensions.cs b/src/Messaging/NBB.Messaging.JetStream/DependencyInjectionExtensions.cs new file mode 100644 index 00000000..b5c4eeb9 --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/DependencyInjectionExtensions.cs @@ -0,0 +1,24 @@ +// Copyright (c) TotalSoft. +// This source code is licensed under the MIT license. + +using Microsoft.Extensions.Configuration; +using NBB.Messaging.Abstractions; +using NBB.Messaging.JetStream; +using NBB.Messaging.JetStream.Internal; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class DependencyInjectionExtensions + { + public static IServiceCollection AddJetStreamTransport(this IServiceCollection services, IConfiguration configuration) + { + services.Configure(configuration.GetSection("Messaging").GetSection("JetStream")); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + } +} diff --git a/src/Messaging/NBB.Messaging.JetStream/Internal/JetStreamConnectionProvider.cs b/src/Messaging/NBB.Messaging.JetStream/Internal/JetStreamConnectionProvider.cs new file mode 100644 index 00000000..551f5b57 --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/Internal/JetStreamConnectionProvider.cs @@ -0,0 +1,104 @@ +// Copyright (c) TotalSoft. +// This source code is licensed under the MIT license. + +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NATS.Client; +using NBB.Messaging.Abstractions; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.Messaging.JetStream.Internal +{ + public class JetStreamConnectionProvider : IDisposable, ITransportMonitor + { + private readonly IOptions _natsOptions; + private readonly ILogger _logger; + private IConnection _connection; + private static readonly object InstanceLoker = new(); + private Exception _unrecoverableException; + + public event TransportErrorHandler OnError; + + public JetStreamConnectionProvider(IOptions natsOptions, ILogger logger) + { + _natsOptions = natsOptions; + _logger = logger; + } + + public async Task ExecuteAsync(Func action) + { + var connection = GetAndCheckConnection(); + + await action(connection); + } + + public void Execute(Action action) + { + var connection = GetAndCheckConnection(); + + action(connection); + } + + private IConnection GetAndCheckConnection() + { + if (_connection == null) + lock (InstanceLoker) + { + if (_connection == null) + _connection = CreateConnection(); + } + return _connection; + } + + private IConnection CreateConnection() + { + var options = ConnectionFactory.GetDefaultOptions(); + options.Url = _natsOptions.Value.NatsUrl; + + //https://github.com/nats-io/nats.net/issues/804 + options.AllowReconnect = false; + + options.ClosedEventHandler += (_, args) => + { + SetConnectionLostState(args.Error ?? new Exception("NATS Jetstream connection was lost")); + }; + + _connection = new ConnectionFactory().CreateConnection(options); + _logger.LogInformation($"NATS Jetstream connection to {_natsOptions.Value.NatsUrl} was established"); + + return _connection; + } + + private void SetConnectionLostState(Exception exception) + { + _connection = null; + + // Set the field to the current exception if not already set + var existingException = Interlocked.CompareExchange(ref _unrecoverableException, exception, null); + + // Send the application stop signal only once + if (existingException != null) + return; + + _logger.LogError(exception, "NATS Jetstream connection unrecoverable"); + + OnError?.Invoke(exception); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _connection?.Dispose(); + } + } + } +} diff --git a/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs new file mode 100644 index 00000000..2c32a87b --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs @@ -0,0 +1,97 @@ +// Copyright (c) TotalSoft. +// This source code is licensed under the MIT license. + +using Microsoft.Extensions.Options; +using NATS.Client; +using NATS.Client.JetStream; +using NBB.Messaging.Abstractions; +using NBB.Messaging.JetStream.Internal; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.Messaging.JetStream +{ + public class JetStreamMessagingTransport : IMessagingTransport + { + private readonly IOptions _natsOptions; + private readonly JetStreamConnectionProvider _natsConnectionManager; + + public JetStreamMessagingTransport(IOptions natsOptions, JetStreamConnectionProvider natsConnectionManager) + { + _natsOptions = natsOptions; + _natsConnectionManager = natsConnectionManager; + } + + public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default) + { + var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke(); + + return _natsConnectionManager.ExecuteAsync(con => + { + IJetStream js = con.CreateJetStreamContext(); + return js.PublishAsync(topic, envelopeData); + }); + } + + public Task SubscribeAsync(string topic, + Func handler, + SubscriptionTransportOptions options = null, + CancellationToken cancellationToken = default) + { + + IDisposable consumer = null; + + _natsConnectionManager.Execute(con => + { + IJetStream js = con.CreateJetStreamContext(); + + // set's up the stream + var isCommand = topic.ToLower().Contains("commands."); + + var stream = isCommand ? _natsOptions.Value.CommandsStream : _natsOptions.Value.EventsStream; + var jsm = con.CreateJetStreamManagementContext(); + jsm.GetStreamInfo(stream); + + // get stream context, create consumer and get the consumer context + var streamContext = con.GetStreamContext(stream); + + var subscriberOptions = options ?? SubscriptionTransportOptions.Default; + var ccb = ConsumerConfiguration.Builder(); + + if (subscriberOptions.IsDurable) + { + var clientId = (_natsOptions.Value.ClientId + topic).Replace(".", "_"); + ccb.WithDurable(clientId); + } + + if (subscriberOptions.DeliverNewMessagesOnly) + ccb.WithDeliverPolicy(DeliverPolicy.New); + else + ccb.WithDeliverPolicy(DeliverPolicy.All); + + ccb.WithAckWait(subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000); + + //https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending + ccb.WithMaxAckPending(subscriberOptions.MaxConcurrentMessages); + ccb.WithFilterSubject(topic); + + var consumerContext = streamContext.CreateOrUpdateConsumer(ccb.Build()); + + void NatsMsgHandler(object obj, MsgHandlerEventArgs args) + { + if (cancellationToken.IsCancellationRequested) + return; + + var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data)); + + // Fire and forget + _ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken); + } + consumer = consumerContext.Consume(NatsMsgHandler); + + }); + return Task.FromResult(consumer); + } + } +} diff --git a/src/Messaging/NBB.Messaging.JetStream/JetStreamOptions.cs b/src/Messaging/NBB.Messaging.JetStream/JetStreamOptions.cs new file mode 100644 index 00000000..c6e6b4e5 --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/JetStreamOptions.cs @@ -0,0 +1,26 @@ +// Copyright (c) TotalSoft. +// This source code is licensed under the MIT license. + +namespace NBB.Messaging.JetStream +{ + public class JetStreamOptions + { + /// + /// URL of the Streaming NATS cluster + /// + public string NatsUrl { get; set; } + + /// + /// Identifier of the Streaming NATS client + /// + public string ClientId { get; set; } + + /// + /// The time the server awaits for acknowledgement from the client before redelivering the message (in milliseconds) + /// + public int? AckWait { get; set; } + public string CommandsStream { get; set; } + public string EventsStream { get; set; } + + } +} diff --git a/src/Messaging/NBB.Messaging.JetStream/JsUtils.cs b/src/Messaging/NBB.Messaging.JetStream/JsUtils.cs new file mode 100644 index 00000000..5b89945c --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/JsUtils.cs @@ -0,0 +1,337 @@ +// Copyright 2021-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using NATS.Client; +using NATS.Client.JetStream; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace NBB.Messaging.JetStream +{ + internal static class JsUtils + { + // ---------------------------------------------------------------------------------------------------- + // STREAM INFO / CREATE / UPDATE + // ---------------------------------------------------------------------------------------------------- + public static StreamInfo GetStreamInfoOrNullWhenNotExist(IJetStreamManagement jsm, string streamName) + { + try + { + return jsm.GetStreamInfo(streamName); + } + catch (NATSJetStreamException e) + { + if (e.ErrorCode == 404) + { + return null; + } + + throw; + } + } + + public static bool StreamExists(IConnection c, string streamName) + { + return GetStreamInfoOrNullWhenNotExist(c.CreateJetStreamManagementContext(), streamName) != null; + } + + public static bool StreamExists(IJetStreamManagement jsm, string streamName) + { + return GetStreamInfoOrNullWhenNotExist(jsm, streamName) != null; + } + + public static void ExitIfStreamExists(IJetStreamManagement jsm, string streamName) + { + if (StreamExists(jsm, streamName)) + { + Console.WriteLine($"\nThe example cannot run since the stream '{streamName}' already exists.\n" + + "It depends on the stream being in a new state. You can either:\n" + + " 1) Change the stream name in the example.\n 2) Delete the stream.\n 3) Restart the server if the stream is a memory stream."); + Environment.Exit(-1); + } + } + + public static void ExitIfStreamNotExists(IConnection c, string streamName) + { + if (!StreamExists(c, streamName)) + { + Console.WriteLine("\nThe example cannot run since the stream '" + streamName + "' does not exist.\n" + + "It depends on the stream existing and having data."); + Environment.Exit(-1); + } + } + + public static StreamInfo CreateStream(IJetStreamManagement jsm, string streamName, StorageType storageType, + params string[] subjects) + { + // Create a stream, here will use a file storage type, and one subject, + // the passed subject. + StreamConfiguration sc = StreamConfiguration.Builder() + .WithName(streamName) + .WithStorageType(storageType) + .WithSubjects(subjects) + .Build(); + + // Add or use an existing stream. + StreamInfo si = jsm.AddStream(sc); + Console.WriteLine("Created stream '{0}' with subject(s) [{1}]\n", streamName, + string.Join(",", si.Config.Subjects)); + return si; + } + + public static StreamInfo CreateStream(IJetStreamManagement jsm, string stream, params string[] subjects) + { + return CreateStream(jsm, stream, StorageType.Memory, subjects); + } + + public static StreamInfo CreateStream(IConnection c, string stream, params string[] subjects) + { + return CreateStream(c.CreateJetStreamManagementContext(), stream, StorageType.Memory, subjects); + } + + public static StreamInfo CreateStreamExitWhenExists(IConnection c, string streamName, params string[] subjects) + { + return CreateStreamExitWhenExists(c.CreateJetStreamManagementContext(), streamName, subjects); + } + + public static StreamInfo CreateStreamExitWhenExists(IJetStreamManagement jsm, string streamName, + params string[] subjects) + { + ExitIfStreamExists(jsm, streamName); + return CreateStream(jsm, streamName, StorageType.Memory, subjects); + } + + public static void CreateStreamWhenDoesNotExist(IJetStreamManagement jsm, string stream, + params string[] subjects) + { + try + { + jsm.GetStreamInfo(stream); // this throws if the stream does not exist + return; + } + catch (NATSJetStreamException) + { + /* stream does not exist */ + } + + StreamConfiguration sc = StreamConfiguration.Builder() + .WithName(stream) + .WithStorageType(StorageType.Memory) + .WithSubjects(subjects) + .Build(); + jsm.AddStream(sc); + } + + public static void CreateStreamWhenDoesNotExist(IConnection c, string stream, params string[] subjects) + { + CreateStreamWhenDoesNotExist(c.CreateJetStreamManagementContext(), stream, subjects); + } + + public static StreamInfo CreateStreamOrUpdateSubjects(IJetStreamManagement jsm, string streamName, + StorageType storageType, params string[] subjects) + { + + StreamInfo si = GetStreamInfoOrNullWhenNotExist(jsm, streamName); + if (si == null) + { + return CreateStream(jsm, streamName, storageType, subjects); + } + + // check to see if the configuration has all the subject we want + StreamConfiguration sc = si.Config; + bool needToUpdate = false; + foreach (string sub in subjects) + { + if (!sc.Subjects.Contains(sub)) + { + needToUpdate = true; + sc.Subjects.Add(sub); + } + } + + if (needToUpdate) + { + si = jsm.UpdateStream(sc); + Console.WriteLine("Existing stream '{0}' was updated, has subject(s) [{1}]\n", + streamName, string.Join(",", si.Config.Subjects)); + // Existing stream 'scratch' [sub1, sub2] + } + else + { + Console.WriteLine("Existing stream '{0}' already contained subject(s) [{1}]\n", + streamName, string.Join(",", si.Config.Subjects)); + } + + return si; + } + + public static StreamInfo CreateStreamOrUpdateSubjects(IJetStreamManagement jsm, string streamName, + params string[] subjects) + { + return CreateStreamOrUpdateSubjects(jsm, streamName, StorageType.Memory, subjects); + } + + public static StreamInfo CreateStreamOrUpdateSubjects(IConnection c, string stream, params string[] subjects) + { + return CreateStreamOrUpdateSubjects(c.CreateJetStreamManagementContext(), stream, StorageType.Memory, + subjects); + } + + public static void CreateOrReplaceStream(IJetStreamManagement jsm, string stream, string subject) + { + // in case the stream was here before, we want a completely new one + try + { + jsm.DeleteStream(stream); + } + catch (Exception) + { + } + + jsm.AddStream(StreamConfiguration.Builder() + .WithName(stream) + .WithStorageType(StorageType.File) + .WithSubjects(subject) + .Build()); + } + + // ---------------------------------------------------------------------------------------------------- + // PUBLISH + // ---------------------------------------------------------------------------------------------------- + public static void Publish(IConnection c, string subject, int count) + { + Publish(c.CreateJetStreamContext(), subject, "data", count, false); + } + + public static void Publish(IJetStream js, string subject, int count) + { + Publish(js, subject, "data", count, false); + } + + public static void Publish(IJetStream js, string subject, string prefix, int count, bool verbose = true) + { + if (verbose) + { + Console.Write("Publish ->"); + } + + for (int x = 1; x <= count; x++) + { + string data = prefix + x; + if (verbose) + { + Console.Write(" " + data); + } + + js.Publish(subject, Encoding.UTF8.GetBytes(data)); + } + + if (verbose) + { + Console.WriteLine(" <-"); + } + } + + public static void PublishInBackground(IJetStream js, string subject, string prefix, int count) + { + new Thread(() => + { + try + { + for (int x = 1; x <= count; x++) + { + js.Publish(subject, Encoding.ASCII.GetBytes(prefix + "-" + x)); + } + } + catch (Exception e) + { + Console.WriteLine(e); + Environment.Exit(-1); + } + }).Start(); + Thread.Sleep(100); // give the publish thread a little time to get going + } + + // ---------------------------------------------------------------------------------------------------- + // READ MESSAGES + // ---------------------------------------------------------------------------------------------------- + public static IList ReadMessagesAck(ISyncSubscription sub, bool verbose = true, int timeout = 1000) + { + if (verbose) + { + Console.Write("Read/Ack ->"); + } + + IList messages = new List(); + bool keepGoing = true; + while (keepGoing) + { + try + { + Msg msg = sub.NextMessage(timeout); + messages.Add(msg); + msg.Ack(); + if (verbose) + { + Console.Write(" " + msg.Subject + " / " + Encoding.UTF8.GetString(msg.Data)); + } + } + catch (NATSTimeoutException) // timeout means there are no messages available + { + keepGoing = false; + } + } + + if (verbose) + { + Console.Write(messages.Count == 0 ? " No messages available <-\n" : " <-\n"); + } + + return messages; + } + + // ---------------------------------------------------------------------------------------------------- + // REPORT + // ---------------------------------------------------------------------------------------------------- + public static void Report(IList list) + { + Console.Write("Fetch ->"); + foreach (Msg m in list) + { + Console.Write(" " + Encoding.UTF8.GetString(m.Data)); + } + + Console.Write(" <- \n"); + } + + private static readonly Random Random = new Random(); + private const string RandomChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"; + private const string RandomIdChars = "0123456789abcdef"; + + public static string RandomText() + { + return new string(Enumerable.Repeat(RandomChars, 20) + .Select(s => s[Random.Next(s.Length)]).ToArray()); + } + + public static string RandomId() + { + return new string(Enumerable.Repeat(RandomIdChars, 6) + .Select(s => s[Random.Next(s.Length)]).ToArray()); + } + } + +} diff --git a/src/Messaging/NBB.Messaging.JetStream/NBB.Messaging.JetStream.csproj b/src/Messaging/NBB.Messaging.JetStream/NBB.Messaging.JetStream.csproj new file mode 100644 index 00000000..c2d1fb63 --- /dev/null +++ b/src/Messaging/NBB.Messaging.JetStream/NBB.Messaging.JetStream.csproj @@ -0,0 +1,30 @@ + + + + SAK + SAK + SAK + SAK + + + + net7.0 + Nats JetStream messaging + + + + + + + + + + + + + + + + + + diff --git a/src/Messaging/README.md b/src/Messaging/README.md index d66efcd3..6c0deb14 100644 --- a/src/Messaging/README.md +++ b/src/Messaging/README.md @@ -36,6 +36,7 @@ Messaging transports ----------------- The message bus uses an abstraction over the messaging transport. The following implementations are currently supported: * **NATS Streaming** (*NBB.Messaging.Nats* package) - https://nats.io +* **JetStream NATS** (*NBB.Messaging.JetStream* package) - https://nats.io * **In-process** (*NBB.Messaging.InProcessMessaging*) - can be used as *test doubles* in integration tests * **Noop** (*NBB.Messaging.Noop*) - NoOp implementation of messaging transport * **Rusi** (*NBB.Messaging.Rusi*) - it's a transport implementation for https://github.com/osstotalsoft/rusi, which will handle some messaging concerns externally