Search code examples
javamongodbapache-flinkconnectormongo-connector

Apache Flink job trying to read from Mongo via cdc source connector results in MongoTimeoutException


I am trying to use Mongo CDC connector as a source for my DataStream source in my Flink job. I use the same example code as [per the docs][1].

That's my code:

MongoDBSource<String> mongoSource =
        MongoDBSource.<String>builder()
              .hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
              .username("myUsername")
              .password("myPassword")
              .databaseList("exercises")
              .collectionList("exercises.movies")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
   .setParallelism(1)
   .print();

env.execute("mongo-cdc");

Since I am using a replica set I put the connection URIs for all three nodes with the ports, separated by comma. I tried to use the normal connection URI from Compass, but another error was produced because of the due to mongodb+srv in the beginning and I had to change the URI.

I have my local Flink cluster running and I submit via the terminal: ./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar

It takes about 30 seconds before the job appears in the Flink UI on port 8081 and it constantly switches its status between RUNNING and RESTARTING.

The error that the job produces is this: Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]

I am sure the problem isn't in Mongo because I connect without a problem via the Compass and I also tried the other example for [Mongo sink/source without cdc][2] and the everything works, even when using the standard mongodb+srv URI.

I am using Flink 1.20.0 and these are my dependencies:

<dependency>
        <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.2.0-1.19</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>3.2.1</version>
        </dependency>
    ```

What could I be missing?


  [1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
  [2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/

Solution

  • It turned out that the problem was that I need to include the option for tls when connection to the Atlas cluster:

    MongoDBSource<Shipment> mongoSource =
                 MongoDBSource.<Shipment>builder()
                          .hosts("cluster0-shard-...:27017")
                          .username("username")
                          .password("password")
                          .databaseList("db")
                          .collectionList("db.col")
                          .connectionOptions("tls=true") <--
                          .deserializer(new ShipmentDeserializationSchema())
                          .build();
    

    It connected instantly, without error for 30s timeout, however, since I am using the most basic, free tier in Atlas, I got this error:

    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=db.col, splitId='db.col:0', splitKeyType=[`_id` INT], splitStart=[{"_id": 1}, {"_id": {"$minKey": 1}}], splitEnd=[{"_id": 1}, {"_id": {"$maxKey": 1}}], highWatermark=null} error due to Query failed with error code 8000 with name 'AtlasError' and error message 'noTimeout cursors are disallowed in this atlas tier' on server cluster0-shard-...:27017.
    

    So as far as I researched, unless I upgrade to a non-free cluster, I wouldn't be able to use the stream option with the Atlas free tier. Also, it wasn't necessary to provide all 3 nodes' URIs, but it may be better. I created my own local cluster with 3 nodes and connected to it without a problem and everything was working. In that case, I omitted the tls option since it didn't work with it:

    MongoDBSource<Shipment> mongoSource =
                 MongoDBSource.<Shipment>builder()
                          .hosts("localhost:27018,localhost:27019,localhost:27020")
                          .username("username")
                          .password("password")
                          .databaseList("db")
                          .collectionList("db.col")
                          .deserializer(new ShipmentDeserializationSchema())
                          .build();