Skip to content

Commit

Permalink
feat: init reader ydb topics (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov authored Jan 21, 2025
1 parent 99c3c52 commit 5b0a5c5
Show file tree
Hide file tree
Showing 9 changed files with 988 additions and 82 deletions.
15 changes: 14 additions & 1 deletion src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@ public WriterException(string message, Exception inner) : base(message, inner)

public class ReaderException : Exception
{
protected ReaderException(string message) : base(message)
public ReaderException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}

public ReaderException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public ReaderException(string message, Driver.TransportException e) : base(message, e)
{
Status = e.Status;
}

public Status Status { get; }
}
6 changes: 3 additions & 3 deletions src/Ydb.Sdk/src/Services/Topic/IReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Ydb.Sdk.Services.Topic;

public interface IReader<TValue>
public interface IReader<TValue> : IDisposable
{
public Task<TValue> ReadAsync();
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);

public Task<Message<TValue>> ReadMessageAsync();
public ValueTask<BatchMessage<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
}
91 changes: 91 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System.Collections.Immutable;
using Google.Protobuf;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic.Reader;

internal class InternalMessage
{
public InternalMessage(
ByteString data,
string topic,
long partitionId,
string producerId,
OffsetsRange offsetsRange,
Timestamp createdAt,
RepeatedField<MetadataItem> metadataItems,
long approximatelyBytesSize)
{
Data = data;
Topic = topic;
PartitionId = partitionId;
ProducerId = producerId;
OffsetsRange = offsetsRange;
CreatedAt = createdAt;
MetadataItems = metadataItems;
ApproximatelyBytesSize = approximatelyBytesSize;
}

private ByteString Data { get; }

private string Topic { get; }

private long PartitionId { get; }

private string ProducerId { get; }

private OffsetsRange OffsetsRange { get; }

private Timestamp CreatedAt { get; }

private RepeatedField<MetadataItem> MetadataItems { get; }

private long ApproximatelyBytesSize { get; }

internal Message<TValue> ToPublicMessage<TValue>(IDeserializer<TValue> deserializer, ReaderSession readerSession)
{
return new Message<TValue>(
data: deserializer.Deserialize(Data.ToByteArray()),
topic: Topic,
partitionId: PartitionId,
producerId: ProducerId,
createdAt: CreatedAt.ToDateTime(),
metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(),
offsetsRange: OffsetsRange,
readerSession: readerSession,
approximatelyBytesSize: ApproximatelyBytesSize
);
}
}

internal class InternalBatchMessage
{
public InternalBatchMessage(
OffsetsRange batchOffsetsRange,
Queue<InternalMessage> internalMessages,
ReaderSession readerSession,
long approximatelyBatchSize)
{
BatchOffsetsRange = batchOffsetsRange;
InternalMessages = internalMessages;
ReaderSession = readerSession;
ApproximatelyBatchSize = approximatelyBatchSize;
}

internal OffsetsRange BatchOffsetsRange { get; }

internal Queue<InternalMessage> InternalMessages { get; }

internal ReaderSession ReaderSession { get; }

internal long ApproximatelyBatchSize { get; }
}

internal record CommitSending(
OffsetsRange OffsetsRange,
long PartitionSessionId,
TaskCompletionSource TcsCommit,
long ApproximatelyBytesSize
);
95 changes: 95 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,100 @@
using System.Collections.Immutable;
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic.Reader;

public class Message<TValue>
{
private readonly OffsetsRange _offsetsRange;
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBytesSize;

internal Message(
TValue data,
string topic,
long partitionId,
string producerId,
DateTime createdAt,
ImmutableArray<Metadata> metadata,
OffsetsRange offsetsRange,
ReaderSession readerSession,
long approximatelyBytesSize)
{
Data = data;
Topic = topic;
PartitionId = partitionId;
ProducerId = producerId;
CreatedAt = createdAt;
Metadata = metadata;

_offsetsRange = offsetsRange;
_readerSession = readerSession;
_approximatelyBytesSize = approximatelyBytesSize;
}

public TValue Data { get; }

/// <summary>
/// The topic associated with the message.
/// </summary>
public string Topic { get; }

public long PartitionId { get; }

public string ProducerId { get; }

public DateTime CreatedAt { get; }

public ImmutableArray<Metadata> Metadata { get; }

internal long Start => _offsetsRange.Start;
internal long End => _offsetsRange.End;

public Task CommitAsync()
{
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize);
}
}

public class BatchMessage<TValue>
{
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBatchSize;

public ImmutableArray<Message<TValue>> Batch { get; }

internal BatchMessage(
ImmutableArray<Message<TValue>> batch,
ReaderSession readerSession,
long approximatelyBatchSize)
{
Batch = batch;
_readerSession = readerSession;
_approximatelyBatchSize = approximatelyBatchSize;
}

public Task CommitBatchAsync()
{
if (Batch.Length == 0)
{
return Task.CompletedTask;
}

var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End };

return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
}
}

public class TopicPartitionOffset
{
public TopicPartitionOffset(long offset, long partitionId)
{
Offset = offset;
PartitionId = partitionId;
}

public long Offset { get; }

public long PartitionId { get; }
}
Loading

0 comments on commit 5b0a5c5

Please sign in to comment.