Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL's support of at-least-once semantics #1403

Open
matheisco opened this issue Nov 15, 2024 · 1 comment
Open

KCL's support of at-least-once semantics #1403

matheisco opened this issue Nov 15, 2024 · 1 comment

Comments

@matheisco
Copy link

matheisco commented Nov 15, 2024

Hi!

In my application I need to make sure that no record is lost. I'm not sure this is the case in certain error cases.

Consider the following case:

  1. KCL calls processRecords() with a batch of records.
  2. The business logic can't handle one of those records (e.g. some other required system is currently down), and therefore tries to put the record into an S3 bucket to not lose it.
  3. The write to the S3 bucket fails for some reason.
  4. No explicit checkpointing happens in processRecords().

In this case I want that the record is present again in the next processRecords() call (i.e. the whole previous batch is replayed for instance). Is this guaranteed when I don't call checkpoint()?

Some parts of the documentation make me believe that processRecords() will skip records from a previous call, even if there was no checkpoint in the previous call. E.g. here in this example the periodic checkpointing only makes sense if that's the case.
This is problematic IMO, since a subsequent successful processRecords() would checkpoint(), therefore checkpointing the previous record that wasn't successfully processed.

@akidambisrinivasan
Copy link
Contributor

akidambisrinivasan commented Nov 18, 2024

In this case I want that the record is present again in the next processRecords() call (i.e. the whole previous batch is replayed for instance). Is this guaranteed when I don't call checkpoint()?

checkpointing or not does not affect the delivery of records. KCL will always provide the next batch of records as received from Kinesis.

KCL currently does not provide a way for a single record processor to notify an un-recoverable error in processing and have KCL shutdown that single record processor and restart processing from the last persisted checkpoint seq#. Currently, the only supported way to achieve that is to call System.exit (which brings down the whole worker), if you encounter an un-recoverable error in any record processor. We are aware of the feature request from another git hub issue.

For recoverable errors, you need to store the delivered records in-memory and reprocess them upon next delivery etc, to make sure they are processed before invoking any checkpoint.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants