I am trying to use Mongo CDC connector as a source for my DataStream source in my Flink job. I use the same example code as [per the docs][1].
That's my code:
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
Since I am using a replica set I put the connection URIs for all three nodes with the ports, separated by comma. I tried to use the normal connection URI from Compass, but another error was produced because of the due to mongodb+srv
in the beginning and I had to change the URI.
I have my local Flink cluster running and I submit via the terminal:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
It takes about 30 seconds before the job appears in the Flink UI on port 8081 and it constantly switches its status between RUNNING
and RESTARTING
.
The error that the job produces is this:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
I am sure the problem isn't in Mongo because I connect without a problem via the Compass and I also tried the other example for [Mongo sink/source without cdc][2] and the everything works, even when using the standard mongodb+srv
URI.
I am using Flink 1.20.0 and these are my dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.2.1</version>
</dependency>
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/
It turned out that the problem was that I need to include the option for tls
when connection to the Atlas cluster:
MongoDBSource<Shipment> mongoSource =
MongoDBSource.<Shipment>builder()
.hosts("cluster0-shard-...:27017")
.username("username")
.password("password")
.databaseList("db")
.collectionList("db.col")
.connectionOptions("tls=true") <--
.deserializer(new ShipmentDeserializationSchema())
.build();
It connected instantly, without error for 30s timeout, however, since I am using the most basic, free tier in Atlas, I got this error:
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=db.col, splitId='db.col:0', splitKeyType=[`_id` INT], splitStart=[{"_id": 1}, {"_id": {"$minKey": 1}}], splitEnd=[{"_id": 1}, {"_id": {"$maxKey": 1}}], highWatermark=null} error due to Query failed with error code 8000 with name 'AtlasError' and error message 'noTimeout cursors are disallowed in this atlas tier' on server cluster0-shard-...:27017.
So as far as I researched, unless I upgrade to a non-free cluster, I wouldn't be able to use the stream option with the Atlas free tier. Also, it wasn't necessary to provide all 3 nodes' URIs, but it may be better. I created my own local cluster with 3 nodes and connected to it without a problem and everything was working. In that case, I omitted the tls
option since it didn't work with it:
MongoDBSource<Shipment> mongoSource =
MongoDBSource.<Shipment>builder()
.hosts("localhost:27018,localhost:27019,localhost:27020")
.username("username")
.password("password")
.databaseList("db")
.collectionList("db.col")
.deserializer(new ShipmentDeserializationSchema())
.build();