Conversation
…in polling mechanism
src/ketu/async/source.clj
Outdated
| ([^Consumer consumer source-name records-to-skip] | ||
| (try | ||
| (let [assigned-partitions (consumer/assignment consumer) | ||
| skip-amount (or records-to-skip 1)] |
There was a problem hiding this comment.
should be part of the default-opts function for better visibility
| (log/error logger "[source={}] Failed to get assigned partitions for offset increment" | ||
| source-name e))))) | ||
|
|
||
| (defn- default-poll-error-handler [consumer opts] |
There was a problem hiding this comment.
I think this as a default is very opinionated. IMO we should provide the functionality but allow producers to opt-in instead of opt-out
There was a problem hiding this comment.
OR, an additional option is to set the default value to 0
There was a problem hiding this comment.
While I agree that the library shouldn’t enforce a specific opinion, the current implementation causes the service to break when an error occurs and defaulting the value to 0 could result in an infinite loop if the error is unrecoverable.
If skipping the problematic message feels too opinionated, I still think defaulting to 0 is preferable to making this entirely opt-in, since the current behavior can lead to undefined outcomes.
README.md
Outdated
| | :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) | | ||
| | :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, default value is 1. | |
There was a problem hiding this comment.
assigning :ketu.source/poll-error-handler overrides the definition of :ketu.source/error-skip-offset-amount. If the first is set, the latter is ignored. This is confusing for a user of the library.
I think we should support one or the either, or document that they are mutually exclusive
There was a problem hiding this comment.
Do you mean that the provided error handler may choose not to use the error-skip-offset-amount opt?
src/ketu/async/source.clj
Outdated
| (fn [consumer] | ||
| (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) | ||
|
|
||
| (defn- increment-offsets-for-assigned-partitions! |
There was a problem hiding this comment.
We should consider simply adding this as a utility function and letting consumers choose whento use it
There was a problem hiding this comment.
Made it public so it can be referred in user provided error handlers. Or did you mean to move it to a dedicated utils namespace?
Summary
Add configurable, recoverable error handling for Kafka
pollin asyncsource, with docs and tests.Changes
poll-fnnow::ketu.source/custom-catch-fn(if given), or:ketu.source/error-skip-offset-amount.README.mdConsumer-source options now document::ketu.source/custom-catch-fn:ketu.source/error-skip-offset-amount