Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements to Redis providers #8261

Merged
merged 4 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System;
using Orleans;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Hosting;
using Orleans.Messaging;
using Orleans.Clustering.Redis;
using StackExchange.Redis;

namespace Microsoft.Extensions.Hosting
{
Expand All @@ -25,7 +26,7 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, Act
}

services
.AddRedis()
.AddRedisClustering()
.AddSingleton<IGatewayListProvider, RedisGatewayListProvider>();
});
}
Expand All @@ -38,10 +39,10 @@ public static IClientBuilder UseRedisClustering(this IClientBuilder builder, str
return builder.ConfigureServices(services => services
.Configure<RedisClusteringOptions>(opt =>
{
opt.ConnectionString = redisConnectionString;
opt.Database = db;
opt.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString);
ReubenBond marked this conversation as resolved.
Show resolved Hide resolved
opt.ConfigurationOptions.DefaultDatabase = db;
})
.AddRedis()
.AddRedisClustering()
.AddSingleton<IGatewayListProvider, RedisGatewayListProvider>());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System;
using System;
using Orleans;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Hosting;
using Orleans.Clustering.Redis;
using StackExchange.Redis;

namespace Microsoft.Extensions.Hosting
{
Expand All @@ -23,7 +24,7 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action<
services.Configure(configuration);
}

services.AddRedis();
services.AddRedisClustering();
});
}

Expand All @@ -33,13 +34,18 @@ public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, Action<
public static ISiloBuilder UseRedisClustering(this ISiloBuilder builder, string redisConnectionString, int db = 0)
{
return builder.ConfigureServices(services => services
.Configure<RedisClusteringOptions>(options => { options.Database = db; options.ConnectionString = redisConnectionString; })
.AddRedis());
.Configure<RedisClusteringOptions>(options =>
{
options.ConfigurationOptions = ConfigurationOptions.Parse(redisConnectionString);
options.ConfigurationOptions.DefaultDatabase = db;
})
.AddRedisClustering());
}

internal static IServiceCollection AddRedis(this IServiceCollection services)
internal static IServiceCollection AddRedisClustering(this IServiceCollection services)
{
services.AddSingleton<RedisMembershipTable>();
services.AddSingleton<IConfigurationValidator, RedisClusteringOptionsValidator>();
services.AddSingleton<IMembershipTable>(sp => sp.GetRequiredService<RedisMembershipTable>());
return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageTags>$(PackageTags) Redis Clustering</PackageTags>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<VersionSuffix Condition="$(VersionSuffix) == ''">beta1</VersionSuffix>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Orleans.Runtime;
using StackExchange.Redis;
using System;
using System.Threading.Tasks;
Expand All @@ -10,26 +11,55 @@ namespace Orleans.Clustering.Redis
public class RedisClusteringOptions
{
/// <summary>
/// Specifies the database identi
/// Gets or sets the Redis client configuration.
/// </summary>
public int Database { get; set; }
[RedactRedisConfigurationOptions]
public ConfigurationOptions ConfigurationOptions { get; set; }
ReubenBond marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// The connection string.
/// The delegate used to create a Redis connection multiplexer.
/// </summary>
public string ConnectionString { get; set; } = "localhost:6379";
public Func<RedisClusteringOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests).
/// Setting a value different from null will cause entries to be deleted after some period of time.
/// </summary>
public Func<RedisClusteringOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
public TimeSpan? EntryExpiry { get; set; } = null;

/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisClusteringOptions options)
{
return await ConnectionMultiplexer.ConnectAsync(options.ConnectionString);
return await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions);
}
}

internal class RedactRedisConfigurationOptions : RedactAttribute
{
public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value);
}

/// <summary>
/// Configuration validator for <see cref="RedisClusteringOptions"/>.
/// </summary>
public class RedisClusteringOptionsValidator : IConfigurationValidator
{
private readonly RedisClusteringOptions _options;

public RedisClusteringOptionsValidator(RedisClusteringOptions options)
{
_options = options;
}

/// <inheritdoc/>
public void ValidateConfiguration()
{
if (_options.ConfigurationOptions == null)
{
throw new OrleansConfigurationException($"Invalid {nameof(RedisClusteringOptions)} values for {nameof(RedisMembershipTable)}. {nameof(_options.ConfigurationOptions)} is required.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
using Newtonsoft.Json;
using System.Linq;
using Microsoft.Extensions.Options;
using System.Runtime.CompilerServices;
using System.Globalization;
using System.Text;

namespace Orleans.Clustering.Redis
{
Expand All @@ -26,7 +26,7 @@ public RedisMembershipTable(IOptions<RedisClusteringOptions> redisOptions, IOpti
{
_redisOptions = redisOptions.Value;
_clusterOptions = clusterOptions.Value;
_clusterKey = $"{_clusterOptions.ServiceId}/{_clusterOptions.ClusterId}";
_clusterKey = Encoding.UTF8.GetBytes($"{_clusterOptions.ServiceId}/members/{_clusterOptions.ClusterId}");
_jsonSerializerSettings = JsonSettings.JsonSerializerSettings;
}

Expand All @@ -40,11 +40,12 @@ public async Task DeleteMembershipTableEntries(string clusterId)
public async Task InitializeMembershipTable(bool tryInitTableVersion)
{
_muxer = await _redisOptions.CreateMultiplexer(_redisOptions);
_db = _muxer.GetDatabase(_redisOptions.Database);
_db = _muxer.GetDatabase();

if (tryInitTableVersion)
{
await _db.HashSetAsync(_clusterKey, TableVersionKey, SerializeVersion(DefaultTableVersion), When.NotExists);
await _db.KeyExpireAsync(_clusterKey, _redisOptions.EntryExpiry);
ReubenBond marked this conversation as resolved.
Show resolved Hide resolved
}

this.IsInitialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

namespace Orleans.Hosting
{
/// <summary>
/// Extensions for configuring Redis as a grain directory provider.
/// </summary>
public static class RedisGrainDirectoryExtensions
{
/// <summary>
/// Use a Redis data-store as the default Grain Directory
/// Adds a default grain directory which persists entries in Redis.
/// </summary>
public static ISiloBuilder UseRedisGrainDirectoryAsDefault(
this ISiloBuilder builder,
Expand All @@ -21,7 +24,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault(
}

/// <summary>
/// Use a Redis data-store as the default Grain Directory
/// Adds a default grain directory which persists entries in Redis.
/// </summary>
public static ISiloBuilder UseRedisGrainDirectoryAsDefault(
this ISiloBuilder builder,
Expand All @@ -31,7 +34,7 @@ public static ISiloBuilder UseRedisGrainDirectoryAsDefault(
}

/// <summary>
/// Add a Redis data-store as a named Grain Directory
/// Adds a named grain directory which persists entries in Redis.
/// </summary>
public static ISiloBuilder AddRedisGrainDirectory(
this ISiloBuilder builder,
Expand All @@ -42,7 +45,7 @@ public static ISiloBuilder AddRedisGrainDirectory(
}

/// <summary>
/// Add a Redis data-store as a named Grain Directory
/// Adds a named grain directory which persists entries in Redis.
/// </summary>
public static ISiloBuilder AddRedisGrainDirectory(
this ISiloBuilder builder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using Orleans.GrainDirectory.Redis;
using Orleans.Runtime;
using StackExchange.Redis;
Expand All @@ -11,37 +12,51 @@ namespace Orleans.Configuration
public class RedisGrainDirectoryOptions
{
/// <summary>
/// Configure the Redis client
/// Gets or sets the Redis client configuration.
/// </summary>
[RedactRedisConfigurationOptions]
public ConfigurationOptions ConfigurationOptions { get; set; }

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// </summary>
public Func<RedisGrainDirectoryOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests).
/// Setting a value different from null will cause duplicate activations in the cluster.
/// </summary>
public TimeSpan? EntryExpiry { get; set; } = null;

/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisGrainDirectoryOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions);
ReubenBond marked this conversation as resolved.
Show resolved Hide resolved
}

public class RedactRedisConfigurationOptions : RedactAttribute
internal class RedactRedisConfigurationOptions : RedactAttribute
{
public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value);
}

/// <summary>
/// Configuration validator for <see cref="RedisGrainDirectoryOptions"/>.
/// </summary>
public class RedisGrainDirectoryOptionsValidator : IConfigurationValidator
{
private readonly RedisGrainDirectoryOptions options;
private readonly RedisGrainDirectoryOptions _options;

public RedisGrainDirectoryOptionsValidator(RedisGrainDirectoryOptions options)
{
this.options = options;
_options = options;
}

/// <inheritdoc/>
public void ValidateConfiguration()
{
if (this.options.ConfigurationOptions == null)
if (_options.ConfigurationOptions == null)
{
throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(options.ConfigurationOptions)} is required.");
throw new OrleansConfigurationException($"Invalid {nameof(RedisGrainDirectoryOptions)} values for {nameof(RedisGrainDirectory)}. {nameof(_options.ConfigurationOptions)} is required.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageTags>$(PackageTags) Redis Grain Directory</PackageTags>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<VersionSuffix Condition="$(VersionSuffix) == ''">beta1</VersionSuffix>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Tester.Redis")]
29 changes: 16 additions & 13 deletions src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -16,8 +17,9 @@ public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloL
private readonly RedisGrainDirectoryOptions directoryOptions;
private readonly ClusterOptions clusterOptions;
private readonly ILogger<RedisGrainDirectory> logger;
private readonly RedisKey _keyPrefix;

private ConnectionMultiplexer redis;
private IConnectionMultiplexer redis;
private IDatabase database;
private LuaScript deleteScript;

Expand All @@ -29,6 +31,7 @@ public RedisGrainDirectory(
this.directoryOptions = directoryOptions;
this.logger = logger;
this.clusterOptions = clusterOptions.Value;
_keyPrefix = Encoding.UTF8.GetBytes($"{this.clusterOptions.ClusterId}/directory/");
}

public async Task<GrainAddress> Lookup(GrainId grainId)
Expand Down Expand Up @@ -121,7 +124,7 @@ public void Participate(ISiloLifecycle lifecycle)

public async Task Initialize(CancellationToken ct = default)
{
this.redis = await ConnectionMultiplexer.ConnectAsync(this.directoryOptions.ConfigurationOptions);
this.redis = await directoryOptions.CreateMultiplexer(directoryOptions);

// Configure logging
this.redis.ConnectionRestored += this.LogConnectionRestored;
Expand All @@ -132,16 +135,16 @@ public async Task Initialize(CancellationToken ct = default)
this.database = this.redis.GetDatabase();

this.deleteScript = LuaScript.Prepare(
ReubenBond marked this conversation as resolved.
Show resolved Hide resolved
@"
local cur = redis.call('GET', @key)
if cur ~= false then
local typedCur = cjson.decode(cur)
if typedCur.ActivationId == @val then
return redis.call('DEL', @key)
end
end
return 0
");
"""
local cur = redis.call('GET', @key)
if cur ~= false then
local typedCur = cjson.decode(cur)
if typedCur.ActivationId == @val then
return redis.call('DEL', @key)
end
end
return 0
""");
}

private async Task Uninitialize(CancellationToken arg)
Expand All @@ -155,7 +158,7 @@ private async Task Uninitialize(CancellationToken arg)
}
}

private string GetKey(GrainId grainId) => $"{this.clusterOptions.ClusterId}-{grainId}";
private string GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString());

#region Logging
private void LogConnectionRestored(object sender, ConnectionFailedEventArgs e)
Expand Down
Loading