1

I am writing integration tests and created kafka topic using docker command docker exec kafka-broker kafka-topics.sh --create --bootstrap-server localhost:9093 --partitions 1 --replication-factor 1 --topic test-topic in github workflow, Topic is created successfully. Using kafka topic in testcase to write data on but it gives error org.apache.kafka.common.KafkaException: Failed to construct kafka producer

Full erro message

WARN ClientUtils: Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:274) at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136) at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82) at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198) at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:49) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$2(KafkaWriter.scala:72) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1(KafkaWriter.scala:73) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1$adapted(KafkaWriter.scala:70) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:414)

I have a code as bellow

df = #df creation code
    df.write.format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("kafka.security.protocol", "PLAINTEXT")
        .option("topic", test-topic)
        .save()

i want to run testcases in CICD. Can someone please help me how how to fix this?

2
  • Can you share the full error message please? Commented May 24, 2023 at 12:13
  • I have updated full error message now. Commented May 24, 2023 at 12:20

2 Answers 2

0

Looks like there is a port number mismatch: try changing .option("kafka.bootstrap.servers", kafka:9092) to .option("kafka.bootstrap.servers", "localhost:9093") - from the topic creation command it looks like your bootstrap server is setup to run on port 9093 & not 9092.

Sign up to request clarification or add additional context in comments.

4 Comments

I tried matching port but it gives error WARN NetworkClient: [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. and ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.kafka.common.errors.TimeoutException: Topic test-topic not present in metadata after 60000 ms.
Have you checked to see if the broker process is running when you run the test? I see that you have a container, kafka-broker where you have created the topic. What is the ip address of this container? your bootstrap server ip address may not be localhost if your tests are running in a different container than kafka-broker
I am writing tescase and creating testcaontainer maunally. I have below commands `docker pull bitnami/kafka:latest docker run -itd -p 9093:9093 --name kafka-broker--env-file $(ENV) -d $(IMAGE_NAME) docker ps -a sleep 15s #wait until service is up docker exec kafka-test-container kafka-topics.sh --create --bootstrap-server localhost:9093 --partitions 1 --replication-factor 1 --topic test-topic poetry run pytest \ --cov=app \ tests/integration/test_kafka.py
I can see in docker inspect kafka-broker IPAddress is 172.17.0.2. I have this in .env file "KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094", "KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093",
0

DNS resolution failed for kafka ... No resolvable bootstrap urls given

Outside of the Docker network, you cannot resolve container names.

Option 1: you need to run tests outside of a container and use localhost:9093, the external listener

Option 2: Run your tests in a container, on a shared Docker network (using --network flag for Docker run, or using Docker Compose) and use kafka:9092, where the host address can be resolved, the internal listener

in github workflow

Those always run as containers, so you're limited to option 2. --network flag isn't available there, however.

Start here https://testcontainers-python.readthedocs.io/en/latest/kafka/README.html

And also see https://docs.github.com/en/actions/using-containerized-services/about-service-containers#communicating-with-service-containers

10 Comments

Thank you so much helping me I tried.both ways but it gives error. 1st option - I used localhost:9093 as a external listener in code "bootstrap_server": "localhost:9093" but it does not connect. I have created topic successfull using docker exec cmd but it says can not find topic. Option 2: gives error Failed to construct kafka producer. Unfortunetly I can not use TestContainer.
As mentioned, in a Github workflow, localhost will not work (your Spark executors are not running Kafka brokers). For the DNS resolution errors, I dont have suggests beyond asking Github Support for using networking features in a workflow. But you should not be using docker run and docker exec in a Github workflow, anyway... And why can you not use TestContainers? This is what it was meant for
I want to run testcases in all workflows in the CI pipeline will run inside a container. I have used used --network flag like this .PHONY: integration-test-container integration-test-container: docker run -itd --network network-name -p 9093:9093 --name container-name --env-file env.env -d bitnami/kafka:latest sleep 15s docker exec container-name kafka-topics.sh --create --bootstrap-server localhost:9093 --partitions 1 --replication-factor 1 --topic test poetry run pytest --cov=package \ tests/integration/test_kafka.py
1) Poetry isn't running in a container. So, kafka DNS name isn't resolvable, as your error says. You don't need a network at all since there's only one container 2) I suggest using AdminClient in Python to create topics, as it'll also test your broker connection 3) use kafkacat to debug, as written in this blog confluent.io/blog/kafka-listeners-explained
Thank you so much for the link. I already had created AdminClient but i was not able to connect :( that is why i moved to docker exec. But i see that error comming from my tests as code can not find kafka topic.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.