Skip to content

Commit

Permalink
Fix multi-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Meisel committed Jan 23, 2025
1 parent 252aa16 commit 5e6695b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ object LocalstackKCLConsumerFS2 {
_.configureLeaseManagementConfig(
LocalstackKCLConsumer.configureTopLevelLeaseManagementConfig
)
.configureLeaseManagementConfig(
LocalstackKCLConsumer.configureLeaseManagementFactory
.configureLeaseManagementConfig(x =>
LocalstackKCLConsumer.configureLeaseManagementFactory(
x,
streamTracker.isMultiStream()
)
)
.configureCoordinatorConfig(_.parentShardPollIntervalMillis(1000L))
.configureRetrievalConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object LocalstackKCLConsumer {
.failoverTimeMillis(1000L)

private[kcl] def configureLeaseManagementFactory(
defaultLeaseManagement: LeaseManagementConfig
defaultLeaseManagement: LeaseManagementConfig,
isMultiStream: Boolean
): LeaseManagementConfig =
defaultLeaseManagement.leaseManagementFactory(
new DynamoDBLeaseManagementFactory(
Expand Down Expand Up @@ -79,7 +80,7 @@ object LocalstackKCLConsumer {
defaultLeaseManagement.tags(),
new DynamoDBLeaseSerializer(),
defaultLeaseManagement.customShardDetectorProvider(),
false,
isMultiStream,
LeaseCleanupConfig
.builder()
.completedLeaseCleanupIntervalMillis(500L)
Expand Down Expand Up @@ -162,7 +163,9 @@ object LocalstackKCLConsumer {
.configureLeaseManagementConfig(
configureTopLevelLeaseManagementConfig
)
.configureLeaseManagementConfig(configureLeaseManagementFactory)
.configureLeaseManagementConfig(x =>
configureLeaseManagementFactory(x, streamTracker.isMultiStream())
)
.configureCoordinatorConfig(_.parentShardPollIntervalMillis(1000L))
.configureRetrievalConfig(
_.retrievalSpecificConfig(retrievalConfig)
Expand Down

0 comments on commit 5e6695b

Please sign in to comment.