I am trying to use Mongo CDC connector as a source for my DataStream source in my Flink job. I use the same example code as [per the docs][1].
That's my code:
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
Since I am using a replica set I put the connection URIs for all three nodes with the ports, separated by comma. I tried to use the normal connection URI from Compass, but another error was produced because of the due to mongodb+srv in the beginning and I had to change the URI.
I have my local Flink cluster running and I submit via the terminal:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
It takes about 30 seconds before the job appears in the Flink UI on port 8081 and it constantly switches its status between RUNNING and RESTARTING.
The error that the job produces is this:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
I am sure the problem isn't in Mongo because I connect without a problem via the Compass and I also tried the other example for [Mongo sink/source without cdc][2] and the everything works, even when using the standard mongodb+srv URI.
I am using Flink 1.20.0 and these are my dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.2.1</version>
</dependency>
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/