2

My current testing configuration looks like so:

version: '3.7'
services:
  postgres:
    image: debezium/postgres
    restart: always
    ports:
      - "5432:5432"
  zookeeper:
    image: debezium/zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
  kafka:
    image: debezium/kafka
    restart: always
    ports:
      - "9092:9092"
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
  connect:
    image: debezium/connect
    restart: always
    ports:
      - "8083:8083"
    links:
      - zookeeper
      - postgres
      - kafka
    depends_on:
      - zookeeper
      - postgres
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

I run it with docker-compose like so:

$ docker-compose up

And I see no error messages. It seems like everything is running ok. If I do docker ps, I see that all services are running.

In order to check that Kafka is running, I made Kafka producer and Kafka consumer in Python:

# producer. I run it in one console window
from kafka import KafkaProducer
from json import dumps
from time import sleep

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

for e in range(1000):
    data = {'number' : e}
    producer.send('numtest', value=data)
    sleep(5)

# consumer. I run it in other colsole window

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
     print(message)

And it works absolutely great. I see how my producer publishes messages and I see how they are consumed in consumer window.

Now I want to make CDC work. First of all, inside Postgres container I set postgres role password to postgres:

$ su postgres
$ psql
psql> \password postgres
Enter new password: postgres

I then created a new database test:

psql> CREATE DATABASE test;

I created a table:

psql> \c test;
test=# create table mytable (id serial, name varchar(128), primary key(id));

And, finally, for my Debezium CDC stack I created a connector:

$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "test-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "pgoutput",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "test",
    "database.server.name": "postgres",
    "database.whitelist": "public.mytable",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "public.some_topic"
    }
}'

{"name":"test-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"test","database.server.name":"postgres","database.whitelist":"public.mytable","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"public.some_topic","name":"test-connector"},"tasks":[],"type":"source"}

As you can see, my connector was created without any errors. Now I expect Debezium CDC to publish all changes to Kafka topic public.some_topic. To check this, I create a new Kafka comsumer:

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'public.some_topic',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
     print(message)

The only difference with the first example, is that I'm watching public.some_topic. I then go to database console and make an insert:

test=# insert into mytable (name) values ('Tom Cat');    
INSERT 0 1
test=#

So, a new value is inserted, but I see nothing is happening in consumer window. In other words, Debezium does not publish events to Kafka public.some_topic. What is wrong with that and how can I fix it?

4
  • 1. If you query the state of the connector, is it still running? 2. Is there anything in the Kafka Connect worker log to show that the connectotor has failed? 3. I would use kafkacat for inspecting topics and producing/consuming data :) Commented Jan 15, 2020 at 15:51
  • @Robin Moffatt. If I run docker ps, I see that my connect service is running. Commented Jan 15, 2020 at 15:52
  • @Robin Moffatt. I've just checked connector logs and see one line repeating: INFO || WorkerSourceTask{id=test-connector2-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] Commented Jan 15, 2020 at 15:56
  • did you solved this, i tried to run your docker-compose but I saw some error onnect_1 | 2020-04-16 06:06:36,922 ERROR || WorkerSourceTask{id=test-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask] connect_1 | io.debezium.jdbc.JdbcConnectionException: ERROR: syntax error connect_1 | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:145) Commented Apr 16, 2020 at 6:08

1 Answer 1

5

Using your Docker Compose I see this error in the Kafka Connect worker log when the connector is created:

Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
        at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
        at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
        ... 9 more

This is also mirrored in the status of the task if you use the Kafka Connect REST API to query it:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
  "name": "test-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.16.5:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.16.5:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
    }
  ],
  "type": "source"

The version of Postgres that you're running is

postgres=# SHOW server_version;
 server_version
----------------
 9.6.16

The pgoutput is only available >= version 10.

I changed your Docker Compose to use version 10:

image: debezium/postgres:10

After bouncing the stack for a clean start and following your instructions, I get a connector that's running:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
source  |  test-connector  |  RUNNING  |  RUNNING  |  io.debezium.connector.postgresql.PostgresConnector

and data in the Kafka topic:

$ docker exec kafkacat kafkacat -b kafka:9092 -t postgres.public.mytable -C
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.mytable.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Tom Cat"},"source":{"version":"1.0.0.Final","connector":"postgresql","name":"postgres","ts_ms":1579172192292,"snapshot":"false","db":"test","schema":"public","table":"mytable","txId":561,"lsn":24485520,"xmin":null},"op":"c","ts_ms":1579172192347}}% Reached end of topic postgres.public.mytable [0] at offset 1

I added kafkacat into your Docker Compose with:

  kafkacat:
    image: edenhill/kafkacat:1.5.0
    container_name: kafkacat
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        while [ 1 -eq 1 ];do sleep 60;done

Edit: retaining previous answer as it's still useful & relevant:

Debezium will write message to a topic based on the name of the table. In your example this would be postgres.test.mytable.

This is why kafkacat is useful, because you can run

kafkacat -b broker:9092 -L 

to see a list of all your topics and partitions. Once you've got the topic

kafkacat -b broker:9092 -t postgres.test.mytable -C

to read from it.

Check out details on kafkacat including how to run it with Docker

There's also a demo of it all in action with Docker Compose here

Sign up to request clarification or add additional context in comments.

5 Comments

kafkacat is a good tool. I checked it. It lists many topics in Kafka. But I see no postgres.test.mytable topic there. I also tried to read the topic postgres.test.mytable. But the whole issue is with Debezium CDC, as I've said it. It does not publish messages to Kafka. And I do not know why.
What topics does kafkacat list as being present? Debezium may be writing to a topic with a different name permutation. Also try recreating your connector with a new name in case the offsets have got munged up.
kafkacat prints this list of topics: "test", "my_connect_configs", "some_topic", "public.some_topic", "my_connect_offsets" ,"__consumer_offsets", "numtest", "my_source_connect_statuses". They have nothing to do with mytable
I tried to recreate connectors many times, but see no effect.Probably, I miss something in my configuration file, that I attached to my question. So, it's a real trouble to debug connector.
Great answer! If upgrading to PG 10+ is no option for whatever reason, you'd have to use the "decoderbufs" or "wal2json" logical decoding plug-ins instead of "pgoutput".

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.