Conversation
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/3e106451-ba31-4116-b4ae-a05183b55168 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
…ed design decisions Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/3b1fffcf-985a-4222-b083-a02ab4ff9c7e Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds a detailed design plan for introducing a new Reactor component (in query-support) to process domain events with per-event retries/backoff, Kafka-transactional offset management, and a failure hook that can emit compensating commands via CommandBus.
Changes:
- Introduces the full Reactor architecture proposal (API annotations/interfaces + query-support runtime/controller/partition + Spring wiring).
- Specifies transactional offset commit design using
sendOffsetsToTransaction()plus optional command emission on failure. - Defines retry/backoff behavior and outlines implementation/testing phases.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
plans/reactor-feature.md
Outdated
| 1. **External API invocation** triggered by domain events from Kafka | ||
| 2. **Kafka offset management** — offsets are committed only after a successful API call | ||
| 3. **Retry with incremental backoff** — maximum 5 retries with incremental (linear) backoff | ||
| 4. **Failure hook** — when all retries are exhausted, the implementor can send a command back to an aggregate to signal the failure |
There was a problem hiding this comment.
we should also have a success hook, which the implementor can use to send a succes Command to the aggregate
There was a problem hiding this comment.
Added onSuccess(DomainEvent event, CommandBus commandBus) to the Reactor interface as a symmetric counterpart to onFailure. Both hooks run inside the Kafka transaction, so success commands are atomically committed with the offset. Updated flows, usage example, and component descriptions. See 33efcb5.
plans/reactor-feature.md
Outdated
| - Maximum **5 retries** (configurable via annotation, default = 5) | ||
| - **Linear incremental backoff**: retry delays of 1s, 2s, 3s, 4s, 5s (base interval configurable, default = 1 second) | ||
| - Formula: `delay = attempt * baseInterval` where attempt is 1-based | ||
| - Retries happen in-process (blocking the partition thread for that event) using `Thread.sleep()` to keep it simple and predictable |
There was a problem hiding this comment.
we cannot pause too long because we need to keep polling the kafka topic in regular intervals, in case of a failure the control should be returned to the polling thread. The current kafka transaction should be rolled back and the retry should be attempted in the next poll (the retries will be kept in memory only, need to store it between restarts)
There was a problem hiding this comment.
Redesigned the retry strategy to be non-blocking and poll-loop integrated (33efcb5):
- On failure: transaction is rolled back (offset not committed), retry count + next-attempt timestamp stored in-memory
- Control returns to the poll loop immediately
- Partition is paused via
consumer.pause(); on next poll after backoff elapses, partition is resumed and same event is redelivered - In-memory retry state only (
Map<TopicPartition, RetryState>) — on restart, retry count resets to 0 (acceptable since external calls must be idempotent) - Added
RETRY_PENDINGtoReactorResultenum
This avoids exceeding max.poll.interval.ms and prevents rebalances during backoff.
plans/reactor-feature.md
Outdated
| - The offset is committed regardless (to avoid infinite retry loops) | ||
| - Both the offset commit and any failure commands are committed atomically within a single Kafka transaction | ||
|
|
||
| The `CommandBus` is the existing framework interface (`org.elasticsoftware.akces.commands.CommandBus`), backed by the `ReactorPartition`. Commands sent via the `CommandBus` during `onFailure` are queued in the Kafka producer and committed as part of the same transaction as the offset commit. |
There was a problem hiding this comment.
indeed, the same Producer instance should be used by both the AkcesClient and the ReactorPartition. This may be an issue with the current AkcesClient implementation. Please explore
There was a problem hiding this comment.
Explored in depth — added a full "Producer Sharing Analysis" section to the plan (33efcb5).
Finding: Sharing the same Producer instance between AkcesClient and ReactorPartition is not possible because:
- Kafka transactional producers are not thread-safe for transaction operations
ReactorPartitionneeds offset commit + commands in the same transaction; routing through AkcesClient's queue would put them in a different transaction on a different thread
Recommendation: Share the ProducerFactory (same Kafka config), but each component creates its own Producer instance with a unique transaction ID — same pattern as AggregatePartition. The command routing logic (resolveType, resolveTopic, resolvePartition) is shared via AkcesRegistry.
The AkcesClient concern: its command-sending is tightly coupled to its internal thread/producer. The ReactorPartition must implement CommandBus directly (like AggregatePartition does) rather than delegating to AkcesClient.
There was a problem hiding this comment.
agree, we should not reuse AkcesClient but implement our own sending logic as you suggest
…haring analysis Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/7f80b740-377a-4f02-9714-f3021fda48db Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/7f80b740-377a-4f02-9714-f3021fda48db Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
jwijgerd
left a comment
There was a problem hiding this comment.
change the implementation of the CommandBus interface to a custom implementation using the same Kafka Producer that is used to create the transaction to ack commit the offset. Each ReactorPartition has a Consumer and a Transactional Producer
…l Producer, custom CommandBus uses same producer for offset commits Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/902ebe88-449e-44c5-8b99-ca88f911919a Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
Updated the
|
Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/2d075cc2-4ba2-4c6c-a65f-7792c80bb235 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
plans/reflector-feature.md
Outdated
| ```java | ||
| @FunctionalInterface | ||
| public interface ReflectorEventHandlerFunction<E extends DomainEvent> { | ||
| void accept(@Nonnull E event) throws Exception; |
There was a problem hiding this comment.
on a succesful call we would want to also reflect the API result back into the success handler to be able to construct the correct Command payload. Change the Reflector API to be able to return a success payload (typed).
There was a problem hiding this comment.
Updated in c9f6344. Reflector is now generic: Reflector<R> where R is the type returned by the external API call. The @ReflectorEventHandler method returns R (instead of void), and onSuccess now receives (DomainEvent event, R result, CommandBus commandBus), so the typed API response is directly available when constructing success Command payloads. ReflectorEventHandlerFunction is also updated to <E extends DomainEvent, R> with R accept(E event) throws Exception. The usage example shows a NotificationResult record as the return type.
… hook Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/d694a20d-2c94-48e5-af3e-0af5c4d5be66 Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
Renames the feature from "Reactor" to "Reflector" and introduces a detailed design plan for the new
Reflectorcomponent inquery-support.Changes Made
Reactor/reactortoReflector/reflector(153 occurrences); plan file renamed fromreactor-feature.mdtoreflector-feature.mdonSuccess:Reflectoris now generic —Reflector<R>whereRis the type returned by the external API call. The@ReflectorEventHandlermethod returnsRinstead ofvoid, andonSuccessnow receives(DomainEvent event, R result, CommandBus commandBus), enabling the implementor to build precise Command payloads from the typed API responseReflectorEventHandlerFunction<E, R>: Updated to be generic on both the event type and the result type;accept()returnsRinstead ofvoidReflectorRuntime.handleSuccess(): Accepts the API result;KafkaReflectorRuntimestores it in alastResultfield afterapply()returnsSUCCESSonSuccessandonFailurereceive aCommandBusbacked by the same transactional Kafka producer used for offset commits, ensuring atomicitymax.poll.interval.msCommandBus: EachReflectorPartitionowns a KafkaConsumerand a TransactionalProducer; theCommandBusimplementation uses that same producer for both offset commits and command sendingAkcesClient's producer cannot be shared withReflectorPartitionand recommends per-partition transactional producers with a sharedProducerFactory