Skip to content

Reflector feature plan: Kafka transactions, CommandBus, typed success result, success/failure hooks, non-blocking retry#267

Merged
jwijgerd merged 8 commits intomainfrom
copilot/add-reactor-feature
Apr 3, 2026
Merged

Reflector feature plan: Kafka transactions, CommandBus, typed success result, success/failure hooks, non-blocking retry#267
jwijgerd merged 8 commits intomainfrom
copilot/add-reactor-feature

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 2, 2026

Renames the feature from "Reactor" to "Reflector" and introduces a detailed design plan for the new Reflector component in query-support.

Changes Made

  • Renamed: All occurrences of Reactor/reactor to Reflector/reflector (153 occurrences); plan file renamed from reactor-feature.md to reflector-feature.md
  • Typed API result in onSuccess: Reflector is now generic — Reflector<R> where R is the type returned by the external API call. The @ReflectorEventHandler method returns R instead of void, and onSuccess now receives (DomainEvent event, R result, CommandBus commandBus), enabling the implementor to build precise Command payloads from the typed API response
  • ReflectorEventHandlerFunction<E, R>: Updated to be generic on both the event type and the result type; accept() returns R instead of void
  • ReflectorRuntime.handleSuccess(): Accepts the API result; KafkaReflectorRuntime stores it in a lastResult field after apply() returns SUCCESS
  • Success/failure hooks: Both onSuccess and onFailure receive a CommandBus backed by the same transactional Kafka producer used for offset commits, ensuring atomicity
  • Non-blocking retry: On failure the Kafka transaction is rolled back, retry state is stored in memory, and control returns immediately to the poll loop; partitions are paused/resumed for backoff without blocking max.poll.interval.ms
  • Custom CommandBus: Each ReflectorPartition owns a Kafka Consumer and a Transactional Producer; the CommandBus implementation uses that same producer for both offset commits and command sending
  • Producer sharing analysis: Documents why AkcesClient's producer cannot be shared with ReflectorPartition and recommends per-partition transactional producers with a shared ProducerFactory

@jwijgerd jwijgerd marked this pull request as ready for review April 2, 2026 12:05
Copilot AI review requested due to automatic review settings April 2, 2026 12:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a detailed design plan for introducing a new Reactor component (in query-support) to process domain events with per-event retries/backoff, Kafka-transactional offset management, and a failure hook that can emit compensating commands via CommandBus.

Changes:

  • Introduces the full Reactor architecture proposal (API annotations/interfaces + query-support runtime/controller/partition + Spring wiring).
  • Specifies transactional offset commit design using sendOffsetsToTransaction() plus optional command emission on failure.
  • Defines retry/backoff behavior and outlines implementation/testing phases.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1. **External API invocation** triggered by domain events from Kafka
2. **Kafka offset management** — offsets are committed only after a successful API call
3. **Retry with incremental backoff** — maximum 5 retries with incremental (linear) backoff
4. **Failure hook** — when all retries are exhausted, the implementor can send a command back to an aggregate to signal the failure
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also have a success hook, which the implementor can use to send a succes Command to the aggregate

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added onSuccess(DomainEvent event, CommandBus commandBus) to the Reactor interface as a symmetric counterpart to onFailure. Both hooks run inside the Kafka transaction, so success commands are atomically committed with the offset. Updated flows, usage example, and component descriptions. See 33efcb5.

- Maximum **5 retries** (configurable via annotation, default = 5)
- **Linear incremental backoff**: retry delays of 1s, 2s, 3s, 4s, 5s (base interval configurable, default = 1 second)
- Formula: `delay = attempt * baseInterval` where attempt is 1-based
- Retries happen in-process (blocking the partition thread for that event) using `Thread.sleep()` to keep it simple and predictable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot pause too long because we need to keep polling the kafka topic in regular intervals, in case of a failure the control should be returned to the polling thread. The current kafka transaction should be rolled back and the retry should be attempted in the next poll (the retries will be kept in memory only, need to store it between restarts)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redesigned the retry strategy to be non-blocking and poll-loop integrated (33efcb5):

  1. On failure: transaction is rolled back (offset not committed), retry count + next-attempt timestamp stored in-memory
  2. Control returns to the poll loop immediately
  3. Partition is paused via consumer.pause(); on next poll after backoff elapses, partition is resumed and same event is redelivered
  4. In-memory retry state only (Map<TopicPartition, RetryState>) — on restart, retry count resets to 0 (acceptable since external calls must be idempotent)
  5. Added RETRY_PENDING to ReactorResult enum

This avoids exceeding max.poll.interval.ms and prevents rebalances during backoff.

- The offset is committed regardless (to avoid infinite retry loops)
- Both the offset commit and any failure commands are committed atomically within a single Kafka transaction

The `CommandBus` is the existing framework interface (`org.elasticsoftware.akces.commands.CommandBus`), backed by the `ReactorPartition`. Commands sent via the `CommandBus` during `onFailure` are queued in the Kafka producer and committed as part of the same transaction as the offset commit.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, the same Producer instance should be used by both the AkcesClient and the ReactorPartition. This may be an issue with the current AkcesClient implementation. Please explore

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explored in depth — added a full "Producer Sharing Analysis" section to the plan (33efcb5).

Finding: Sharing the same Producer instance between AkcesClient and ReactorPartition is not possible because:

  • Kafka transactional producers are not thread-safe for transaction operations
  • ReactorPartition needs offset commit + commands in the same transaction; routing through AkcesClient's queue would put them in a different transaction on a different thread

Recommendation: Share the ProducerFactory (same Kafka config), but each component creates its own Producer instance with a unique transaction ID — same pattern as AggregatePartition. The command routing logic (resolveType, resolveTopic, resolvePartition) is shared via AkcesRegistry.

The AkcesClient concern: its command-sending is tightly coupled to its internal thread/producer. The ReactorPartition must implement CommandBus directly (like AggregatePartition does) rather than delegating to AkcesClient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, we should not reuse AkcesClient but implement our own sending logic as you suggest

Copilot AI changed the title Reactor feature plan: Kafka transactions, CommandBus, resolved design decisions Reactor feature plan: Kafka transactions, CommandBus, success/failure hooks, non-blocking retry Apr 2, 2026
Copilot AI requested a review from jwijgerd April 2, 2026 13:31
Copy link
Copy Markdown
Contributor

@jwijgerd jwijgerd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the implementation of the CommandBus interface to a custom implementation using the same Kafka Producer that is used to create the transaction to ack commit the offset. Each ReactorPartition has a Consumer and a Transactional Producer

…l Producer, custom CommandBus uses same producer for offset commits

Agent-Logs-Url: https://github.com/elasticsoftwarefoundation/akces-framework/sessions/902ebe88-449e-44c5-8b99-ca88f911919a

Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 2, 2026

change the implementation of the CommandBus interface to a custom implementation using the same Kafka Producer that is used to create the transaction to ack commit the offset. Each ReactorPartition ha...

Updated the ReactorPartition section to make this explicit (c50264d):

  • Each ReactorPartition owns both a Kafka Consumer and a Transactional Producer
  • The CommandBus is a custom implementation (not delegating to AkcesClient) that uses the same transactional producer for both offset commits (sendOffsetsToTransaction) and command sending (CommandBus.send), ensuring they are always in the same transaction

```java
@FunctionalInterface
public interface ReflectorEventHandlerFunction<E extends DomainEvent> {
void accept(@Nonnull E event) throws Exception;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on a succesful call we would want to also reflect the API result back into the success handler to be able to construct the correct Command payload. Change the Reflector API to be able to return a success payload (typed).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in c9f6344. Reflector is now generic: Reflector<R> where R is the type returned by the external API call. The @ReflectorEventHandler method returns R (instead of void), and onSuccess now receives (DomainEvent event, R result, CommandBus commandBus), so the typed API response is directly available when constructing success Command payloads. ReflectorEventHandlerFunction is also updated to <E extends DomainEvent, R> with R accept(E event) throws Exception. The usage example shows a NotificationResult record as the return type.

Copilot AI changed the title Reactor feature plan: Kafka transactions, CommandBus, success/failure hooks, non-blocking retry Reflector feature plan: Kafka transactions, CommandBus, typed success result, success/failure hooks, non-blocking retry Apr 3, 2026
Copilot AI requested a review from jwijgerd April 3, 2026 06:14
@jwijgerd jwijgerd merged commit 7e4dd20 into main Apr 3, 2026
3 checks passed
@jwijgerd jwijgerd deleted the copilot/add-reactor-feature branch April 3, 2026 06:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants