0

I am trying to use Mongo CDC connector as a source for my DataStream source in my Flink job. I use the same example code as [per the docs][1].

That's my code:

MongoDBSource<String> mongoSource =
        MongoDBSource.<String>builder()
              .hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
              .username("myUsername")
              .password("myPassword")
              .databaseList("exercises")
              .collectionList("exercises.movies")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
   .setParallelism(1)
   .print();

env.execute("mongo-cdc");

Since I am using a replica set I put the connection URIs for all three nodes with the ports, separated by comma. I tried to use the normal connection URI from Compass, but another error was produced because of the due to mongodb+srv in the beginning and I had to change the URI.

I have my local Flink cluster running and I submit via the terminal: ./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar

It takes about 30 seconds before the job appears in the Flink UI on port 8081 and it constantly switches its status between RUNNING and RESTARTING.

The error that the job produces is this: Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]

I am sure the problem isn't in Mongo because I connect without a problem via the Compass and I also tried the other example for [Mongo sink/source without cdc][2] and the everything works, even when using the standard mongodb+srv URI.

I am using Flink 1.20.0 and these are my dependencies:

<dependency>
        <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.2.0-1.19</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>3.2.1</version>
        </dependency>
    ```

What could I be missing?


  [1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
  [2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/

2 Answers 2

0

1.Correct the hosts Parameter The hosts parameter must list all replica set members with their IPs or hostnames and ports. Replace and <...> with actual values. For example:

MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
      .hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
      .username("yourUsername")
      .password("yourPassword")
      .databaseList("exercises")
      .collectionList("exercises.movies")
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build();
  • Important: Avoid using mongodb+srv because it’s not supported by the Flink MongoDB CDC connector.

2.Ensure Replica Set Configuration Verify that your MongoDB cluster is running as a replica set:

  • Log in to the Mongo shell or Compass and run:

    rs.status()

  • The command should return the status of your replica set. If it doesn’t, configure your MongoDB server to enable replication.

3.Enable Checkpointing MongoDB CDC requires checkpointing. Add this to your code:

env.enableCheckpointing(5000); 

4.Increase Timeout Settings Add the connection timeout parameter to allow more time for establishing connections:

.connectionTimeout(60000) // Timeout in milliseconds

5.Check MongoDB Permissions Ensure that the user has read access to the database and oplog. In MongoDB Compass, assign the read or readWrite roles to the user for the target database.

6.Dependencies Use the correct dependencies for Flink 1.20:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.20.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.20.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>1.2.0-1.19</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb-cdc</artifactId>
    <version>3.2.1</version>
</dependency>

7.Test Network Connectivity Use telnet or ping to ensure your machine can reach the replica set nodes:

telnet cluster0-shard-00-00.mongodb.net 27017

8.Simplify and Test Run this minimal job:

MongoDBSource<String> mongoSource =
    MongoDBSource.<String>builder()
          .hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
          .username("yourUsername")
          .password("yourPassword")
          .databaseList("exercises")
          .collectionList("exercises.movies")
          .deserializer(new JsonDebeziumDeserializationSchema())
          .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
   .setParallelism(1)
   .print();

env.execute("mongo-cdc");

9.Check Flink Logs After running the job, inspect logs via Flink's web UI (http://localhost:8081) or terminal for more specific details if issues persist.

10.Retry with SRV Connection (if needed) If you must use mongodb+srv, consider using a custom connection solution, as the Flink CDC connector doesn’t natively support it.

Sign up to request clarification or add additional context in comments.

1 Comment

I have tried all of the points, but to no avail. The mongo replica set is running perfectly as well. About the first one, I intentionally used a shortened version in my post.
0

It turned out that the problem was that I need to include the option for tls when connection to the Atlas cluster:

MongoDBSource<Shipment> mongoSource =
             MongoDBSource.<Shipment>builder()
                      .hosts("cluster0-shard-...:27017")
                      .username("username")
                      .password("password")
                      .databaseList("db")
                      .collectionList("db.col")
                      .connectionOptions("tls=true") <--
                      .deserializer(new ShipmentDeserializationSchema())
                      .build();

It connected instantly, without error for 30s timeout, however, since I am using the most basic, free tier in Atlas, I got this error:

Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=db.col, splitId='db.col:0', splitKeyType=[`_id` INT], splitStart=[{"_id": 1}, {"_id": {"$minKey": 1}}], splitEnd=[{"_id": 1}, {"_id": {"$maxKey": 1}}], highWatermark=null} error due to Query failed with error code 8000 with name 'AtlasError' and error message 'noTimeout cursors are disallowed in this atlas tier' on server cluster0-shard-...:27017.

So as far as I researched, unless I upgrade to a non-free cluster, I wouldn't be able to use the stream option with the Atlas free tier. Also, it wasn't necessary to provide all 3 nodes' URIs, but it may be better. I created my own local cluster with 3 nodes and connected to it without a problem and everything was working. In that case, I omitted the tls option since it didn't work with it:

MongoDBSource<Shipment> mongoSource =
             MongoDBSource.<Shipment>builder()
                      .hosts("localhost:27018,localhost:27019,localhost:27020")
                      .username("username")
                      .password("password")
                      .databaseList("db")
                      .collectionList("db.col")
                      .deserializer(new ShipmentDeserializationSchema())
                      .build();

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.