Skip to main content
Filter by
Sorted by
Tagged with
1 vote
0 answers
67 views

Try the confluent kafka base example from confluent_kafka import Consumer c = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': None, 'auto.offset.reset': 'earliest' }) c....
Irina's user avatar
  • 1,417
4 votes
1 answer
916 views

When the consumer (which is a very simple confluent-kafka-python consumer) starts, we see this log message after the assignment %6|1739802885.947|GETSUBSCRIPTIONS|<consumer id>#consumer-1| [...
Sergej Herbert's user avatar
0 votes
1 answer
26 views

I would like find a published message using the message KEY or VALUE. How can I search the message and print it? I have tried the below one but nothing is working: def filter_messages(self, topic, ...
Ram Thoutam's user avatar
0 votes
0 answers
750 views

I'm new to Kafka and am trying to get a simple consumer working for my application. I have tried getting python-kafka and confluent-kafka to work. My consumer config properties are the following: conf ...
Kevin's user avatar
  • 59
0 votes
1 answer
45 views

In my infrastructure I have multiple microservices with multiple language communicating over Kafka. In my Kafka I have multiple partitions and to keep consistency I use a key when I push messages so ...
Antoine Grenard's user avatar
0 votes
1 answer
1k views

I am using python-confluent-kafka to create a producer. The Kafka cluster is on MSK 3.7.x KRaft with IAM enabled and TLS enabled, both within the cluster and between clients and brokers. Any ideas on ...
Fergus Johnson's user avatar
0 votes
0 answers
35 views

I just started learning about Docker and I am trying to create a project where I have producer, consumer and server scripts in seperate containers.Consumers update the table in sqlite3 database file ...
barkin sagan's user avatar
0 votes
2 answers
2k views

I'm using confluent-kafka-client. I have one producer producing into a topic with one partition and one consumer within one group ID. First, I create a producer (with default configs) for the topic (...
MatinMoezi's user avatar
1 vote
1 answer
656 views

I believe I have a configuration problem that I don't seem to be able to get around. Currently I have a containerized Kafka broker and I want to connect to it locally. However, when I try to connect ...
Kevin's user avatar
  • 11
0 votes
0 answers
829 views

I want to update my python runtime for aws lambda from 3.9 to 3.11. Till now, I am using python-3.9 runtime and I have not specified any requirements or additional libraries to my lambda function, ...
Yogesh Rajgure's user avatar
0 votes
0 answers
90 views

I'm encountering an OverflowError when attempting to deserialize messages using the **confluent_kafka **Avro deserializer in Python. Here's a simplified version of my code: class ConsumerKafka: ...
Andrea Bencini's user avatar
1 vote
1 answer
694 views

I am using kafka static partitioning to assign a consumer to a group example: I have a test-topic with 3 partitions and I have manually made consumer1 connect to partition 0, consumer2 connect to ...
Shivangi Singh's user avatar
2 votes
1 answer
501 views

I am trying to create a Python script to automate topic creation and configuration. I have managed to make the topic creation part work. I get a runtime error when trying to alter the configuration. ...
user2138149's user avatar
  • 18.6k
0 votes
0 answers
484 views

I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors. Now, if I try to list out ...
Tarique's user avatar
  • 711
2 votes
2 answers
230 views

Serialization code (Go lang) 1. Producer func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) { producerConfig := getKafkaProducerConfig(config.EnvConfig) producer, err := ...
user20969617's user avatar
0 votes
1 answer
96 views

I'm writing a kafka consumer using confluent-kafka in Python. I'm using the asynccommit API. The documentation for the on_commit callback says the following. I don't understand the different between ...
Prithivi Maruthachalam's user avatar
0 votes
0 answers
2k views

Unable to connect kafka broker using confluent-kafka using oauth. Also I am getting SSL handshake error in the log. but the callback error I am getting "KafkaError{code=\_MSG_TIMED_OUT,val=-192,...
senthil rajendran's user avatar
0 votes
2 answers
782 views

I have started to learn confluent kafka(python). There is 1 producer, 1 topic, 1 partition and 1 consumer(a simple setup). My requirement is that I wish to fetch data collectively. I read that using ...
the_he_man's user avatar
2 votes
2 answers
3k views

I am newbie on Kafka and while reading docs stumbled upon flush() and poll() methods. According to my understanding, flush method flushes the producer buffer to send all events to broker. But what ...
Nishant Neupane's user avatar
1 vote
1 answer
482 views

I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications in aws msk cluster. I am able to send and received with ...
Kotesh Nataru's user avatar
0 votes
0 answers
260 views

I’m relatively new to Kafka and currently trying to set up a Kafka consumer within a Docker Compose environment. Unfortunately, I’ve hit a roadblock with connection issues and I’d greatly appreciate ...
cole's user avatar
  • 155
1 vote
1 answer
198 views

I'm trying to receive the message from the topic using Python script. from confluent_kafka import Consumer, KafkaError import uuid # Kafka broker details broker = "something" topic = "...
Aleksandar's user avatar
0 votes
1 answer
484 views

I am trying to build Stream-Table Join Operations in Kafka python and below is the code that performs join operation on data sent from producer stream_table_join.py from confluent_kafka import ...
Polymath's user avatar
  • 328
2 votes
0 answers
162 views

Good afternoon. Given: a kafka topic with more than 20 partitions and a huge message traffic and one consumer group. It is necessary each time connecting to a topic to reset all the lag available ...
Valtesar's user avatar
0 votes
1 answer
630 views

I deployed a redpanda cluster, and would like to query offset by timestamp. I first tried confluent-kafka python library: import confluent_kafka as ck import uuid c = ck.Consumer({ 'bootstrap....
Xiang Zhang's user avatar
  • 2,983

1
2 3 4 5