Search code examples
javaelasticsearchapache-flinkflink-streamingelasticsearch-7

How to use setApiCompatibilityMode in Elasticsearch7 sink for Flink Streaming Job


I have a flink streaming job that saves proccessed data to an elasticsearch version 8 cluster, unfortunately I am using flink version 1.13.6 which still uses the elasticsearch version 7 sinks. So I get a parsing error when trying to save data.

I have two options, either delete my current cluster and set up an elasticsearch 7 cluster or enable compatibility mode for the sink.

I can't delete my cluster because I have a large amount of important data that would take forever to recover, and since snapshots don't work from higher to lower versions that leaves me with the second option.

I searched online and found this: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html.

The problem is I don't know where to enable that. Here is my code:

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

public class ElasticMemberSink implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private transient ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> memberEsSinkBuilder;

    public ElasticMemberSink(List<HttpHost> httpHosts, String elasticPassword) {
        // create member sink
        memberEsSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
                new ElasticsearchSinkFunction<Tuple4<String, Integer, Integer, Integer>>() {

                    public IndexRequest createIndexRequest(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple)
                            throws JsonProcessingException {
                        Date date = new Date();
                        Map<String, Object> json = new HashMap<>();

                        json.put("serverId", memberSummaryTuple.f0);
                        json.put("date", String.valueOf(date.getTime()));
                        json.put("numLeft", memberSummaryTuple.f1);
                        json.put("numJoined", memberSummaryTuple.f2);
                        json.put("memberCount", memberSummaryTuple.f3);

                        return Requests.indexRequest().index("prod-members").type("_doc").source(json);
                    }

                    @Override
                    public void process(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple, RuntimeContext ctx,
                            RequestIndexer indexer) {

                        try {
                            indexer.add(createIndexRequest(memberSummaryTuple));
                        } catch (JsonProcessingException e) {
                            e.printStackTrace();
                        }

                    }
                });

        // set RestClientFactory to provide authentication
        if(elasticPassword != null) {
            memberEsSinkBuilder.setRestClientFactory(restClientBuilder -> {
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                        // elasticsearch username and password
                        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        credentialsProvider.setCredentials(AuthScope.ANY,
                                new UsernamePasswordCredentials("elastic", elasticPassword));

                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
            });
        }

        // set number of events to be seen before writing to Elasticsearch
        memberEsSinkBuilder.setBulkFlushMaxActions(1);

        // handle failing elasticsearch requests
        memberEsSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
                    throws Throwable {

                if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                    // full queue; re-add document for indexing
                    indexer.add(action);
                } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                    // malformed document; simply drop request without failing sink
                } else {
                    // for all other failures, fail the sink
                    // here the failure is simply rethrown, but users can also choose to throw
                    // custom exceptions
                    throw failure;
                }
            }
        });
    }

    public ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> getSinkBuilder() {
        return memberEsSinkBuilder;
    }
}




Solution

  • There's no immediate solution. Flink's Elasticsearch connector uses the RestHighLevelClient. The compatibility mode was only added to version 7.17, but Elastic has relicenses that client to an incompatible SSPL license. That prevents the current Flink implementation from upgrading. You can track https://issues.apache.org/jira/browse/FLINK-26088 for adding proper support for Elasticsearch 8, though there has been no volunteer yet to pick it up.