Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RetryPolicy to keep SignalR connection alive and replace List<RealtimeData> with ImmutableList<RealtimeData> #24

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions server/PathServices/RetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.AspNetCore.SignalR.Client;
using Serilog;
using System;
using PathApi.Server.PathServices.Models;
using PathApi.V1;

namespace PathApi.Server.PathServices
{
internal sealed class RetryPolicy : IRetryPolicy
{
private readonly Station station;
private readonly RouteDirection direction;

public RetryPolicy(Station station, RouteDirection direction)
{
this.station = station;
this.direction = direction;
}

public TimeSpan? NextRetryDelay(RetryContext retryContext)
{
Log.Logger.Here().Warning("SignalR connection for S:{station} D:{direction} retrying because of {retryReason}, total retry count {previousRetryCount}", this.station, this.direction, retryContext.RetryReason, retryContext.PreviousRetryCount);
return TimeSpan.FromSeconds(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5));
}
}
}
102 changes: 80 additions & 22 deletions server/PathServices/SignalRRealtimeDataRepository.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace PathApi.Server.PathServices
namespace PathApi.Server.PathServices
{
using Microsoft.AspNetCore.SignalR.Client;
using Newtonsoft.Json;
Expand All @@ -8,6 +8,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -16,10 +18,11 @@
/// </summary>
internal sealed class SignalRRealtimeDataRepository : IRealtimeDataRepository, IDisposable
{
private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromSeconds(5);
private readonly IPathDataRepository pathDataRepository;
private readonly IPathApiClient pathApiClient;
private readonly ConcurrentDictionary<(Station, RouteDirection), HubConnection> hubConnections;
private readonly ConcurrentDictionary<(Station, RouteDirection), List<RealtimeData>> realtimeData;
private readonly ConcurrentDictionary<(Station, RouteDirection), ImmutableList<RealtimeData>> realtimeData;

/// <summary>
/// Constructs a new instance of the <see cref="SignalRRealtimeDataRepository"/>.
Expand All @@ -29,7 +32,7 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa
this.pathDataRepository = pathDataRepository;
this.pathApiClient = pathApiClient;
this.hubConnections = new ConcurrentDictionary<(Station, RouteDirection), HubConnection>();
this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), List<RealtimeData>>();
this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), ImmutableList<RealtimeData>>();

this.pathDataRepository.OnDataUpdate += this.PathSqlDbUpdated;
}
Expand All @@ -41,12 +44,37 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa
/// <returns>A collection of arriving trains.</returns>
public Task<IEnumerable<RealtimeData>> GetRealtimeData(Station station)
{
return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow));
var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ));
var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Worried that this condition will always be met when the headways are really large... it may be fine. Need to double check.

if (allData.Count() != freshData.Count())
{
var staledData = allData.Except(freshData);
foreach (var staledDataPoint in staledData)
Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated);

Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection...");
Task.Run(this.CreateHubConnections).Wait();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make this method async you could await CreateHubConnections and just return freshData (rather than needing to wrap it in a task).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once I deal with the concurrency issue of CreateHubConnections I'll avoid calling .Wait() so this function can return immediately.
Currently there are 2 potential issues:

                    if (!this.hubConnections.ContainsKey((station, direction)))

This check is not reliable as without lock another thread can substitute the connection with a new one and the condition will still return true.

        private async Task CloseExistingHubConnections()
        {
            // Materialize the connections so we can clear the dictionary before disconnecting.
            // Otherwise, we will reconnect before reinitializing the connection (potentially
            // causing a loop if the token changes).
            var connections = this.hubConnections.Values.ToArray();
            this.hubConnections.Clear();

            await Task.WhenAll(connections.Select(async (client) => await client.DisposeAsync()));
        }

Another thread might be able to run between .ToArray() and .Clear() such that it will try to dispose the connections again.

The goal is to make sure once an HubConnection instance is retrieved from dictionary it remain exclusive to the thread until being put back(or not), maybe TryRemove will do.

Once refactor is done we should be able to reconnect a single HubConnection for more fine-grained control, if staled data is detected for the (Station, RouteDirection) pair.

}
return Task.FromResult(freshData);
}

private IEnumerable<RealtimeData> GetRealtimeData(Station station, RouteDirection direction)
{
return this.realtimeData.GetValueOrDefault((station, direction), new List<RealtimeData>());
Log.Logger.Here().Debug("Getting realtime data for {station}-{direction}...", station, direction);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
var emptyRealtimeData = ImmutableList.Create<RealtimeData>();
var realtimeDataResult = this.realtimeData.GetValueOrDefault((station, direction), emptyRealtimeData);
stopWatch.Stop();
if (realtimeDataResult.Count() != 0)
{
Log.Logger.Here().Debug("Got {count} realtime dataPoint(s) for {station}-{direction}", realtimeDataResult.Count(), station, direction);
} else
{
Log.Logger.Here().Information("Got no realtime dataPoint for {station}-{direction}, this might indicate a problem either on the server or the client side", station, direction);
}
Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, stopWatch.Elapsed);
return realtimeDataResult;
}

private void PathSqlDbUpdated(object sender, EventArgs args)
Expand All @@ -66,7 +94,7 @@ await Task.WhenAll(StationMappings.StationToSignalRTokenName.SelectMany(station
RouteDirectionMappings.RouteDirectionToDirectionKey.Select(direction => this.CreateHubConnection(tokenBrokerUrl, tokenValue, station.Key, direction.Key))));
}

private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction, int sequentialFailures = 0)
private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction)
{
SignalRToken token;

Expand All @@ -77,22 +105,18 @@ private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue,

var connection = new HubConnectionBuilder()
.WithUrl(token.Url, c => c.AccessTokenProvider = () => Task.FromResult(token.AccessToken))
.WithAutomaticReconnect(new RetryPolicy(station, direction))
.Build();

connection.KeepAliveInterval = this.KEEP_ALIVE_INTERVAL;

connection.On<string, string>("SendMessage", (_, json) =>
this.ProcessNewMessage(station, direction, json)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult());

async Task RetryConnection()
{
await Task.Delay(new Random().Next(1, 7) * (1000 * Math.Min(sequentialFailures + 1, 5)));
await this.CreateHubConnection(tokenBrokerUrl, tokenValue, station, direction, sequentialFailures + 1);
};

connection.Closed += async (e) =>
{
connection.Closed += async (e) => {
if (!this.hubConnections.ContainsKey((station, direction)))
{
return;
Expand All @@ -103,8 +127,8 @@ async Task RetryConnection()
Log.Logger.Here().Warning(e, "SignalR connection was closed as a result of an exception");
}

Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction);
await RetryConnection();
// Disable warning: This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
await Task.CompletedTask;
};

try
Expand All @@ -114,10 +138,12 @@ async Task RetryConnection()
catch (Exception ex)
{
Log.Logger.Here().Warning(ex, "SignalR connection failed to start for {station}-{direction}...", station, direction);
await RetryConnection();
}

this.hubConnections.AddOrUpdate((station, direction), connection, (_, __) => connection);
this.hubConnections.AddOrUpdate((station, direction), connection, (key, existingConnection) =>
{
return connection;
});
}
catch (Exception ex)
{
Expand All @@ -137,8 +163,9 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction,
}
catch (Exception) { /* Ignore. */ }

Log.Logger.Here().Debug("SignalR Hub ProcessNewMessage for {station}-{direction}...", station, direction);

List<RealtimeData> newData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage =>
var newImmtubaleData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage =>
{
var realtimeData = new RealtimeData()
{
Expand All @@ -161,8 +188,39 @@ private async Task ProcessNewMessage(Station station, RouteDirection direction,
}
realtimeData.Route = route;
return realtimeData;
}))).ToList();
this.realtimeData.AddOrUpdate((station, direction), newData, (ignored, oldData) => newData[0].LastUpdated > oldData[0].LastUpdated ? newData : oldData);
}))).ToImmutableList();

this.realtimeData.AddOrUpdate((station, direction), newImmtubaleData, (key, oldImmutableData) => {
var latestNewDataPointLastUpdated = DateTimeOffset.FromUnixTimeSeconds(0).DateTime; // 1970 epoch
foreach (var newDataPoint in newImmtubaleData) {
if (newDataPoint.LastUpdated > latestNewDataPointLastUpdated)
{
latestNewDataPointLastUpdated = newDataPoint.LastUpdated;
}
if (newDataPoint.DataExpiration <= DateTime.UtcNow)
{
Log.Logger.Here().Warning("Staled dataPoint received for S:{station} D:{direction} with timestamp {lastUpdated} expires at {expiration}", station, direction, newDataPoint.LastUpdated, newDataPoint.DataExpiration);
}
}

var updatedImmutableData = newImmtubaleData;
var oldDataNewerThanNewDataLastUpdatedCount = oldImmutableData.Where(oldDataPoint => oldDataPoint.LastUpdated > latestNewDataPointLastUpdated).Count();
if (oldDataNewerThanNewDataLastUpdatedCount > 0)
{
Log.Logger.Here().Warning("{count} dataPoint(s) in oldData are newer than newData for S:{station} D:{direction}, keeping oldData instead", oldDataNewerThanNewDataLastUpdatedCount, station, direction);
updatedImmutableData = oldImmutableData;
}
var filteredUpdatedImmutableData = updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToImmutableList();
if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count())
{
Log.Logger.Here().Warning("{removedCount} dataPoint(s) out of {totalCount} in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), updatedImmutableData.Count(), station, direction);
} else
{
// return existing data will improve performance
filteredUpdatedImmutableData = updatedImmutableData;
}
return filteredUpdatedImmutableData;
});
}
catch (Exception ex)
{
Expand Down Expand Up @@ -233,4 +291,4 @@ public sealed class RealtimeMessage
public DateTime DepartureTime { get; set; }
}
}
}
}
2 changes: 1 addition & 1 deletion server/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void Main(string[] args)
{
// Setup Logging
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.MinimumLevel.Debug()
.Enrich.FromLogContext()
.WriteTo.Console(outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] ({FilePath}.{MemberName}:{LineNumber}) {Message}{NewLine}{Exception}")
.CreateLogger();
Expand Down