I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,
@Service
public class MyPublishService {
@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient
@PostConstruct
void init(){
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}
public void publish(String topic, Object payload) {//which call sends payload to kafka
eventProducer.send(topic, payload);
}
}//end of service
@Configuration
class MyKafkaConfig {
String server; //which I expect to be localhost:9092 in integration tests
@Bean
public MyEventsProducer myEventsProducer(){
if(localEnv()) { //tells if setup is local kafka
server = "localhost:9092";
}
return new MyEventsProducer(server);
}
}
Now my IntegrationTest set up is,
@ExtendWith(SpringExtension.class)
@SpringBootTest(
classes = Application.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{
@Autowired
MyPublishService myPublishService;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void publishMessageTest() {
System.out.println(embeddedKafkaBroker.getBrokersAsString());
myPublishService.publishCollaboration(collaboration);
}
}
I have 2 questions,
- Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set
spring.kafka.bootstrap-servers: localhost:9092inapplication-test.yamlbut it did not help. What should I do to get a fixed host:port? - I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.
Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 org.testcontainers:kafka:1.19.3