Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return PartitionSession in StartPartitionSessionEvent #249

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package tech.ydb.topic.read.events;

import javax.annotation.Nullable;
import tech.ydb.topic.read.PartitionSession;

/**
* @author Nikolay Perfilov
*/
public interface StopPartitionSessionEvent {
PartitionSession getPartitionSession();
long getPartitionSessionId();

@Nullable
Long getPartitionId();

long getCommittedOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,11 +116,10 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta

@Override
protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
@Nullable Long partitionId, Runnable confirmCallback) {
final long partitionSessionId = request.getPartitionSessionId();
PartitionSession partitionSession, Runnable confirmCallback) {
final long committedOffset = request.getCommittedOffset();
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSessionId, partitionId,
committedOffset, confirmCallback);
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSession, committedOffset,
confirmCallback);
handlerExecutor.execute(() -> {
eventHandler.onStopPartitionSession(event);
});
Expand Down
17 changes: 10 additions & 7 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +112,8 @@ protected abstract void handleStartPartitionSessionRequest(
PartitionSession partitionSession,
Consumer<StartPartitionSessionSettings> confirmCallback);
protected abstract void handleStopPartitionSession(
YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, @Nullable Long partitionId,
YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
PartitionSession partitionSession,
Runnable confirmCallback);
protected abstract void handleClosePartitionSession(PartitionSession partitionSession);

Expand Down Expand Up @@ -423,20 +422,24 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart
if (partitionSession != null) {
logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} " +
"(partition {})", fullId, partitionSession.getId(), partitionSession.getPartitionId());
handleStopPartitionSession(request, partitionSession.getSessionInfo(),
() -> sendStopPartitionSessionResponse(request.getPartitionSessionId()));
} else {
logger.warn("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " +
logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " +
"but have no such partition session active", fullId, request.getPartitionSessionId());
closeDueToError(null,
new RuntimeException("Restarting read session due to receiving " +
"StopPartitionSessionRequest with PartitionSessionId " +
request.getPartitionSessionId() + " that SDK knows nothing about"));
}
handleStopPartitionSession(request, partitionSession == null ? null : partitionSession.getPartitionId(),
() -> sendStopPartitionSessionResponse(request.getPartitionSessionId()));
} else {
PartitionSessionImpl partitionSession = partitionSessions.remove(request.getPartitionSessionId());
if (partitionSession != null) {
logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition " +
"{})", fullId, partitionSession.getId(), partitionSession.getPartitionId());
closePartitionSession(partitionSession);
} else {
logger.warn("[{}] Received force StopPartitionSessionRequest for partition session {}, " +
logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, " +
"but have no such partition session running", fullId, request.getPartitionSessionId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta

@Override
protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
@Nullable Long partitionId, Runnable confirmCallback) {
PartitionSession partitionSession, Runnable confirmCallback) {
confirmCallback.run();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
package tech.ydb.topic.read.impl.events;

import javax.annotation.Nullable;

import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.StopPartitionSessionEvent;

/**
* @author Nikolay Perfilov
*/
public class StopPartitionSessionEventImpl implements StopPartitionSessionEvent {
private final long partitionSessionId;
@Nullable
private final Long partitionId;
private final PartitionSession partitionSession;
private final long committedOffset;
private final Runnable confirmCallback;

public StopPartitionSessionEventImpl(long partitionSessionId, @Nullable Long partitionId, long committedOffset,
public StopPartitionSessionEventImpl(PartitionSession partitionSession, long committedOffset,
Runnable confirmCallback) {
this.partitionSessionId = partitionSessionId;
this.partitionId = partitionId;
this.partitionSession = partitionSession;
this.committedOffset = committedOffset;
this.confirmCallback = confirmCallback;
}

@Override
public PartitionSession getPartitionSession() {
return partitionSession;
}

@Override
public long getPartitionSessionId() {
return partitionSessionId;
return partitionSession.getId();
}

@Override
Expand All @@ -33,9 +34,8 @@ public long getCommittedOffset() {
}

@Override
@Nullable
public Long getPartitionId() {
return partitionId;
return partitionSession.getPartitionId();
}

@Override
Expand Down
Loading