456 questions
0
votes
2
answers
81
views
python kafka doesn't see headers/metadata while kafka ui and kafka go lang does
Kafka-python doesn't see kafka-headers, while kafka golang does.
It doesn't see any headers in all the messages.
Example of code:
for message in consumer:
# message value and key are raw bytes -- ...
1
vote
0
answers
24
views
Kafka Consumer Rebalancing Despite Different Group IDs
I'm working on a Kafka-based pipeline using Python (kafka-python) where I have two separate consumers:
consumer.py tracks user health factors from the topic aave-raw
→ uses group_id="risk-dash-...
0
votes
2
answers
167
views
Kafka consumer is missing messages during deployment
My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit().
...
0
votes
0
answers
76
views
Kafka external client receives other name than in ADVERTISED_LISTENERS
On a local network, I have a broker at cgw.local and a client at rpi.local, connected to the same switch. No matter what I put into kafka.advertised.listeners: PLAINTEXT://cgw.local:9092 or PLAINTEXT:/...
0
votes
1
answer
45
views
Kakfa failed to produce topic and consume value on certain consumer group when one of the broker down
I have two kafka broker (S1/S2) with the config below, and by default all my topics produce in one partition only.
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
...
0
votes
1
answer
230
views
Not able to consume messages using Kafka-python consumer SSL and SASL
consumer_SASL = KafkaConsumer(topics,
bootstrap_servers=kafkaBrokers,
group_id=group,
security_protocol='...
0
votes
0
answers
979
views
Encountering "[Errno 16] Device or Resource Busy" Error with Kafka Consumer in Snowflake Stored Procedure
Goal
The goal is to create a Snowflake stored procedure that consumes messages from a Kafka topic once per day, processes these messages, and then loads the processed data into Snowflake tables for ...
0
votes
1
answer
136
views
Kafka consumer not receiveing messages after first consumption
I have a problem while trying to make a data flux between a class called AD_Drone and another called AD_Engine. Engine has to produce for every drone connected their final position, after that, Drone ...
0
votes
1
answer
550
views
ValidationError while validating data against schema FastAvro
After multiple attempt I am not able to decode the error thrown by fastavro library when validating data against the schema.Below is what I am getting
File "fastavro\\_validation.pyx", line ...
-1
votes
1
answer
125
views
How to run python script running when django server starts
How to run python script running when django server starts. For example, I have a python kafka script , this script should be running continously when django server starts. when i give two commands in ...
0
votes
1
answer
223
views
Strange difference between reading from kafka topic using subscribe() and assign()
My task is to count messages in Kafka topics (some with one partition, some with many partitions). I tried two techniques: one with subscribe() and other with assign().
Full code:
#!/usr/bin/env ...
0
votes
0
answers
218
views
Why Python KafkaConsumer do not read all messages from topic?
For test purposes I have to read all messages in some kafka topics. Before test I remove all messages using /kafka-delete-records.sh then I run tests that fill kafka topics. After test I want to ...
1
vote
1
answer
146
views
Need to balancing kafka consumer tasks
I need to have a kafka producer and 4 consumers in python that balancing queue.
My Topic bash code:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --...
0
votes
1
answer
459
views
Batch process Kafka messages
I want to write my Kafka messages to a jsonl file which should each contain a number of lines (let's say 2). My producer currently writes 3 messages at a time so I should get two jsonl files: one with ...
1
vote
1
answer
1k
views
ImportError: cannot import name 'IncompatibleBrokerVersion' from 'kafka.errors'
I am trying to import:
from kafka import KafkaConsumer
from kafka import KafkaProducer
but I am getting this error
ImportError: cannot import name 'IncompatibleBrokerVersion' from
'kafka.errors'.
I ...
0
votes
1
answer
427
views
Why do I get a ValueError in my Kafka Consumer if I seek to another position? [duplicate]
I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.
I'm still getting my feet wet with Kafka so it's entirely possible I'm doing things the wrong way.
...
0
votes
1
answer
150
views
How does Kafka store messages offsets on a local computer?
How does Kafka store messages on a local server or laptop?
I'm new to Kafka and just playing around with the tech for now but I'm curious to the answer because I started by looking at the Kafka ...
0
votes
1
answer
3k
views
ModuleNotFoundError: No module named 'kafka'
I have downloaded Kafka and installed kafka-python library using "pip install kafka-python" and "conda install -c conda-forge kafka-python".
I am able to run "from kafka ...
0
votes
0
answers
17
views
kafka docker python API does not recognizes the broker [duplicate]
I have a kafka consumer in VSCode IDE (MS Windows) that works properly. I take the same python script in docker container, but the kafka broker is not recognized. I put them inside the same network; ...
1
vote
1
answer
1k
views
How can I mock the instantiation of a Class
I can't seem to figure out how to mock the instantiation of a class, any pointers would be greatly appreciated. Here is what I am trying to do:
I would like to test the method ClassA.some_method() and ...
4
votes
1
answer
5k
views
Kafka + FastAPI + Docker template
Introduction
I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern.
Goal - Vision
Building a repository of ...
0
votes
1
answer
94
views
Python - Optimal way to check for a condition with a timeout?
Wrote two functions, 1. uploads a gzip file to Artifactory and 2. listens to the Kafka topic.
As further steps, wanted to validate whether the upload is successful within 5 minutes by listening to the ...
0
votes
0
answers
16
views
Cannot connect to Kafka broker, BrokerConnection error 111 [duplicate]
I have a container in the same docker-compose that is trying to connect to my broker. It suddenly stopped connecting and is resulting with error:
2023-02-22 14:24:00 ERROR:kafka.conn:Connect attempt ...
0
votes
1
answer
930
views
Kafka Consumer Committing With Auto-Commit Disabled
I'm missing events when reading from a Kafaka queue because the consumer is updating the offset without an explicit commit even when enable_auto_commit is disabled.
from kafka import KafkaClient, ...
0
votes
1
answer
428
views
kafka produce messages in consumer
I have an application with the need to pass messages into multiple layers of processing.
I need to do this because all the new messages should be put into the first generic topic so they can be ...