Skip to content

Commit

Permalink
Add FdbConnectionOptions and merge OpenAsync with OpenNamedPartitionA…
Browse files Browse the repository at this point in the history
…sync

- Add FdbConnectionOptions class that contains all main options and knobs for the client (clusterfile, timeouts, etc...)
- Fdb.OpenAsync(FdbConnectionOptions) can open any database, including named partitions
- Fdb.Directory.OpenNamedPartitionAsync are deprecated since OpenAsync can do the job
- Allows settings default timeout and retry limits before accessing the Directory Layer (cf Issue #61)
  • Loading branch information
KrzysFR committed May 11, 2018
1 parent 192ef09 commit b3a18d3
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 68 deletions.
29 changes: 18 additions & 11 deletions FdbShell/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ private static async Task MainAsync(string[] args, CancellationToken cancel)
Db = null;
try
{
Db = await ChangeDatabase(clusterFile, dbName, partition, cancel);
var cnxOptions = new FdbConnectionOptions
{
ClusterFile = clusterFile,
DbName = dbName,
PartitionPath = partition
};
Db = await ChangeDatabase(cnxOptions, cancel);
Db.DefaultTimeout = Math.Max(0, timeout) * 1000;
Db.DefaultRetryLimit = Math.Max(0, maxRetries);

Expand Down Expand Up @@ -542,7 +548,13 @@ private static async Task MainAsync(string[] args, CancellationToken cancel)
IFdbDatabase newDb = null;
try
{
newDb = await ChangeDatabase(clusterFile, dbName, newPartition, cancel);
var options = new FdbConnectionOptions
{
ClusterFile = clusterFile,
DbName = dbName,
PartitionPath = newPartition
};
newDb = await ChangeDatabase(options, cancel);
}
catch (Exception)
{
Expand Down Expand Up @@ -720,16 +732,11 @@ private static async Task CoordinatorsCommand(IFdbDatabase db, TextWriter log, C
}
}

private static Task<IFdbDatabase> ChangeDatabase(string clusterFile, string dbName, string[] partition, CancellationToken ct)
private static Task<IFdbDatabase> ChangeDatabase(FdbConnectionOptions options, CancellationToken ct)
{
if (partition == null || partition.Length == 0)
{
return Fdb.OpenAsync(clusterFile, dbName, ct);
}
else
{
return Fdb.Directory.OpenNamedPartitionAsync(clusterFile, dbName, partition, false, ct);
}
options.DefaultTimeout = TimeSpan.FromSeconds(30);
options.DefaultRetryLimit = 50;
return Fdb.OpenAsync(options, ct);
}

}
Expand Down
99 changes: 89 additions & 10 deletions FoundationDB.Client/Fdb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ namespace FoundationDB.Client
{
using System;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Doxense.Diagnostics.Contracts;
using SystemIO = System.IO;
using FoundationDB.Client.Native;
using JetBrains.Annotations;
Expand Down Expand Up @@ -532,7 +534,7 @@ private static async Task<FdbCluster> CreateClusterInternalAsync([CanBeNull] str
[ItemNotNull]
public static Task<IFdbDatabase> OpenAsync(CancellationToken ct = default)
{
return OpenAsync(clusterFile: null, dbName: null, globalSpace: KeySubspace.Empty, ct: ct);
return OpenInternalAsync(new FdbConnectionOptions(), ct);
}

/// <summary>Create a new connection with the "DB" database on the cluster specified by the default cluster file, and with the specified global subspace</summary>
Expand All @@ -544,7 +546,11 @@ public static Task<IFdbDatabase> OpenAsync(CancellationToken ct = default)
[ItemNotNull]
public static Task<IFdbDatabase> OpenAsync([CanBeNull] IKeySubspace globalSpace, CancellationToken ct = default)
{
return OpenAsync(clusterFile: null, dbName: null, globalSpace: globalSpace, ct: ct);
var options = new FdbConnectionOptions
{
GlobalSpace = globalSpace,
};
return OpenInternalAsync(options, ct);
}

/// <summary>Create a new connection with a database on the specified cluster</summary>
Expand All @@ -559,7 +565,12 @@ public static Task<IFdbDatabase> OpenAsync([CanBeNull] IKeySubspace globalSpace,
[ItemNotNull]
public static Task<IFdbDatabase> OpenAsync([CanBeNull] string clusterFile, [CanBeNull] string dbName, CancellationToken ct = default)
{
return OpenAsync(clusterFile, dbName, KeySubspace.Empty, readOnly: false, ct: ct);
var options = new FdbConnectionOptions
{
ClusterFile = clusterFile,
DbName = dbName,
};
return OpenInternalAsync(options, ct);
}

/// <summary>Create a new connection with a database on the specified cluster</summary>
Expand All @@ -569,26 +580,50 @@ public static Task<IFdbDatabase> OpenAsync([CanBeNull] string clusterFile, [CanB
/// <param name="readOnly">If true, the database instance will only allow read operations</param>
/// <param name="ct">Token used to abort the operation</param>
/// <returns>Task that will return an FdbDatabase, or an exception</returns>
/// <remarks>As of 1.0, the only supported database name is 'DB'</remarks>
/// <exception cref="InvalidOperationException">If <paramref name="dbName"/> is anything other than 'DB'</exception>
/// <exception cref="OperationCanceledException">If the token <paramref name="ct"/> is cancelled</exception>
/// <remarks>Since connections are not pooled, so this method can be costly and should NOT be called every time you need to read or write from the database. Instead, you should open a database instance at the start of your process, and use it a singleton.</remarks>
[ItemNotNull]
public static Task<IFdbDatabase> OpenAsync([CanBeNull] string clusterFile, [CanBeNull] string dbName, [CanBeNull] IKeySubspace globalSpace, bool readOnly = false, CancellationToken ct = default)
{
return OpenInternalAsync(clusterFile, dbName, globalSpace, readOnly, ct);
var options = new FdbConnectionOptions
{
ClusterFile = clusterFile,
DbName = dbName,
GlobalSpace = globalSpace,
ReadOnly = readOnly
};
return OpenInternalAsync(options, ct);
}

/// <summary>Create a new connection with a database using the specified options</summary>
/// <param name="options">Connection options used to specify the cluster file, partition path, default timeouts, etc...</param>
/// <param name="ct">Token used to abort the operation</param>
/// <returns>Task that will return an FdbDatabase, or an exception</returns>
/// <exception cref="InvalidOperationException">If <see name="FdbConnectionOptions.DbName"/> is anything other than 'DB'</exception>
/// <exception cref="OperationCanceledException">If the token <paramref name="ct"/> is cancelled</exception>
[ItemNotNull]
public static Task<IFdbDatabase> OpenAsync([NotNull] FdbConnectionOptions options, CancellationToken ct)
{
Contract.NotNull(options, nameof(options));
return OpenInternalAsync(options, ct);
}

/// <summary>Create a new database handler instance using the specificied cluster file, database name, global subspace and read only settings</summary>
[ItemNotNull]
internal static async Task<IFdbDatabase> OpenInternalAsync([CanBeNull] string clusterFile, [CanBeNull] string dbName, [CanBeNull] IKeySubspace globalSpace, bool readOnly, CancellationToken ct)
internal static async Task<IFdbDatabase> OpenInternalAsync(FdbConnectionOptions options, CancellationToken ct)
{
Contract.Requires(options != null);
ct.ThrowIfCancellationRequested();

dbName = dbName ?? "DB";
globalSpace = globalSpace ?? KeySubspace.Empty;
string clusterFile = options.ClusterFile;
string dbName = options.DbName ?? FdbConnectionOptions.DefaultDbName; // new FdbConnectionOptions { GlobalSpace =
bool readOnly = options.ReadOnly;
IKeySubspace globalSpace = options.GlobalSpace ?? KeySubspace.Empty;
string[] partitionPath = options.PartitionPath?.ToArray();
bool hasPartition = partitionPath != null && partitionPath.Length > 0;

if (Logging.On) Logging.Info(typeof(Fdb), "OpenAsync", $"Connecting to database '{dbName}' using cluster file '{clusterFile}' and subspace '{globalSpace}' ...");
if (Logging.On) Logging.Info(typeof(Fdb), nameof(OpenInternalAsync), $"Connecting to database '{dbName}' using cluster file '{clusterFile}' and subspace '{globalSpace}' ...");

FdbCluster cluster = null;
FdbDatabase db = null;
Expand All @@ -597,7 +632,18 @@ internal static async Task<IFdbDatabase> OpenInternalAsync([CanBeNull] string cl
{
cluster = await CreateClusterInternalAsync(clusterFile, ct).ConfigureAwait(false);
//note: since the cluster is not provided by the caller, link it with the database's Dispose()
db = await cluster.OpenDatabaseInternalAsync(dbName, globalSpace, readOnly: readOnly, ownsCluster: true, ct: ct).ConfigureAwait(false);
db = await cluster.OpenDatabaseInternalAsync(dbName, globalSpace, readOnly: !hasPartition && readOnly, ownsCluster: true, ct: ct).ConfigureAwait(false);

// set the default options
if (options.DefaultTimeout != TimeSpan.Zero) db.DefaultTimeout = checked((int) Math.Ceiling(options.DefaultTimeout.TotalMilliseconds));
if (options.DefaultRetryLimit != 0) db.DefaultRetryLimit = options.DefaultRetryLimit;
if (options.DefaultMaxRetryDelay != 0) db.DefaultMaxRetryDelay = options.DefaultMaxRetryDelay;

if (hasPartition)
{ // open the partition, and switch the root of the db
await Fdb.Directory.SwitchToNamedPartitionAsync(db, partitionPath, readOnly, ct);
}

success = true;
return db;
}
Expand Down Expand Up @@ -791,4 +837,37 @@ public static void Stop()

}

public sealed class FdbConnectionOptions
{

public const string DefaultDbName = "DB";

/// <summary>Full path to a specific 'fdb.cluster' file</summary>
public string ClusterFile { get; set; }

/// <summary>Default database name</summary>
/// <remarks>Only "DB" is supported for now</remarks>
public string DbName { get; set; } = DefaultDbName;

/// <summary>If true, opens a read-only view of the database</summary>
/// <remarks>If set to true, only read-only transactions will be allowed on the database instance</remarks>
public bool ReadOnly { get; set; }

/// <summary>Default timeout for all transactions, in milliseconds precision (or infinite if 0)</summary>
public TimeSpan DefaultTimeout { get; set; } // sec

/// <summary>Default maximum number of retries for all transactions (or infinite if 0)</summary>
public int DefaultRetryLimit { get; set; }

public int DefaultMaxRetryDelay { get; set; }

/// <summary>Global subspace in use by the database (empty prefix by default)</summary>
/// <remarks>If <see cref="PartitionPath"/> is also set, this subspace will be used to locate the top-level Directory Layer, and the actual GlobalSpace of the database will be the partition</remarks>
public IKeySubspace GlobalSpace { get; set; }

/// <summary>If specified, open the named partition at the specified path</summary>
/// <remarks>If <see cref="GlobalSpace"/> is also set, it will be used to locate the top-level Directory Layer.</remarks>
public string[] PartitionPath { get; set; }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace FoundationDB.Client
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Doxense.Diagnostics.Contracts;
using Doxense.Linq;
using FoundationDB.Layers.Directories;
using JetBrains.Annotations;
Expand All @@ -48,6 +49,7 @@ public static class Directory
/// <param name="path">Path of the named partition to open</param>
/// <param name="ct">Token used to cancel this operation</param>
/// <returns>Returns a new database instance that will only be able to read and write inside the specified partition. If the partition does not exist, it will be automatically created</returns>
[Obsolete("Use " + nameof(Fdb.OpenAsync) + "(" + nameof(FdbConnectionOptions) + ", ...) instead")]
[ItemNotNull]
public static Task<IFdbDatabase> OpenNamedPartitionAsync([NotNull] IEnumerable<string> path, CancellationToken ct)
{
Expand All @@ -61,43 +63,44 @@ public static Task<IFdbDatabase> OpenNamedPartitionAsync([NotNull] IEnumerable<s
/// <param name="readOnly">If true, the database instance will only allow read operations</param>
/// <param name="ct">Token used to cancel this operation</param>
/// <returns>Returns a new database instance that will only be able to read and write inside the specified partition. If the partition does not exist, it will be automatically created</returns>
[Obsolete("Use " + nameof(Fdb.OpenAsync) + "(" + nameof(FdbConnectionOptions) + ", ...) instead")]
[ItemNotNull]
public static async Task<IFdbDatabase> OpenNamedPartitionAsync(string clusterFile, string dbName, [NotNull] IEnumerable<string> path, bool readOnly, CancellationToken ct)
{
if (path == null) throw new ArgumentNullException(nameof(path));
var partitionPath = path.ToList();
if (partitionPath.Count == 0) throw new ArgumentException("The path to the named partition cannot be empty", nameof(path));
Contract.NotNull(path, nameof(path));
var partitionPath = (path as string[]) ?? path.ToArray();
if (partitionPath.Length == 0) throw new ArgumentException("The path to the named partition cannot be empty", nameof(path));

// looks at the global partition table for the specified named partition

// By convention, all named databases will be under the "/Databases" folder
FdbDatabase db = null;
var rootSpace = KeySubspace.Empty;
try
var options = new FdbConnectionOptions
{
db = (FdbDatabase) await OpenInternalAsync(clusterFile, dbName, rootSpace, readOnly: false, ct: ct).ConfigureAwait(false);
var rootLayer = FdbDirectoryLayer.Create(rootSpace);
if (Logging.On) Logging.Verbose(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Opened root layer of database {db.Name} using cluster file '{db.Cluster.Path}'");
ClusterFile = clusterFile,
DbName = dbName,
PartitionPath = partitionPath,
};
var db = await Fdb.OpenInternalAsync(options, ct).ConfigureAwait(false);
return db;
}

/// <summary>Opens a named partition, and change the root subspace of the database to the corresponding prefix</summary>
internal static async Task SwitchToNamedPartitionAsync([NotNull] FdbDatabase db, [NotNull, ItemNotNull] string[] path, bool readOnly, CancellationToken ct)
{
Contract.Requires(db != null && path != null);
ct.ThrowIfCancellationRequested();

// look up in the root layer for the named partition
var descriptor = await rootLayer.CreateOrOpenAsync(db, partitionPath, layer: FdbDirectoryPartition.LayerId, ct: ct).ConfigureAwait(false);
if (Logging.On) Logging.Verbose(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Found named partition '{descriptor.FullName}' at prefix {descriptor}");
if (path.Length == 0) throw new ArgumentException("The path to the named partition cannot be empty", nameof(path));

// we have to chroot the database to the new prefix, and create a new DirectoryLayer with a new '/'
rootSpace = descriptor.Copy(); //note: create a copy of the key
//TODO: find a nicer way to do that!
db.ChangeRoot(rootSpace, FdbDirectoryLayer.Create(rootSpace, partitionPath), readOnly);
if (Logging.On) Logging.Verbose(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Opened root layer of database {db.Name} using cluster file '{db.Cluster.Path}'");

if (Logging.On) Logging.Info(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Opened partition {descriptor.FullName} at {db.GlobalSpace}, using directory layer at {db.Directory.DirectoryLayer.NodeSubspace}");
// look up in the root layer for the named partition
var descriptor = await db.Directory.CreateOrOpenAsync(path, layer: FdbDirectoryPartition.LayerId, ct: ct).ConfigureAwait(false);
if (Logging.On) Logging.Verbose(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Found named partition '{descriptor.FullName}' at prefix {descriptor}");

return db;
}
catch(Exception e)
{
db?.Dispose();
if (Logging.On) Logging.Exception(typeof(Fdb.Directory), "OpenNamedPartitionAsync", e);
throw;
}
// we have to chroot the database to the new prefix, and create a new DirectoryLayer with a new '/'
var rootSpace = descriptor.Copy(); //note: create a copy of the key
//TODO: find a nicer way to do that!
db.ChangeRoot(rootSpace, FdbDirectoryLayer.Create(rootSpace, path), readOnly);

if (Logging.On) Logging.Info(typeof(Fdb.Directory), "OpenNamedPartitionAsync", $"Opened partition {descriptor.FullName} at {db.GlobalSpace}, using directory layer at {db.Directory.DirectoryLayer.NodeSubspace}");
}

/// <summary>List and open the sub-directories of the given directory</summary>
Expand All @@ -120,7 +123,7 @@ public static async Task<Dictionary<string, FdbDirectorySubspace>> BrowseAsync([
var folders = await names
.ToAsyncEnumerable()
.SelectAsync((name, _) => parent.OpenAsync(tr, name))
.ToListAsync();
.ToListAsync(ct);

// map the result
return folders.ToDictionary(ds => ds.Name);
Expand Down
14 changes: 7 additions & 7 deletions FoundationDB.Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ static void Main(string[] args)
Fdb.Start(Fdb.GetDefaultApiVersion());
try
{
if (partition == null || partition.Length == 0)
var options = new FdbConnectionOptions
{
Db = Fdb.OpenAsync(clusterFile, dbName).GetAwaiter().GetResult();
}
else
{
Db = Fdb.Directory.OpenNamedPartitionAsync(clusterFile, dbName, partition, false, go.Token).GetAwaiter().GetResult();
}
ClusterFile = clusterFile,
DbName = dbName,
PartitionPath = partition,
};
Db = Fdb.OpenAsync(options, go.Token).GetAwaiter().GetResult();

using (Db)
{
Db.DefaultTimeout = 30 * 1000;
Expand Down
11 changes: 5 additions & 6 deletions FoundationDB.Tests/DatabaseFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,16 @@ public async Task Test_Open_Database_With_Invalid_Name_Should_Fail()
{
// As of 1.0, the only accepted database name is "DB".
// Any other name should fail with "InvalidDatabaseName"

// note: Don't forget to update this test if in the future if the API allows for other names !

// manually
using (var cluster = await Fdb.CreateClusterAsync(this.Cancellation))
{
await TestHelpers.AssertThrowsFdbErrorAsync(() => cluster.OpenDatabaseAsync("SomeOtherName", KeySubspace.Empty, false, this.Cancellation), FdbError.InvalidDatabaseName, "Passing anything other then 'DB' should fail");
}

await TestHelpers.AssertThrowsFdbErrorAsync(() => Fdb.OpenAsync(null, "SomeOtherName"), FdbError.InvalidDatabaseName, "Passing anything other then 'DB' should fail");

await TestHelpers.AssertThrowsFdbErrorAsync(() => Fdb.OpenAsync(null, "SomeOtherName", KeySubspace.Empty), FdbError.InvalidDatabaseName, "Passing anything other then 'DB' should fail");
// using Fdb.OpenAsync
await TestHelpers.AssertThrowsFdbErrorAsync(() => Fdb.OpenAsync(new FdbConnectionOptions { DbName = "SomeOtherName" }, this.Cancellation), FdbError.InvalidDatabaseName, "Passing anything other then 'DB' should fail");
}

[Test]
Expand Down Expand Up @@ -282,7 +281,7 @@ public async Task Test_Can_Get_System_Status()
public async Task Test_Can_Open_Database_With_Non_Empty_GlobalSpace()
{
// using a tuple prefix
using (var db = await Fdb.OpenAsync(null, "DB", KeySubspace.FromKey(TuPack.EncodeKey("test")), false, this.Cancellation))
using (var db = await Fdb.OpenAsync(new FdbConnectionOptions { GlobalSpace = KeySubspace.FromKey(TuPack.EncodeKey("test")) }, this.Cancellation))
{
Assert.That(db, Is.Not.Null);
Assert.That(db.GlobalSpace, Is.Not.Null);
Expand All @@ -299,7 +298,7 @@ public async Task Test_Can_Open_Database_With_Non_Empty_GlobalSpace()
}

// using a random binary prefix
using (var db = await Fdb.OpenAsync(null, "DB", new KeySubspace(new byte[] { 42, 255, 0, 90 }.AsSlice()), false, this.Cancellation))
using (var db = await Fdb.OpenAsync(new FdbConnectionOptions { GlobalSpace = KeySubspace.FromKey(new byte[] { 42, 255, 0, 90 }.AsSlice()) }, this.Cancellation))
{
Assert.That(db, Is.Not.Null);
Assert.That(db.GlobalSpace, Is.Not.Null);
Expand Down
Loading

0 comments on commit b3a18d3

Please sign in to comment.