I'm playing a bit with the latest versions of Logstash and Kafka but I can't get the Kafka input to work.
Here a brief summary of my setup:
- I'm using Docker Compose with apache/kafka:3.9.0 and logstash:8.16.1 Docker images.
- The Kafka broker is reachable by Logstash, indeed, the Kafka output works as expected with the
generatorandhttpinputs.
This is my logstash.conf
input {
generator {
count => 10
message => "Hello, World!"
}
http {
host => "0.0.0.0"
port => 8080
codec => json {
target => "[data]"
}
}
kafka {
bootstrap_servers => "kafka:19092"
codec => "json"
topics => ["logstash-input"]
group_id => "logstash_group"
client_id => "logstash_consumer"
auto_offset_reset => "earliest"
}
}
output {
stdout {
codec => "rubydebug"
}
kafka {
bootstrap_servers => "kafka:19092"
codec => "json"
topic_id => "logstash-output"
}
}
and this is the docker-compose.yaml:
services:
kafka:
image: "apache/kafka:3.9.0"
hostname: "kafka"
container_name: "kafka"
restart: "always"
labels:
docker-hub: "https://hub.docker.com/r/apache/kafka"
github: "https://github.com/apache/kafka"
readme: "https://github.com/apache/kafka/blob/trunk/docker/examples/README.md#using-environment-variables"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:-EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT}"
KAFKA_ADVERTISED_LISTENERS: "${KAFKA_ADVERTISED_LISTENERS:-EXTERNAL://localhost:9092,INTERNAL://kafka:19092}"
KAFKA_LISTENERS: "${KAFKA_LISTENERS:-EXTERNAL://0.0.0.0:9092,INTERNAL://kafka:19092,CONTROLLER://kafka:29092}"
KAFKA_INTER_BROKER_LISTENER_NAME: "${KAFKA_INTER_BROKER_LISTENER_NAME:-INTERNAL}"
KAFKA_CONTROLLER_LISTENER_NAMES: "${KAFKA_CONTROLLER_LISTENER_NAMES:-CONTROLLER}"
KAFKA_CONTROLLER_QUORUM_VOTERS: "${KAFKA_CONTROLLER_QUORUM_VOTERS:-1@kafka:29092}"
networks:
- logstash-network
ports:
- "9092:9092"
volumes:
- kafka-secrets:/etc/kafka/secrets
- kafka-data:/var/lib/kafka/data
- kafka-config:/mnt/shared/config
logstash:
image: "logstash:8.16.1"
hostname: "logstash"
container_name: "logstash"
restart: "always"
labels:
docker-hub: "https://hub.docker.com/_/logstash"
github: "https://github.com/elastic/logstash"
readme: "https://www.elastic.co/guide/en/logstash/current/docker-config.html"
environment:
LOGSTASH_LOG_LEVEL: "debug"
MONITORING_ENABLED: "${MONITORING_ENABLED:-false}"
XPACK_MONITORING_ENABLED: "${XPACK_MONITORING_ENABLED:-false}"
PIPELINE_ECS__COMPATIBILITY: "${PIPELINE_ECS__COMPATIBILITY:-disabled}"
depends_on:
- kafka
networks:
- logstash-network
ports:
- "9600:9600"
- "8080:8080"
volumes:
- ./volumes/usr/share/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
volumes:
kafka-secrets:
name: "kafka-secrets"
kafka-data:
name: "kafka-data"
kafka-config:
name: "kafka-config"
networks:
logstash-network:
name: "logstash-network"
The Logstash console displays the following lines when filtering by consumer keyword:
docker compose logs -f logstash | grep consumer
logstash | [2024-12-09T17:11:20,682][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] ConsumerConfig values:
logstash | client.id = logstash_consumer-0
logstash | partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
logstash | [2024-12-09T17:11:20,839][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Subscribed to topic(s): logstash-input
logstash | [2024-12-09T17:11:21,047][WARN ][org.apache.kafka.clients.NetworkClient][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Error while fetching metadata with correlation id 2 : {logstash-input=UNKNOWN_TOPIC_OR_PARTITION}
logstash | [2024-12-09T17:11:21,049][INFO ][org.apache.kafka.clients.Metadata][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Cluster ID: 5L6g3nShT-eMCtK--X86sw
logstash | [2024-12-09T17:11:21,169][INFO ][org.apache.kafka.clients.Metadata][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Resetting the last seen epoch of partition logstash-input-0 to 0 since the associated topicId changed from null to Ij5j2AoLRNSgijZQRGzMlw
logstash | [2024-12-09T17:11:21,255][INFO ][org.apache.kafka.clients.Metadata][main] [Producer clientId=logstash] Resetting the last seen epoch of partition logstash-output-0 to 0 since the associated topicId changed from null to N5hIo5WrTZ2E6Dc0XCVm5Q
logstash | [2024-12-09T17:20:21,107][INFO ][org.apache.kafka.clients.NetworkClient][main][1cc47ee71c50b28c8d2ef3535bea0308e2047cb3ebc72456a3d39f1ebf27b3e3] [Consumer clientId=logstash_consumer-0, groupId=logstash_group] Node -1 disconnected.
logstash | [2024-12-09T17:20:55,313][INFO ][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=logstash] Node -1 disconnected.
Similarly, from the Kafka container I get the following lines:
docker compose logs -f kafka | grep consumer
kafka | group.consumer.assignors = [org.apache.kafka.coordinator.group.assignor.UniformAssignor, org.apache.kafka.coordinator.group.assignor.RangeAssignor]
kafka | group.consumer.heartbeat.interval.ms = 5000
kafka | group.consumer.max.heartbeat.interval.ms = 15000
kafka | group.consumer.max.session.timeout.ms = 60000
kafka | group.consumer.max.size = 2147483647
kafka | group.consumer.migration.policy = disabled
kafka | group.consumer.min.heartbeat.interval.ms = 5000
kafka | group.consumer.min.session.timeout.ms = 45000
kafka | group.consumer.session.timeout.ms = 45000
kafka | [2024-12-09 17:11:21,020] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka | [2024-12-09 17:11:21,167] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka | [2024-12-09 17:11:21,273] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka | [2024-12-09 17:11:21,282] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka | [2024-12-09 17:11:21,365] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
kafka | [2024-12-09 17:11:21,383] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
It is possible to test the working http input --> kafka outut path with
curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello, Logstash!"}' http://localhost:8080
Similarly, it is possible to check why the path kafka input --> kafka output is not working with:
jq --compact-output -n --arg id "$(uuidgen)" --arg message "Hello" --arg timestamp "$(date)" '$ARGS.named' | docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh --topic logstash-input --bootstrap-server localhost:9092
Update
Ensuring the logstash-input topic exists on Kafka before starting Logstash doesn't fix the issue.
kafka:
healthcheck:
test: "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic logstash-input --if-not-exists"
interval: 30s
timeout: 10s
retries: 3
logstash:
depends_on:
kafka:
condition: service_healthy
Subscribed to topic(s): logstash-inputbut then there isError while fetching metadata with correlation id 2