2

I'm using docker desktop running Kubernetes.

I'm setting up my environment using the next configuration:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: kafka-network
spec:
  ingress:
    - from:
        - podSelector:
            matchLabels:
              network/kafka-network: "true"
  podSelector:
    matchLabels:
      network/kafka-network: "true"
---
apiVersion: v1
kind: Service
metadata:
  labels:
    service: kafka-controller
  name: kafka-controller
spec:
  clusterIP: None
  selector:
    app: kafka-controller
    run: kafka-controller
  ports:
    - name: internal
      port: 9093
      targetPort: 9093
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-controller
spec:
  serviceName: kafka-controller
  replicas: 1
  selector:
    matchLabels:
      app: kafka-controller
      run: kafka-controller
  template:
    metadata:
      labels:
        network/kafka-network: "true"
        app: kafka-controller
        run: kafka-controller
    spec:
      hostname: kafka-controller
      securityContext:
        fsGroup: 1000
      enableServiceLinks: false
      containers:
        - name: kafka-controller
          image: apache/kafka:3.9.0
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9093
          env:
            - name: KAFKA_OPTS
              value: "-Djavax.net.debug=all"
            - name: KAFKA_LOG4J_ROOT_LOGLEVEL
              value: debug
            - name: KAFKA_NODE_ID
              value: "0"
            - name: KAFKA_PROCESS_ROLES
              value: "controller"
            - name: KAFKA_LISTENERS
              value: "CONTROLLER://:9093"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "PLAINTEXT"
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: "[email protected]:9093"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
---
apiVersion: v1
kind: Service
metadata:
  labels:
    service: kafka-broker
  name: kafka-broker
spec:
  clusterIP: None
  selector:
    app: kafka-broker
    run: kafka-broker
  ports:
    - name: internal
      port: 29092
      targetPort: 29092
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-broker
spec:
  serviceName: kafka-broker
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
      run: kafka-broker
  template:
    metadata:
      labels:
        network/kafka-network: "true"
        app: kafka-broker
        run: kafka-broker
    spec:
      securityContext:
        fsGroup: 1000
      enableServiceLinks: false
      hostname: kafka-broker
      containers:
        - name: kafka-broker
          image: apache/kafka:3.9.0
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 29092
          env:
            - name: KAFKA_OPTS
              value: "-Djavax.net.debug=all"
            - name: KAFKA_NODE_ID
              value: "1"
            - name: KAFKA_PROCESS_ROLES
              value: "broker"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "PLAINTEXT://kafka-broker-0.kafka-broker.default.svc.cluster.local:29092"
            - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
              value: "true"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "PLAINTEXT"
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_LISTENERS
              value: "PLAINTEXT://:29092"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: "[email protected]:9093"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"

After initialize the services with the command kubectl apply -f kafka.yml, I see no errors into the logs. If I get into one of the pods:

I can create topics using the command:

/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server=kafka-broker:29092 --topic event_topic --partitions 2

I can describe the created topic with:

/opt/kafka/bin/kafka-topics.sh --bootstrap-server=kafka-broker:29092 --describe --topic event_topic

I can publish new messages into the topic using:

/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka-broker:29092 --topic event_topic

And validate the message is available into the kafka logs:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/event_topic-0/00000000000000000000.log

Nevertheless, I'm unable to consume messages from the topic:

/opt/kafka/bin/kafka-console-consumer.sh --topic event_topic --from-beginning --group=c7abf8f3-6ec1-4798-aff5-feeee0886838 --bootstrap-server kafka-broker:29092

Deploying the services using docker-compose appears to work just fine, so, probably... am I misconfiguring something?

I tried so far:

  • Remove the group for the consumer
  • Use "bootstrap-server" as "kafka-broker-0.kafka-broker.default.svc.cluster.local:29092"
  • Use "bootstrap-server" as "kafka-broker-0:29092"

None of them appear to produce an error, but no message is showing up into the consumer.

The only log I'm getting is a constant INFO message into the broker "INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)" as far as I can tell, this is just the consumer offset update.

No other log messages are present into the controller.

Thank you in advance for your help.

2
  • 5
    I had the same error using the official kafka image and overriding the environment variables, in my case it was due to KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR being set by default (not overriding it) to something different, and I set it to 1 (default) instead of 3 Commented May 8 at 10:11
  • @user5384518 it helped, thanks !!! Commented Oct 1 at 8:49

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.