I'm using docker desktop running Kubernetes.
I'm setting up my environment using the next configuration:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: kafka-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
network/kafka-network: "true"
podSelector:
matchLabels:
network/kafka-network: "true"
---
apiVersion: v1
kind: Service
metadata:
labels:
service: kafka-controller
name: kafka-controller
spec:
clusterIP: None
selector:
app: kafka-controller
run: kafka-controller
ports:
- name: internal
port: 9093
targetPort: 9093
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-controller
spec:
serviceName: kafka-controller
replicas: 1
selector:
matchLabels:
app: kafka-controller
run: kafka-controller
template:
metadata:
labels:
network/kafka-network: "true"
app: kafka-controller
run: kafka-controller
spec:
hostname: kafka-controller
securityContext:
fsGroup: 1000
enableServiceLinks: false
containers:
- name: kafka-controller
image: apache/kafka:3.9.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9093
env:
- name: KAFKA_OPTS
value: "-Djavax.net.debug=all"
- name: KAFKA_LOG4J_ROOT_LOGLEVEL
value: debug
- name: KAFKA_NODE_ID
value: "0"
- name: KAFKA_PROCESS_ROLES
value: "controller"
- name: KAFKA_LISTENERS
value: "CONTROLLER://:9093"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "[email protected]:9093"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
---
apiVersion: v1
kind: Service
metadata:
labels:
service: kafka-broker
name: kafka-broker
spec:
clusterIP: None
selector:
app: kafka-broker
run: kafka-broker
ports:
- name: internal
port: 29092
targetPort: 29092
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-broker
spec:
serviceName: kafka-broker
replicas: 1
selector:
matchLabels:
app: kafka-broker
run: kafka-broker
template:
metadata:
labels:
network/kafka-network: "true"
app: kafka-broker
run: kafka-broker
spec:
securityContext:
fsGroup: 1000
enableServiceLinks: false
hostname: kafka-broker
containers:
- name: kafka-broker
image: apache/kafka:3.9.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 29092
env:
- name: KAFKA_OPTS
value: "-Djavax.net.debug=all"
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: "broker"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-broker-0.kafka-broker.default.svc.cluster.local:29092"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_LISTENERS
value: "PLAINTEXT://:29092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "[email protected]:9093"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
After initialize the services with the command kubectl apply -f kafka.yml, I see no errors into the logs. If I get into one of the pods:
I can create topics using the command:
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server=kafka-broker:29092 --topic event_topic --partitions 2
I can describe the created topic with:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server=kafka-broker:29092 --describe --topic event_topic
I can publish new messages into the topic using:
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka-broker:29092 --topic event_topic
And validate the message is available into the kafka logs:
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/event_topic-0/00000000000000000000.log
Nevertheless, I'm unable to consume messages from the topic:
/opt/kafka/bin/kafka-console-consumer.sh --topic event_topic --from-beginning --group=c7abf8f3-6ec1-4798-aff5-feeee0886838 --bootstrap-server kafka-broker:29092
Deploying the services using docker-compose appears to work just fine, so, probably... am I misconfiguring something?
I tried so far:
- Remove the group for the consumer
- Use "bootstrap-server" as "kafka-broker-0.kafka-broker.default.svc.cluster.local:29092"
- Use "bootstrap-server" as "kafka-broker-0:29092"
None of them appear to produce an error, but no message is showing up into the consumer.
The only log I'm getting is a constant INFO message into the broker "INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)" as far as I can tell, this is just the consumer offset update.
No other log messages are present into the controller.
Thank you in advance for your help.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTORbeing set by default (not overriding it) to something different, and I set it to 1 (default) instead of 3