Search code examples
javaapache-flinkapache-nifi

Unable to connect NIFI to flink using NIFI connector


I was trying to use NIFI as a source connector in flink, am getting the following error. My NIFI service is running state, the template is in running state.

Flink Error:

INFO: Source: Custom Source (2/2) (e9ceb92d895d6cd6524ecd0615b219df) switched from RUNNING to FAILED. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at org.apache.http.conn.ssl.DefaultHostnameVerifier.(DefaultHostnameVerifier.java:82) at org.apache.http.impl.client.HttpClientBuilder.build(HttpClientBuilder.java:966) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.setupClient(SiteToSiteRestApiClient.java:283) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getHttpClient(SiteToSiteRestApiClient.java:219) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1189) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1237) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.fetchController(SiteToSiteRestApiClient.java:419) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:394) at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:361) at org.apache.nifi.remote.client.SiteInfoProvider.refreshRemoteInfo(SiteInfoProvider.java:69) at org.apache.nifi.remote.client.SiteInfoProvider.getPortIdentifier(SiteInfoProvider.java:220) at org.apache.nifi.remote.client.SiteInfoProvider.getOutputPortIdentifier(SiteInfoProvider.java:204) at org.apache.nifi.remote.client.socket.SocketClient.getPortIdentifier(SocketClient.java:79) at org.apache.nifi.remote.client.socket.SocketClient.createTransaction(SocketClient.java:121) at org.apache.flink.streaming.connectors.nifi.NiFiSource.run(NiFiSource.java:89) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

public class NiFiSourceTopologyExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
                .url("http://localhost:8080/nifi")
                .portName("CasandraOut")
                .requestBatchCount(5)
                .buildConfig();

        try{

        SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
        DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);

        DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
            @Override
            public String map(NiFiDataPacket value) throws Exception {
                return new String(value.getContent(), Charset.defaultCharset());
            }
        });

        dataStream.print();
        env.execute();
        }catch(Exception e)
        {
            System.out.println("Error->"+e.getMessage());
        }
    }
}

Solution

  • Error java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory means that you are directly or indirectly using Apache commons logging library.

    Exception NoClassDefFoundError may be due to some classpath issue, but most often this is due to missing JAR in the classpath.

    Try adding commons-logging-x.jar into to your project dependency and try again.