**I got something wrong when i try to put the datastream sink to elasticsearch 7.17.9 ** here is my maven dependeies:
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.17.9</elasticsearch.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.16.2</flink.version>
<flink.client.version>1.14.6</flink.client.version>
</properties>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>uk.co.jemos.podam</groupId>
<artifactId>podam</artifactId>
<version>7.2.11.RELEASE</version>
</dependency>
<!--sink function 方法开始 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-java</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.33</version>
</dependency>
my key code here is :
log.info("开始执行 flink sink to elasticsearch 程序!");
SourceFunction<DataChangeDTO> sqlServerSource =
SqlServerSource.<DataChangeDTO>builder()
.hostname(sourceConfiguration.getHostName())
.port(sourceConfiguration.getPort())
.database(sourceConfiguration.getDatabase())
.tableList(String.join(",", sourceConfiguration.getTableList()))
.username(sourceConfiguration.getUserName())
.password(sourceConfiguration.getPassword())
.deserializer(new CustomJsonDebeziumDeserializationSchema())
//.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8080);
HttpHost httpHost = new HttpHost(targetConfiguration.getHostName(), targetConfiguration.getPort(), "http");
//ElasticsearchSink.Builder<DataChangeDTO> esSinkBuilder = new ElasticsearchSink.Builder<>(ListUtil.of(httpHost), new CustomElasticSinkFunction());
//esSinkBuilder.setBulkFlushMaxActions(1);
env
.addSource(sqlServerSource)
.filter(new DataStreamFilter())
.sinkTo(
new Elasticsearch7SinkBuilder<DataChangeDTO>()
// 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)
.setHosts(httpHost)
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)))
.build())
//.print()
//.addSink(esSinkBuilder.build())
.setParallelism(1)
.name("flinkCDC to elasticsearch process");
//.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute(" SqlServer to Elastic ");
and I got error detail:
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/unit/TimeValue
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:109) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:69) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.createBulkProcessor(ElasticsearchWriter.java:198) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.<init>(ElasticsearchWriter.java:105) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:90) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-runtime-1.16.2.jar:1.16.2]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.unit.TimeValue
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_292]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_292]
... 19 common frames omitted
I do not Figure It Out,many thanks
I try it any other way,some like :
<dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-base</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>1.14.6</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- <version>7.17.9</version>-->
<!-- </dependency>-->
and use env.addSink instead sink to ... but the problem still happend
I try remove cdc step to build a bounded stream sink to elasticsearch ,and I got the same error,plz see the picture: enter image description here
The Elasticsearch version that you're trying to use is not compatible with the Flink Elasticsearch connector. The 1.14 version of that uses 7.5.1, as can be found in https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40
Flink can't upgrade to 7.17.9, because that newer version uses the SSPL license which is incompatible with the Apache 2.0 license.