I have a flink streaming job that saves proccessed data to an elasticsearch version 8 cluster, unfortunately I am using flink version 1.13.6 which still uses the elasticsearch version 7 sinks. So I get a parsing error when trying to save data.
I have two options, either delete my current cluster and set up an elasticsearch 7 cluster or enable compatibility mode for the sink.
I can't delete my cluster because I have a large amount of important data that would take forever to recover, and since snapshots don't work from higher to lower versions that leaves me with the second option.
I searched online and found this: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html.
The problem is I don't know where to enable that. Here is my code:
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
public class ElasticMemberSink implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private transient ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> memberEsSinkBuilder;
public ElasticMemberSink(List<HttpHost> httpHosts, String elasticPassword) {
// create member sink
memberEsSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction<Tuple4<String, Integer, Integer, Integer>>() {
public IndexRequest createIndexRequest(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple)
throws JsonProcessingException {
Date date = new Date();
Map<String, Object> json = new HashMap<>();
json.put("serverId", memberSummaryTuple.f0);
json.put("date", String.valueOf(date.getTime()));
json.put("numLeft", memberSummaryTuple.f1);
json.put("numJoined", memberSummaryTuple.f2);
json.put("memberCount", memberSummaryTuple.f3);
return Requests.indexRequest().index("prod-members").type("_doc").source(json);
}
@Override
public void process(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple, RuntimeContext ctx,
RequestIndexer indexer) {
try {
indexer.add(createIndexRequest(memberSummaryTuple));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
// set RestClientFactory to provide authentication
if(elasticPassword != null) {
memberEsSinkBuilder.setRestClientFactory(restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", elasticPassword));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
});
}
// set number of events to be seen before writing to Elasticsearch
memberEsSinkBuilder.setBulkFlushMaxActions(1);
// handle failing elasticsearch requests
memberEsSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw
// custom exceptions
throw failure;
}
}
});
}
public ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> getSinkBuilder() {
return memberEsSinkBuilder;
}
}
There's no immediate solution. Flink's Elasticsearch connector uses the RestHighLevelClient. The compatibility mode was only added to version 7.17, but Elastic has relicenses that client to an incompatible SSPL license. That prevents the current Flink implementation from upgrading. You can track https://issues.apache.org/jira/browse/FLINK-26088 for adding proper support for Elasticsearch 8, though there has been no volunteer yet to pick it up.