Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
Expand All @@ -35,40 +35,43 @@ public class AbstractKafkaConnectorIT {

private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaConnectorIT.class);

public static final String CONFLUENT_PLATFORM_VERSION = "7.5.0";
public static final String APACHE_KAFKA_VERSION = "4.1.1";
public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1";
private static final Network KAFKA_NETWORK = Network.newNetwork();
public static final String KAFKA_CONNECTOR_NAME = "test";

@Container
final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
.withNetwork(KAFKA_NETWORK)
.withNetworkAliases("broker")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withLogConsumer(new Slf4jLogConsumer(LOG));
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
.withNetworkAliases("kafka")
.withNetwork(KAFKA_NETWORK)
.withListener("kafka:19092")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@Container
final GenericContainer connect = new GenericContainer(
DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION))
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
.withEnv("CONNECT_REST_PORT", "8083")
.withEnv("CONNECT_GROUP_ID", "kafka-connect")
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs")
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets")
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status")
.withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
.withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect")
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors")
.withExposedPorts(8083)
.withNetwork(KAFKA_NETWORK)
.withNetworkAliases("kafka-connect")
.dependsOn(kafka)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.waitingFor(forHttp("/connector-plugins"));
DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION))
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:19092")
.withEnv("CONNECT_REST_PORT", "8083")
.withEnv("CONNECT_GROUP_ID", "kafka-connect")
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs")
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets")
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status")
.withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
.withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect")
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors")
.withExposedPorts(8083)
.withNetwork(KAFKA_NETWORK)
.withNetworkAliases("kafka-connect")
.dependsOn(kafka)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.waitingFor(forHttp("/connector-plugins"));

@NotNull
protected String getConnectUrl() {
Expand All @@ -77,7 +80,7 @@ protected String getConnectUrl() {


protected void deployFilestreamSinkConnectorAndWait() throws URISyntaxException, IOException, InterruptedException {
try(HttpClient httpClient = HttpClient.newHttpClient();) {
try (HttpClient httpClient = HttpClient.newHttpClient();) {
HttpRequest createOrUpdateConnectorConfig = HttpRequest.newBuilder()
.uri(new URI(getConnectUrl() + "/connectors/test/config"))
.header("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AbstractKafkaIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaIntegrationTest.class);
private static final Network KAFKA_NETWORK = Network.newNetwork();

public static final String APACHE_KAFKA_VERSION = "3.8.0";
public static final String APACHE_KAFKA_VERSION = "4.1.1";
public static final int DEFAULT_NUM_PARTITIONS = 1;
public static final short DEFAULT_REPLICATION_FACTOR = (short) 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
Expand All @@ -47,12 +47,15 @@ public class AbstractIntegrationTest {

private static final Network KAFKA_NETWORK = Network.newNetwork();

public static final String CONFLUENT_PLATFORM_VERSION = "8.0.3";
public static final String APACHE_KAFKA_VERSION = "4.1.1";
public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1";

@Container
final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
.withNetwork(KAFKA_NETWORK)
.withNetworkAliases("kafka")
.withListener("kafka:19092")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withLogConsumer(new Slf4jLogConsumer(LOG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package io.streamthoughts.jikkou.schema.registry.api;

import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.APACHE_KAFKA_VERSION;
import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.CONFLUENT_PLATFORM_VERSION;

import io.streamthoughts.jikkou.core.data.SchemaType;
Expand All @@ -29,11 +30,11 @@
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;

Expand All @@ -48,8 +49,10 @@ class AsyncSchemaRegistryApiTest {

@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
.withNetwork(KAFKA_NETWORK)
.withNetworkAliases("broker")
.withListener("broker:19092")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withLogConsumer(new Slf4jLogConsumer(LOG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package io.streamthoughts.jikkou.schema.registry.api;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.kafka.KafkaContainer;

public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {

Expand All @@ -29,7 +29,7 @@ public SchemaRegistryContainer(String version) {
}

public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":19092");
}

public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
Expand Down
Loading