diff --git a/providers/jikkou-provider-kafka-connect/src/integration-test/java/io/streamthoughts/jikkou/kafka/connect/AbstractKafkaConnectorIT.java b/providers/jikkou-provider-kafka-connect/src/integration-test/java/io/streamthoughts/jikkou/kafka/connect/AbstractKafkaConnectorIT.java index 51eda54a3..ff7d23931 100644 --- a/providers/jikkou-provider-kafka-connect/src/integration-test/java/io/streamthoughts/jikkou/kafka/connect/AbstractKafkaConnectorIT.java +++ b/providers/jikkou-provider-kafka-connect/src/integration-test/java/io/streamthoughts/jikkou/kafka/connect/AbstractKafkaConnectorIT.java @@ -21,11 +21,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; @Testcontainers @@ -35,40 +35,43 @@ public class AbstractKafkaConnectorIT { private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaConnectorIT.class); - public static final String CONFLUENT_PLATFORM_VERSION = "7.5.0"; + public static final String APACHE_KAFKA_VERSION = "4.1.1"; + public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1"; private static final Network KAFKA_NETWORK = Network.newNetwork(); public static final String KAFKA_CONNECTOR_NAME = "test"; + @Container final KafkaContainer kafka = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft() - .withNetwork(KAFKA_NETWORK) - .withNetworkAliases("broker") - .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") - .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION)) + .withNetworkAliases("kafka") + .withNetwork(KAFKA_NETWORK) + .withListener("kafka:19092") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .withLogConsumer(new Slf4jLogConsumer(LOG)); @Container final GenericContainer connect = new GenericContainer( - DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION)) - .withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092") - .withEnv("CONNECT_REST_PORT", "8083") - .withEnv("CONNECT_GROUP_ID", "kafka-connect") - .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs") - .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets") - .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status") - .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") - .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") - .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect") - .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") - .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") - .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") - .withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors") - .withExposedPorts(8083) - .withNetwork(KAFKA_NETWORK) - .withNetworkAliases("kafka-connect") - .dependsOn(kafka) - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .waitingFor(forHttp("/connector-plugins")); + DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION)) + .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:19092") + .withEnv("CONNECT_REST_PORT", "8083") + .withEnv("CONNECT_GROUP_ID", "kafka-connect") + .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs") + .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets") + .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status") + .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") + .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") + .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect") + .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors") + .withExposedPorts(8083) + .withNetwork(KAFKA_NETWORK) + .withNetworkAliases("kafka-connect") + .dependsOn(kafka) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor(forHttp("/connector-plugins")); @NotNull protected String getConnectUrl() { @@ -77,7 +80,7 @@ protected String getConnectUrl() { protected void deployFilestreamSinkConnectorAndWait() throws URISyntaxException, IOException, InterruptedException { - try(HttpClient httpClient = HttpClient.newHttpClient();) { + try (HttpClient httpClient = HttpClient.newHttpClient();) { HttpRequest createOrUpdateConnectorConfig = HttpRequest.newBuilder() .uri(new URI(getConnectUrl() + "/connectors/test/config")) .header("Content-Type", "application/json") diff --git a/providers/jikkou-provider-kafka/src/integration-test/java/io/streamthoughts/jikkou/kafka/AbstractKafkaIntegrationTest.java b/providers/jikkou-provider-kafka/src/integration-test/java/io/streamthoughts/jikkou/kafka/AbstractKafkaIntegrationTest.java index d7094171d..1dd951469 100644 --- a/providers/jikkou-provider-kafka/src/integration-test/java/io/streamthoughts/jikkou/kafka/AbstractKafkaIntegrationTest.java +++ b/providers/jikkou-provider-kafka/src/integration-test/java/io/streamthoughts/jikkou/kafka/AbstractKafkaIntegrationTest.java @@ -35,7 +35,7 @@ public class AbstractKafkaIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaIntegrationTest.class); private static final Network KAFKA_NETWORK = Network.newNetwork(); - public static final String APACHE_KAFKA_VERSION = "3.8.0"; + public static final String APACHE_KAFKA_VERSION = "4.1.1"; public static final int DEFAULT_NUM_PARTITIONS = 1; public static final short DEFAULT_REPLICATION_FACTOR = (short) 1; diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/AbstractIntegrationTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/AbstractIntegrationTest.java index 42d87a751..cabeeab66 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/AbstractIntegrationTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/AbstractIntegrationTest.java @@ -16,11 +16,11 @@ import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; @Testcontainers @@ -47,12 +47,15 @@ public class AbstractIntegrationTest { private static final Network KAFKA_NETWORK = Network.newNetwork(); - public static final String CONFLUENT_PLATFORM_VERSION = "8.0.3"; + public static final String APACHE_KAFKA_VERSION = "4.1.1"; + public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1"; @Container final KafkaContainer kafkaContainer = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft() + DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION)) .withNetwork(KAFKA_NETWORK) + .withNetworkAliases("kafka") + .withListener("kafka:19092") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") .withLogConsumer(new Slf4jLogConsumer(LOG)); diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java index cfd5b6d9a..a7a827dce 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java @@ -6,6 +6,7 @@ */ package io.streamthoughts.jikkou.schema.registry.api; +import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.APACHE_KAFKA_VERSION; import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.CONFLUENT_PLATFORM_VERSION; import io.streamthoughts.jikkou.core.data.SchemaType; @@ -29,11 +30,11 @@ import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.KafkaContainer; import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Mono; @@ -48,8 +49,10 @@ class AsyncSchemaRegistryApiTest { @Container private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft() + DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION)) .withNetwork(KAFKA_NETWORK) + .withNetworkAliases("broker") + .withListener("broker:19092") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") .withLogConsumer(new Slf4jLogConsumer(LOG)); diff --git a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryContainer.java b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryContainer.java index e11a1709c..bbd6963e3 100644 --- a/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryContainer.java +++ b/providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryContainer.java @@ -7,9 +7,9 @@ package io.streamthoughts.jikkou.schema.registry.api; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.KafkaContainer; public class SchemaRegistryContainer extends GenericContainer { @@ -29,7 +29,7 @@ public SchemaRegistryContainer(String version) { } public SchemaRegistryContainer withKafka(KafkaContainer kafka) { - return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092"); + return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":19092"); } public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {