239 questions
1
vote
0
answers
67
views
Assertion failed: (self->u.Consumer.rkqu) for confluent python kafka
Try the confluent kafka base example
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': None,
'auto.offset.reset': 'earliest'
})
c....
4
votes
1
answer
916
views
How can I prevent the "Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to" log in a confluent-kafka-python consumer?
When the consumer (which is a very simple confluent-kafka-python consumer) starts, we see this log message after the assignment
%6|1739802885.947|GETSUBSCRIPTIONS|<consumer id>#consumer-1| [...
0
votes
1
answer
26
views
Search for a message using key or value
I would like find a published message using the message KEY or VALUE. How can I search the message and print it?
I have tried the below one but nothing is working:
def filter_messages(self, topic, ...
0
votes
0
answers
750
views
Issues with connecting to Kafka topic as a Consumer
I'm new to Kafka and am trying to get a simple consumer working for my application. I have tried getting python-kafka and confluent-kafka to work.
My consumer config properties are the following:
conf ...
0
votes
1
answer
45
views
Kafka partioning key difference between Python and Nestjs
In my infrastructure I have multiple microservices with multiple language communicating over Kafka. In my Kafka I have multiple partitions and to keep consistency I use a key when I push messages so ...
0
votes
1
answer
1k
views
Confluent-Kafka: no broker available for coordinator query: intervaled in state query-coord
I am using python-confluent-kafka to create a producer.
The Kafka cluster is on MSK 3.7.x KRaft with IAM enabled and TLS enabled, both within the cluster and between clients and brokers.
Any ideas on ...
0
votes
0
answers
35
views
Container script does not automatically run at the container start-up
I just started learning about Docker and I am trying to create a project where I have producer, consumer and server scripts in seperate containers.Consumers update the table in sqlite3 database file ...
0
votes
2
answers
2k
views
Kafka Consumer first poll(0) returns no data
I'm using confluent-kafka-client. I have one producer producing into a topic with one partition and one consumer within one group ID. First, I create a producer (with default configs) for the topic (...
1
vote
1
answer
656
views
Cannot connect locally to containerized Kafka, Error: Missing close-
I believe I have a configuration problem that I don't seem to be able to get around. Currently I have a containerized Kafka broker and I want to connect to it locally. However, when I try to connect ...
0
votes
0
answers
829
views
No module named 'confluent_kafka.cimpl' error while updating aws lambda runtime from python 3.9 to 3.11
I want to update my python runtime for aws lambda from 3.9 to 3.11.
Till now, I am using python-3.9 runtime and I have not specified any requirements or additional libraries to my lambda function, ...
0
votes
0
answers
90
views
OverflowError: Python int too large to convert to C int when using confluent_kafka Avro deserializer
I'm encountering an OverflowError when attempting to deserialize messages using the **confluent_kafka **Avro deserializer in Python. Here's a simplified version of my code:
class ConsumerKafka:
...
1
vote
1
answer
694
views
Kafka consumer status says "no active members" although I see my consumers ingesting logs
I am using kafka static partitioning to assign a consumer to a group example:
I have a test-topic with 3 partitions and I have manually made consumer1 connect to partition 0, consumer2 connect to ...
2
votes
1
answer
501
views
How does one alter a topic configuration using incremental_alter_configs?
I am trying to create a Python script to automate topic creation and configuration.
I have managed to make the topic creation part work. I get a runtime error when trying to alter the configuration.
...
0
votes
0
answers
484
views
Confluent Kafka connection from Databricks
I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors.
Now, if I try to list out ...
2
votes
2
answers
230
views
Deserializtion/Parsing error KafkaProtobuf Python
Serialization code (Go lang)
1. Producer
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
producerConfig := getKafkaProducerConfig(config.EnvConfig)
producer, err := ...
0
votes
1
answer
96
views
Kafka Error vs Error Field within each TopicPartition in the confluent kafka python on_commit callback()
I'm writing a kafka consumer using confluent-kafka in Python. I'm using the asynccommit API. The documentation for the on_commit callback says the following. I don't understand the different between ...
0
votes
0
answers
2k
views
Error "KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}" coiming in confluent-kafka
Unable to connect kafka broker using confluent-kafka using oauth. Also I am getting SSL handshake error in the log. but the callback error I am getting "KafkaError{code=\_MSG_TIMED_OUT,val=-192,...
0
votes
2
answers
782
views
ConfluentKafka in Python: Using consumer to consumer set of records
I have started to learn confluent kafka(python). There is 1 producer, 1 topic, 1 partition and 1 consumer(a simple setup). My requirement is that I wish to fetch data collectively. I read that using ...
2
votes
2
answers
3k
views
Confluent Kafka python producer poll() method
I am newbie on Kafka and while reading docs stumbled upon flush() and poll() methods. According to my understanding, flush method flushes the producer buffer to send all events to broker.
But what ...
1
vote
1
answer
482
views
Not receiving the messages at aws MSK consumer end
I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications in aws msk cluster. I am able to send and received with ...
0
votes
0
answers
260
views
Facing connection issues within confluent-kafka in docker compose environment
I’m relatively new to Kafka and currently trying to set up a Kafka consumer within a Docker Compose environment. Unfortunately, I’ve hit a roadblock with connection issues and I’d greatly appreciate ...
1
vote
1
answer
198
views
I'm not receiving the keys in the Kafka content - Python/confluent
I'm trying to receive the message from the topic using Python script.
from confluent_kafka import Consumer, KafkaError
import uuid
# Kafka broker details
broker = "something"
topic = "...
0
votes
1
answer
484
views
Data stream with confluent-kafka python giving error - TypeError: You must pass either str or Schema
I am trying to build Stream-Table Join Operations in Kafka python and below is the code that performs join operation on data sent from producer
stream_table_join.py
from confluent_kafka import ...
2
votes
0
answers
162
views
Reset accumulated lag on all kafka partitions
Good afternoon. Given: a kafka topic with more than 20 partitions and a huge message traffic and one consumer group.
It is necessary each time connecting to a topic to reset all the lag available ...
0
votes
1
answer
630
views
Cannot get offset by timestamp in redpanda cluster
I deployed a redpanda cluster, and would like to query offset by timestamp.
I first tried confluent-kafka python library:
import confluent_kafka as ck
import uuid
c = ck.Consumer({
'bootstrap....