diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs index 703a4e7d20..c95073aa69 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs @@ -4,28 +4,24 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System; using System.Collections.Generic; using System.Data; - using System.IO; using System.Linq; using System.Net; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; - using global::Azure.Core.Serialization; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Diagnostics; using Microsoft.Azure.Cosmos.FaultInjection; using Microsoft.VisualStudio.TestTools.UnitTesting; + using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.MultiRegionSetupHelpers; + using CosmosSystemTextJsonSerializer = MultiRegionSetupHelpers.CosmosSystemTextJsonSerializer; using Database = Database; using PartitionKey = PartitionKey; [TestClass] public class CosmosAvailabilityStrategyTests { - private const string dbName = "availabilityStrategyTestDb"; - private const string containerName = "availabilityStrategyTestContainer"; - private const string changeFeedContainerName = "availabilityStrategyTestChangeFeedContainer"; - private CosmosClient client; private Database database; private Container container; @@ -69,7 +65,7 @@ public async Task TestInitAsync() { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; - this.cosmosSystemTextJsonSerializer = new CosmosSystemTextJsonSerializer(jsonSerializerOptions); + this.cosmosSystemTextJsonSerializer = new MultiRegionSetupHelpers.CosmosSystemTextJsonSerializer(jsonSerializerOptions); if (string.IsNullOrEmpty(this.connectionString)) { @@ -99,7 +95,7 @@ public void TestCleanup() { try { - this.container.DeleteItemAsync("deleteMe", new PartitionKey("MMWrite")); + this.container.DeleteItemAsync("deleteMe", new PartitionKey("MMWrite")); } catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { @@ -256,11 +252,11 @@ public async Task AvailabilityStrategyNoTriggerTest(bool isPreferredLocationsEmp connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); responseDelay.Enable(); - ItemResponse ir = await container.ReadItemAsync("testId", new PartitionKey("pk")); + ItemResponse ir = await container.ReadItemAsync("testId", new PartitionKey("pk")); CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); @@ -326,8 +322,8 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); responseDelay.Enable(); @@ -337,7 +333,7 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred threshold: TimeSpan.FromMilliseconds(100), thresholdStep: TimeSpan.FromMilliseconds(50)) }; - ItemResponse ir = await container.ReadItemAsync( + ItemResponse ir = await container.ReadItemAsync( "testId", new PartitionKey("pk"), requestOptions); @@ -390,8 +386,8 @@ public async Task AvailabilityStrategyDisableOverideTest(bool isPreferredLocatio connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); responseDelay.Enable(); ItemRequestOptions requestOptions = new ItemRequestOptions @@ -399,7 +395,7 @@ public async Task AvailabilityStrategyDisableOverideTest(bool isPreferredLocatio AvailabilityStrategy = new DisabledAvailabilityStrategy() }; - ItemResponse ir = await container.ReadItemAsync( + ItemResponse ir = await container.ReadItemAsync( "testId", new PartitionKey("pk"), requestOptions); @@ -534,8 +530,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); CosmosTraceDiagnostics traceDiagnostic; object hedgeContext; @@ -552,7 +548,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co itemRequestOptions.ExcludeRegions = new List() { "East US" }; } - ItemResponse ir = await container.ReadItemAsync( + ItemResponse ir = await container.ReadItemAsync( "testId", new PartitionKey("pk"), itemRequestOptions); @@ -579,7 +575,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co requestOptions.ExcludeRegions = new List() { "East US" }; } - FeedIterator queryIterator = container.GetItemQueryIterator( + FeedIterator queryIterator = container.GetItemQueryIterator( new QueryDefinition(queryString), requestOptions: requestOptions); @@ -587,7 +583,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co while (queryIterator.HasMoreResults) { - FeedResponse feedResponse = await queryIterator.ReadNextAsync(); + FeedResponse feedResponse = await queryIterator.ReadNextAsync(); Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; @@ -609,7 +605,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co queryRequestOptions.ExcludeRegions = new List() { "East US" }; } - FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( + FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( new QueryDefinition(crossPartitionQueryString), null, queryRequestOptions); @@ -618,7 +614,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co while (crossPartitionQueryIterator.HasMoreResults) { - FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); + FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; @@ -640,7 +636,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co readManyRequestOptions.ExcludeRegions = new List() { "East US" }; } - FeedResponse readManyResponse = await container.ReadManyItemsAsync( + FeedResponse readManyResponse = await container.ReadManyItemsAsync( new List<(string, PartitionKey)>() { ("testId", new PartitionKey("pk")), @@ -660,8 +656,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co break; case "ChangeFeed": - Container leaseContainer = database.GetContainer(CosmosAvailabilityStrategyTests.changeFeedContainerName); - ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( + Container leaseContainer = database.GetContainer(MultiRegionSetupHelpers.changeFeedContainerName); + ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( processorName: "AvialabilityStrategyTest", onChangesDelegate: HandleChangesAsync) .WithInstanceName("test") @@ -670,13 +666,13 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co await changeFeedProcessor.StartAsync(); await Task.Delay(1000); - AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject testObject = new CosmosIntegrationTestObject { Id = "item4", Pk = "pk4", Other = Guid.NewGuid().ToString() }; - await container.UpsertItemAsync(testObject); + await container.UpsertItemAsync(testObject); rule.Enable(); @@ -751,8 +747,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); CosmosTraceDiagnostics traceDiagnostic; object hedgeContext; @@ -763,7 +759,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito rule1.Enable(); rule2.Enable(); - ItemResponse ir = await container.ReadItemAsync( + ItemResponse ir = await container.ReadItemAsync( "testId", new PartitionKey("pk")); @@ -783,7 +779,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito PartitionKey = new PartitionKey("pk"), }; - FeedIterator queryIterator = container.GetItemQueryIterator( + FeedIterator queryIterator = container.GetItemQueryIterator( new QueryDefinition(queryString), requestOptions: requestOptions); @@ -792,7 +788,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito while (queryIterator.HasMoreResults) { - FeedResponse feedResponse = await queryIterator.ReadNextAsync(); + FeedResponse feedResponse = await queryIterator.ReadNextAsync(); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); @@ -805,7 +801,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito case "CrossPartitionQuery": string crossPartitionQueryString = "SELECT * FROM c"; - FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( + FeedIterator crossPartitionQueryIterator = container.GetItemQueryIterator( new QueryDefinition(crossPartitionQueryString)); rule1.Enable(); @@ -813,7 +809,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito while (crossPartitionQueryIterator.HasMoreResults) { - FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); + FeedResponse feedResponse = await crossPartitionQueryIterator.ReadNextAsync(); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); @@ -828,7 +824,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito rule1.Enable(); rule2.Enable(); - FeedResponse readManyResponse = await container.ReadManyItemsAsync( + FeedResponse readManyResponse = await container.ReadManyItemsAsync( new List<(string, PartitionKey)>() { ("testId", new PartitionKey("pk")), @@ -846,8 +842,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito break; case "ChangeFeed": - Container leaseContainer = database.GetContainer(CosmosAvailabilityStrategyTests.changeFeedContainerName); - ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( + Container leaseContainer = database.GetContainer(MultiRegionSetupHelpers.changeFeedContainerName); + ChangeFeedProcessor changeFeedProcessor = container.GetChangeFeedProcessorBuilder( processorName: "AvialabilityStrategyTest", onChangesDelegate: HandleChangesStepAsync) .WithInstanceName("test") @@ -856,13 +852,13 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito await changeFeedProcessor.StartAsync(); await Task.Delay(1000); - AvailabilityStrategyTestObject testObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject testObject = new CosmosIntegrationTestObject { Id = "item4", Pk = "pk4", Other = Guid.NewGuid().ToString() }; - await container.UpsertItemAsync(testObject); + await container.UpsertItemAsync(testObject); rule1.Enable(); rule2.Enable(); @@ -920,8 +916,8 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeTest() connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); sendDelay.Enable(); @@ -933,15 +929,15 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeTest() enableMultiWriteRegionHedge: true) }; - AvailabilityStrategyTestObject availabilityStrategyTestObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject { Id = "deleteMe", Pk = "MMWrite", Other = "test" }; - ItemResponse ir = await container.CreateItemAsync( - availabilityStrategyTestObject, + ItemResponse ir = await container.CreateItemAsync( + CosmosIntegrationTestObject, requestOptions: requestOptions); sendDelay.Disable(); @@ -988,8 +984,8 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest() connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); responseDelay.Enable(); @@ -1001,7 +997,7 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest() enableMultiWriteRegionHedge: true) }; - AvailabilityStrategyTestObject availabilityStrategyTestObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject { Id = "deleteMe", Pk = "MMWrite", @@ -1010,8 +1006,8 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest() try { - ItemResponse ir = await container.CreateItemAsync( - availabilityStrategyTestObject, + ItemResponse ir = await container.CreateItemAsync( + CosmosIntegrationTestObject, requestOptions: requestOptions); } catch (CosmosException ex) @@ -1080,8 +1076,8 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeStepTest() connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); @@ -1093,7 +1089,7 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeStepTest() enableMultiWriteRegionHedge: true) }; - AvailabilityStrategyTestObject availabilityStrategyTestObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject { Id = "deleteMe", Pk = "MMWrite", @@ -1102,9 +1098,9 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeStepTest() try { - await this.container.DeleteItemAsync( - availabilityStrategyTestObject.Id, - new PartitionKey(availabilityStrategyTestObject.Pk)); + await this.container.DeleteItemAsync( + CosmosIntegrationTestObject.Id, + new PartitionKey(CosmosIntegrationTestObject.Pk)); } catch (Exception) { @@ -1114,8 +1110,8 @@ await this.container.DeleteItemAsync( sendDelay.Enable(); sendDelay2.Enable(); - ItemResponse ir = await container.CreateItemAsync( - availabilityStrategyTestObject, + ItemResponse ir = await container.CreateItemAsync( + CosmosIntegrationTestObject, requestOptions: requestOptions); sendDelay.Disable(); @@ -1178,8 +1174,8 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterStepTest() connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); ItemRequestOptions requestOptions = new ItemRequestOptions { @@ -1189,7 +1185,7 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterStepTest() enableMultiWriteRegionHedge: true) }; - AvailabilityStrategyTestObject availabilityStrategyTestObject = new AvailabilityStrategyTestObject + CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject { Id = "deleteMe", Pk = "MMWrite", @@ -1198,9 +1194,9 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterStepTest() try { - await this.container.DeleteItemAsync( - availabilityStrategyTestObject.Id, - new PartitionKey(availabilityStrategyTestObject.Pk)); + await this.container.DeleteItemAsync( + CosmosIntegrationTestObject.Id, + new PartitionKey(CosmosIntegrationTestObject.Pk)); } catch (Exception) { @@ -1212,8 +1208,8 @@ await this.container.DeleteItemAsync( try { - ItemResponse ir = await container.CreateItemAsync( - availabilityStrategyTestObject, + ItemResponse ir = await container.CreateItemAsync( + CosmosIntegrationTestObject, requestOptions: requestOptions); } catch (CosmosException ex) @@ -1275,11 +1271,11 @@ public async Task AvailabilityStrategyWithCancellationTokenThrowsExceptionTest() CancellationTokenSource cts = new CancellationTokenSource(); cts.Cancel(); - Database database = faultInjectionClient.GetDatabase(CosmosAvailabilityStrategyTests.dbName); - Container container = database.GetContainer(CosmosAvailabilityStrategyTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); CosmosOperationCanceledException cancelledException = await Assert.ThrowsExceptionAsync(() => - container.ReadItemAsync( + container.ReadItemAsync( "testId", new PartitionKey("pk"), cancellationToken: cts.Token )); @@ -1290,7 +1286,7 @@ public async Task AvailabilityStrategyWithCancellationTokenThrowsExceptionTest() private static async Task HandleChangesAsync( ChangeFeedProcessorContext context, - IReadOnlyCollection changes, + IReadOnlyCollection changes, CancellationToken cancellationToken) { if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1)) @@ -1308,7 +1304,7 @@ private static async Task HandleChangesAsync( private static async Task HandleChangesStepAsync( ChangeFeedProcessorContext context, - IReadOnlyCollection changes, + IReadOnlyCollection changes, CancellationToken cancellationToken) { if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1)) @@ -1324,55 +1320,5 @@ private static async Task HandleChangesStepAsync( Assert.AreNotEqual(region2, (string)hedgeContext); await Task.Delay(1); } - - internal class AvailabilityStrategyTestObject - { - - [JsonPropertyName("id")] - public string Id { get; set; } - - [JsonPropertyName("pk")] - public string Pk { get; set; } - - [JsonPropertyName("other")] - public string Other { get; set; } - } - - private class CosmosSystemTextJsonSerializer : CosmosSerializer - { - private readonly JsonObjectSerializer systemTextJsonSerializer; - - public CosmosSystemTextJsonSerializer(JsonSerializerOptions jsonSerializerOptions) - { - this.systemTextJsonSerializer = new JsonObjectSerializer(jsonSerializerOptions); - } - - public override T FromStream(Stream stream) - { - using (stream) - { - if (stream.CanSeek - && stream.Length == 0) - { - return default; - } - - if (typeof(Stream).IsAssignableFrom(typeof(T))) - { - return (T)(object)stream; - } - - return (T)this.systemTextJsonSerializer.Deserialize(stream, typeof(T), default); - } - } - - public override Stream ToStream(T input) - { - MemoryStream streamPayload = new MemoryStream(); - this.systemTextJsonSerializer.Serialize(streamPayload, input, input.GetType(), default); - streamPayload.Position = 0; - return streamPayload; - } - } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs new file mode 100644 index 0000000000..e51b6a3fff --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemIntegrationTests.cs @@ -0,0 +1,149 @@ +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Net; + using System.Text.Json; + using System.Text.Json.Serialization; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.FaultInjection; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.MultiRegionSetupHelpers; + + [TestClass] + public class CosmosItemIntegrationTests + { + private string connectionString; + private CosmosClient client; + private Database database; + private Container container; + private Container changeFeedContainer; + + private static string region1; + private static string region2; + private static string region3; + private CosmosSystemTextJsonSerializer cosmosSystemTextJsonSerializer; + + [TestInitialize] + public async Task TestInitAsync() + { + this.connectionString = ConfigurationManager.GetEnvironmentVariable("COSMOSDB_MULTI_REGION", null); + + JsonSerializerOptions jsonSerializerOptions = new JsonSerializerOptions() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + this.cosmosSystemTextJsonSerializer = new CosmosSystemTextJsonSerializer(jsonSerializerOptions); + + if (string.IsNullOrEmpty(this.connectionString)) + { + Assert.Fail("Set environment variable COSMOSDB_MULTI_REGION to run the tests"); + } + this.client = new CosmosClient( + this.connectionString, + new CosmosClientOptions() + { + Serializer = this.cosmosSystemTextJsonSerializer, + }); + + (this.database, this.container, this.changeFeedContainer) = await MultiRegionSetupHelpers.GetOrCreateMultiRegionDatabaseAndContainers(this.client); + + IDictionary readRegions = this.client.DocumentClient.GlobalEndpointManager.GetAvailableReadEndpointsByLocation(); + Assert.IsTrue(readRegions.Count() >= 3); + + region1 = readRegions.Keys.ElementAt(0); + region2 = readRegions.Keys.ElementAt(1); + region3 = readRegions.Keys.ElementAt(2); + } + + [TestCleanup] + public void TestCleanup() + { + try + { + this.container.DeleteItemAsync("deleteMe", new PartitionKey("MMWrite")); + } + catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound) + { + // Ignore + } + finally + { + //Do not delete the resources (except MM Write test object), georeplication is slow and we want to reuse the resources + this.client?.Dispose(); + } + } + + [TestMethod] + [TestCategory("MultiRegion")] + [Timeout(70000)] + public async Task ReadMany2UnreachablePartitionsTest() + { + List feedRanges = (List)await this.container.GetFeedRangesAsync(); + Assert.IsTrue(feedRanges.Count > 0); + + FaultInjectionCondition condition = new FaultInjectionConditionBuilder() + .WithConnectionType(FaultInjectionConnectionType.Direct) + .WithOperationType(FaultInjectionOperationType.QueryItem) + .WithEndpoint(new FaultInjectionEndpointBuilder( + MultiRegionSetupHelpers.dbName, + MultiRegionSetupHelpers.containerName, + feedRanges[0]) + .WithReplicaCount(2) + .WithIncludePrimary(false) + .Build()) + .Build(); + + FaultInjectionServerErrorResult result = new FaultInjectionServerErrorResultBuilder(FaultInjectionServerErrorType.Gone) + .WithTimes(int.MaxValue - 1) + .Build(); + + FaultInjectionRule rule = new FaultInjectionRuleBuilder("connectionDelay", condition, result) + .WithDuration(TimeSpan.FromDays(1)) + .Build(); + + FaultInjector injector = new FaultInjector(new List { rule }); + + rule.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ConsistencyLevel = ConsistencyLevel.Strong, + //Serializer = this.cosmosSystemTextJsonSerializer, + FaultInjector = injector, + }; + + CosmosClient fiClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: clientOptions); + + Database fidb = fiClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container fic = fidb.GetContainer(MultiRegionSetupHelpers.containerName); + + IReadOnlyList<(string, PartitionKey)> items = new List<(string, PartitionKey)>() + { + ("testId", new PartitionKey("pk")), + ("testId2", new PartitionKey("pk2")), + ("testId3", new PartitionKey("pk3")), + ("testId4", new PartitionKey("pk4")), + }; + + try + { + rule.Enable(); + FeedResponse feedResponse = await fic.ReadManyItemsAsync(items); + } + catch (Exception ex) + { + Assert.Fail(ex.ToString()); + } + finally + { + rule.Disable(); + fiClient.Dispose(); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosMultiRegionDiagnosticsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosMultiRegionDiagnosticsTests.cs index 848af2adee..8f65c3321c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosMultiRegionDiagnosticsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosMultiRegionDiagnosticsTests.cs @@ -2,21 +2,17 @@ { using System; using System.Collections.Generic; - using System.Net; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Diagnostics; using Microsoft.Azure.Cosmos.FaultInjection; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.VisualStudio.TestTools.UnitTesting; - using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.CosmosAvailabilityStrategyTests; + using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.MultiRegionSetupHelpers; [TestClass] public class CosmosMultiRegionDiagnosticsTests { - private const string dbName = "availabilityStrategyTestDb"; - private const string containerName = "availabilityStrategyTestContainer"; - CosmosClient client; Database database; Container container; @@ -30,7 +26,7 @@ public async Task TestInitialize() this.client = new CosmosClient(this.connectionString); DatabaseResponse db = await this.client.CreateDatabaseIfNotExistsAsync( - id: CosmosMultiRegionDiagnosticsTests.dbName, + id: MultiRegionSetupHelpers.dbName, throughput: 400); this.database = db.Database; @@ -49,8 +45,8 @@ public void TestCleanup() [TestCategory("MultiRegion")] public async Task ExlcudeRegionDiagnosticsTest() { - this.container = this.database.GetContainer(CosmosMultiRegionDiagnosticsTests.containerName); - ItemResponse itemResponse = await this.container.ReadItemAsync( + this.container = this.database.GetContainer(MultiRegionSetupHelpers.containerName); + ItemResponse itemResponse = await this.container.ReadItemAsync( "testId", new Cosmos.PartitionKey("pk"), new ItemRequestOptions() { @@ -69,9 +65,9 @@ public async Task ExlcudeRegionDiagnosticsTest() [TestCategory("MultiRegion")] public async Task ExcludeRegionWithReadManyDiagnosticsTest() { - this.container = this.database.GetContainer(CosmosMultiRegionDiagnosticsTests.containerName); + this.container = this.database.GetContainer(MultiRegionSetupHelpers.containerName); - FeedResponse feedResonse = await this.container.ReadManyItemsAsync( + FeedResponse feedResonse = await this.container.ReadManyItemsAsync( new List<(string, PartitionKey)>() { ("testId", new PartitionKey("pk")), @@ -125,8 +121,8 @@ public async Task HedgeNestingDiagnosticsTest() connectionString: this.connectionString, clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) { - Database database = faultInjectionClient.GetDatabase(CosmosMultiRegionDiagnosticsTests.dbName); - Container container = database.GetContainer(CosmosMultiRegionDiagnosticsTests.containerName); + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); responseDelay.Enable(); @@ -138,7 +134,7 @@ public async Task HedgeNestingDiagnosticsTest() }; //Request should be hedged to North Central US - ItemResponse itemResponse = await container.ReadItemAsync( + ItemResponse itemResponse = await container.ReadItemAsync( "testId", new PartitionKey("pk"), requestOptions); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/MultiRegionSetupHelpers.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/MultiRegionSetupHelpers.cs index 971611f5ce..ad8cf2863e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/MultiRegionSetupHelpers.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/MultiRegionSetupHelpers.cs @@ -5,15 +5,18 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests { using System.Collections.Generic; + using System.IO; using System.Net; + using System.Text.Json; + using System.Text.Json.Serialization; using System.Threading.Tasks; - using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.CosmosAvailabilityStrategyTests; + using global::Azure.Core.Serialization; public class MultiRegionSetupHelpers { - private const string dbName = "availabilityStrategyTestDb"; - private const string containerName = "availabilityStrategyTestContainer"; - private const string changeFeedContainerName = "availabilityStrategyTestChangeFeedContainer"; + public const string dbName = "integrationTestDb"; + public const string containerName = "integrationTestContainer"; + public const string changeFeedContainerName = "integrationTestChangeFeedContainer"; public static async Task<(Database, Container, Container)> GetOrCreateMultiRegionDatabaseAndContainers(CosmosClient client) { @@ -39,14 +42,14 @@ public class MultiRegionSetupHelpers List tasks = new List() { - container.CreateItemAsync( - new AvailabilityStrategyTestObject { Id = "testId", Pk = "pk" }), - container.CreateItemAsync( - new AvailabilityStrategyTestObject { Id = "testId2", Pk = "pk2" }), - container.CreateItemAsync( - new AvailabilityStrategyTestObject { Id = "testId3", Pk = "pk3" }), - container.CreateItemAsync( - new AvailabilityStrategyTestObject { Id = "testId4", Pk = "pk4" }) + container.CreateItemAsync( + new CosmosIntegrationTestObject { Id = "testId", Pk = "pk" }), + container.CreateItemAsync( + new CosmosIntegrationTestObject { Id = "testId2", Pk = "pk2" }), + container.CreateItemAsync( + new CosmosIntegrationTestObject { Id = "testId3", Pk = "pk3" }), + container.CreateItemAsync( + new CosmosIntegrationTestObject { Id = "testId4", Pk = "pk4" }) }; await Task.WhenAll(tasks); @@ -62,5 +65,55 @@ public class MultiRegionSetupHelpers return (database, container, changeFeedContainer); } + + internal class CosmosIntegrationTestObject + { + + [JsonPropertyName("id")] + public string Id { get; set; } + + [JsonPropertyName("pk")] + public string Pk { get; set; } + + [JsonPropertyName("other")] + public string Other { get; set; } + } + + internal class CosmosSystemTextJsonSerializer : CosmosSerializer + { + private readonly JsonObjectSerializer systemTextJsonSerializer; + + public CosmosSystemTextJsonSerializer(JsonSerializerOptions jsonSerializerOptions) + { + this.systemTextJsonSerializer = new JsonObjectSerializer(jsonSerializerOptions); + } + + public override T FromStream(Stream stream) + { + using (stream) + { + if (stream.CanSeek + && stream.Length == 0) + { + return default; + } + + if (typeof(Stream).IsAssignableFrom(typeof(T))) + { + return (T)(object)stream; + } + + return (T)this.systemTextJsonSerializer.Deserialize(stream, typeof(T), default); + } + } + + public override Stream ToStream(T input) + { + MemoryStream streamPayload = new MemoryStream(); + this.systemTextJsonSerializer.Serialize(streamPayload, input, input.GetType(), default); + streamPayload.Position = 0; + return streamPayload; + } + } } }