Search code examples
apache-flinkflink-streaming

How to lookup db data from a flink job


I am new to Apache Flink.

I am constructing a flink operator. There is a need to fetch data from a relational store to process the streaming data. This is a small quick lookup. I have used a spring-jdbc client to do lookups.

public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {


private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

public FilterCriteriaEvaluator(DataSource dataSource) {
    namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...

However on executing this job on a cluster I get the following error

Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate

The spring db client is not serializable. I then considered using java.sql.DataSource directly. But this is not serializable as well.

Marking the db client transient doesn't help as it is then not serialized as a part of the operator object and I get an NPE while executing the job on the cluster.

What am I missing here? How can I do db lookups from a flink job?


Solution

  • As a workaround, you can mark the JdbcTemplate as transient and make it lazy initializable - something like

    
    private transient JdbcTemplate instance = null;
    
    // Not thread-safe
    public JdbcTemplate getInstance() {
        if(instance == null){
            // init
        }
    
        return instance;
    }
    

    and access it via getInstance() method. However, this way you will have a separate instance per each task slot.

    In order to have a single instance per Task Manager, you can make it static variable. However, this way you will need to care about thread safety and make thread-safe initialiser.

    Alternatively, you should again mark the variable transient, but instead of making getInstance field, extend from RichFunction (or whatever operation you have) and initialise the template in open(Configuration parameters) method, which gets called on initialisation and is suitable for one time setup work.