1

I'm playing a bit with the latest versions of Logstash and Kafka but I can't get the Kafka input to work.

Here a brief summary of my setup:

  1. I'm using Docker Compose with apache/kafka:3.9.0 and logstash:8.16.1 Docker images.
  2. The Kafka broker is reachable by Logstash, indeed, the Kafka output works as expected with the generator and http inputs.

This is my logstash.conf

input {
  generator {
    count => 10
    message => "Hello, World!"
  }
  http {
    host => "0.0.0.0"
    port => 8080
    codec => json {
        target => "[data]"
    }
  }
  kafka {
    bootstrap_servers => "kafka:19092"
    codec => "json"
    topics => ["logstash-input"]
    group_id => "logstash_group"
    client_id => "logstash_consumer"
    auto_offset_reset => "earliest"
  }
}

output {
  stdout {
    codec => "rubydebug"
  }
  kafka {
    bootstrap_servers => "kafka:19092"
    codec => "json"
    topic_id => "logstash-output"
  }
}

and this is the docker-compose.yaml:

services:
  kafka:
    image: "apache/kafka:3.9.0"
    hostname: "kafka"
    container_name: "kafka"
    restart: "always"
    labels:
      docker-hub: "https://hub.docker.com/r/apache/kafka"
      github: "https://github.com/apache/kafka"
      readme: "https://github.com/apache/kafka/blob/trunk/docker/examples/README.md#using-environment-variables"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LOG_DIRS: "/var/lib/kafka/data"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:-EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT}"
      KAFKA_ADVERTISED_LISTENERS: "${KAFKA_ADVERTISED_LISTENERS:-EXTERNAL://localhost:9092,INTERNAL://kafka:19092}"
      KAFKA_LISTENERS: "${KAFKA_LISTENERS:-EXTERNAL://0.0.0.0:9092,INTERNAL://kafka:19092,CONTROLLER://kafka:29092}"
      KAFKA_INTER_BROKER_LISTENER_NAME: "${KAFKA_INTER_BROKER_LISTENER_NAME:-INTERNAL}"
      KAFKA_CONTROLLER_LISTENER_NAMES: "${KAFKA_CONTROLLER_LISTENER_NAMES:-CONTROLLER}"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "${KAFKA_CONTROLLER_QUORUM_VOTERS:-1@kafka:29092}"
    networks:
      - logstash-network
    ports:
      - "9092:9092"
    volumes:
      - kafka-secrets:/etc/kafka/secrets
      - kafka-data:/var/lib/kafka/data
      - kafka-config:/mnt/shared/config

  logstash:
    image: "logstash:8.16.1"
    hostname: "logstash"
    container_name: "logstash"
    restart: "always"
    labels:
      docker-hub: "https://hub.docker.com/_/logstash"
      github: "https://github.com/elastic/logstash"
      readme: "https://www.elastic.co/guide/en/logstash/current/docker-config.html"
    environment:
      LOGSTASH_LOG_LEVEL: "debug"
      MONITORING_ENABLED: "${MONITORING_ENABLED:-false}"
      XPACK_MONITORING_ENABLED: "${XPACK_MONITORING_ENABLED:-false}"
      PIPELINE_ECS__COMPATIBILITY: "${PIPELINE_ECS__COMPATIBILITY:-disabled}"
    depends_on:
      - kafka
    networks:
      - logstash-network
    ports:
      - "9600:9600"
      - "8080:8080"
    volumes:
      - ./volumes/usr/share/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro

volumes:
  kafka-secrets:
    name: "kafka-secrets"
  kafka-data:
    name: "kafka-data"
  kafka-config:
    name: "kafka-config"

networks:
  logstash-network:
    name: "logstash-network"

The Logstash console displays the following lines when filtering by consumer keyword:

docker compose logs -f logstash | grep consumer
logstash  | [2024-12-09T17:11:20,682][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] ConsumerConfig values: 
logstash  |     client.id = logstash_consumer-0
logstash  |     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
logstash  | [2024-12-09T17:11:20,839][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Subscribed to topic(s): logstash-input
logstash  | [2024-12-09T17:11:21,047][WARN ][org.apache.kafka.clients.NetworkClient][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Error while fetching metadata with correlation id 2 : {logstash-input=UNKNOWN_TOPIC_OR_PARTITION}
logstash  | [2024-12-09T17:11:21,049][INFO ][org.apache.kafka.clients.Metadata][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Cluster ID: 5L6g3nShT-eMCtK--X86sw
logstash  | [2024-12-09T17:11:21,169][INFO ][org.apache.kafka.clients.Metadata][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Resetting the last seen epoch of partition logstash-input-0 to 0 since the associated topicId changed from null to Ij5j2AoLRNSgijZQRGzMlw
logstash  | [2024-12-09T17:11:21,255][INFO ][org.apache.kafka.clients.Metadata][main] [Producer clientId=logstash] Resetting the last seen epoch of partition logstash-output-0 to 0 since the associated topicId changed from null to N5hIo5WrTZ2E6Dc0XCVm5Q
logstash  | [2024-12-09T17:20:21,107][INFO ][org.apache.kafka.clients.NetworkClient][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Node -1 disconnected.
logstash  | [2024-12-09T17:20:55,313][INFO ][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=logstash] Node -1 disconnected.

Similarly, from the Kafka container I get the following lines:

docker compose logs -f kafka | grep consumer
kafka  |        group.consumer.assignors = [org.apache.kafka.coordinator.group.assignor.UniformAssignor, org.apache.kafka.coordinator.group.assignor.RangeAssignor]
kafka  |        group.consumer.heartbeat.interval.ms = 5000
kafka  |        group.consumer.max.heartbeat.interval.ms = 15000
kafka  |        group.consumer.max.session.timeout.ms = 60000
kafka  |        group.consumer.max.size = 2147483647
kafka  |        group.consumer.migration.policy = disabled
kafka  |        group.consumer.min.heartbeat.interval.ms = 5000
kafka  |        group.consumer.min.session.timeout.ms = 45000
kafka  |        group.consumer.session.timeout.ms = 45000
kafka  | [2024-12-09 17:11:21,020] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka  | [2024-12-09 17:11:21,167] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka  | [2024-12-09 17:11:21,273] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka  | [2024-12-09 17:11:21,282] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka  | [2024-12-09 17:11:21,365] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka  | [2024-12-09 17:11:21,383] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)

It is possible to test the working http input --> kafka outut path with

curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello, Logstash!"}' http://localhost:8080

Similarly, it is possible to check why the path kafka input --> kafka output is not working with:

jq --compact-output -n --arg id "$(uuidgen)" --arg message "Hello" --arg timestamp "$(date)" '$ARGS.named' | docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh --topic logstash-input --bootstrap-server localhost:9092

Update

Ensuring the logstash-input topic exists on Kafka before starting Logstash doesn't fix the issue.

  kafka:
      healthcheck:
        test: "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic logstash-input --if-not-exists"
        interval: 30s
        timeout: 10s
        retries: 3

  logstash:
    depends_on:
      kafka:
        condition: service_healthy
4
  • Kafka is telling you the logstash-input topic does not exist. Why do you think it should? What is creating it? Commented Dec 10, 2024 at 4:39
  • I'm creating it via kafka-console-producer.sh Commented Dec 10, 2024 at 8:50
  • Is there a way to configure consumer reconnection if the topic doesn't exist yet when Logstash tries to connect? Commented Dec 10, 2024 at 8:51
  • 1
    moreover in the logs you can see Subscribed to topic(s): logstash-input but then there is Error while fetching metadata with correlation id 2 Commented Dec 10, 2024 at 11:38

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.