0

First time working with Kafka and I've run into a problem.

I have a following implementation of my consumer:


from kafka import KafkaConsumer
import config


class KafkaMessageConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVER,
            security_protocol=config.KAFKA_SECURITY_PROTOCOL,
            sasl_mechanism=config.KAFKA_SASL_MECHANISM,
            sasl_plain_username=config.KAFKA_USERNAME,
            sasl_plain_password=config.KAFKA_PASSWORD,
            value_deserializer=lambda x: json.loads(x.decode("utf-8")),
    )

    def receive_messages(self, topic):
        self.consumer.subscribe(topics=[topic])
        print(f"Subscribed to topics: {self.consumer.subscription()}")
        for msg in self.consumer:
            yield msg.value


if __name__ == "__main__":
    consumer = KafkaMessageConsumer()
    for message in consumer.receive_messages(config.KAFKA_TOPIC):
        print("Received message:", message)

Where the credentials should be implemented correctly. I get the message of subscribing to the topic without error, but there are no yielded messages eventhough I know for sure that there are messages to be consumed on the topic. Am I missing some neccessery config here?

1 Answer 1

1

I'm no expert in Python but it looks like you haven't consumed any messages. You have subscribed to the topic but you would need to poll() for messages https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll

Also, where have you set the topic name? [topic]

Sign up to request clarification or add additional context in comments.

3 Comments

I am not sure what do you mean by where have you set the topic name.. I am subscribing to the topic that is being pulled from config.py which I know for sure is correct.
Anyhow, adding poll() call did not help.
Ah, if the topic is provided in a configuration file that's fine. When you say poll() didn't help, do you mean it didn't consume any messages? Can you update your post with the new code? Also, were any errors returned?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.