diff --git a/server/PathServices/RetryPolicy.cs b/server/PathServices/RetryPolicy.cs new file mode 100644 index 0000000..9812e59 --- /dev/null +++ b/server/PathServices/RetryPolicy.cs @@ -0,0 +1,26 @@ +using Microsoft.AspNetCore.SignalR.Client; +using Serilog; +using System; +using PathApi.Server.PathServices.Models; +using PathApi.V1; + +namespace PathApi.Server.PathServices +{ + internal sealed class RetryPolicy : IRetryPolicy + { + private readonly Station station; + private readonly RouteDirection direction; + + public RetryPolicy(Station station, RouteDirection direction) + { + this.station = station; + this.direction = direction; + } + + public TimeSpan? NextRetryDelay(RetryContext retryContext) + { + Log.Logger.Here().Warning("SignalR connection for S:{station} D:{direction} retrying because of {retryReason}, total retry count {previousRetryCount}", this.station, this.direction, retryContext.RetryReason, retryContext.PreviousRetryCount); + return TimeSpan.FromSeconds(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5)); + } + } +} diff --git a/server/PathServices/SignalRRealtimeDataRepository.cs b/server/PathServices/SignalRRealtimeDataRepository.cs index 5fb4123..c3527e1 100644 --- a/server/PathServices/SignalRRealtimeDataRepository.cs +++ b/server/PathServices/SignalRRealtimeDataRepository.cs @@ -1,4 +1,4 @@ -namespace PathApi.Server.PathServices +namespace PathApi.Server.PathServices { using Microsoft.AspNetCore.SignalR.Client; using Newtonsoft.Json; @@ -8,6 +8,8 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Collections.Immutable; + using System.Diagnostics; using System.Linq; using System.Threading.Tasks; @@ -16,10 +18,11 @@ /// internal sealed class SignalRRealtimeDataRepository : IRealtimeDataRepository, IDisposable { + private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromSeconds(5); private readonly IPathDataRepository pathDataRepository; private readonly IPathApiClient pathApiClient; private readonly ConcurrentDictionary<(Station, RouteDirection), HubConnection> hubConnections; - private readonly ConcurrentDictionary<(Station, RouteDirection), List> realtimeData; + private readonly ConcurrentDictionary<(Station, RouteDirection), ImmutableList> realtimeData; /// /// Constructs a new instance of the . @@ -29,7 +32,7 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa this.pathDataRepository = pathDataRepository; this.pathApiClient = pathApiClient; this.hubConnections = new ConcurrentDictionary<(Station, RouteDirection), HubConnection>(); - this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), List>(); + this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), ImmutableList>(); this.pathDataRepository.OnDataUpdate += this.PathSqlDbUpdated; } @@ -41,12 +44,37 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa /// A collection of arriving trains. public Task> GetRealtimeData(Station station) { - return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow)); + var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)); + var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow); + if (allData.Count() != freshData.Count()) + { + var staledData = allData.Except(freshData); + foreach (var staledDataPoint in staledData) + Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated); + + Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection..."); + Task.Run(this.CreateHubConnections).Wait(); + } + return Task.FromResult(freshData); } private IEnumerable GetRealtimeData(Station station, RouteDirection direction) { - return this.realtimeData.GetValueOrDefault((station, direction), new List()); + Log.Logger.Here().Debug("Getting realtime data for {station}-{direction}...", station, direction); + Stopwatch stopWatch = new Stopwatch(); + stopWatch.Start(); + var emptyRealtimeData = ImmutableList.Create(); + var realtimeDataResult = this.realtimeData.GetValueOrDefault((station, direction), emptyRealtimeData); + stopWatch.Stop(); + if (realtimeDataResult.Count() != 0) + { + Log.Logger.Here().Debug("Got {count} realtime dataPoint(s) for {station}-{direction}", realtimeDataResult.Count(), station, direction); + } else + { + Log.Logger.Here().Information("Got no realtime dataPoint for {station}-{direction}, this might indicate a problem either on the server or the client side", station, direction); + } + Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, stopWatch.Elapsed); + return realtimeDataResult; } private void PathSqlDbUpdated(object sender, EventArgs args) @@ -66,7 +94,7 @@ await Task.WhenAll(StationMappings.StationToSignalRTokenName.SelectMany(station RouteDirectionMappings.RouteDirectionToDirectionKey.Select(direction => this.CreateHubConnection(tokenBrokerUrl, tokenValue, station.Key, direction.Key)))); } - private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction, int sequentialFailures = 0) + private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction) { SignalRToken token; @@ -77,22 +105,18 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, var connection = new HubConnectionBuilder() .WithUrl(token.Url, c => c.AccessTokenProvider = () => Task.FromResult(token.AccessToken)) + .WithAutomaticReconnect(new RetryPolicy(station, direction)) .Build(); + connection.KeepAliveInterval = this.KEEP_ALIVE_INTERVAL; + connection.On("SendMessage", (_, json) => this.ProcessNewMessage(station, direction, json) .ConfigureAwait(false) .GetAwaiter() .GetResult()); - async Task RetryConnection() - { - await Task.Delay(new Random().Next(1, 7) * (1000 * Math.Min(sequentialFailures + 1, 5))); - await this.CreateHubConnection(tokenBrokerUrl, tokenValue, station, direction, sequentialFailures + 1); - }; - - connection.Closed += async (e) => - { + connection.Closed += async (e) => { if (!this.hubConnections.ContainsKey((station, direction))) { return; @@ -103,8 +127,8 @@ async Task RetryConnection() Log.Logger.Here().Warning(e, "SignalR connection was closed as a result of an exception"); } - Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction); - await RetryConnection(); + // Disable warning: This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread. + await Task.CompletedTask; }; try @@ -114,10 +138,12 @@ async Task RetryConnection() catch (Exception ex) { Log.Logger.Here().Warning(ex, "SignalR connection failed to start for {station}-{direction}...", station, direction); - await RetryConnection(); } - this.hubConnections.AddOrUpdate((station, direction), connection, (_, __) => connection); + this.hubConnections.AddOrUpdate((station, direction), connection, (key, existingConnection) => + { + return connection; + }); } catch (Exception ex) { @@ -137,8 +163,9 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, } catch (Exception) { /* Ignore. */ } + Log.Logger.Here().Debug("SignalR Hub ProcessNewMessage for {station}-{direction}...", station, direction); - List newData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => + var newImmtubaleData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage => { var realtimeData = new RealtimeData() { @@ -161,8 +188,39 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction, } realtimeData.Route = route; return realtimeData; - }))).ToList(); - this.realtimeData.AddOrUpdate((station, direction), newData, (ignored, oldData) => newData[0].LastUpdated > oldData[0].LastUpdated ? newData : oldData); + }))).ToImmutableList(); + + this.realtimeData.AddOrUpdate((station, direction), newImmtubaleData, (key, oldImmutableData) => { + var latestNewDataPointLastUpdated = DateTimeOffset.FromUnixTimeSeconds(0).DateTime; // 1970 epoch + foreach (var newDataPoint in newImmtubaleData) { + if (newDataPoint.LastUpdated > latestNewDataPointLastUpdated) + { + latestNewDataPointLastUpdated = newDataPoint.LastUpdated; + } + if (newDataPoint.DataExpiration <= DateTime.UtcNow) + { + Log.Logger.Here().Warning("Staled dataPoint received for S:{station} D:{direction} with timestamp {lastUpdated} expires at {expiration}", station, direction, newDataPoint.LastUpdated, newDataPoint.DataExpiration); + } + } + + var updatedImmutableData = newImmtubaleData; + var oldDataNewerThanNewDataLastUpdatedCount = oldImmutableData.Where(oldDataPoint => oldDataPoint.LastUpdated > latestNewDataPointLastUpdated).Count(); + if (oldDataNewerThanNewDataLastUpdatedCount > 0) + { + Log.Logger.Here().Warning("{count} dataPoint(s) in oldData are newer than newData for S:{station} D:{direction}, keeping oldData instead", oldDataNewerThanNewDataLastUpdatedCount, station, direction); + updatedImmutableData = oldImmutableData; + } + var filteredUpdatedImmutableData = updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToImmutableList(); + if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count()) + { + Log.Logger.Here().Warning("{removedCount} dataPoint(s) out of {totalCount} in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), updatedImmutableData.Count(), station, direction); + } else + { + // return existing data will improve performance + filteredUpdatedImmutableData = updatedImmutableData; + } + return filteredUpdatedImmutableData; + }); } catch (Exception ex) { @@ -233,4 +291,4 @@ public sealed class RealtimeMessage public DateTime DepartureTime { get; set; } } } -} \ No newline at end of file +} diff --git a/server/Program.cs b/server/Program.cs index e3e698f..1df8ee9 100644 --- a/server/Program.cs +++ b/server/Program.cs @@ -22,7 +22,7 @@ static void Main(string[] args) { // Setup Logging Log.Logger = new LoggerConfiguration() - .MinimumLevel.Information() + .MinimumLevel.Debug() .Enrich.FromLogContext() .WriteTo.Console(outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] ({FilePath}.{MemberName}:{LineNumber}) {Message}{NewLine}{Exception}") .CreateLogger();