Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,21 +24,19 @@

package com.bakdata.kafka.consumer;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.mixin.ConsumerOptions;
import com.bakdata.kafka.mixin.InputOptions;
import java.time.Duration;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Option;


/**
Expand Down Expand Up @@ -72,10 +70,6 @@ public abstract class KafkaConsumerApplication<T extends ConsumerApp> extends
@Mixin
@Delegate
private ConsumerOptions consumerOptions = new ConsumerOptions();
@Option(names = {"--poll-timeout"},
description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.",
defaultValue = "PT0.1S")
private Duration pollTimeout = Duration.ofMillis(100);

/**
* Reset the Kafka Consumer application. Additionally, delete the consumer group.
Expand All @@ -101,8 +95,9 @@ public void reset() {
@Override
public final Optional<ConsumerExecutionOptions> createExecutionOptions() {
final ConsumerExecutionOptions executionOptions = ConsumerExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.onStart(this::onConsumerStart)
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.pollTimeout(this.getPollTimeout())
.build();
return Optional.of(executionOptions);
Expand All @@ -128,13 +123,4 @@ public final ConfiguredConsumerApp<T> createConfiguredApp(final T app,
public ConsumerAppConfiguration createConfiguration(final ConsumerTopicConfig topics) {
return new ConsumerAppConfiguration(topics, this.getGroupId());
}

/**
* Called after starting Kafka Consumer
*
* @param runningConsumer running {@link ConsumerRunnable} instance along with its {@link ConsumerConfig}
*/
protected void onConsumerStart(final RunningConsumer runningConsumer) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerApplication} without any additional configuration options.
* {@link KafkaConsumerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,11 +24,14 @@

package com.bakdata.kafka.consumerproducer;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.consumer.ConsumerExecutionOptions;
import com.bakdata.kafka.mixin.ConsumerOptions;
import com.bakdata.kafka.mixin.ErrorOptions;
import com.bakdata.kafka.mixin.InputOptions;
import com.bakdata.kafka.mixin.OutputOptions;
import com.bakdata.kafka.producer.ProducerExecutionOptions;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -94,10 +97,9 @@ public void clean() {
}

/**
* Clear all state stores and consumer group offsets associated with the Kafka ConsumerProducer application.
* Reset consumer group offsets associated with the Kafka ConsumerProducer application.
*/
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka ConsumerProducer application.")
@Command(description = "Reset consumer group offsets associated with the Kafka ConsumerProducer application.")
public void reset() {
this.prepareClean();
try (final CleanableApp<ConsumerProducerCleanUpRunner> app = this.createCleanableApp()) {
Expand All @@ -108,7 +110,17 @@ public void reset() {

@Override
public final Optional<ConsumerProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
final ConsumerProducerExecutionOptions executionOptions = ConsumerProducerExecutionOptions.builder()
.consumerExecutionOptions(ConsumerExecutionOptions.builder()
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.pollTimeout(this.getPollTimeout())
.build())
.producerExecutionOptions(ProducerExecutionOptions.builder()
.build())
.build();
return Optional.of(executionOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerProducerApplication} without any additional configuration options.
* {@link KafkaConsumerProducerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerProducerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package com.bakdata.kafka.mixin;

import java.time.Duration;
import lombok.Data;
import picocli.CommandLine;
import picocli.CommandLine.Option;

/**
* Shared CLI options to configure Kafka Consumer applications.
Expand All @@ -38,4 +40,7 @@ public class ConsumerOptions {
@CommandLine.Option(names = "--group-id",
description = "Unique identifier for the Kafka Consumer applications, used as 'group.id'.")
private String groupId;
@Option(names = {"--poll-timeout"},
description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.")
private Duration pollTimeout = Duration.ofSeconds(10L);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -73,7 +73,9 @@ public void clean() {

@Override
public final Optional<ProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
final ProducerExecutionOptions executionOptions = ProducerExecutionOptions.builder()
.build();
return Optional.of(executionOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,7 @@

package com.bakdata.kafka.streams;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.mixin.ErrorOptions;
import com.bakdata.kafka.mixin.InputOptions;
Expand Down Expand Up @@ -116,7 +117,9 @@ public void reset() {
@Override
public final Optional<StreamsExecutionOptions> createExecutionOptions() {
final StreamsExecutionOptions options = StreamsExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.uncaughtExceptionHandler(this::createUncaughtExceptionHandler)
.stateListener(this::createStateListener)
.onStart(this::onStreamsStart)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -65,7 +65,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -97,7 +97,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -125,7 +125,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -161,7 +161,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -191,11 +191,11 @@ public ConsumerProducerApp createApp() {
return new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {};
return consumerConfig -> {};
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "my-id";
}

Expand Down Expand Up @@ -223,13 +223,13 @@ void shouldExitWithErrorInBuildRunnable() {
() -> new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {
return consumerConfig -> {
throw new RuntimeException("Error building runnable");
};
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "app";
}

Expand Down Expand Up @@ -264,8 +264,8 @@ void shouldExitWithSuccessCodeOnShutdown() {
() -> new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {
try (final Producer<String, String> producer = builder.producerBuilder()
return consumerConfig -> {
try (final Producer<String, String> producer = builder.getProducerBuilder()
.createProducer()) {
final ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(output, "foo", "bar");
Expand All @@ -275,7 +275,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "app";
}

Expand Down Expand Up @@ -321,7 +321,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -351,7 +351,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -60,19 +60,19 @@ public ConsumerProducerApp createApp() {

@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
final Producer<String, String> producer = builder.producerBuilder().createProducer();
final Consumer<String, String> consumer = builder.consumerBuilder().createConsumer();
builder.consumerBuilder().subscribeToAllTopics(consumer);
final Producer<String, String> producer = builder.getProducerBuilder().createProducer();
final Consumer<String, String> consumer = builder.getConsumerBuilder().createConsumer();
builder.getConsumerBuilder().subscribeToAllTopics(consumer);
final ConsumerRunnable consumerRunnable =
builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records ->
builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records ->
records.forEach(consumerRecord ->
producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(),
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(),
consumerRecord.key(), consumerRecord.value()))));
return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable);
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return CloseFlagApp.this.getClass().getSimpleName() + "-" + configuration.getTopics().getOutputTopic();
}

Expand Down
Loading
Loading