Skip to content

Commit

Permalink
PartiQL (#256)
Browse files Browse the repository at this point in the history
Adds PartiQL support to low-level API.
  • Loading branch information
ChrixApp authored Oct 16, 2024
1 parent a871096 commit 82bb835
Show file tree
Hide file tree
Showing 18 changed files with 759 additions and 10 deletions.
17 changes: 10 additions & 7 deletions src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.DocumentModel;
using EfficientDynamoDb.Exceptions;
using EfficientDynamoDb.Internal;
Expand Down Expand Up @@ -31,6 +26,11 @@
using EfficientDynamoDb.Operations.TransactGetItems;
using EfficientDynamoDb.Operations.TransactWriteItems;
using EfficientDynamoDb.Operations.UpdateItem;
using System.Collections.Concurrent;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace EfficientDynamoDb
{
Expand All @@ -39,11 +39,14 @@ public class DynamoDbLowLevelContext : IDynamoDbLowLevelContext
internal DynamoDbContextConfig Config { get; }
internal HttpApi Api { get; }
private static readonly ConcurrentDictionary<string, Task<(string Pk, string? Sk)>> KeysCache = new ConcurrentDictionary<string, Task<(string Pk, string? Sk)>>();

private readonly DynamoDbLowLevelPartiQLContext _partiQLContext;
public IDynamoDbLowLevelPartiQLContext PartiQL => _partiQLContext;

internal DynamoDbLowLevelContext(DynamoDbContextConfig config, HttpApi api)
{
Api = api;
Config = config;
_partiQLContext = new DynamoDbLowLevelPartiQLContext(config, api);
}

public async Task<GetItemResponse> GetItemAsync(GetItemRequest request, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -145,7 +148,7 @@ public async Task<TransactWriteItemsResponse> TransactWriteItemsAsync(TransactWr

return TransactWriteItemsResponseParser.Parse(result);
}

public T ToObject<T>(Document document) where T : class => document.ToObject<T>(Config.Metadata);

public Document ToDocument<T>(T entity) where T : class => entity.ToDocument(Config.Metadata);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using EfficientDynamoDb.Internal;
using EfficientDynamoDb.Internal.Operations.BatchExecuteStatement;
using EfficientDynamoDb.Internal.Operations.ExecuteStatement;
using EfficientDynamoDb.Internal.Operations.ExecuteTransaction;
using EfficientDynamoDb.Internal.Operations.Query;
using EfficientDynamoDb.Internal.Operations.TransactGetItems;
using EfficientDynamoDb.Operations.BatchExecuteStatement;
using EfficientDynamoDb.Operations.ExecuteStatement;
using EfficientDynamoDb.Operations.ExecuteTransaction;
using System.Threading;
using System.Threading.Tasks;

namespace EfficientDynamoDb
{
internal class DynamoDbLowLevelPartiQLContext : IDynamoDbLowLevelPartiQLContext
{
internal DynamoDbContextConfig Config { get; }
internal HttpApi Api { get; }

internal DynamoDbLowLevelPartiQLContext(DynamoDbContextConfig config, HttpApi api)
{
Api = api;
Config = config;
}

public async Task<ExecuteStatementResponse> ExecuteStatementAsync(ExecuteStatementRequest request, CancellationToken cancellationToken = default)
{
var httpContent = new ExecuteStatementRequestHttpContent(request);
using var response = await Api.SendAsync(Config, httpContent, cancellationToken).ConfigureAwait(false);
var result = await DynamoDbLowLevelContext.ReadDocumentAsync(response, QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
return ExecuteStatementResponseParser.Parse(result!);
}

public async Task<BatchExecuteStatementResponse> BatchExecuteStatementAsync(BatchExecuteStatementRequest request, CancellationToken cancellationToken = default)
{
var httpContent = new BatchExecuteStatementRequestHttpContent(request);
using var response = await Api.SendAsync(Config, httpContent, cancellationToken).ConfigureAwait(false);
var result = await DynamoDbLowLevelContext.ReadDocumentAsync(response, TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
return BatchExecuteStatementResponseParser.Parse(result!);
}

public async Task<ExecuteTransactionResponse> ExecuteTransactionAsync(ExecuteTransactionRequest request, CancellationToken cancellationToken = default)
{
var httpContent = new ExecuteTransactionRequestHttpContent(request);
using var response = await Api.SendAsync(Config, httpContent, cancellationToken).ConfigureAwait(false);
var result = await DynamoDbLowLevelContext.ReadDocumentAsync(response, TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false);
return ExecuteTransactionResponseParser.Parse(result!);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System.Threading;
using System.Threading.Tasks;
using EfficientDynamoDb.DocumentModel;
using EfficientDynamoDb.Operations.BatchGetItem;
using EfficientDynamoDb.Operations.BatchWriteItem;
Expand All @@ -11,11 +9,15 @@
using EfficientDynamoDb.Operations.TransactGetItems;
using EfficientDynamoDb.Operations.TransactWriteItems;
using EfficientDynamoDb.Operations.UpdateItem;
using System.Threading;
using System.Threading.Tasks;

namespace EfficientDynamoDb
{
public interface IDynamoDbLowLevelContext
{
public IDynamoDbLowLevelPartiQLContext PartiQL { get; }

Task<GetItemResponse> GetItemAsync(GetItemRequest request, CancellationToken cancellationToken = default);

Task<BatchGetItemResponse> BatchGetItemAsync(BatchGetItemRequest request, CancellationToken cancellationToken = default);
Expand All @@ -35,7 +37,7 @@ public interface IDynamoDbLowLevelContext
Task<DeleteItemResponse> DeleteItemAsync(DeleteItemRequest request, CancellationToken cancellationToken = default);

Task<TransactWriteItemsResponse> TransactWriteItemsAsync(TransactWriteItemsRequest request, CancellationToken cancellationToken = default);

T ToObject<T>(Document document) where T : class;

Document ToDocument<T>(T entity) where T : class;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using EfficientDynamoDb.Operations.BatchExecuteStatement;
using EfficientDynamoDb.Operations.ExecuteStatement;
using EfficientDynamoDb.Operations.ExecuteTransaction;
using System.Threading;
using System.Threading.Tasks;

namespace EfficientDynamoDb
{
public interface IDynamoDbLowLevelPartiQLContext
{
Task<ExecuteStatementResponse> ExecuteStatementAsync(ExecuteStatementRequest request, CancellationToken cancellationToken = default);

Task<BatchExecuteStatementResponse> BatchExecuteStatementAsync(BatchExecuteStatementRequest request, CancellationToken cancellationToken = default);

Task<ExecuteTransactionResponse> ExecuteTransactionAsync(ExecuteTransactionRequest request, CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using EfficientDynamoDb.Converters;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Operations.Shared;
using EfficientDynamoDb.Operations.BatchExecuteStatement;
using EfficientDynamoDb.Operations.Shared;
using System.Threading.Tasks;

namespace EfficientDynamoDb.Internal.Operations.BatchExecuteStatement
{
internal class BatchExecuteStatementRequestHttpContent : DynamoDbHttpContent
{
private readonly BatchExecuteStatementRequest _request;

public BatchExecuteStatementRequestHttpContent(BatchExecuteStatementRequest request) : base("DynamoDB_20120810.BatchExecuteStatement")
{
_request = request;
}

protected override ValueTask WriteDataAsync(DdbWriter writer)
{
var json = writer.JsonWriter;
json.WriteStartObject();

json.WritePropertyName("Statements");
json.WriteStartArray();
foreach (var statementRequest in _request.Statements)
{
json.WriteStartObject();

json.WriteString("Statement", statementRequest.Statement);

if (statementRequest.Parameters.Count > 0)
{
json.WritePropertyName("Parameters");
json.WriteStartArray();
foreach (var parameter in statementRequest.Parameters)
{
parameter.Write(json);
}
json.WriteEndArray();
}

if (statementRequest.ConsistentRead)
json.WriteBoolean("ConsistentRead", true);

if (statementRequest.ReturnValuesOnConditionCheckFailure != ReturnValuesOnConditionCheckFailure.None)
json.WriteReturnValuesOnConditionCheckFailure(statementRequest.ReturnValuesOnConditionCheckFailure);

json.WriteEndObject();
}
json.WriteEndArray();

if (_request.ReturnConsumedCapacity != ReturnConsumedCapacity.None)
json.WriteReturnConsumedCapacity(_request.ReturnConsumedCapacity);

json.WriteEndObject();

return default;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using EfficientDynamoDb.DocumentModel;
using EfficientDynamoDb.Internal.Operations.Shared;
using EfficientDynamoDb.Operations.BatchExecuteStatement;
using System;
using System.Runtime.CompilerServices;

namespace EfficientDynamoDb.Internal.Operations.BatchExecuteStatement
{
internal static class BatchExecuteStatementResponseParser
{
public static BatchExecuteStatementResponse Parse(Document response)
=> new BatchExecuteStatementResponse { Responses = ParseResponses(response), ConsumedCapacity = CapacityParser.ParseFullConsumedCapacities(response) };

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static BatchStatementResponse[] ParseResponses(Document response)
{
if (!response.TryGetValue("Responses", out var responsesAttr))
return Array.Empty<BatchStatementResponse>();

var attributesList = responsesAttr.AsListAttribute().Items;
var responses = new BatchStatementResponse[attributesList.Count];
for (var i = 0; i < attributesList.Count; i++)
responses[i] = ParseBatchStatementResponse(attributesList[i].AsDocument());

return responses;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static BatchStatementResponse ParseBatchStatementResponse(Document document)
{
var response = new BatchStatementResponse();

if (document.TryGetValue("TableName", out var tableNameAttr))
{
response.TableName = tableNameAttr.AsString();
}

if (document.TryGetValue("Error", out var errorAttr))
{
response.Error = ParseBatchStatementError(errorAttr.AsDocument());
}

if (document.TryGetValue("Item", out var itemAttr))
{
response.Item = itemAttr.AsDocument();
}

return response;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static BatchStatementError? ParseBatchStatementError(Document document)
{
var error = new BatchStatementError();

if (document.TryGetValue("Code", out var codeAttr))
{
error.Code = codeAttr.AsString();
}

if (document.TryGetValue("Message", out var messageAttr))
{
error.Message = messageAttr.AsString();
}

if (document.TryGetValue("Item", out var itemAttr))
{
error.Item = itemAttr.AsDocument();
}

return error;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using EfficientDynamoDb.Converters;
using EfficientDynamoDb.Internal.Extensions;
using EfficientDynamoDb.Internal.Operations.Shared;
using EfficientDynamoDb.Operations.ExecuteStatement;
using EfficientDynamoDb.Operations.Shared;
using System.Threading.Tasks;

namespace EfficientDynamoDb.Internal.Operations.ExecuteStatement
{
internal class ExecuteStatementRequestHttpContent : DynamoDbHttpContent
{
private readonly ExecuteStatementRequest _request;

public ExecuteStatementRequestHttpContent(ExecuteStatementRequest executeStatementRequest) : base("DynamoDB_20120810.ExecuteStatement")
{
_request = executeStatementRequest;
}

protected override ValueTask WriteDataAsync(DdbWriter writer)
{
var json = writer.JsonWriter;
json.WriteStartObject();

json.WritePropertyName("Statement");
json.WriteStringValue(_request.Statement);

if (_request.Parameters.Count > 0)
{
json.WritePropertyName("Parameters");
json.WriteStartArray();
foreach (var parameter in _request.Parameters)
{
parameter.Write(json);
}
json.WriteEndArray();
}

if (_request.ConsistentRead)
json.WriteBoolean("ConsistentRead", true);

if (_request.Limit.HasValue)
json.WriteNumber("Limit", _request.Limit.Value);

if (_request.NextToken != null)
json.WriteString("NextToken", _request.NextToken);

if (_request.ReturnConsumedCapacity != ReturnConsumedCapacity.None)
json.WriteReturnConsumedCapacity(_request.ReturnConsumedCapacity);

if (_request.ReturnItemCollectionMetrics != ReturnItemCollectionMetrics.None)
json.WriteReturnItemCollectionMetrics(_request.ReturnItemCollectionMetrics);

if (_request.ReturnValuesOnConditionCheckFailure != ReturnValuesOnConditionCheckFailure.None)
json.WriteReturnValuesOnConditionCheckFailure(_request.ReturnValuesOnConditionCheckFailure);

json.WriteEndObject();
return default;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using EfficientDynamoDb.DocumentModel;
using EfficientDynamoDb.Internal.Operations.Shared;
using EfficientDynamoDb.Operations.ExecuteStatement;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

namespace EfficientDynamoDb.Internal.Operations.ExecuteStatement
{
internal static class ExecuteStatementResponseParser
{
public static ExecuteStatementResponse Parse(Document response) =>
new ExecuteStatementResponse
{
Items = response.TryGetValue("Items", out var items) ? items._documentListValue.Items : Array.Empty<Document>(),
LastEvaluatedKey = ParseLastEvaluatedKey(response),
NextToken = response.TryGetValue("NextToken", out var nextToken) ? nextToken.AsString() : string.Empty,
ConsumedCapacity = CapacityParser.ParseFullConsumedCapacity(response),
};

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static IReadOnlyDictionary<string, AttributeValue>? ParseLastEvaluatedKey(Document response)
{
if (!response.TryGetValue("LastEvaluatedKey", out var attribute))
return null;

var document = attribute.AsDocument();
return document.Count > 0 ? document : null;
}
}
}
Loading

0 comments on commit 82bb835

Please sign in to comment.