From 8a3978c1ad422cd7c4fe45be077cd7c99512e9d8 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 13 Jan 2023 10:58:36 -0800 Subject: [PATCH 1/4] Various improvements to Redis providers * Use consistent configuration pattern * Add consistent configuration validators * Use unique key prefixes for all providers * Add expiry to all keys for testing * Consistently throw serializable exceptions * Update doc comments * Mark classes which do not need to be public as internal * Other minor cleanup --- .../HostingExtensions.ICientBuilder.cs | 11 +- .../Hosting/HostingExtensions.ISiloBuilder.cs | 16 +- .../Orleans.Clustering.Redis.csproj | 1 + .../Providers/RedisClusteringOptions.cs | 44 ++++- .../Storage/RedisMembershipTable.cs | 7 +- .../Hosting/RedisGrainDirectoryExtensions.cs | 11 +- .../Options/RedisGrainDirectoryOptions.cs | 27 ++- .../Orleans.GrainDirectory.Redis.csproj | 1 + .../Properties/AssemblyInfo.cs | 3 + .../RedisGrainDirectory.cs | 29 ++-- ...GrainStorageServiceCollectionExtensions.cs | 8 +- .../Hosting/RedisSiloBuilderExtensions.cs | 8 +- .../Orleans.Persistence.Redis.csproj | 9 +- .../Providers/RedisStorageOptions.cs | 42 +++-- .../Providers/RedisStorageOptionsValidator.cs | 22 +-- .../Storage/RedisGrainStorage.cs | 90 ++++++---- .../Hosting/SiloBuilderReminderExtensions.cs | 5 +- .../Orleans.Reminders.Redis.csproj | 1 + .../Providers/RedisReminderTableOptions.cs | 46 +++-- .../Storage/RedisReminderTable.cs | 161 ++++++++++++------ .../Storage/RedisRemindersException.cs | 43 +++++ .../Clustering/RedisMembershipTableTests.cs | 7 +- .../RedisGrainDirectoryTests.cs | 7 +- .../RedisMultipleGrainDirectoriesTests.cs | 7 - .../Tester.Redis/Persistence/GrainState.cs | 1 - .../Persistence/RedisPersistenceGrainTests.cs | 28 ++- .../Persistence/RedisPersistenceSetupTests.cs | 15 +- .../Reminders/RedisReminderTableTests.cs | 15 +- .../Tester.Redis/Utility/TestExtensions.cs | 3 - .../PersistenceTestGrains.cs | 12 +- .../PersistentStateTestGrains.cs | 4 +- .../Tester/Directories/GrainDirectoryTests.cs | 8 +- .../MembershipTableTestsBase.cs | 16 +- 33 files changed, 464 insertions(+), 244 deletions(-) create mode 100644 src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs create mode 100644 src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs index d9f82f6c52..5f034a1bcf 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs @@ -1,9 +1,10 @@ -using System; +using System; using Orleans; using Microsoft.Extensions.DependencyInjection; using Orleans.Hosting; using Orleans.Messaging; using Orleans.Clustering.Redis; +using StackExchange.Redis; namespace Microsoft.Extensions.Hosting { @@ -25,7 +26,7 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, Act } services - .AddRedis() + .AddRedisClustering() .AddSingleton(); }); } @@ -38,10 +39,10 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, str return builder.ConfigureServices(services => services .Configure(opt => { - opt.ConnectionString = redisConnectionString; - opt.Database = db; + opt.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); + opt.ConfigurationOptions.DefaultDatabase = db; }) - .AddRedis() + .AddRedisClustering() .AddSingleton()); } diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs index 5ae281fed9..f50ab0d55d 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs @@ -1,8 +1,9 @@ -using System; +using System; using Orleans; using Microsoft.Extensions.DependencyInjection; using Orleans.Hosting; using Orleans.Clustering.Redis; +using StackExchange.Redis; namespace Microsoft.Extensions.Hosting { @@ -23,7 +24,7 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action< services.Configure(configuration); } - services.AddRedis(); + services.AddRedisClustering(); }); } @@ -33,13 +34,18 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action< public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString, int db = 0) { return builder.ConfigureServices(services => services - .Configure(options => { options.Database = db; options.ConnectionString = redisConnectionString; }) - .AddRedis()); + .Configure(options => + { + options.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); + options.ConfigurationOptions.DefaultDatabase = db; + }) + .AddRedisClustering()); } - internal static IServiceCollection AddRedis(this IServiceCollection services) + internal static IServiceCollection AddRedisClustering(this IServiceCollection services) { services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(sp => sp.GetRequiredService()); return services; } diff --git a/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj b/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj index 2bd5989d2e..74c237e7c4 100644 --- a/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj +++ b/src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Clustering $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs index 63ae47f835..d444983fee 100644 --- a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs +++ b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs @@ -1,3 +1,4 @@ +using Orleans.Runtime; using StackExchange.Redis; using System; using System.Threading.Tasks; @@ -10,26 +11,55 @@ namespace Orleans.Clustering.Redis public class RedisClusteringOptions { /// - /// Specifies the database identi + /// Gets or sets the Redis client configuration. /// - public int Database { get; set; } + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } /// - /// The connection string. + /// The delegate used to create a Redis connection multiplexer. /// - public string ConnectionString { get; set; } = "localhost:6379"; + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; /// - /// The delegate used to create a Redis connection multiplexer. + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause entries to be deleted after some period of time. /// - public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + public TimeSpan? EntryExpiry { get; set; } = null; /// /// The default multiplexer creation delegate. /// public static async Task DefaultCreateMultiplexer(RedisClusteringOptions options) { - return await ConnectionMultiplexer.ConnectAsync(options.ConnectionString); + return await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); + } + + /// + /// Configuration validator for . + /// + public class RedisClusteringOptionsValidator : IConfigurationValidator + { + private readonly RedisClusteringOptions _options; + + public RedisClusteringOptionsValidator(RedisClusteringOptions options) + { + _options = options; + } + + /// + public void ValidateConfiguration() + { + if (_options.ConfigurationOptions == null) + { + throw new OrleansConfigurationException($"Invalid {nameof(RedisClusteringOptions)} values for {nameof(RedisMembershipTable)}. {nameof(_options.ConfigurationOptions)} is required."); + } } } } \ No newline at end of file diff --git a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs index 2a9c9981c7..55219e75ec 100644 --- a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs +++ b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs @@ -6,8 +6,8 @@ using Newtonsoft.Json; using System.Linq; using Microsoft.Extensions.Options; -using System.Runtime.CompilerServices; using System.Globalization; +using System.Text; namespace Orleans.Clustering.Redis { @@ -26,7 +26,7 @@ public RedisMembershipTable(IOptions redisOptions, IOpti { _redisOptions = redisOptions.Value; _clusterOptions = clusterOptions.Value; - _clusterKey = $"{_clusterOptions.ServiceId}/{_clusterOptions.ClusterId}"; + _clusterKey = Encoding.UTF8.GetBytes($"{_clusterOptions.ServiceId}/members/{_clusterOptions.ClusterId}"); _jsonSerializerSettings = JsonSettings.JsonSerializerSettings; } @@ -40,11 +40,12 @@ public async Task DeleteMembershipTableEntries(string clusterId) public async Task InitializeMembershipTable(bool tryInitTableVersion) { _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); - _db = _muxer.GetDatabase(_redisOptions.Database); + _db = _muxer.GetDatabase(); if (tryInitTableVersion) { await _db.HashSetAsync(_clusterKey, TableVersionKey, SerializeVersion(DefaultTableVersion), When.NotExists); + await _db.KeyExpireAsync(_clusterKey, _redisOptions.EntryExpiry); } this.IsInitialized = true; diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs index 6566c03a17..9f01b1bf39 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs @@ -8,10 +8,13 @@ namespace Orleans.Hosting { + /// + /// Extensions for configuring Redis as a grain directory provider. + /// public static class RedisGrainDirectoryExtensions { /// - /// Use a Redis data-store as the default Grain Directory + /// Adds a default grain directory which persists entries in Redis. /// public static ISiloBuilder UseRedisGrainDirectoryAsDefault( this ISiloBuilder builder, @@ -21,7 +24,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault( } /// - /// Use a Redis data-store as the default Grain Directory + /// Adds a default grain directory which persists entries in Redis. /// public static ISiloBuilder UseRedisGrainDirectoryAsDefault( this ISiloBuilder builder, @@ -31,7 +34,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault( } /// - /// Add a Redis data-store as a named Grain Directory + /// Adds a named grain directory which persists entries in Redis. /// public static ISiloBuilder AddRedisGrainDirectory( this ISiloBuilder builder, @@ -42,7 +45,7 @@ public static ISiloBuilder AddRedisGrainDirectory( } /// - /// Add a Redis data-store as a named Grain Directory + /// Adds a named grain directory which persists entries in Redis. /// public static ISiloBuilder AddRedisGrainDirectory( this ISiloBuilder builder, diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs index 7220bb65f3..b81a8d031d 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Orleans.GrainDirectory.Redis; using Orleans.Runtime; using StackExchange.Redis; @@ -11,37 +12,51 @@ namespace Orleans.Configuration public class RedisGrainDirectoryOptions { /// - /// Configure the Redis client + /// Gets or sets the Redis client configuration. /// [RedactRedisConfigurationOptions] public ConfigurationOptions ConfigurationOptions { get; set; } + /// + /// The delegate used to create a Redis connection multiplexer. + /// + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + /// /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). /// Setting a value different from null will cause duplicate activations in the cluster. /// public TimeSpan? EntryExpiry { get; set; } = null; + + /// + /// The default multiplexer creation delegate. + /// + public static async Task DefaultCreateMultiplexer(RedisGrainDirectoryOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); } - public class RedactRedisConfigurationOptions : RedactAttribute + internal class RedactRedisConfigurationOptions : RedactAttribute { public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); } + /// + /// Configuration validator for . + /// public class RedisGrainDirectoryOptionsValidator : IConfigurationValidator { - private readonly RedisGrainDirectoryOptions options; + private readonly RedisGrainDirectoryOptions _options; public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options) { - this.options = options; + _options = options; } + /// public void ValidateConfiguration() { - if (this.options.ConfigurationOptions == null) + if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj b/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj index 5359bef72e..3ece13b004 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj +++ b/src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Grain Directory $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs b/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..0fc65c7673 --- /dev/null +++ b/src/Redis/Orleans.GrainDirectory.Redis/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Tester.Redis")] diff --git a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs index c6100da49a..d38cfcb6aa 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -16,8 +17,9 @@ public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant logger; + private readonly RedisKey _keyPrefix; - private ConnectionMultiplexer redis; + private IConnectionMultiplexer redis; private IDatabase database; private LuaScript deleteScript; @@ -29,6 +31,7 @@ public RedisGrainDirectory( this.directoryOptions = directoryOptions; this.logger = logger; this.clusterOptions = clusterOptions.Value; + _keyPrefix = Encoding.UTF8.GetBytes($"{this.clusterOptions.ClusterId}/directory/"); } public async Task Lookup(GrainId grainId) @@ -121,7 +124,7 @@ public void Participate(ISiloLifecycle lifecycle) public async Task Initialize(CancellationToken ct = default) { - this.redis = await ConnectionMultiplexer.ConnectAsync(this.directoryOptions.ConfigurationOptions); + this.redis = await directoryOptions.CreateMultiplexer(directoryOptions); // Configure logging this.redis.ConnectionRestored += this.LogConnectionRestored; @@ -132,16 +135,16 @@ public async Task Initialize(CancellationToken ct = default) this.database = this.redis.GetDatabase(); this.deleteScript = LuaScript.Prepare( - @" -local cur = redis.call('GET', @key) -if cur ~= false then - local typedCur = cjson.decode(cur) - if typedCur.ActivationId == @val then - return redis.call('DEL', @key) - end -end -return 0 - "); + """ + local cur = redis.call('GET', @key) + if cur ~= false then + local typedCur = cjson.decode(cur) + if typedCur.ActivationId == @val then + return redis.call('DEL', @key) + end + end + return 0 + """); } private async Task Uninitialize(CancellationToken arg) @@ -155,7 +158,7 @@ private async Task Uninitialize(CancellationToken arg) } } - private string GetKey(GrainId grainId) => $"{this.clusterOptions.ClusterId}-{grainId}"; + private string GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); #region Logging private void LogConnectionRestored(object sender, ConnectionFailedEventArgs e) diff --git a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs index 7c69e5fa9e..e79b50813c 100644 --- a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisGrainStorageServiceCollectionExtensions.cs @@ -17,7 +17,7 @@ namespace Orleans.Hosting public static class RedisGrainStorageServiceCollectionExtensions { /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceCollection services, Action configureOptions) { @@ -25,7 +25,7 @@ public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceColl } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static IServiceCollection AddRedisGrainStorage(this IServiceCollection services, string name, Action configureOptions) { @@ -33,7 +33,7 @@ public static IServiceCollection AddRedisGrainStorage(this IServiceCollection se } /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceCollection services, Action> configureOptions = null) { @@ -41,7 +41,7 @@ public static IServiceCollection AddRedisGrainStorageAsDefault(this IServiceColl } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static IServiceCollection AddRedisGrainStorage(this IServiceCollection services, string name, Action> configureOptions = null) diff --git a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs index 45d948d0c0..a5b3d0b586 100644 --- a/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Hosting/RedisSiloBuilderExtensions.cs @@ -11,7 +11,7 @@ namespace Orleans.Hosting public static class RedisSiloBuilderExtensions { /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder builder, Action configureOptions) { @@ -19,7 +19,7 @@ public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder build } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, string name, Action configureOptions) { @@ -27,7 +27,7 @@ public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, strin } /// - /// Configure silo to use Redis as the default grain storage. + /// Configures Redis as the default grain storage provider. /// public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder builder, Action> configureOptions = null) { @@ -35,7 +35,7 @@ public static ISiloBuilder AddRedisGrainStorageAsDefault(this ISiloBuilder build } /// - /// Configure silo to use Redis for grain storage. + /// Configures Redis as a grain storage provider. /// public static ISiloBuilder AddRedisGrainStorage(this ISiloBuilder builder, string name, Action> configureOptions = null) { diff --git a/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj b/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj index 7ad009fe77..cbdaf41d30 100644 --- a/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj +++ b/src/Redis/Orleans.Persistence.Redis/Orleans.Persistence.Redis.csproj @@ -1,12 +1,13 @@ - Microsoft.Orleans.Persistance.Redis - Microsoft Orleans Persistance Redis Provider - Microsoft Orleans Persistance implementation that uses Redis - $(PackageTags) Redis Persistance + Microsoft.Orleans.Persistence.Redis + Microsoft Orleans Persistence Redis Provider + Microsoft Orleans Persistence implementation that uses Redis + $(PackageTags) Redis Persistence $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs index 9fb324278a..a37f21de65 100644 --- a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs +++ b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptions.cs @@ -1,4 +1,7 @@ -using Orleans.Storage; +using System; +using System.Threading.Tasks; +using Orleans.Storage; +using StackExchange.Redis; namespace Orleans.Persistence { @@ -7,21 +10,11 @@ namespace Orleans.Persistence /// public class RedisStorageOptions : IStorageProviderSerializerOptions { - /// - /// The connection string. - /// - public string ConnectionString { get; set; } = "localhost:6379"; - /// /// Whether or not to delete state during a clear operation. /// public bool DeleteOnClear { get; set; } - /// - /// The database number. - /// - public int? DatabaseNumber { get; set; } - /// /// Stage of silo lifecycle where storage should be initialized. Storage must be initialized prior to use. /// @@ -29,5 +22,32 @@ public class RedisStorageOptions : IStorageProviderSerializerOptions /// public IGrainStorageSerializer GrainStorageSerializer { get; set; } + + /// + /// Gets or sets the Redis client configuration. + /// + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } + + /// + /// The delegate used to create a Redis connection multiplexer. + /// + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + + /// + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause duplicate activations in the cluster. + /// + public TimeSpan? EntryExpiry { get; set; } = null; + + /// + /// The default multiplexer creation delegate. + /// + public static async Task DefaultCreateMultiplexer(RedisStorageOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); } } \ No newline at end of file diff --git a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs index 0dfd8a1bfc..08e50ff8e1 100644 --- a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs +++ b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs @@ -1,4 +1,4 @@ -using Orleans.Runtime; +using Orleans.Runtime; namespace Orleans.Persistence { @@ -9,27 +9,15 @@ internal class RedisStorageOptionsValidator : IConfigurationValidator public RedisStorageOptionsValidator(RedisStorageOptions options, string name) { - this._options = options; - this._name = name; + _options = options; + _name = name; } public void ValidateConfiguration() { - var msg = $"Configuration for {nameof(RedisGrainStorage)} - {_name} is invalid"; - - if (_options == null) - { - throw new OrleansConfigurationException($"{msg} - {nameof(RedisStorageOptions)} is null"); - } - - if (string.IsNullOrWhiteSpace(_options.ConnectionString)) - { - throw new OrleansConfigurationException($"{msg} - {nameof(_options.ConnectionString)} is null or empty"); - } - - if (!_options.ConnectionString.Contains(':')) // host:port delimiter + if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"{msg} - {nameof(_options.ConnectionString)} invalid format: {_options.ConnectionString}, should contain host and port delimited by ':'"); + throw new OrleansConfigurationException($"Invalid {nameof(RedisStorageOptions)} values for {nameof(RedisGrainStorage)} with name {_name}. {nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs index 763c43c2b6..9b5147348f 100644 --- a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs +++ b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs @@ -1,6 +1,8 @@ -using System; +using System; using System.Diagnostics; +using System.Globalization; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -17,20 +19,33 @@ namespace Orleans.Persistence /// /// Redis-based grain storage provider /// - public class RedisGrainStorage : IGrainStorage, ILifecycleParticipant + internal class RedisGrainStorage : IGrainStorage, ILifecycleParticipant { - private const string WriteScript = "local etag = redis.call('HGET', KEYS[1], 'etag')\nif etag == false or etag == ARGV[1] then return redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) else return false end"; + private const string WriteScript = + """ + local etag = redis.call('HGET', KEYS[1], 'etag') + if etag == false or etag == ARGV[1] then + local result = redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) + if ARGV[4] ~= '-1' then + redis.call('EXPIRE', KEYS[1], ARGV[4]) + end + return result + else + return false + end + """; private const int ReloadWriteScriptMaxCount = 3; private readonly string _serviceId; + private readonly RedisValue _ttl; + private readonly RedisKey _keyPrefix; private readonly string _name; private readonly ILogger _logger; private readonly RedisStorageOptions _options; private readonly IGrainStorageSerializer _grainStorageSerializer; - private ConnectionMultiplexer _connection; + private IConnectionMultiplexer _connection; private IDatabase _db; - private ConfigurationOptions _redisOptions; private LuaScript _preparedWriteScript; private byte[] _preparedWriteScriptHash; @@ -49,6 +64,8 @@ public RedisGrainStorage( _options = options; _grainStorageSerializer = options.GrainStorageSerializer ?? grainStorageSerializer; _serviceId = clusterOptions.Value.ServiceId; + _ttl = options.EntryExpiry is { } ts ? ts.TotalSeconds.ToString(CultureInfo.InvariantCulture) : "-1"; + _keyPrefix = Encoding.UTF8.GetBytes($"{_serviceId}/state/"); } /// @@ -67,24 +84,14 @@ private async Task Init(CancellationToken cancellationToken) if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogDebug( - "RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DatabaseNumber={DatabaseNumber} DeleteOnClear={DeleteOnClear}", + "RedisGrainStorage {Name} is initializing: ServiceId={ServiceId} DeleteOnClear={DeleteOnClear}", _name, _serviceId, - _options.DatabaseNumber, _options.DeleteOnClear); } - _redisOptions = ConfigurationOptions.Parse(_options.ConnectionString); - _connection = await ConnectionMultiplexer.ConnectAsync(_redisOptions).ConfigureAwait(false); - - if (_options.DatabaseNumber.HasValue) - { - _db = _connection.GetDatabase(_options.DatabaseNumber.Value); - } - else - { - _db = _connection.GetDatabase(); - } + _connection = await _options.CreateMultiplexer(_options).ConfigureAwait(false); + _db = _connection.GetDatabase(); _preparedWriteScript = LuaScript.Prepare(WriteScript); _preparedWriteScriptHash = await LoadWriteScriptAsync().ConfigureAwait(false); @@ -108,7 +115,8 @@ private async Task Init(CancellationToken cancellationToken) _name, _serviceId, timer.Elapsed.TotalMilliseconds.ToString("0.00")); - throw; + + throw new RedisStorageException(Invariant($"{ex.GetType()}: {ex.Message}")); } } @@ -116,7 +124,6 @@ private async Task LoadWriteScriptAsync() { Debug.Assert(_connection is not null); Debug.Assert(_preparedWriteScript is not null); - Debug.Assert(_redisOptions.EndPoints.Count > 0); System.Net.EndPoint[] endPoints = _connection.GetEndPoints(); var loadTasks = new Task[endPoints.Length]; @@ -134,7 +141,7 @@ private async Task LoadWriteScriptAsync() /// public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainState grainState) { - var key = grainId.ToString(); + var key = GetKey(grainId); try { @@ -168,18 +175,18 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) { var etag = grainState.ETag ?? "null"; - var key = grainId.ToString(); - var newEtag = Guid.NewGuid().ToString(); + var key = GetKey(grainId); + var newEtag = Guid.NewGuid().ToString("N"); - RedisValue payload = default; - RedisResult writeWithScriptResponse = null; + RedisResult writeWithScriptResponse; try { - payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); - writeWithScriptResponse = await WriteToRedisUsingPreparedScriptAsync(payload, - etag: etag, - key: key, - newEtag: newEtag) + var payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); + writeWithScriptResponse = await WriteToRedisUsingPreparedScriptAsync( + payload, + etag: etag, + key: key, + newEtag: newEtag) .ConfigureAwait(false); } catch (Exception e) @@ -195,19 +202,18 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt if (writeWithScriptResponse is not null && writeWithScriptResponse.IsNull) { - throw new InconsistentStateException($"ETag mismatch - tried with ETag: {grainState.ETag}"); + throw new InconsistentStateException(Invariant($"ETag mismatch - tried with ETag: {grainState.ETag}")); } grainState.ETag = newEtag; } - private Task WriteToRedisUsingPreparedScriptAsync(RedisValue payload, string etag, string key, string newEtag) + private Task WriteToRedisUsingPreparedScriptAsync(RedisValue payload, string etag, RedisKey key, string newEtag) { var keys = new RedisKey[] { key }; - var args = new RedisValue[] { etag, newEtag, payload }; + var args = new RedisValue[] { etag, newEtag, payload, _ttl }; return WriteToRedisUsingPreparedScriptAsync(attemptNum: 0); - async Task WriteToRedisUsingPreparedScriptAsync(int attemptNum) { try @@ -228,14 +234,26 @@ async Task WriteToRedisUsingPreparedScriptAsync(int attemptNum) return await WriteToRedisUsingPreparedScriptAsync(attemptNum: attemptNum + 1) .ConfigureAwait(false); } + catch (Exception exception) + { + throw new RedisStorageException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } } + private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); + /// public async Task ClearStateAsync(string grainType, GrainId grainId, IGrainState grainState) { - var key = grainId.ToString(); - await _db.KeyDeleteAsync(key).ConfigureAwait(false); + try + { + await _db.KeyDeleteAsync(GetKey(grainId)).ConfigureAwait(false); + } + catch (Exception exception) + { + throw new RedisStorageException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } private async Task Close(CancellationToken cancellationToken) diff --git a/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs b/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs index 34a9a11fd2..4ea2dcc4f0 100644 --- a/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs +++ b/src/Redis/Orleans.Reminders.Redis/Hosting/SiloBuilderReminderExtensions.cs @@ -1,7 +1,7 @@ -using System; +using System; using Microsoft.Extensions.DependencyInjection; - +using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Hosting; using Orleans.Reminders.Redis; @@ -47,6 +47,7 @@ public static IServiceCollection UseRedisReminderService(this IServiceCollection { services.AddSingleton(); services.Configure(configure); + services.AddSingleton(); services.ConfigureFormatter(); return services; } diff --git a/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj b/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj index e817a7e072..55290b343c 100644 --- a/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj +++ b/src/Redis/Orleans.Reminders.Redis/Orleans.Reminders.Redis.csproj @@ -7,6 +7,7 @@ $(PackageTags) Redis Reminders $(DefaultTargetFrameworks) beta1 + true diff --git a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs index 0cf0d8412b..e955f83233 100644 --- a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs +++ b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs @@ -1,6 +1,7 @@ -using System; +using System; using System.Threading.Tasks; - +using Orleans.Reminders.Redis; +using Orleans.Runtime; using StackExchange.Redis; namespace Orleans.Configuration @@ -10,29 +11,52 @@ namespace Orleans.Configuration /// public class RedisReminderTableOptions { - /// - /// The connection string. + /// Gets or sets the Redis client options. /// - public string ConnectionString { get; set; } = "localhost:6379"; + [RedactRedisConfigurationOptions] + public ConfigurationOptions ConfigurationOptions { get; set; } /// - /// The database number. + /// The delegate used to create a Redis connection multiplexer. /// - public int? DatabaseNumber { get; set; } + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; /// - /// The delegate used to create a Redis connection multiplexer. + /// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests). + /// Setting a value different from null will cause reminder entries to be deleted after some period of time. /// - public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + public TimeSpan? EntryExpiry { get; set; } = null; /// /// The default multiplexer creation delegate. /// - public static async Task DefaultCreateMultiplexer(RedisReminderTableOptions options) + public static async Task DefaultCreateMultiplexer(RedisReminderTableOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions); + } + + internal class RedactRedisConfigurationOptions : RedactAttribute + { + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); + } + + /// + /// Configuration validator for . + /// + public class RedisReminderTableOptionsValidator : IConfigurationValidator + { + private readonly RedisReminderTableOptions _options; + + public RedisReminderTableOptionsValidator(RedisReminderTableOptions options) { - return await ConnectionMultiplexer.ConnectAsync(options.ConnectionString); + _options = options; } + public void ValidateConfiguration() + { + if (_options.ConfigurationOptions == null) + { + throw new OrleansConfigurationException($"Invalid {nameof(RedisReminderTableOptions)} values for {nameof(RedisReminderTable)}. {nameof(_options.ConfigurationOptions)} is required."); + } + } } } diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs index 4ca6e81b78..5e37e243f9 100644 --- a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs @@ -1,7 +1,8 @@ -using System; +using System; using System.Collections.Generic; using System.Globalization; using System.Linq; +using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -13,12 +14,13 @@ using Orleans.Runtime; using StackExchange.Redis; +using static System.FormattableString; namespace Orleans.Reminders.Redis { internal class RedisReminderTable : IReminderTable { - private readonly RedisKey RemindersRedisKey; + private readonly RedisKey _hashSetKey; private readonly RedisReminderTableOptions _redisOptions; private readonly ClusterOptions _clusterOptions; private readonly ILogger _logger; @@ -42,98 +44,147 @@ public RedisReminderTable( _clusterOptions = clusterOptions.Value; _logger = logger; - RemindersRedisKey = $"{_clusterOptions.ServiceId}_Reminders"; + _hashSetKey = Encoding.UTF8.GetBytes($"{_clusterOptions.ServiceId}/reminders"); } public async Task Init() { - _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); - _db = _redisOptions.DatabaseNumber.HasValue - ? _muxer.GetDatabase(_redisOptions.DatabaseNumber.Value) - : _muxer.GetDatabase(); + try + { + _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); + _db = _muxer.GetDatabase(); + + await _db.KeyExpireAsync(_hashSetKey, _redisOptions.EntryExpiry); + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task ReadRow(GrainId grainId, string reminderName) { - (string from, string to) = GetFilter(grainId, reminderName); - RedisValue[] values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); - if (values.Length == 0) + try { - return null; + (string from, string to) = GetFilter(grainId, reminderName); + RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + if (values.Length == 0) + { + return null; + } + else + { + return ConvertToEntry(values.SingleOrDefault()); + } } - else + catch (Exception exception) { - return ConvertToEntry(values.SingleOrDefault()); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } } public async Task ReadRows(GrainId grainId) { - (string from, string to) = GetFilter(grainId); - RedisValue[] values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); - IEnumerable records = values.Select(v => ConvertToEntry(v)); - return new ReminderTableData(records); + try + { + (string from, string to) = GetFilter(grainId); + RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + IEnumerable records = values.Select(v => ConvertToEntry(v)); + return new ReminderTableData(records); + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task ReadRows(uint begin, uint end) { - (string _, string from) = GetFilter(begin); - (string _, string to) = GetFilter(end); - IEnumerable values; - if (begin < end) + try { - // -----begin******end----- - values = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, to); + (string _, string from) = GetFilter(begin); + (string _, string to) = GetFilter(end); + IEnumerable values; + if (begin < end) + { + // -----begin******end----- + values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); + } + else + { + // *****end------begin***** + RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, "[\"FFFFFFFF\",#"); + RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(_hashSetKey, "[\"00000000\",\"", to); + values = values1.Concat(values2); + } + + IEnumerable records = values.Select(v => ConvertToEntry(v)); + return new ReminderTableData(records); } - else + catch (Exception exception) { - // *****end------begin***** - RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, from, "[\"FFFFFFFF\",#"); - RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(RemindersRedisKey, "[\"00000000\",\"", to); - values = values1.Concat(values2); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } - - IEnumerable records = values.Select(v => ConvertToEntry(v)); - return new ReminderTableData(records); } public async Task RemoveRow(GrainId grainId, string reminderName, string eTag) { - (RedisValue from, RedisValue to) = GetFilter(grainId, reminderName, eTag); - long removed = await _db.SortedSetRemoveRangeByValueAsync(RemindersRedisKey, from, to); - return removed > 0; + try + { + (RedisValue from, RedisValue to) = GetFilter(grainId, reminderName, eTag); + long removed = await _db.SortedSetRemoveRangeByValueAsync(_hashSetKey, from, to); + return removed > 0; + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task TestOnlyClearTable() { - await _db.ExecuteAsync("FLUSHDB"); + try + { + await _db.KeyDeleteAsync(_hashSetKey); + } + catch (Exception exception) + { + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); + } } public async Task UpsertRow(ReminderEntry entry) { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag); - } - - (string etag, string value) = ConvertFromEntry(entry); - (string from, string to) = GetFilter(entry.GrainId, entry.ReminderName); - - ITransaction tx = _db.CreateTransaction(); - _db.SortedSetRemoveRangeByValueAsync(RemindersRedisKey, from, to).Ignore(); - _db.SortedSetAddAsync(RemindersRedisKey, value, 0).Ignore(); - bool success = await tx.ExecuteAsync(); - if (success) + try { - return etag; + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag); + } + + (string etag, string value) = ConvertFromEntry(entry); + (string from, string to) = GetFilter(entry.GrainId, entry.ReminderName); + + ITransaction tx = _db.CreateTransaction(); + _db.SortedSetRemoveRangeByValueAsync(_hashSetKey, from, to).Ignore(); + _db.SortedSetAddAsync(_hashSetKey, value, 0).Ignore(); + bool success = await tx.ExecuteAsync(); + if (success) + { + return etag; + } + else + { + _logger.LogWarning( + (int)ErrorCode.ReminderServiceBase, + "Intermediate error updating entry {Entry} to Redis.", + entry); + throw new ReminderException("Failed to upsert reminder"); + } } - else + catch (Exception exception) when (exception is not ReminderException) { - _logger.LogWarning( - (int)ErrorCode.ReminderServiceBase, - "Intermediate error updating entry {Entry} to Redis.", - entry); - throw new ReminderException("Failed to upsert reminder"); + throw new RedisRemindersException(Invariant($"{exception.GetType()}: {exception.Message}")); } } diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs new file mode 100644 index 0000000000..c0eb26dcf9 --- /dev/null +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisRemindersException.cs @@ -0,0 +1,43 @@ +using System; +using System.Runtime.Serialization; + +namespace Orleans.Reminders.Redis +{ + /// + /// Exception thrown from . + /// + [GenerateSerializer] + public class RedisRemindersException : Exception + { + /// + /// Initializes a new instance of . + /// + public RedisRemindersException() + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + public RedisRemindersException(string message) : base(message) + { + } + + /// + /// Initializes a new instance of . + /// + /// The error message that explains the reason for the exception. + /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. + public RedisRemindersException(string message, Exception inner) : base(message, inner) + { + } + + /// + protected RedisRemindersException( + SerializationInfo info, + StreamingContext context) : base(info, context) + { + } + } +} \ No newline at end of file diff --git a/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs b/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs index ffa4d3bbfd..fdae4a4fc5 100644 --- a/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs +++ b/test/Extensions/Tester.Redis/Clustering/RedisMembershipTableTests.cs @@ -6,6 +6,7 @@ using UnitTests.MembershipTests; using TestExtensions; using UnitTests; +using StackExchange.Redis; namespace Tester.Redis.Clustering { @@ -32,7 +33,11 @@ protected override IMembershipTable CreateMembershipTable(ILogger logger) TestUtils.CheckForRedis(); membershipTable = new RedisMembershipTable( - Options.Create(new RedisClusteringOptions() { ConnectionString = GetConnectionString().Result }), + Options.Create(new RedisClusteringOptions() + { + ConfigurationOptions = ConfigurationOptions.Parse(GetConnectionString().Result), + EntryExpiry = TimeSpan.FromHours(1) + }), this.clusterOptions); return membershipTable; diff --git a/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs b/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs index 511ad42148..f1882b0c75 100644 --- a/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs +++ b/test/Extensions/Tester.Redis/GrainDirectory/RedisGrainDirectoryTests.cs @@ -1,16 +1,11 @@ -using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; +using Orleans.GrainDirectory; using Orleans.GrainDirectory.Redis; -using Orleans.Hosting; -using Orleans.TestingHost; using StackExchange.Redis; using Tester.Directories; -using Tester.Redis.Utility; using TestExtensions; -using UnitTests.Grains.Directories; -using Xunit; using Xunit.Abstractions; namespace Tester.Redis.GrainDirectory diff --git a/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs b/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs index e605eb5d85..9ab50c13ca 100644 --- a/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs +++ b/test/Extensions/Tester.Redis/GrainDirectory/RedisMultipleGrainDirectoriesTests.cs @@ -1,17 +1,10 @@ -using System; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Orleans.Configuration; using Orleans.GrainDirectory.Redis; -using Orleans.Hosting; using Orleans.TestingHost; using StackExchange.Redis; using Tester.Directories; -using Tester.Redis.Utility; using TestExtensions; using UnitTests.Grains.Directories; -using Xunit; -using Xunit.Abstractions; namespace Tester.Redis.GrainDirectory { diff --git a/test/Extensions/Tester.Redis/Persistence/GrainState.cs b/test/Extensions/Tester.Redis/Persistence/GrainState.cs index 9e7eee317c..85aac58bd7 100644 --- a/test/Extensions/Tester.Redis/Persistence/GrainState.cs +++ b/test/Extensions/Tester.Redis/Persistence/GrainState.cs @@ -1,4 +1,3 @@ -using System; using UnitTests.GrainInterfaces; namespace Tester.Redis.Persistence diff --git a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs index 3b6fc2640a..6ff524c1f1 100644 --- a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs +++ b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceGrainTests.cs @@ -1,12 +1,10 @@ -using System; -using System.Collections.Generic; +using System.Text.RegularExpressions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Orleans.Runtime; using Orleans.Storage; using Orleans.TestingHost; using StackExchange.Redis; -using Tester.Redis.Utility; using TestExtensions; using TestExtensions.Runners; using UnitTests.GrainInterfaces; @@ -18,8 +16,8 @@ namespace Tester.Redis.Persistence [TestCategory("Redis"), TestCategory("Persistence"), TestCategory("Functional")] public class RedisPersistenceGrainTests : GrainPersistenceTestsRunner, IClassFixture { - public static Guid ServiceId = Guid.NewGuid(); - public static string ConnectionStringKey = "ConnectionString"; + public static readonly string ServiceId = Guid.NewGuid().ToString("N"); + public const string ConnectionStringKey = "ConnectionString"; public class Fixture : BaseTestClusterFixture { protected override void ConfigureTestCluster(TestClusterBuilder builder) @@ -31,7 +29,7 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { {ConnectionStringKey, TestDefaultConfiguration.RedisConnectionString} })); - builder.Options.ServiceId = ServiceId.ToString(); + builder.Options.ServiceId = ServiceId; builder.AddSiloBuilderConfigurator(); builder.AddClientBuilderConfigurator(); } @@ -46,7 +44,8 @@ public void Configure(IHostBuilder hostBuilder) siloBuilder .AddRedisGrainStorage("GrainStorageForTest", options => { - options.ConnectionString = connectionString; + options.ConfigurationOptions = ConfigurationOptions.Parse(connectionString); + options.EntryExpiry = TimeSpan.FromHours(1); }) .AddMemoryGrainStorage("MemoryStore"); }); @@ -122,7 +121,18 @@ public async Task Redis_TestRedisScriptCacheClearBeforeGrainWriteState() { var grain = fixture.GrainFactory.GetGrain>(1111); - await database.ExecuteAsync("SCRIPT", "FLUSH", "SYNC"); + var info = (string)await database.ExecuteAsync("INFO"); + var versionString = Regex.Match(info, @"redis_version:[\s]*([^\s]+)").Groups[1].Value; + var version = Version.Parse(versionString); + if (version >= Version.Parse("6.2.0")) + { + await database.ExecuteAsync("SCRIPT", "FLUSH", "SYNC"); + } + else + { + await database.ExecuteAsync("SCRIPT", "FLUSH"); + } + await grain.DoWrite(state); var result = await grain.DoRead(); @@ -139,7 +149,7 @@ public async Task Redis_DoubleActivationETagConflictSimulation() var grain = fixture.GrainFactory.GetGrain>(54321); var data = await grain.DoRead(); - var key = grain.GetGrainId().ToString(); + var key = $"{ServiceId}/state/{grain.GetGrainId()}"; await database.HashSetAsync(key, new[] { new HashEntry("etag", "derp") }); await Assert.ThrowsAsync(() => grain.DoWrite(state)); diff --git a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs index 6fdce74698..8d41c5cc5b 100644 --- a/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs +++ b/test/Extensions/Tester.Redis/Persistence/RedisPersistenceSetupTests.cs @@ -3,6 +3,7 @@ using Xunit; using Orleans.Configuration; using Orleans.Runtime; +using StackExchange.Redis; namespace Tester.Redis.Persistence { @@ -11,9 +12,7 @@ public class RedisPersistenceSetupTests { [SkippableTheory] [InlineData(null)] - [InlineData("")] - [InlineData(" ")] - [InlineData("123")] + [InlineData("localhost:1234")] public void StorageOptionsValidator(string connectionString) { TestUtils.CheckForRedis(); @@ -29,11 +28,17 @@ public void StorageOptionsValidator(string connectionString) .ConfigureEndpoints(siloAddress, siloPort, gatewayPort) .AddRedisGrainStorage("Redis", optionsBuilder => optionsBuilder.Configure(options => { - options.ConnectionString = connectionString; + if (connectionString is not null) + { + options.ConfigurationOptions = ConfigurationOptions.Parse(connectionString); + } })); }).Build(); - Assert.Throws(() => host.Start()); + if (string.IsNullOrWhiteSpace(connectionString)) + { + Assert.Throws(() => host.Start()); + } } } } \ No newline at end of file diff --git a/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs b/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs index 89d5b91e09..3dd4b0d242 100644 --- a/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs +++ b/test/Extensions/Tester.Redis/Reminders/RedisReminderTableTests.cs @@ -1,16 +1,10 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Reminders.Redis; -using Orleans.Runtime; -using Orleans.TestingHost; -using Tester.Redis.Utility; +using StackExchange.Redis; using TestExtensions; using UnitTests; -using UnitTests.GrainInterfaces; using UnitTests.RemindersTest; using Xunit; @@ -37,8 +31,11 @@ protected override IReminderTable CreateRemindersTable() RedisReminderTable reminderTable = new( this.loggerFactory.CreateLogger(), this.clusterOptions, - Options.Create(new RedisReminderTableOptions() { ConnectionString = GetConnectionString().Result }) - ); + Options.Create(new RedisReminderTableOptions() + { + ConfigurationOptions = ConfigurationOptions.Parse(GetConnectionString().Result), + EntryExpiry = TimeSpan.FromHours(1) + })); if (reminderTable == null) { diff --git a/test/Extensions/Tester.Redis/Utility/TestExtensions.cs b/test/Extensions/Tester.Redis/Utility/TestExtensions.cs index d2671a3403..d2e1ce705a 100644 --- a/test/Extensions/Tester.Redis/Utility/TestExtensions.cs +++ b/test/Extensions/Tester.Redis/Utility/TestExtensions.cs @@ -1,8 +1,5 @@ using Orleans.Runtime; -using System; using System.Net; -using System.Threading; -using System.Threading.Tasks; namespace Tester.Redis.Utility { diff --git a/test/Grains/TestInternalGrains/PersistenceTestGrains.cs b/test/Grains/TestInternalGrains/PersistenceTestGrains.cs index 43ed3c58ac..9e0850cb91 100644 --- a/test/Grains/TestInternalGrains/PersistenceTestGrains.cs +++ b/test/Grains/TestInternalGrains/PersistenceTestGrains.cs @@ -356,7 +356,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -388,7 +388,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -427,7 +427,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -463,7 +463,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -495,7 +495,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } @@ -534,7 +534,7 @@ public async Task DoRead() public Task DoDelete() { - return ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return ClearStateAsync(); } } diff --git a/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs b/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs index bfcfa84235..64f7217ac1 100644 --- a/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs +++ b/test/Grains/TestInternalGrains/PersistentStateTestGrains.cs @@ -38,7 +38,7 @@ public async Task DoRead() public Task DoDelete() { - return this.persistentState.ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return this.persistentState.ClearStateAsync(); } } @@ -82,7 +82,7 @@ public async Task DoRead() public Task DoDelete() { - return this.persistentState.ClearStateAsync(); // Automatically marks this grain as DeactivateOnIdle + return this.persistentState.ClearStateAsync(); } } } \ No newline at end of file diff --git a/test/Tester/Directories/GrainDirectoryTests.cs b/test/Tester/Directories/GrainDirectoryTests.cs index ddd5ca2757..c354fe4cbb 100644 --- a/test/Tester/Directories/GrainDirectoryTests.cs +++ b/test/Tester/Directories/GrainDirectoryTests.cs @@ -30,7 +30,7 @@ public async Task RegisterLookupUnregisterLookup() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -50,7 +50,7 @@ public async Task DoNotOverrideEntry() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -84,7 +84,7 @@ public async Task DoNotDeleteDifferentActivationIdEntry() var expected = new GrainAddress { ActivationId = ActivationId.NewId(), - GrainId = GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")), + GrainId = GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")), SiloAddress = SiloAddress.FromParsableString("10.0.23.12:1000@5678"), MembershipVersion = new MembershipVersion(51) }; @@ -105,7 +105,7 @@ public async Task DoNotDeleteDifferentActivationIdEntry() [SkippableFact] public async Task LookupNotFound() { - Assert.Null(await this.grainDirectory.Lookup(GrainId.Parse("user/someraondomuser_" + Guid.NewGuid().ToString("N")))); + Assert.Null(await this.grainDirectory.Lookup(GrainId.Parse("user/somerandomuser_" + Guid.NewGuid().ToString("N")))); } } } diff --git a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs index ac744d4fb6..81bfa5b428 100644 --- a/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs +++ b/test/TesterInternal/MembershipTests/MembershipTableTestsBase.cs @@ -369,16 +369,24 @@ protected async Task MembershipTable_UpdateRowInParallel(bool extendedProtocol = await Task.WhenAll(Enumerable.Range(1, 19).Select(async i => { - bool done; + var done = false; do { var updatedTableData = await membershipTable.ReadAll(); var updatedRow = updatedTableData.TryGet(data.SiloAddress); - TableVersion tableVersion = updatedTableData.Version.Next(); - await Task.Delay(10); - try { done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); } catch { done = false; } + if (updatedRow is null) continue; + + TableVersion tableVersion = updatedTableData.Version.Next(); + try + { + done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); + } + catch + { + done = false; + } } while (!done); })).WithTimeout(TimeSpan.FromSeconds(30)); From 177962c30f5553e50fb9ef065468298f1242d5ea Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 16 Jan 2023 10:20:27 -0800 Subject: [PATCH 2/4] Review feedback --- .../HostingExtensions.ICientBuilder.cs | 3 +- .../Hosting/HostingExtensions.ISiloBuilder.cs | 3 +- .../Providers/RedisClusteringOptions.cs | 2 +- .../Storage/RedisMembershipTable.cs | 6 +- .../Hosting/RedisGrainDirectoryExtensions.cs | 2 +- .../Options/RedisGrainDirectoryOptions.cs | 6 +- .../RedisGrainDirectory.cs | 32 +++++---- .../Providers/RedisStorageOptionsValidator.cs | 2 +- .../Storage/RedisGrainStorage.cs | 72 ++----------------- .../Providers/RedisReminderTableOptions.cs | 2 +- .../Storage/RedisReminderTable.cs | 6 +- 11 files changed, 42 insertions(+), 94 deletions(-) diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs index 5f034a1bcf..ded2d9feba 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ICientBuilder.cs @@ -34,13 +34,12 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, Act /// /// Configures Redis as the clustering provider. /// - public static IClientBuilder UseRedisClustering(this IClientBuilder builder, string redisConnectionString, int db = 0) + public static IClientBuilder UseRedisClustering(this IClientBuilder builder, string redisConnectionString) { return builder.ConfigureServices(services => services .Configure(opt => { opt.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); - opt.ConfigurationOptions.DefaultDatabase = db; }) .AddRedisClustering() .AddSingleton()); diff --git a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs index f50ab0d55d..906bae176f 100644 --- a/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs +++ b/src/Redis/Orleans.Clustering.Redis/Hosting/HostingExtensions.ISiloBuilder.cs @@ -31,13 +31,12 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action< /// /// Configures Redis as the clustering provider. /// - public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString, int db = 0) + public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString) { return builder.ConfigureServices(services => services .Configure(options => { options.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString); - options.ConfigurationOptions.DefaultDatabase = db; }) .AddRedisClustering()); } diff --git a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs index d444983fee..86f1448dbb 100644 --- a/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs +++ b/src/Redis/Orleans.Clustering.Redis/Providers/RedisClusteringOptions.cs @@ -58,7 +58,7 @@ public void ValidateConfiguration() { if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisClusteringOptions)} values for {nameof(RedisMembershipTable)}. {nameof(_options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisMembershipTable)}. {nameof(RedisClusteringOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs index 55219e75ec..c7d92f3245 100644 --- a/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs +++ b/src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs @@ -45,7 +45,11 @@ public async Task InitializeMembershipTable(bool tryInitTableVersion) if (tryInitTableVersion) { await _db.HashSetAsync(_clusterKey, TableVersionKey, SerializeVersion(DefaultTableVersion), When.NotExists); - await _db.KeyExpireAsync(_clusterKey, _redisOptions.EntryExpiry); + + if (_redisOptions.EntryExpiry is { } expiry) + { + await _db.KeyExpireAsync(_clusterKey, expiry); + } } this.IsInitialized = true; diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs index 9f01b1bf39..92dc69f845 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Hosting/RedisGrainDirectoryExtensions.cs @@ -62,7 +62,7 @@ private static IServiceCollection AddRedisGrainDirectory( { configureOptions.Invoke(services.AddOptions(name)); services - .AddTransient(sp => new RedisGrainDirectoryOptionsValidator(sp.GetRequiredService>().Get(name))) + .AddTransient(sp => new RedisGrainDirectoryOptionsValidator(sp.GetRequiredService>().Get(name), name)) .ConfigureNamedOptionForLogging(name) .AddSingletonNamedService(name, (sp, name) => ActivatorUtilities.CreateInstance(sp, sp.GetOptionsByName(name))) .AddSingletonNamedService>(name, (s, n) => (ILifecycleParticipant)s.GetRequiredServiceByName(n)); diff --git a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs index b81a8d031d..1f1f84daba 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/Options/RedisGrainDirectoryOptions.cs @@ -45,10 +45,12 @@ internal class RedactRedisConfigurationOptions : RedactAttribute public class RedisGrainDirectoryOptionsValidator : IConfigurationValidator { private readonly RedisGrainDirectoryOptions _options; + private readonly string _name; - public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options) + public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options, string name) { _options = options; + _name = name; } /// @@ -56,7 +58,7 @@ public void ValidateConfiguration() { if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(_options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisGrainDirectory)} with name {_name}. {nameof(RedisGrainDirectoryOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs index d38cfcb6aa..e6ede0960f 100644 --- a/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs +++ b/src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs @@ -14,6 +14,18 @@ namespace Orleans.GrainDirectory.Redis { public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant { + private const string DeleteScript = + """ + local cur = redis.call('GET', KEYS[1]) + if cur ~= false then + local typedCur = cjson.decode(cur) + if typedCur.ActivationId == ARGV[1] then + return redis.call('DEL', KEYS[1]) + end + end + return 0 + """; + private readonly RedisGrainDirectoryOptions directoryOptions; private readonly ClusterOptions clusterOptions; private readonly ILogger logger; @@ -21,7 +33,6 @@ public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant _keyPrefix.Append(grainId.ToString()); + private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); #region Logging private void LogConnectionRestored(object sender, ConnectionFailedEventArgs e) diff --git a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs index 08e50ff8e1..32b55712d9 100644 --- a/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs +++ b/src/Redis/Orleans.Persistence.Redis/Providers/RedisStorageOptionsValidator.cs @@ -17,7 +17,7 @@ public void ValidateConfiguration() { if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisStorageOptions)} values for {nameof(RedisGrainStorage)} with name {_name}. {nameof(_options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisGrainStorage)} with name {_name}. {nameof(RedisStorageOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs index 9b5147348f..d08d3430b4 100644 --- a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs +++ b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs @@ -34,7 +34,6 @@ internal class RedisGrainStorage : IGrainStorage, ILifecycleParticipant /// Creates a new instance of the type. @@ -93,9 +90,6 @@ private async Task Init(CancellationToken cancellationToken) _connection = await _options.CreateMultiplexer(_options).ConfigureAwait(false); _db = _connection.GetDatabase(); - _preparedWriteScript = LuaScript.Prepare(WriteScript); - _preparedWriteScriptHash = await LoadWriteScriptAsync().ConfigureAwait(false); - if (_logger.IsEnabled(LogLevel.Debug)) { timer.Stop(); @@ -120,24 +114,6 @@ private async Task Init(CancellationToken cancellationToken) } } - private async Task LoadWriteScriptAsync() - { - Debug.Assert(_connection is not null); - Debug.Assert(_preparedWriteScript is not null); - - System.Net.EndPoint[] endPoints = _connection.GetEndPoints(); - var loadTasks = new Task[endPoints.Length]; - for (int i = 0; i < endPoints.Length; i++) - { - var endpoint = endPoints.ElementAt(i); - var server = _connection.GetServer(endpoint); - - loadTasks[i] = _preparedWriteScript.LoadAsync(server); - } - await Task.WhenAll(loadTasks).ConfigureAwait(false); - return loadTasks[0].Result.Hash; - } - /// public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainState grainState) { @@ -178,16 +154,13 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt var key = GetKey(grainId); var newEtag = Guid.NewGuid().ToString("N"); - RedisResult writeWithScriptResponse; + RedisResult response; try { var payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); - writeWithScriptResponse = await WriteToRedisUsingPreparedScriptAsync( - payload, - etag: etag, - key: key, - newEtag: newEtag) - .ConfigureAwait(false); + var keys = new RedisKey[] { key }; + var args = new RedisValue[] { etag, newEtag, payload, _ttl }; + response = await _db.ScriptEvaluateAsync(WriteScript, keys, args).ConfigureAwait(false); } catch (Exception e) { @@ -197,10 +170,10 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt grainId, key); throw new RedisStorageException( - Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}."), e); + Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}. {e.GetType()}: {e.Message}")); } - if (writeWithScriptResponse is not null && writeWithScriptResponse.IsNull) + if (response is not null && response.IsNull) { throw new InconsistentStateException(Invariant($"ETag mismatch - tried with ETag: {grainState.ETag}")); } @@ -208,39 +181,6 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt grainState.ETag = newEtag; } - private Task WriteToRedisUsingPreparedScriptAsync(RedisValue payload, string etag, RedisKey key, string newEtag) - { - var keys = new RedisKey[] { key }; - var args = new RedisValue[] { etag, newEtag, payload, _ttl }; - return WriteToRedisUsingPreparedScriptAsync(attemptNum: 0); - - async Task WriteToRedisUsingPreparedScriptAsync(int attemptNum) - { - try - { - return await _db.ScriptEvaluateAsync(_preparedWriteScriptHash, keys, args).ConfigureAwait(false); - } - catch (RedisServerException rse) when (rse.Message is not null && rse.Message.StartsWith("NOSCRIPT ", StringComparison.Ordinal)) - { - // EVALSHA returned error 'NOSCRIPT No matching script. Please use EVAL.'. - // This means that SHA1 cache of Lua scripts is cleared at server side, possibly because of Redis server rebooted after Init() method was called. Need to reload Lua script. - // Several attempts are made just in case (e.g. if Redis server is rebooted right after previous script reload). - if (attemptNum >= ReloadWriteScriptMaxCount) - { - throw; - } - - await LoadWriteScriptAsync().ConfigureAwait(false); - return await WriteToRedisUsingPreparedScriptAsync(attemptNum: attemptNum + 1) - .ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RedisStorageException(Invariant($"{exception.GetType()}: {exception.Message}")); - } - } - } - private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); /// diff --git a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs index e955f83233..576d9e65ac 100644 --- a/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs +++ b/src/Redis/Orleans.Reminders.Redis/Providers/RedisReminderTableOptions.cs @@ -55,7 +55,7 @@ public void ValidateConfiguration() { if (_options.ConfigurationOptions == null) { - throw new OrleansConfigurationException($"Invalid {nameof(RedisReminderTableOptions)} values for {nameof(RedisReminderTable)}. {nameof(_options.ConfigurationOptions)} is required."); + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisReminderTable)}. {nameof(RedisReminderTableOptions)}.{nameof(_options.ConfigurationOptions)} is required."); } } } diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs index 5e37e243f9..d43ffb5199 100644 --- a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs @@ -54,7 +54,10 @@ public async Task Init() _muxer = await _redisOptions.CreateMultiplexer(_redisOptions); _db = _muxer.GetDatabase(); - await _db.KeyExpireAsync(_hashSetKey, _redisOptions.EntryExpiry); + if (_redisOptions.EntryExpiry is { } expiry) + { + await _db.KeyExpireAsync(_hashSetKey, expiry); + } } catch (Exception exception) { @@ -231,7 +234,6 @@ private ReminderEntry ConvertToEntry(string reminderValue) return (from, to); } - private (string eTag, string value) ConvertFromEntry(ReminderEntry entry) { string grainHash = entry.GrainId.GetUniformHashCode().ToString("X8"); From 9677e4d6a3be3e16a5264673913572268d6c1c90 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 18 Jan 2023 15:43:31 -0800 Subject: [PATCH 3/4] Convert RedisReminderTable.UpsertRow implementation to Lua script instead of Redis transaction --- .../Storage/RedisReminderTable.cs | 80 +++++++++---------- .../RemindersTest/ReminderTableTestsBase.cs | 4 +- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs index d43ffb5199..90873c3e0f 100644 --- a/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs +++ b/src/Redis/Orleans.Reminders.Redis/Storage/RedisReminderTable.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -69,7 +70,7 @@ public async Task ReadRow(GrainId grainId, string reminderName) { try { - (string from, string to) = GetFilter(grainId, reminderName); + var (from, to) = GetFilter(grainId, reminderName); RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); if (values.Length == 0) { @@ -90,9 +91,9 @@ public async Task ReadRows(GrainId grainId) { try { - (string from, string to) = GetFilter(grainId); + var (from, to) = GetFilter(grainId); RedisValue[] values = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, to); - IEnumerable records = values.Select(v => ConvertToEntry(v)); + IEnumerable records = values.Select(static v => ConvertToEntry(v)); return new ReminderTableData(records); } catch (Exception exception) @@ -105,8 +106,8 @@ public async Task ReadRows(uint begin, uint end) { try { - (string _, string from) = GetFilter(begin); - (string _, string to) = GetFilter(end); + var (_, from) = GetFilter(begin); + var (_, to) = GetFilter(end); IEnumerable values; if (begin < end) { @@ -116,12 +117,12 @@ public async Task ReadRows(uint begin, uint end) else { // *****end------begin***** - RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, "[\"FFFFFFFF\",#"); - RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(_hashSetKey, "[\"00000000\",\"", to); + RedisValue[] values1 = await _db.SortedSetRangeByValueAsync(_hashSetKey, from, "\"FFFFFFFF\",#"); + RedisValue[] values2 = await _db.SortedSetRangeByValueAsync(_hashSetKey, "\"00000000\",\"", to); values = values1.Concat(values2); } - IEnumerable records = values.Select(v => ConvertToEntry(v)); + IEnumerable records = values.Select(static v => ConvertToEntry(v)); return new ReminderTableData(records); } catch (Exception exception) @@ -134,7 +135,7 @@ public async Task RemoveRow(GrainId grainId, string reminderName, string e { try { - (RedisValue from, RedisValue to) = GetFilter(grainId, reminderName, eTag); + var (from, to) = GetFilter(grainId, reminderName, eTag); long removed = await _db.SortedSetRemoveRangeByValueAsync(_hashSetKey, from, to); return removed > 0; } @@ -158,6 +159,21 @@ public async Task TestOnlyClearTable() public async Task UpsertRow(ReminderEntry entry) { + const string UpsertScript = + """ + local key = KEYS[1] + local from = '[' .. ARGV[1] -- start of the conditional (with etag) key range + local to = '[' .. ARGV[2] -- end of the conditional (with etag) key range + local value = ARGV[3] + + -- Remove all entries for this reminder + local remRes = redis.call('ZREMRANGEBYLEX', key, from, to); + + -- Add the new reminder entry + local addRes = redis.call('ZADD', key, 0, value); + return { key, from, to, value, remRes, addRes } + """; + try { if (_logger.IsEnabled(LogLevel.Debug)) @@ -165,25 +181,10 @@ public async Task UpsertRow(ReminderEntry entry) _logger.LogDebug("UpsertRow entry = {Entry}, ETag = {ETag}", entry.ToString(), entry.ETag); } - (string etag, string value) = ConvertFromEntry(entry); - (string from, string to) = GetFilter(entry.GrainId, entry.ReminderName); - - ITransaction tx = _db.CreateTransaction(); - _db.SortedSetRemoveRangeByValueAsync(_hashSetKey, from, to).Ignore(); - _db.SortedSetAddAsync(_hashSetKey, value, 0).Ignore(); - bool success = await tx.ExecuteAsync(); - if (success) - { - return etag; - } - else - { - _logger.LogWarning( - (int)ErrorCode.ReminderServiceBase, - "Intermediate error updating entry {Entry} to Redis.", - entry); - throw new ReminderException("Failed to upsert reminder"); - } + var (newETag, value) = ConvertFromEntry(entry); + var (from, to) = GetFilter(entry.GrainId, entry.ReminderName); + var res = await _db.ScriptEvaluateAsync(UpsertScript, keys: new[] { _hashSetKey }, values: new[] { from, to, value }); + return newETag; } catch (Exception exception) when (exception is not ReminderException) { @@ -191,9 +192,9 @@ public async Task UpsertRow(ReminderEntry entry) } } - private ReminderEntry ConvertToEntry(string reminderValue) + private static ReminderEntry ConvertToEntry(string reminderValue) { - string[] segments = JsonConvert.DeserializeObject(reminderValue); + string[] segments = JsonConvert.DeserializeObject($"[{reminderValue}]"); return new ReminderEntry { @@ -205,36 +206,33 @@ private ReminderEntry ConvertToEntry(string reminderValue) }; } - private (string from, string to) GetFilter(uint grainHash) + private (RedisValue from, RedisValue to) GetFilter(uint grainHash) { return GetFilter(grainHash.ToString("X8")); } - private (string from, string to) GetFilter(GrainId grainId) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString()); } - private (string from, string to) GetFilter(GrainId grainId, string reminderName) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId, string reminderName) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString(), reminderName); } - private (string from, string to) GetFilter(GrainId grainId, string reminderName, string eTag) + private (RedisValue from, RedisValue to) GetFilter(GrainId grainId, string reminderName, string eTag) { return GetFilter(grainId.GetUniformHashCode().ToString("X8"), grainId.ToString(), reminderName, eTag); } - private (string from, string to) GetFilter(params string[] segments) + private (RedisValue from, RedisValue to) GetFilter(params string[] segments) { string prefix = JsonConvert.SerializeObject(segments, _jsonSettings); - prefix = prefix.Remove(prefix.Length - 1); - string from = prefix + ",\""; - string to = prefix + ",#"; - return (from, to); + return ($"{prefix[1..^1]},\"", $"{prefix[1..^1]},#"); } - private (string eTag, string value) ConvertFromEntry(ReminderEntry entry) + private (RedisValue eTag, RedisValue value) ConvertFromEntry(ReminderEntry entry) { string grainHash = entry.GrainId.GetUniformHashCode().ToString("X8"); string eTag = Guid.NewGuid().ToString(); @@ -248,7 +246,7 @@ private ReminderEntry ConvertToEntry(string reminderValue) entry.Period.ToString() }; - return (eTag, JsonConvert.SerializeObject(segments, _jsonSettings)); + return (eTag, JsonConvert.SerializeObject(segments, _jsonSettings)[1..^1]); } } } diff --git a/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs b/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs index dc81e1f95b..d09fd082db 100644 --- a/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs +++ b/test/TesterInternal/RemindersTest/ReminderTableTestsBase.cs @@ -110,7 +110,7 @@ protected async Task ReminderSimple() Assert.False(removeRowRes, "should have failed. reminder shouldn't exist"); } - protected async Task RemindersRange(int iterations=1000) + protected async Task RemindersRange(int iterations = 1000) { await Task.WhenAll(Enumerable.Range(1, iterations).Select(async i => { @@ -119,7 +119,7 @@ await Task.WhenAll(Enumerable.Range(1, iterations).Select(async i => await RetryHelper.RetryOnExceptionAsync(10, RetryOperation.Sigmoid, async () => { await remindersTable.UpsertRow(CreateReminder(grainRef, i.ToString())); - return Task.CompletedTask; + return 0; }); })); From 115a0f8d4960f8d78720b997be0a0941edc8bb54 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 18 Jan 2023 16:45:01 -0800 Subject: [PATCH 4/4] Fixes for RedisGrainStorage --- .../Storage/RedisGrainStorage.cs | 81 ++++++++++++------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs index d08d3430b4..cb3ae7e1e1 100644 --- a/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs +++ b/src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs @@ -21,20 +21,6 @@ namespace Orleans.Persistence /// internal class RedisGrainStorage : IGrainStorage, ILifecycleParticipant { - private const string WriteScript = - """ - local etag = redis.call('HGET', KEYS[1], 'etag') - if etag == false or etag == ARGV[1] then - local result = redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) - if ARGV[4] ~= '-1' then - redis.call('EXPIRE', KEYS[1], ARGV[4]) - end - return result - else - return false - end - """; - private readonly string _serviceId; private readonly RedisValue _ttl; private readonly RedisKey _keyPrefix; @@ -130,10 +116,12 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta grainState.State = _grainStorageSerializer.Deserialize(valueEntry.Value); grainState.ETag = etagEntry.Value; + grainState.RecordExists = true; } else { - grainState.ETag = Guid.NewGuid().ToString(); + grainState.ETag = null; + grainState.RecordExists = false; } } catch (Exception e) @@ -150,19 +138,40 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta /// public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) { - var etag = grainState.ETag ?? "null"; + const string WriteScript = + """ + local etag = redis.call('HGET', KEYS[1], 'etag') + if (not etag and (ARGV[1] == nil or ARGV[1] == '')) or etag == ARGV[1] then + redis.call('HMSET', KEYS[1], 'etag', ARGV[2], 'data', ARGV[3]) + if ARGV[4] ~= '-1' then + redis.call('EXPIRE', KEYS[1], ARGV[4]) + end + return 0 + else + return -1 + end + """; + var key = GetKey(grainId); - var newEtag = Guid.NewGuid().ToString("N"); + RedisValue etag = grainState.ETag ?? ""; + RedisValue newEtag = Guid.NewGuid().ToString("N"); - RedisResult response; try { var payload = new RedisValue(_grainStorageSerializer.Serialize(grainState.State).ToString()); var keys = new RedisKey[] { key }; var args = new RedisValue[] { etag, newEtag, payload, _ttl }; - response = await _db.ScriptEvaluateAsync(WriteScript, keys, args).ConfigureAwait(false); + var response = await _db.ScriptEvaluateAsync(WriteScript, keys, args).ConfigureAwait(false); + + if (response is not null && (int)response == -1) + { + throw new InconsistentStateException($"Version conflict ({nameof(WriteStateAsync)}): ServiceId={_serviceId} ProviderName={_name} GrainType={grainType} GrainId={grainId} ETag={grainState.ETag}."); + } + + grainState.ETag = newEtag; + grainState.RecordExists = true; } - catch (Exception e) + catch (Exception exception) when (exception is not InconsistentStateException) { _logger.LogError( "Failed to write grain state for {GrainType} grain with ID: {GrainId} with redis key {Key}.", @@ -170,15 +179,8 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt grainId, key); throw new RedisStorageException( - Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}. {e.GetType()}: {e.Message}")); + Invariant($"Failed to write grain state for {grainType} grain with ID: {grainId} with redis key {key}. {exception.GetType()}: {exception.Message}")); } - - if (response is not null && response.IsNull) - { - throw new InconsistentStateException(Invariant($"ETag mismatch - tried with ETag: {grainState.ETag}")); - } - - grainState.ETag = newEtag; } private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString()); @@ -186,11 +188,30 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt /// public async Task ClearStateAsync(string grainType, GrainId grainId, IGrainState grainState) { + const string ClearScript = + """ + local etag = redis.call('HGET', KEYS[1], 'etag') + if (not etag and not ARGV[1]) or etag == ARGV[1] then + redis.call('DEL', KEYS[1]) + return 0 + else + return -1 + end + """; try { - await _db.KeyDeleteAsync(GetKey(grainId)).ConfigureAwait(false); + RedisValue etag = grainState.ETag ?? ""; + var response = await _db.ScriptEvaluateAsync(ClearScript, keys: new[] { GetKey(grainId) }, values: new[] { etag }).ConfigureAwait(false); + + if (response is not null && (int)response == -1) + { + throw new InconsistentStateException($"Version conflict ({nameof(ClearStateAsync)}): ServiceId={_serviceId} ProviderName={_name} GrainType={grainType} GrainId={grainId} ETag={grainState.ETag}."); + } + + grainState.ETag = null; + grainState.RecordExists = false; } - catch (Exception exception) + catch (Exception exception) when (exception is not InconsistentStateException) { throw new RedisStorageException(Invariant($"{exception.GetType()}: {exception.Message}")); }