Skip to content

Commit

Permalink
Return PartitionSession in StartPartitionSessionEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
pnv1 committed Mar 26, 2024
1 parent 1d640da commit 4687dee
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package tech.ydb.topic.read.events;

import javax.annotation.Nullable;
import tech.ydb.topic.read.PartitionSession;

/**
* @author Nikolay Perfilov
*/
public interface StopPartitionSessionEvent {
PartitionSession getPartitionSession();
long getPartitionSessionId();

@Nullable
Long getPartitionId();

long getCommittedOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,11 +116,10 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta

@Override
protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
@Nullable Long partitionId, Runnable confirmCallback) {
final long partitionSessionId = request.getPartitionSessionId();
PartitionSession partitionSession, Runnable confirmCallback) {
final long committedOffset = request.getCommittedOffset();
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSessionId, partitionId,
committedOffset, confirmCallback);
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSession, committedOffset,
confirmCallback);
handlerExecutor.execute(() -> {
eventHandler.onStopPartitionSession(event);
});
Expand Down
17 changes: 10 additions & 7 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +112,8 @@ protected abstract void handleStartPartitionSessionRequest(
PartitionSession partitionSession,
Consumer<StartPartitionSessionSettings> confirmCallback);
protected abstract void handleStopPartitionSession(
YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, @Nullable Long partitionId,
YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
PartitionSession partitionSession,
Runnable confirmCallback);
protected abstract void handleClosePartitionSession(PartitionSession partitionSession);

Expand Down Expand Up @@ -423,20 +422,24 @@ protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPart
if (partitionSession != null) {
logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} " +
"(partition {})", fullId, partitionSession.getId(), partitionSession.getPartitionId());
handleStopPartitionSession(request, partitionSession.getSessionInfo(),
() -> sendStopPartitionSessionResponse(request.getPartitionSessionId()));
} else {
logger.warn("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " +
logger.error("[{}] Received graceful StopPartitionSessionRequest for partition session {}, " +
"but have no such partition session active", fullId, request.getPartitionSessionId());
closeDueToError(null,
new RuntimeException("Restarting read session due to receiving " +
"StopPartitionSessionRequest with PartitionSessionId " +
request.getPartitionSessionId() + " that SDK knows nothing about"));
}
handleStopPartitionSession(request, partitionSession == null ? null : partitionSession.getPartitionId(),
() -> sendStopPartitionSessionResponse(request.getPartitionSessionId()));
} else {
PartitionSessionImpl partitionSession = partitionSessions.remove(request.getPartitionSessionId());
if (partitionSession != null) {
logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition " +
"{})", fullId, partitionSession.getId(), partitionSession.getPartitionId());
closePartitionSession(partitionSession);
} else {
logger.warn("[{}] Received force StopPartitionSessionRequest for partition session {}, " +
logger.info("[{}] Received force StopPartitionSessionRequest for partition session {}, " +
"but have no such partition session running", fullId, request.getPartitionSessionId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta

@Override
protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
@Nullable Long partitionId, Runnable confirmCallback) {
PartitionSession partitionSession, Runnable confirmCallback) {
confirmCallback.run();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
package tech.ydb.topic.read.impl.events;

import javax.annotation.Nullable;

import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.StopPartitionSessionEvent;

/**
* @author Nikolay Perfilov
*/
public class StopPartitionSessionEventImpl implements StopPartitionSessionEvent {
private final long partitionSessionId;
@Nullable
private final Long partitionId;
private final PartitionSession partitionSession;
private final long committedOffset;
private final Runnable confirmCallback;

public StopPartitionSessionEventImpl(long partitionSessionId, @Nullable Long partitionId, long committedOffset,
public StopPartitionSessionEventImpl(PartitionSession partitionSession, long committedOffset,
Runnable confirmCallback) {
this.partitionSessionId = partitionSessionId;
this.partitionId = partitionId;
this.partitionSession = partitionSession;
this.committedOffset = committedOffset;
this.confirmCallback = confirmCallback;
}

@Override
public PartitionSession getPartitionSession() {
return partitionSession;
}

@Override
public long getPartitionSessionId() {
return partitionSessionId;
return partitionSession.getId();
}

@Override
Expand All @@ -33,9 +34,8 @@ public long getCommittedOffset() {
}

@Override
@Nullable
public Long getPartitionId() {
return partitionId;
return partitionSession.getPartitionId();
}

@Override
Expand Down

0 comments on commit 4687dee

Please sign in to comment.