Skip to content

Poll exception catching mechanism#26

Open
yblanke wants to merge 12 commits intomasterfrom
poll-exception-catching-mechanism
Open

Poll exception catching mechanism#26
yblanke wants to merge 12 commits intomasterfrom
poll-exception-catching-mechanism

Conversation

@yblanke
Copy link

@yblanke yblanke commented Dec 22, 2025

Summary

Add configurable, recoverable error handling for Kafka poll in async source, with docs and tests.

Changes

  • Core behavior: poll-fn now:
    • Still lets wakeups behave as before for normal shutdown.
    • On other exceptions, either:
      • Calls a user-provided :ketu.source/custom-catch-fn (if given), or
      • Uses a built-in handler that logs the problem, skips over the problematic messages, and keeps consuming new ones instead of treating it as a fatal error.
  • Offset skipping:
    • Since there is no way to know the offset of the "problematic" message that caused the exception inside the batch, users can configure how many messages they want to skip when an exception occurs.
    • The number of messages to skip can be configured via :ketu.source/error-skip-offset-amount.
    • If configuration not provided, it is set to a safe default of 1 to ensure no valid messages are skipped.
  • Docs: README.md Consumer-source options now document:
    • :ketu.source/custom-catch-fn
    • :ketu.source/error-skip-offset-amount

@github-actions
Copy link

github-actions bot commented Dec 24, 2025

Unit Test Results

0 tests   0 ✔️  0s ⏱️
0 suites  0 💤
0 files    0

Results for commit e2faa6f.

♻️ This comment has been updated with latest results.

([^Consumer consumer source-name records-to-skip]
(try
(let [assigned-partitions (consumer/assignment consumer)
skip-amount (or records-to-skip 1)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be part of the default-opts function for better visibility

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added to defaults.

(log/error logger "[source={}] Failed to get assigned partitions for offset increment"
source-name e)))))

(defn- default-poll-error-handler [consumer opts]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this as a default is very opinionated. IMO we should provide the functionality but allow producers to opt-in instead of opt-out

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OR, an additional option is to set the default value to 0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree that the library shouldn’t enforce a specific opinion, the current implementation causes the service to break when an error occurs and defaulting the value to 0 could result in an infinite loop if the error is unrecoverable.

If skipping the problematic message feels too opinionated, I still think defaulting to 0 is preferable to making this entirely opt-in, since the current behavior can lead to undefined outcomes.

README.md Outdated
Comment on lines +87 to +88
| :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) |
| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, default value is 1. |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigning :ketu.source/poll-error-handler overrides the definition of :ketu.source/error-skip-offset-amount. If the first is set, the latter is ignored. This is confusing for a user of the library.

I think we should support one or the either, or document that they are mutually exclusive

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that the provided error handler may choose not to use the error-skip-offset-amount opt?

(fn [consumer]
(consumer/assign! consumer (consumer/topic-partitions topic partitions))))))

(defn- increment-offsets-for-assigned-partitions!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider simply adding this as a utility function and letting consumers choose whento use it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it public so it can be referred in user provided error handlers. Or did you mean to move it to a dedicated utils namespace?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants