First time working with Kafka and I've run into a problem.
I have a following implementation of my consumer:
from kafka import KafkaConsumer
import config
class KafkaMessageConsumer:
def __init__(self):
self.consumer = KafkaConsumer(
bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVER,
security_protocol=config.KAFKA_SECURITY_PROTOCOL,
sasl_mechanism=config.KAFKA_SASL_MECHANISM,
sasl_plain_username=config.KAFKA_USERNAME,
sasl_plain_password=config.KAFKA_PASSWORD,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
def receive_messages(self, topic):
self.consumer.subscribe(topics=[topic])
print(f"Subscribed to topics: {self.consumer.subscription()}")
for msg in self.consumer:
yield msg.value
if __name__ == "__main__":
consumer = KafkaMessageConsumer()
for message in consumer.receive_messages(config.KAFKA_TOPIC):
print("Received message:", message)
Where the credentials should be implemented correctly. I get the message of subscribing to the topic without error, but there are no yielded messages eventhough I know for sure that there are messages to be consumed on the topic. Am I missing some neccessery config here?