Search code examples
apache-flinkflink-streaming

FlinkCDC sink to elasticsearch org.elasticsearch.common.unit.TimeValue


**I got something wrong when i try to put the datastream sink to elasticsearch 7.17.9 ** here is my maven dependeies:

    <properties>
        <java.version>1.8</java.version>
        <elasticsearch.version>7.17.9</elasticsearch.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.16.2</flink.version>
        <flink.client.version>1.14.6</flink.client.version>
    </properties>

 <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>uk.co.jemos.podam</groupId>
            <artifactId>podam</artifactId>
            <version>7.2.11.RELEASE</version>
        </dependency>

        <!--sink function 方法开始         -->


<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-java</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-streaming-java_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-clients_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->






        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.5.0</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.9</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.33</version>
        </dependency>

my key code here is :

    log.info("开始执行 flink sink to elasticsearch 程序!");
        SourceFunction<DataChangeDTO> sqlServerSource =
                SqlServerSource.<DataChangeDTO>builder()
                        .hostname(sourceConfiguration.getHostName())
                        .port(sourceConfiguration.getPort())
                        .database(sourceConfiguration.getDatabase())
                        .tableList(String.join(",", sourceConfiguration.getTableList()))
                        .username(sourceConfiguration.getUserName())
                        .password(sourceConfiguration.getPassword())
                        .deserializer(new CustomJsonDebeziumDeserializationSchema())
                        //.startupOptions(StartupOptions.initial())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //enable checkpoint
        env.enableCheckpointing(3000);
        // set the source parallelism to 2
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8080);
        HttpHost httpHost = new HttpHost(targetConfiguration.getHostName(), targetConfiguration.getPort(), "http");
        //ElasticsearchSink.Builder<DataChangeDTO> esSinkBuilder = new ElasticsearchSink.Builder<>(ListUtil.of(httpHost), new CustomElasticSinkFunction());
        //esSinkBuilder.setBulkFlushMaxActions(1);

        env
                .addSource(sqlServerSource)
                .filter(new DataStreamFilter())
                .sinkTo(
                        new Elasticsearch7SinkBuilder<DataChangeDTO>()
                                // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
                                .setBulkFlushMaxActions(1)
                                .setHosts(httpHost)
                                .setEmitter(
                                        (element, context, indexer) ->
                                                indexer.add(createIndexRequest(element)))
                                .build())
                //.print()
                //.addSink(esSinkBuilder.build())
                .setParallelism(1)
                .name("flinkCDC to elasticsearch process");
                //.setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute(" SqlServer to Elastic ");

and I got error detail:

Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/unit/TimeValue
    at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:109) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
    at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:69) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
    at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.createBulkProcessor(ElasticsearchWriter.java:198) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
    at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.<init>(ElasticsearchWriter.java:105) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
    at org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:90) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) ~[flink-streaming-java-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-runtime-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-runtime-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-runtime-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-runtime-1.16.2.jar:1.16.2]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.unit.TimeValue
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_292]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_292]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[na:1.8.0_292]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_292]
    ... 19 common frames omitted

I do not Figure It Out,many thanks

I try it any other way,some like :

  <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-base</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-streaming-java_2.12</artifactId>-->
<!--            <version>1.14.6</version>-->
<!--            <scope>provided</scope>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->


<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <artifactId>slf4j-api</artifactId>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.elasticsearch</groupId>-->
<!--            <artifactId>elasticsearch</artifactId>-->
<!--            <version>7.17.9</version>-->
<!--        </dependency>-->

and use env.addSink instead sink to ... but the problem still happend

I try remove cdc step to build a bounded stream sink to elasticsearch ,and I got the same error,plz see the picture: enter image description here


Solution

  • The Elasticsearch version that you're trying to use is not compatible with the Flink Elasticsearch connector. The 1.14 version of that uses 7.5.1, as can be found in https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40

    Flink can't upgrade to 7.17.9, because that newer version uses the SSPL license which is incompatible with the Apache 2.0 license.