Skip to content

Commit

Permalink
Partition autosplit feature
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Jan 22, 2025
1 parent 228bb52 commit 09266c9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
24 changes: 24 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,14 @@ def from_proto(
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
auto_partitioning_support: bool = False

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
return res

@dataclass
Expand Down Expand Up @@ -696,6 +698,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon
partition_session_id=self.partition_session_id,
)

@dataclass
class EndPartitionSession(IFromProto):
partition_session_id: int
adjacent_partition_ids: List[int]
child_partition_ids: List[int]

@staticmethod
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession):
return StreamReadMessage.EndPartitionSession(
partition_session_id=msg.partition_session_id,
adjacent_partition_ids=list(msg.adjacent_partition_ids),
child_partition_ids=list(msg.child_partition_ids),
)

@dataclass
class FromClient(IToProto):
client_message: "ReaderMessagesFromClientToServer"
Expand Down Expand Up @@ -775,6 +791,13 @@ def from_proto(
msg.partition_session_status_response
),
)
elif mess_type == "end_partition_session":
return StreamReadMessage.FromServer(
server_status=server_status,
server_message=StreamReadMessage.EndPartitionSession.from_proto(
msg.end_partition_session,
),
)
else:
raise issues.UnexpectedGrpcMessage(
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
Expand All @@ -799,6 +822,7 @@ def from_proto(
UpdateTokenResponse,
StreamReadMessage.StartPartitionSessionRequest,
StreamReadMessage.StopPartitionSessionRequest,
StreamReadMessage.EndPartitionSession,
]


Expand Down
2 changes: 2 additions & 0 deletions ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class PublicReaderSettings:
consumer: str
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
auto_partitioning_support: bool = False

decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
"""decoders: map[codec_code] func(encoded_bytes)->decoded_bytes"""
Expand Down Expand Up @@ -77,6 +78,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
return StreamReadMessage.InitRequest(
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
consumer=self.consumer,
auto_partitioning_support=self.auto_partitioning_support,
)

def _retry_settings(self) -> RetrySettings:
Expand Down
9 changes: 9 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ async def _read_messages_loop(self):
):
self._on_partition_session_stop(message.server_message)

elif isinstance(
message.server_message,
StreamReadMessage.EndPartitionSession,
):
self._on_end_partition_session(message.server_message)

elif isinstance(message.server_message, UpdateTokenResponse):
self._update_token_event.set()

Expand Down Expand Up @@ -575,6 +581,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
)
)

def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
logger.debug(f"End partition session with id: {message.partition_session_id}")

def _on_read_response(self, message: StreamReadMessage.ReadResponse):
self._buffer_consume_bytes(message.bytes_size)

Expand Down

0 comments on commit 09266c9

Please sign in to comment.