0

I'm new to testcontainers and I want to configure confluent kafka with schema registry for testcontainers. And here's my full code of configuration.

@TestConfiguration(proxyBeanMethods = false)
class KafkaContainerConfig {
    companion object {
        val network = Network.newNetwork()
    }

    @Bean
    fun kafkaContainer(): ConfluentKafkaContainer {
        val kafkaContainer = ConfluentKafkaContainer("confluentinc/cp-kafka:7.5.0")
            .apply {
                withNetwork(network)
                withNetworkAliases("kafka")
                withEnv("KAFKA_CFG_NODE_ID", "1")
                withEnv("KAFKA_CFG_PROCESS_ROLES", "controller,broker")
                withEnv("KAFKA_CFG_LISTENERS", "INTERNAL://:9091,CONTROLLER://:9093,EXTERNAL://:9092")
                withEnv("KAFKA_CFG_ADVERTISED_LISTENERS", "INTERNAL://kafka:9091,EXTERNAL://kafka:9092")
                withEnv("KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT")
                withEnv("KAFKA_CFG_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
                withEnv("KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
                withEnv("KAFKA_CFG_TRANSACTION_STATE_LOG_ISR", "1")
                withEnv("KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
                withEnv("KAFKA_CFG_INTER_BROKER_LISTENER_NAME", "CONTROLLER")
                withEnv("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS", "1@kafka:9093")
                withExposedPorts(9092, 9093, 9091)
            }

        return kafkaContainer
    }

    @Bean
    @DependsOn("kafkaContainer")
    fun schemaRegistryContainer(kafkaContainer: ConfluentKafkaContainer): GenericContainer<Nothing> {
        val schemaRegistryContainer = GenericContainer<Nothing>("confluentinc/cp-schema-registry:7.5.0")
            .apply {
                withExposedPorts(8085)
                withNetwork(network)
                withNetworkAliases("schema-registry")
                withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
                withEnv("SCHEMA_REGISTRY_CUB_KAFKA_MIN_BROKERS", "1")
                withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9092")
                withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
            }

        return schemaRegistryContainer
    }

    @Bean
    fun setKafkaProperties(kafkaContainer: ConfluentKafkaContainer, schemaRegistryContainer: GenericContainer<Nothing>): DynamicPropertyRegistrar {
        return DynamicPropertyRegistrar { registry ->
            registry.add("spring.kafka.properties.bootstrap-servers") { kafkaContainer.bootstrapServers }
            registry.add("kafka.schema-registry-url") { "localhost:${schemaRegistryContainer.exposedPorts[0]}" }
            registry.add("spring.kafka.properties.schema.registry.url") { "localhost:${schemaRegistryContainer.exposedPorts[0]}" }
        }
    }
}

and when I run test code. I got this error message from docker container schema registry.

2025-06-04 17:33:03 [2025-06-04 08:33:03,440] WARN Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka (org.apache.kafka.clients.ClientUtils)
2025-06-04 17:33:03 [2025-06-04 08:33:03,441] ERROR Error while running kafka-ready. (io.confluent.admin.utils.cli.KafkaReadyCommand)
2025-06-04 17:33:03 org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
2025-06-04 17:33:03     at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
2025-06-04 17:33:03     at org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
2025-06-04 17:33:03     at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
2025-06-04 17:33:03     at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:136)
2025-06-04 17:33:03     at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2025-06-04 17:33:03 Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
2025-06-04 17:33:03     at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
2025-06-04 17:33:03     at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
2025-06-04 17:33:03     at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:508)
2025-06-04 17:33:03     ... 4 more
2025-06-04 17:33:03 Using log4j config /etc/schema-registry/log4j.properties

I think schema registry cannot resolve DNS of kafka:9092 which is network aliases of kafka container. then, what is exact configuration of kafka and schema registry for testcontainers?

1 Answer 1

0

I resolved this problem myself!

I added Listener with name kafka:19092, and set SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS env of schema registry kafka:19092 and it worked!

I also set listeners and advertised.listeners previous code but it did not work. I don't know why...

@TestConfiguration(proxyBeanMethods = false)
class KafkaContainerConfig {
    companion object {
        val kafkaNetwork = Network.newNetwork()
    }

    @Bean
    @ServiceConnection
    fun kafkaContainer(): ConfluentKafkaContainer {
        val kafkaContainer = ConfluentKafkaContainer("confluentinc/cp-kafka:7.5.0")
            .apply {
                withNetwork(kafkaNetwork)
                withListener("kafka:19092")
            }

        return kafkaContainer
    }

    @Bean
    @DependsOn("kafkaContainer")
    fun schemaRegistryContainer(kafkaContainer: ConfluentKafkaContainer): GenericContainer<Nothing> {
        val schemaRegistryContainer = GenericContainer<Nothing>("confluentinc/cp-schema-registry:7.5.0")
            .apply {
                withExposedPorts(8085)
                withNetwork(kafkaNetwork)
                withNetworkAliases("schema-registry")
                withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
                withEnv("SCHEMA_REGISTRY_CUB_KAFKA_MIN_BROKERS", "1")
                withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:19092")
                withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
                waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
            }

        return schemaRegistryContainer
    }

    @Bean
    fun setKafkaProperties(kafkaContainer: ConfluentKafkaContainer, schemaRegistryContainer: GenericContainer<Nothing>): DynamicPropertyRegistrar {
        return DynamicPropertyRegistrar { registry ->
            registry.add("spring.kafka.bootstrap-servers") { kafkaContainer.bootstrapServers }
            registry.add("kafka.schema-registry-url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
            registry.add("schema.registry.url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
        }
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.