0

I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,

@Service
public class MyPublishService {

@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient

@PostConstruct
void  init(){ 
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}

public void publish(String topic, Object payload) {//which call sends payload to kafka
 eventProducer.send(topic, payload);
}

}//end of service
@Configuration
class MyKafkaConfig {
  String server; //which I expect to be localhost:9092 in integration tests
  @Bean
  public MyEventsProducer myEventsProducer(){
   if(localEnv()) { //tells if setup is local kafka
    server = "localhost:9092";
   }
  return new MyEventsProducer(server);
 }
}

Now my IntegrationTest set up is,

@ExtendWith(SpringExtension.class)
@SpringBootTest(
    classes = Application.class,
    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{

    @Autowired
    MyPublishService myPublishService;

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void publishMessageTest() {
        System.out.println(embeddedKafkaBroker.getBrokersAsString());       
        myPublishService.publishCollaboration(collaboration);
    }
}

I have 2 questions,

  1. Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set spring.kafka.bootstrap-servers: localhost:9092 in application-test.yaml but it did not help. What should I do to get a fixed host:port?
  2. I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.

Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 org.testcontainers:kafka:1.19.3

1 Answer 1

1

See @EmbeddedKafka JavaDocs:

/**
 * Set explicit ports on which the kafka brokers will listen. Useful when running an
 * embedded broker that you want to access from other processes.
 * A port must be provided for each instance, which means the number of ports must match the value of the count attribute.
 * This property is not valid when using KRaft mode.
 * @return ports for brokers.
 * @since 2.2.4
 */
int[] ports() default { 0 };

That brokerProperties.port i not involved in the Broker instance creation.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.