Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis multilang daemon stopped consuming records. #1430

Open
hsingh124 opened this issue Jan 8, 2025 · 2 comments
Open

Kinesis multilang daemon stopped consuming records. #1430

hsingh124 opened this issue Jan 8, 2025 · 2 comments

Comments

@hsingh124
Copy link

Hi,

I came across an issue where the KCL multilang daemon was not consuming records while there were new records being added to the stream. There were no errors or warnings in the logs. Here are some logs from the daemon:

2025-01-07 15:44:42,522 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 
2025-01-07 15:44:57,524 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:45:27,561 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:45:33,364 [pool-15-thread-1] INFO  s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0 
2025-01-07 15:45:43,564 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000 
2025-01-07 15:45:43,564 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 
2025-01-07 15:45:57,566 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:46:27,574 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:46:33,364 [pool-15-thread-1] INFO  s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0 
2025-01-07 15:46:39,557 [pool-14-thread-1] INFO  s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 5c9661b1-535b-412d-b40b-abe3afe63209 is leader, running the periodic shard sync task 
2025-01-07 15:46:39,563 [pool-14-thread-1] INFO  s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for consumer-prod due to the reason - Hash Ranges are complete for consumer-prod 
2025-01-07 15:46:44,577 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000 
2025-01-07 15:46:44,577 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 
2025-01-07 15:46:57,579 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:47:27,583 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:47:33,364 [pool-15-thread-1] INFO  s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0 
2025-01-07 15:47:45,586 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000 
2025-01-07 15:47:45,586 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 
2025-01-07 15:47:57,588 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:48:25,188 [pool-13-thread-1] INFO  s.a.k.c.DeterministicShuffleShardSyncLeaderDecider [NONE] - Elected leaders: 5c9661b1-535b-412d-b40b-abe3afe63209 
2025-01-07 15:48:27,593 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 
2025-01-07 15:48:33,364 [pool-15-thread-1] INFO  s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0 
2025-01-07 15:48:39,563 [pool-14-thread-1] INFO  s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 5c9661b1-535b-412d-b40b-abe3afe63209 is leader, running the periodic shard sync task 
2025-01-07 15:48:39,570 [pool-14-thread-1] INFO  s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for consumer-prod due to the reason - Hash Ranges are complete for consumer-prod 
2025-01-07 15:48:46,596 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000 
2025-01-07 15:48:46,596 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 
2025-01-07 15:48:57,598 [multi-lang-daemon-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647) 

This log's pattern was being repeated. Once I restarted the consumer, it started working fine and resumed consuming events from where it stopped.

I was also seeing this warning at times but I don't think it's related to this issue:

2025-01-02 19:47:49,316 [aws-java-sdk-NettyEventLoop-1-0] WARN  i.n.channel.DefaultChannelPipeline [NONE] - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. 
java.io.IOException: An error occurred on the connection: java.nio.channels.ClosedChannelException, [channel: 642ffff9]. All streams will be closed
	at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
	at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
	at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
	at software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
	at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
	at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(Http2MultiplexedChannelPool.java:509)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(Http2MultiplexedChannelPool.java:486)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:77)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1174)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.channels.ClosedChannelException: null
	... 41 common frames omitted

Does anyone have any ideas on what the issue could be here? I couldn't replicate it again. I'll try to investigate and replicate this from my side but any information would be helpful.

Thanks!

@minuhong-aws
Copy link
Contributor

@hsingh124 1) Can you share the KCL version that you're using?, 2) Did this issue happen after upgrading the version? Did it happen suddenly after some changes on your environment? or Did it happen as your first use of KCL?

@hsingh124
Copy link
Author

@minuhong-aws The KCL version I'm using is 2.5.8. This is the first time I'm using KCL and there weren't any upgrades or changes to the environment.

I am using KCL to build a consumer that runs in a kubernetes pod. Could this potentially have been caused by insufficient resources?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants