From 4bc969bbd13c29e68af040ad98fe6173751dfcd1 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Tue, 20 Feb 2024 20:17:12 -0500 Subject: [PATCH 1/8] Create RetryPolicy.cs --- server/PathServices/RetryPolicy.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 server/PathServices/RetryPolicy.cs diff --git a/server/PathServices/RetryPolicy.cs b/server/PathServices/RetryPolicy.cs new file mode 100644 index 0000000..d7a1f8c --- /dev/null +++ b/server/PathServices/RetryPolicy.cs @@ -0,0 +1,15 @@ +using Microsoft.AspNetCore.SignalR.Client; +using Serilog; +using System; + +namespace PathApi.Server.PathServices +{ + internal sealed class RetryPolicy : IRetryPolicy + { + public TimeSpan? NextRetryDelay(RetryContext retryContext) + { + Log.Logger.Here().Warning("SignalR connection retrying because of {retryReason}, total retry count {previousRetryCount}", retryContext.RetryReason, retryContext.PreviousRetryCount); + return TimeSpan.FromTicks(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5) * (long)10e6); + } + } +} From b7dd4937a214ba0ad30e68bb9169c178d7f0e4a2 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Tue, 20 Feb 2024 20:18:01 -0500 Subject: [PATCH 2/8] Update SignalRRealtimeDataRepository.cs --- .../SignalRRealtimeDataRepository.cs | 97 +++++++++++++++---- 1 file changed, 77 insertions(+), 20 deletions(-) diff --git a/server/PathServices/SignalRRealtimeDataRepository.cs b/server/PathServices/SignalRRealtimeDataRepository.cs index 5fb4123..91fde43 100644 --- a/server/PathServices/SignalRRealtimeDataRepository.cs +++ b/server/PathServices/SignalRRealtimeDataRepository.cs @@ -1,4 +1,4 @@ -namespace PathApi.Server.PathServices +namespace PathApi.Server.PathServices { using Microsoft.AspNetCore.SignalR.Client; using Newtonsoft.Json; @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; @@ -16,10 +17,11 @@ /// internal sealed class SignalRRealtimeDataRepository : IRealtimeDataRepository, IDisposable { + private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromTicks(5 * (long)10e6); // 5s private readonly IPathDataRepository pathDataRepository; private readonly IPathApiClient pathApiClient; private readonly ConcurrentDictionary<(Station, RouteDirection), HubConnection> hubConnections; - private readonly ConcurrentDictionary<(Station, RouteDirection), List> realtimeData; + private readonly ConcurrentDictionary<(Station, RouteDirection), ImmutableList> realtimeData; /// /// Constructs a new instance of the . @@ -29,7 +31,7 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa this.pathDataRepository = pathDataRepository; this.pathApiClient = pathApiClient; this.hubConnections = new ConcurrentDictionary<(Station, RouteDirection), HubConnection>(); - this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), List>(); + this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), ImmutableList>(); this.pathDataRepository.OnDataUpdate += this.PathSqlDbUpdated; } @@ -41,12 +43,36 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa /// A collection of arriving trains. public Task> GetRealtimeData(Station station) { - return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow)); + var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)); + var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow); + if (allData.Count() != freshData.Count()) + { + var staledData = allData.Except(freshData); + foreach (var staledDataPoint in staledData) + Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated); + + Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection..."); + Task.Run(this.CreateHubConnections).Wait(); + } + return Task.FromResult(freshData); } private IEnumerable GetRealtimeData(Station station, RouteDirection direction) { - return this.realtimeData.GetValueOrDefault((station, direction), new List()); + Log.Logger.Here().Debug("Getting realtime data for {station}-{direction}...", station, direction); + var startTimestamp = DateTime.UtcNow; + var emptyRealtimeData = ImmutableList.Create(); + var realtimeDataResult = this.realtimeData.GetValueOrDefault((station, direction), emptyRealtimeData); + var endTimestamp = DateTime.UtcNow; + if (realtimeDataResult.Count() != 0) + { + Log.Logger.Here().Debug("Got {count} realtime dataPoint(s) for {station}-{direction}", realtimeDataResult.Count(), station, direction); + } else + { + Log.Logger.Here().Information("Got no realtime dataPoint for {station}-{direction}, this might indicate a problem either on the server or the client side", station, direction); + } + Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, endTimestamp - startTimestamp); + return realtimeDataResult; } private void PathSqlDbUpdated(object sender, EventArgs args) @@ -66,7 +92,7 @@ await Task.WhenAll(StationMappings.StationToSignalRTokenName.SelectMany(station RouteDirectionMappings.RouteDirectionToDirectionKey.Select(direction => this.CreateHubConnection(tokenBrokerUrl, tokenValue, station.Key, direction.Key)))); } - private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction, int sequentialFailures = 0) + private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction) { SignalRToken token; @@ -77,20 +103,17 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, var connection = new HubConnectionBuilder() .WithUrl(token.Url, c => c.AccessTokenProvider = () => Task.FromResult(token.AccessToken)) + .WithAutomaticReconnect(new RetryPolicy()) .Build(); + connection.KeepAliveInterval = this.KEEP_ALIVE_INTERVAL; + connection.On("SendMessage", (_, json) => this.ProcessNewMessage(station, direction, json) .ConfigureAwait(false) .GetAwaiter() .GetResult()); - async Task RetryConnection() - { - await Task.Delay(new Random().Next(1, 7) * (1000 * Math.Min(sequentialFailures + 1, 5))); - await this.CreateHubConnection(tokenBrokerUrl, tokenValue, station, direction, sequentialFailures + 1); - }; - connection.Closed += async (e) => { if (!this.hubConnections.ContainsKey((station, direction))) @@ -103,8 +126,7 @@ async Task RetryConnection() Log.Logger.Here().Warning(e, "SignalR connection was closed as a result of an exception"); } - Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction); - await RetryConnection(); + // Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction); }; try @@ -114,10 +136,13 @@ async Task RetryConnection() catch (Exception ex) { Log.Logger.Here().Warning(ex, "SignalR connection failed to start for {station}-{direction}...", station, direction); - await RetryConnection(); } - this.hubConnections.AddOrUpdate((station, direction), connection, (_, __) => connection); + this.hubConnections.AddOrUpdate((station, direction), connection, (key, existingConnection) => + { + + return connection; + }); } catch (Exception ex) { @@ -137,8 +162,9 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, } catch (Exception) { /* Ignore. */ } + Log.Logger.Here().Debug("SignalR Hub ProcessNewMessage for {station}-{direction}...", station, direction); - List newData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => + var newImmtubaleData = ImmutableList.Create((await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => { var realtimeData = new RealtimeData() { @@ -161,8 +187,39 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, } realtimeData.Route = route; return realtimeData; - }))).ToList(); - this.realtimeData.AddOrUpdate((station, direction), newData, (ignored, oldData) => newData[0].LastUpdated > oldData[0].LastUpdated ? newData : oldData); + }))).ToArray()); + + this.realtimeData.AddOrUpdate((station, direction), newImmtubaleData, (key, oldImmutableData) => { + var latestNewDataPointLastUpdated = DateTimeOffset.FromUnixTimeSeconds(0).DateTime; // 1970 epoch + foreach (var newDataPoint in newImmtubaleData) { + if (newDataPoint.LastUpdated > latestNewDataPointLastUpdated) + { + latestNewDataPointLastUpdated = newDataPoint.LastUpdated; + } + if (newDataPoint.DataExpiration <= DateTime.UtcNow) + { + Log.Logger.Here().Warning("Staled dataPoint received for S:{station} D:{direction} with timestamp {lastUpdated} expires at {expiration}", station, direction, newDataPoint.LastUpdated, newDataPoint.DataExpiration); + } + } + + var updatedImmutableData = newImmtubaleData; + var oldDataNewerThanNewDataLastUpdatedCount = oldImmutableData.Where(oldDataPoint => oldDataPoint.LastUpdated > latestNewDataPointLastUpdated).Count(); + if (oldDataNewerThanNewDataLastUpdatedCount > 0) + { + Log.Logger.Here().Warning("{count} dataPoint(s) in oldData are newer than newData for S:{station} D:{direction}, keeping oldData instead", oldDataNewerThanNewDataLastUpdatedCount, station, direction); + updatedImmutableData = oldImmutableData; + } + var filteredUpdatedImmutableData = ImmutableList.Create(updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToArray()); + if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count()) + { + Log.Logger.Here().Warning("{count} dataPoint(s) in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), station, direction); + } else + { + // return existing data will improve performance + filteredUpdatedImmutableData = updatedImmutableData; + } + return filteredUpdatedImmutableData; + }); } catch (Exception ex) { @@ -233,4 +290,4 @@ public sealed class RealtimeMessage public DateTime DepartureTime { get; set; } } } -} \ No newline at end of file +} From 16ad0af8f162493ae76ceea97acc3ac104fad5a4 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Tue, 20 Feb 2024 20:18:27 -0500 Subject: [PATCH 3/8] Update Program.cs --- server/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/Program.cs b/server/Program.cs index e3e698f..1df8ee9 100644 --- a/server/Program.cs +++ b/server/Program.cs @@ -22,7 +22,7 @@ static void Main(string[] args) { // Setup Logging Log.Logger = new LoggerConfiguration() - .MinimumLevel.Information() + .MinimumLevel.Debug() .Enrich.FromLogContext() .WriteTo.Console(outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] ({FilePath}.{MemberName}:{LineNumber}) {Message}{NewLine}{Exception}") .CreateLogger(); From 62b82dd4c137f432701edaefd24dd20dc9e2790d Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Tue, 20 Feb 2024 23:20:14 -0500 Subject: [PATCH 4/8] Address comments --- server/PathServices/RetryPolicy.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/PathServices/RetryPolicy.cs b/server/PathServices/RetryPolicy.cs index d7a1f8c..62adce1 100644 --- a/server/PathServices/RetryPolicy.cs +++ b/server/PathServices/RetryPolicy.cs @@ -9,7 +9,7 @@ internal sealed class RetryPolicy : IRetryPolicy public TimeSpan? NextRetryDelay(RetryContext retryContext) { Log.Logger.Here().Warning("SignalR connection retrying because of {retryReason}, total retry count {previousRetryCount}", retryContext.RetryReason, retryContext.PreviousRetryCount); - return TimeSpan.FromTicks(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5) * (long)10e6); + return TimeSpan.FromSeconds(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5)); } } } From 64ef78d5ccbae5f00f4e5cb50658578c73478499 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Tue, 20 Feb 2024 23:20:37 -0500 Subject: [PATCH 5/8] Address comments --- .../SignalRRealtimeDataRepository.cs | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/PathServices/SignalRRealtimeDataRepository.cs b/server/PathServices/SignalRRealtimeDataRepository.cs index 91fde43..bdde8be 100644 --- a/server/PathServices/SignalRRealtimeDataRepository.cs +++ b/server/PathServices/SignalRRealtimeDataRepository.cs @@ -9,7 +9,9 @@ namespace PathApi.Server.PathServices using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; + using System.Diagnostics; using System.Linq; + using System.Runtime.CompilerServices; using System.Threading.Tasks; /// @@ -17,7 +19,7 @@ namespace PathApi.Server.PathServices /// internal sealed class SignalRRealtimeDataRepository : IRealtimeDataRepository, IDisposable { - private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromTicks(5 * (long)10e6); // 5s + private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromSeconds(5); private readonly IPathDataRepository pathDataRepository; private readonly IPathApiClient pathApiClient; private readonly ConcurrentDictionary<(Station, RouteDirection), HubConnection> hubConnections; @@ -60,10 +62,11 @@ public Task> GetRealtimeData(Station station) private IEnumerable GetRealtimeData(Station station, RouteDirection direction) { Log.Logger.Here().Debug("Getting realtime data for {station}-{direction}...", station, direction); - var startTimestamp = DateTime.UtcNow; + Stopwatch stopWatch = new Stopwatch(); + stopWatch.Start(); var emptyRealtimeData = ImmutableList.Create(); var realtimeDataResult = this.realtimeData.GetValueOrDefault((station, direction), emptyRealtimeData); - var endTimestamp = DateTime.UtcNow; + stopWatch.Stop(); if (realtimeDataResult.Count() != 0) { Log.Logger.Here().Debug("Got {count} realtime dataPoint(s) for {station}-{direction}", realtimeDataResult.Count(), station, direction); @@ -71,7 +74,7 @@ private IEnumerable GetRealtimeData(Station station, RouteDirectio { Log.Logger.Here().Information("Got no realtime dataPoint for {station}-{direction}, this might indicate a problem either on the server or the client side", station, direction); } - Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, endTimestamp - startTimestamp); + Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, stopWatch.Elapsed); return realtimeDataResult; } @@ -114,8 +117,7 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, .GetAwaiter() .GetResult()); - connection.Closed += async (e) => - { + connection.Closed += async (e) => { if (!this.hubConnections.ContainsKey((station, direction))) { return; @@ -126,7 +128,8 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Log.Logger.Here().Warning(e, "SignalR connection was closed as a result of an exception"); } - // Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction); + // Disable warning: This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread. + await Task.CompletedTask; }; try @@ -140,7 +143,6 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, this.hubConnections.AddOrUpdate((station, direction), connection, (key, existingConnection) => { - return connection; }); } @@ -212,7 +214,7 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, var filteredUpdatedImmutableData = ImmutableList.Create(updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToArray()); if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count()) { - Log.Logger.Here().Warning("{count} dataPoint(s) in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), station, direction); + Log.Logger.Here().Warning("{removedCount} dataPoint(s) out of {totalCount} in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), updatedImmutableData.Count(), station, direction); } else { // return existing data will improve performance From f4f67c858e66e075324f746d41f889fc570d4370 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Wed, 21 Feb 2024 08:47:19 -0500 Subject: [PATCH 6/8] Update SignalRRealtimeDataRepository.cs --- server/PathServices/SignalRRealtimeDataRepository.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/PathServices/SignalRRealtimeDataRepository.cs b/server/PathServices/SignalRRealtimeDataRepository.cs index bdde8be..e1a8a80 100644 --- a/server/PathServices/SignalRRealtimeDataRepository.cs +++ b/server/PathServices/SignalRRealtimeDataRepository.cs @@ -11,7 +11,6 @@ namespace PathApi.Server.PathServices using System.Collections.Immutable; using System.Diagnostics; using System.Linq; - using System.Runtime.CompilerServices; using System.Threading.Tasks; /// @@ -166,7 +165,7 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, Log.Logger.Here().Debug("SignalR Hub ProcessNewMessage for {station}-{direction}...", station, direction); - var newImmtubaleData = ImmutableList.Create((await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => + var newImmtubaleData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => { var realtimeData = new RealtimeData() { @@ -189,7 +188,7 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, } realtimeData.Route = route; return realtimeData; - }))).ToArray()); + }))).ToImmutableList(); this.realtimeData.AddOrUpdate((station, direction), newImmtubaleData, (key, oldImmutableData) => { var latestNewDataPointLastUpdated = DateTimeOffset.FromUnixTimeSeconds(0).DateTime; // 1970 epoch @@ -211,7 +210,7 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, Log.Logger.Here().Warning("{count} dataPoint(s) in oldData are newer than newData for S:{station} D:{direction}, keeping oldData instead", oldDataNewerThanNewDataLastUpdatedCount, station, direction); updatedImmutableData = oldImmutableData; } - var filteredUpdatedImmutableData = ImmutableList.Create(updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToArray()); + var filteredUpdatedImmutableData = updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToImmutableList(); if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count()) { Log.Logger.Here().Warning("{removedCount} dataPoint(s) out of {totalCount} in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), updatedImmutableData.Count(), station, direction); From e206721360caa88e1eaf78d619be8cff8b34334a Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Wed, 21 Feb 2024 18:08:31 -0500 Subject: [PATCH 7/8] Update RetryPolicy.cs --- server/PathServices/RetryPolicy.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/PathServices/RetryPolicy.cs b/server/PathServices/RetryPolicy.cs index 62adce1..9812e59 100644 --- a/server/PathServices/RetryPolicy.cs +++ b/server/PathServices/RetryPolicy.cs @@ -1,14 +1,25 @@ using Microsoft.AspNetCore.SignalR.Client; using Serilog; using System; +using PathApi.Server.PathServices.Models; +using PathApi.V1; namespace PathApi.Server.PathServices { internal sealed class RetryPolicy : IRetryPolicy { + private readonly Station station; + private readonly RouteDirection direction; + + public RetryPolicy(Station station, RouteDirection direction) + { + this.station = station; + this.direction = direction; + } + public TimeSpan? NextRetryDelay(RetryContext retryContext) { - Log.Logger.Here().Warning("SignalR connection retrying because of {retryReason}, total retry count {previousRetryCount}", retryContext.RetryReason, retryContext.PreviousRetryCount); + Log.Logger.Here().Warning("SignalR connection for S:{station} D:{direction} retrying because of {retryReason}, total retry count {previousRetryCount}", this.station, this.direction, retryContext.RetryReason, retryContext.PreviousRetryCount); return TimeSpan.FromSeconds(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5)); } } From dcec5d12d341af41b944fe3f11ee89c7ab705d85 Mon Sep 17 00:00:00 2001 From: persuasive0pest <160201971+persuasive0pest@users.noreply.github.com> Date: Wed, 21 Feb 2024 18:08:48 -0500 Subject: [PATCH 8/8] Update SignalRRealtimeDataRepository.cs --- server/PathServices/SignalRRealtimeDataRepository.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/PathServices/SignalRRealtimeDataRepository.cs b/server/PathServices/SignalRRealtimeDataRepository.cs index e1a8a80..c3527e1 100644 --- a/server/PathServices/SignalRRealtimeDataRepository.cs +++ b/server/PathServices/SignalRRealtimeDataRepository.cs @@ -105,7 +105,7 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, var connection = new HubConnectionBuilder() .WithUrl(token.Url, c => c.AccessTokenProvider = () => Task.FromResult(token.AccessToken)) - .WithAutomaticReconnect(new RetryPolicy()) + .WithAutomaticReconnect(new RetryPolicy(station, direction)) .Build(); connection.KeepAliveInterval = this.KEEP_ALIVE_INTERVAL;