0

I am working on a Proof of Concept (PoC) where I am streaming data from a source database into a Kafka topic using Change Data Capture (CDC). The data is successfully being captured in the Kafka topic.

I am using the Kafka JDBC Sink Connector to write this data from Kafka to a target database. However, the connection to the target database is failing, and I am getting an error. here is my jdbc sink connector config:

{
    "name": "jdbc-sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:sqlserver://192.168.0.102:1433;databaseName=targetdatabase",
        "connection.username": "test_user",
        "connection.password": "******",
        "hibernate.dialect": "org.hibernate.dialect.SQLServerDialect",
        "dialect.name": "SqlServerDatabaseDialect",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "primary.key.mode": "record_key",
        "primary.key.fields":"id",
        "schema.evolution": "basic",
        "topics": "sqlserver-cdc.testdatabase.dbo.employees",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,source.ts_ms",
        "decimal.handling.mode": "double"


        
    }
}

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) ... 11 more Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:93) ... 12 more Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_key' cannot have null schema at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyRecordKeyAsPrimaryKey(SinkRecordDescriptor.java:315) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordKeyData(SinkRecordDescriptor.java:288) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:261) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:66) ... 13 more"

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.