Search code examples
apache-flinkapache-nifi

Flink to Nifi the Magic Header was not present


I am trying to use this example to connect Nifi to Flink:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8090/nifi")
            .portName("Data for Flink")
            .requestBatchCount(5)
            .buildConfig();

    SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
    DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);

    DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
        @Override
        public String map(NiFiDataPacket value) throws Exception {
            return new String(value.getContent(), Charset.defaultCharset());
        }
    });

    dataStream.print();
    env.execute();

I am running Nifi as a standalone server with default properties, except these properties:

nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true

The call fails each time, with following log in Nifi:

[Site-to-Site Worker Thread-24] o.a.nifi.remote.SocketRemoteSiteListener 
Unable to communicate with remote instance null due to
org.apache.nifi.remote.exception.HandshakeException: Handshake 
with nifi://localhost:61680 failed because the Magic Header 
was not present; closing connection

Nifi version: 1.7.1, Flink version: 1.7.1


Solution

  • After using the nifi-toolkit I removed the custom value of nifi.remote.input.socket.port and then added transportProtocol(SiteToSiteTransportProtocol.HTTP) to my SiteToSiteClientConfig and http://localhost:8080/nifi as the URL.

    The reason why I changed the port in the first place is that without specifying the protocol HTTP it will use RAW by default. And when using the RAW protocol from Flink side, the client cannot create Transaction and prints the following warning:

    Unable to refresh Remote Group's peers due to Remote instance of NiFi 
    is not configured to allow RAW Socket site-to-site communications
    

    That's why I thought it was a port issue

    So now with the default config of Nifi, this works as expected:

    SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
                .url("http://localhost:8080/nifi")
                .portName("portNameAsInNifi")
                .transportProtocol(SiteToSiteTransportProtocol.HTTP)
                .requestBatchCount(1)
                .buildConfig();