0

My collection looks like this Collection

And the source configuration looks like this

curl -X POST "http://localhost:8083/connectors" \
     -H "Content-Type: application/json" \
     -d '{
           "name": "mongodb-source-connector-",
           "config": {
             "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
             "tasks.max": "1",
             "connection.uri": "mongodb://mongo:27017/?replicaSet=rs0",
             "copy.existing": "true",
             "database": "projectsDB",
             "collection": "projects",
             "topic.namespace.map": "{\"projectsDB.projects\": \"mongo.projects\"}",
             "publish.full.document.only": true
           }
         }'

The resulting kafka message key has the following format { "_id": "3" }, but I need it to be just a string value of _id field.

I have already tried to use transformations like this

"transforms": "id",
"transforms.id.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.id.field": "documentKey._id" 

but it did not help and the error was

      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction], found: java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull(Requirements.java:61)\n\tat org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 11 more\n"

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.