Search code examples
apache-flinkflink-sql

How to Manage Data Source Connection Pool in Flink


I am trying to run a Flink client Application , where i am reading the data from a file. Each record in the file should be verified against the record in the database using ProcessFunction operator. I have configured a Data Source with single ton design pattern. I have below questions

  1. Does this data source will be shared across all the Task Managers and Task Slots?
  2. Is it a good idea to create the data source connection within the processor in Open method with only one as maximum connection pool Size ?
   public void open(Configuration parameters) throws Exception {
       //1. If i open a Connection Here
       //2. Shall i declare a Data Source COnnection here with only 1 as Maximum No of Connections
       super.open(parameters);
   }

Solution

  • If I've understood your use case correctly, this would be more easily implemented as a Lookup Join with the table in the external database.

    If you must implement this yourself, it will work better if you use Flink's async i/o operator, rather than making a synchronous request to the database in a ProcessFunction. (Doing blocking i/o in Flink's user functions tends to cause problems and should be avoided.)

    But to help answer your original questions:

    All of the task slots within a given task manager are in the same JVM. But each task manager is in a separate JVM. Each task slot will have its own instance of the process function; and each instance will be running in a different thread.

    It is not possible to have a global connection pool that shares connections between task managers. It is possible to use a static class to establish a connection pool shared across the slots in a multi-slot task manager, but using static classes this way is considered an anti-pattern in Flink. It can cause deadlocks, and also requires being careful about classloading (static means one instance per classloader, so you have to ensure that the class is loaded by the parent classloader, either by placing the class in /lib, or by configuring classloader.parent-first-patterns.additional (docs) to pick up this particular class).

    For more on why you shouldn't do this, watch https://youtu.be/F7HQd3KX2TQ?t=1407.

    Where connection pooling can be a good idea is in combination with Flink's async i/o operator. There each operator instance is managing a pool of concurrent requests to an external database or service, and using connection pooling there can improve performance. But many async client libraries already do this, in which case there's no need to do it yourself.