diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java index 57fbe220b..9ddcc7b9a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -114,9 +114,19 @@ private AdminClientX createAdminClient() { @RequiredArgsConstructor private class Task { - private final @NonNull AdminClientX adminClient; + private static T runAdminFuture(final KafkaFuture action, final String errorMessage) { + try { + return action.get(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CleanUpException(errorMessage, e); + } catch (final ExecutionException e) { + throw new CleanUpException(errorMessage, e); + } + } + private void reset() { final ConsumerGroupClient groupClient = this.adminClient.consumerGroups() .group(ConsumerCleanUpRunner.this.groupId); @@ -146,17 +156,6 @@ private void reset() { ConsumerCleanUpRunner.this.cleanHooks.runResetHooks(); } - private static T runAdminFuture(final KafkaFuture action, final String errorMessage) { - try { - return action.get(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new CleanUpException(errorMessage, e); - } catch (final ExecutionException e) { - throw new CleanUpException(errorMessage, e); - } - } - private void clean() { this.deleteConsumerGroup(); ConsumerCleanUpRunner.this.cleanHooks.runCleanHooks(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java index b331f42ea..5812d6d62 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java @@ -26,13 +26,11 @@ import com.bakdata.kafka.App; import com.bakdata.kafka.AppConfiguration; -import com.bakdata.kafka.streams.StreamsCleanUpConfiguration; -import com.bakdata.kafka.streams.StreamsCleanUpRunner; /** * Application that defines how to produce or consume messages to and from Kafka and necessary configurations */ -public interface ConsumerProducerApp extends App { +public interface ConsumerProducerApp extends App { /** * Create a runnable that consumes and produces Kafka messages @@ -46,8 +44,8 @@ public interface ConsumerProducerApp extends App - * User may provide a unique group identifier via {@link ConsumerProducerAppConfiguration#getUniqueAppId()}. - * If that is the case, the returned group ID should match the provided one. + * User may provide a unique group identifier via {@link ConsumerProducerAppConfiguration#getUniqueAppId()}. If that + * is the case, the returned group ID should match the provided one. * * @param configuration provides runtime configuration * @return unique group identifier @@ -58,13 +56,13 @@ default String getUniqueAppId(final ConsumerProducerAppConfiguration configurati } /** - * @return {@code StreamsCleanUpConfiguration} - * @see StreamsCleanUpRunner + * @return {@code ConsumerProducerCleanUpConfiguration} + * @see ConsumerProducerCleanUpRunner */ @Override - default StreamsCleanUpConfiguration setupCleanUp( + default ConsumerProducerCleanUpConfiguration setupCleanUp( final AppConfiguration configuration) { - return new StreamsCleanUpConfiguration(); + return new ConsumerProducerCleanUpConfiguration(); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpConfiguration.java new file mode 100644 index 000000000..107f9571b --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpConfiguration.java @@ -0,0 +1,87 @@ +/* + * MIT License + * + * Copyright (c) 2026 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.consumerproducer; + +import com.bakdata.kafka.HasCleanHook; +import com.bakdata.kafka.HasTopicHooks; +import java.util.ArrayList; +import java.util.Collection; +import lombok.NonNull; + +/** + * Provides configuration options for {@link ConsumerProducerCleanUpRunner} + */ +public class ConsumerProducerCleanUpConfiguration implements HasTopicHooks, + HasCleanHook, AutoCloseable { + private final @NonNull Collection topicHooks = new ArrayList<>(); + private final @NonNull Collection cleanHooks = new ArrayList<>(); + private final @NonNull Collection resetHooks = new ArrayList<>(); + + /** + * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. + */ + @Override + public ConsumerProducerCleanUpConfiguration registerTopicHook(final TopicHook hook) { + this.topicHooks.add(hook); + return this; + } + + /** + * Register a hook that is executed after {@link ConsumerProducerCleanUpRunner#clean()} has finished + */ + @Override + public ConsumerProducerCleanUpConfiguration registerCleanHook(final Runnable hook) { + this.cleanHooks.add(hook); + return this; + } + + /** + * Register a hook that is executed after {@link ConsumerProducerCleanUpRunner#reset()} has finished + * + * @param hook factory to create hook from + * @return self for chaining + */ + public ConsumerProducerCleanUpConfiguration registerResetHook(final Runnable hook) { + this.resetHooks.add(hook); + return this; + } + + @Override + public void close() { + this.topicHooks.forEach(TopicHook::close); + } + + void runTopicDeletionHooks(final String topic) { + this.topicHooks.forEach(hook -> hook.deleted(topic)); + } + + void runCleanHooks() { + this.cleanHooks.forEach(Runnable::run); + } + + void runResetHooks() { + this.resetHooks.forEach(Runnable::run); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java index d8a813c84..5b95436e0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,41 +24,54 @@ package com.bakdata.kafka.consumerproducer; +import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.CleanUpRunner; -import com.bakdata.kafka.consumer.ConsumerCleanUpConfiguration; -import com.bakdata.kafka.consumer.ConsumerCleanUpRunner; +import com.bakdata.kafka.SchemaRegistryAppUtils; +import com.bakdata.kafka.admin.AdminClientX; +import com.bakdata.kafka.admin.ConsumerGroupsClient.ConsumerGroupClient; import com.bakdata.kafka.consumer.ConsumerTopicConfig; -import com.bakdata.kafka.producer.ProducerCleanUpConfiguration; -import com.bakdata.kafka.producer.ProducerCleanUpRunner; -import com.bakdata.kafka.producer.ProducerTopicConfig; -import com.bakdata.kafka.streams.StreamsCleanUpConfiguration; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.jooq.lambda.Seq; /** + * TODO: + * Clean up all topics specified by a {@link ConsumerTopicConfig} * Delete all output topics specified by a {@link ConsumerProducerTopicConfig} */ @Slf4j @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class ConsumerProducerCleanUpRunner implements CleanUpRunner { - private final @NonNull ConsumerCleanUpRunner consumerCleanUpRunner; - private final @NonNull ProducerCleanUpRunner producerCleanUpRunner; + private final @NonNull ConsumerProducerTopicConfig topics; + private final @NonNull Map kafkaProperties; + private final @NonNull String groupId; + private final @NonNull ConsumerProducerCleanUpConfiguration cleanHooks; /** - * Create a new {@code ConsumerProducerCleanUpRunner} with default {@link ConsumerCleanUpConfiguration} + * Create a new {@code ConsumerProducerCleanUpRunner} with default {@link ConsumerProducerCleanUpConfiguration} * * @param topics topic configuration * @param kafkaProperties configuration to connect to Kafka admin tools + * @param groupId consumer group id to clean up * @return {@code ConsumerProducerCleanUpRunner} */ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProducerTopicConfig topics, @NonNull final Map kafkaProperties, @NonNull final String groupId) { - return create(topics, kafkaProperties, groupId, new StreamsCleanUpConfiguration()); + return create(topics, kafkaProperties, groupId, new ConsumerProducerCleanUpConfiguration()); } /** @@ -66,42 +79,117 @@ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProduc * * @param topics topic configuration * @param kafkaProperties configuration to connect to Kafka admin tools - * @param groupId group id of the consumer + * @param groupId consumer group id to clean up * @param configuration configuration for hooks that are called when running {@link #clean()} - * @return {@code ConsumerCleanUpRunner} + * @return {@code ConsumerProducerCleanUpRunner} */ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProducerTopicConfig topics, @NonNull final Map kafkaProperties, @NonNull final String groupId, - @NonNull final StreamsCleanUpConfiguration configuration) { - final ConsumerTopicConfig consumerTopicConfig = topics.toConsumerTopicConfig(); - final ProducerTopicConfig producerTopicConfig = topics.toProducerTopicConfig(); - final ConsumerCleanUpConfiguration consumerConfig = configuration.toConsumerCleanUpConfiguration(); - final ProducerCleanUpConfiguration producerConfig = configuration.toProducerCleanUpConfiguration(); - final ConsumerCleanUpRunner consumerCleanUpRunner = - ConsumerCleanUpRunner.create(consumerTopicConfig, kafkaProperties, groupId, consumerConfig); - final ProducerCleanUpRunner producerCleanUpRunner = - ProducerCleanUpRunner.create(producerTopicConfig, kafkaProperties, producerConfig); - return new ConsumerProducerCleanUpRunner(consumerCleanUpRunner, producerCleanUpRunner); + @NonNull final ConsumerProducerCleanUpConfiguration configuration) { + SchemaRegistryAppUtils.createTopicHook(kafkaProperties) + .ifPresent(configuration::registerTopicHook); + return new ConsumerProducerCleanUpRunner(topics, kafkaProperties, groupId, configuration); } @Override public void close() { - this.consumerCleanUpRunner.close(); - this.producerCleanUpRunner.close(); + this.cleanHooks.close(); } @Override public void clean() { - this.consumerCleanUpRunner.clean(); - this.producerCleanUpRunner.clean(); + try (final AdminClientX adminClient = this.createAdminClient()) { + final Task task = new Task(adminClient); + task.clean(); + } } /** * Reset your ConsumerProducer app by resetting consumer group offsets */ public void reset() { - this.consumerCleanUpRunner.reset(); + try (final AdminClientX adminClient = this.createAdminClient()) { + final Task task = new Task(adminClient); + task.reset(); + } + } + + private AdminClientX createAdminClient() { + return AdminClientX.create(this.kafkaProperties); } + @RequiredArgsConstructor + private class Task { + private final @NonNull AdminClientX adminClient; + + private static T runAdminFuture(final KafkaFuture action, final String errorMessage) { + try { + return action.get(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CleanUpException(errorMessage, e); + } catch (final ExecutionException e) { + throw new CleanUpException(errorMessage, e); + } + } + + private void clean() { + this.deleteConsumerGroup(); + this.deleteTopics(); + ConsumerProducerCleanUpRunner.this.cleanHooks.runCleanHooks(); + } + + private void reset() { + final ConsumerGroupClient groupClient = this.adminClient.consumerGroups() + .group(ConsumerProducerCleanUpRunner.this.groupId); + final Optional groupDescription = groupClient.describe(); + if (groupDescription.isEmpty()) { + return; + } + if (groupDescription.get().groupState() != GroupState.EMPTY) { + throw new CleanUpException("Error resetting application, consumer group is not empty"); + } + + final Map groupOffsets = groupClient.listOffsets(); + + final Map request = groupOffsets.keySet().stream() + .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.earliest())); + final Map earliestOffsets = + runAdminFuture(this.adminClient.admin().listOffsets(request).all(), + "Error resetting application, beginning consumer group offset could not be found"); + + final Map resetOffsets = earliestOffsets.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> new OffsetAndMetadata(e.getValue().offset()))); + runAdminFuture( + this.adminClient.admin() + .alterConsumerGroupOffsets(ConsumerProducerCleanUpRunner.this.groupId, resetOffsets) + .all(), "Error resetting application, could not alter consumer group offsets"); + + ConsumerProducerCleanUpRunner.this.cleanHooks.runResetHooks(); + } + + private void deleteConsumerGroup() { + this.adminClient.consumerGroups().group(ConsumerProducerCleanUpRunner.this.groupId).deleteIfExists(); + } + + private void deleteTopics() { + final Iterable outputTopics = this.getAllOutputTopics(); + outputTopics.forEach(this::deleteTopic); + } + + private void deleteTopic(final String topic) { + this.adminClient.topics() + .topic(topic).deleteIfExists(); + ConsumerProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); + } + + private Iterable getAllOutputTopics() { + final String errorTopic = ConsumerProducerCleanUpRunner.this.topics.getErrorTopic(); + return Seq.of(ConsumerProducerCleanUpRunner.this.topics.getOutputTopic()) + .concat(ConsumerProducerCleanUpRunner.this.topics.getLabeledOutputTopics().values()) + .append(Optional.ofNullable(errorTopic)); + } + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java index af3f40d1a..ca849b8b1 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -44,7 +44,6 @@ @Value @EqualsAndHashCode public class ConsumerProducerTopicConfig { - @Builder.Default @NonNull List inputTopics = emptyList(); @@ -54,6 +53,7 @@ public class ConsumerProducerTopicConfig { @Builder.Default @NonNull Map> labeledInputTopics = emptyMap(); + Pattern inputPattern; /** * Input patterns that are identified by a label @@ -61,6 +61,7 @@ public class ConsumerProducerTopicConfig { @Builder.Default @NonNull Map labeledInputPatterns = emptyMap(); + String outputTopic; /** * Output topics that are identified by a label @@ -68,6 +69,7 @@ public class ConsumerProducerTopicConfig { @Builder.Default @NonNull Map labeledOutputTopics = emptyMap(); + String errorTopic; public ConsumerTopicConfig toConsumerTopicConfig() { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java index 8c4c04a4a..0ee319a1a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java @@ -27,10 +27,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.consumer.ConsumerBuilder; -import com.bakdata.kafka.consumer.ConsumerTopicConfig; import com.bakdata.kafka.producer.ProducerBuilder; -import com.bakdata.kafka.producer.ProducerTopicConfig; -import com.bakdata.kafka.streams.StreamsCleanUpConfiguration; import java.util.Map; import lombok.AccessLevel; import lombok.Builder; @@ -63,9 +60,9 @@ public class ExecutableConsumerProducerApp @Override public ConsumerProducerCleanUpRunner createCleanUpRunner() { final AppConfiguration configuration = this.createConfiguration(); - final StreamsCleanUpConfiguration streamsCleanUpConfiguration = this.app.setupCleanUp(configuration); + final ConsumerProducerCleanUpConfiguration cleanUpConfiguration = this.app.setupCleanUp(configuration); return ConsumerProducerCleanUpRunner.create(this.topics, this.consumerProperties, this.groupId, - streamsCleanUpConfiguration); + cleanUpConfiguration); } /** diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpConfiguration.java index 90b511f8e..1bba068d3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpConfiguration.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,10 +24,9 @@ package com.bakdata.kafka.streams; + import com.bakdata.kafka.HasCleanHook; import com.bakdata.kafka.HasTopicHooks; -import com.bakdata.kafka.consumer.ConsumerCleanUpConfiguration; -import com.bakdata.kafka.producer.ProducerCleanUpConfiguration; import java.util.ArrayList; import java.util.Collection; import lombok.NonNull; @@ -62,6 +61,7 @@ public StreamsCleanUpConfiguration registerCleanHook(final Runnable hook) { /** * Register a hook that is executed after {@link StreamsCleanUpRunner#reset()} has finished + * * @param hook factory to create hook from * @return self for chaining */ @@ -86,30 +86,4 @@ void runResetHooks() { void runTopicDeletionHooks(final String topic) { this.topicHooks.forEach(hook -> hook.deleted(topic)); } - - /** - * Converts this configuration to a {@link ConsumerCleanUpConfiguration}, transferring - * registered clean and reset hooks. - * - * @return {@link ConsumerCleanUpConfiguration} instance - */ - public ConsumerCleanUpConfiguration toConsumerCleanUpConfiguration() { - final ConsumerCleanUpConfiguration configuration = new ConsumerCleanUpConfiguration(); - this.cleanHooks.forEach(configuration::registerCleanHook); - this.resetHooks.forEach(configuration::registerResetHook); - return configuration; - } - - /** - * Converts this configuration to a {@link ProducerCleanUpConfiguration}, transferring - * registered clean and topic hooks. - * - * @return {@link ProducerCleanUpConfiguration} instance - */ - public ProducerCleanUpConfiguration toProducerCleanUpConfiguration() { - final ProducerCleanUpConfiguration configuration = new ProducerCleanUpConfiguration(); - this.cleanHooks.forEach(configuration::registerCleanHook); - this.topicHooks.forEach(configuration::registerTopicHook); - return configuration; - } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java index abd99a731..29f080459 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -49,7 +49,6 @@ import com.bakdata.kafka.consumerproducer.apps.MirrorValueWithAvroConsumerProducer; import com.bakdata.kafka.consumerproducer.apps.StringConsumerProducer; import com.bakdata.kafka.consumerproducer.apps.StringPatternConsumerProducer; -import com.bakdata.kafka.streams.StreamsCleanUpConfiguration; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -75,15 +74,15 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class ConsumerProducerCleanUpRunnerTest extends KafkaTest { - @InjectSoftAssertions - private SoftAssertions softly; - @Mock - private TopicHook topicHook; - private static final ConsumerProducerTopicConfig TOPIC_CONFIG = ConsumerProducerTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") + .errorTopic("error") .build(); + @InjectSoftAssertions + private SoftAssertions softly; + @Mock + private TopicHook topicHook; private static void reset(final ExecutableApp app) { try (final ConsumerProducerCleanUpRunner cleanUpRunner = app.createCleanUpRunner()) { @@ -121,10 +120,18 @@ static ConfiguredConsumerProducerApp createStringPatternCon new ConsumerProducerAppConfiguration(topics)); } + static ExecutableConsumerProducerApp createExecutableApp( + final ConfiguredConsumerProducerApp app, + final RuntimeConfiguration runtimeConfiguration) { + return app.withRuntimeConfiguration(runtimeConfiguration); + } + + // TODO: More tests with cleanHook, resetHook + // Verify called / not called on clean and reset private ConfiguredConsumerProducerApp createCleanUpHookApplication() { return new ConfiguredConsumerProducerApp<>(new StringConsumerProducer() { @Override - public StreamsCleanUpConfiguration setupCleanUp( + public ConsumerProducerCleanUpConfiguration setupCleanUp( final AppConfiguration configuration) { return super.setupCleanUp(configuration) .registerTopicHook(ConsumerProducerCleanUpRunnerTest.this.topicHook); @@ -132,12 +139,6 @@ public StreamsCleanUpConfiguration setupCleanUp( }, new ConsumerProducerAppConfiguration(TOPIC_CONFIG)); } - static ExecutableConsumerProducerApp createExecutableApp( - final ConfiguredConsumerProducerApp app, - final RuntimeConfiguration runtimeConfiguration) { - return app.withRuntimeConfiguration(runtimeConfiguration); - } - @Test void shouldDeleteTopic() { try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); @@ -177,6 +178,41 @@ void shouldDeleteTopic() { } } + @Test + void shouldDeleteErrorTopic() { + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + this.createConfig())) { + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getInputTopics().get(0)); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.createTopic(app.getTopics().getErrorTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(app.getTopics().getInputTopics().get(0), List.of( + new SimpleProducerRecord<>("blub", "blub"), + new SimpleProducerRecord<>("bla", "bla"), + new SimpleProducerRecord<>("blub", "blub") + )); + + run(executableApp); + this.assertContent(app.getTopics().getErrorTopic(), List.of(), + "Error topic exists and is empty"); + + awaitClosed(executableApp); + clean(executableApp); + + try (final AdminClientX admin = testClient.admin()) { + final TopicsClient topicClient = admin.topics(); + this.softly.assertThat(topicClient.topic(app.getTopics().getErrorTopic()).exists()) + .as("Error topic is deleted") + .isFalse(); + } + } + } + + @Test void shouldDeleteConsumerGroup() { try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); @@ -374,6 +410,7 @@ void shouldCallCleanUpHookForAllTopics() { this.createConfig())) { clean(executableApp); verify(this.topicHook).deleted(app.getTopics().getOutputTopic()); + verify(this.topicHook).deleted(app.getTopics().getErrorTopic()); verify(this.topicHook).close(); verifyNoMoreInteractions(this.topicHook); }