Search code examples
scalaelasticsearchapache-flinkflink-streaming

Authenticate with ECE ElasticSearch Sink from Apache Fink (Scala code)


Compiler error when using example provided in Flink documentation. The Flink documentation provides sample Scala code to set the REST client factory parameters when talking to Elasticsearch, https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html. When trying out this code i get a compiler error in IntelliJ which says "Cannot resolve symbol restClientBuilder".

I found the following SO which is EXACTLY my problem except that it is in Java and i am doing this in Scala. Apache Flink (v1.6.0) authenticate Elasticsearch Sink (v6.4)

I tried copy pasting the solution code provided in the above SO into IntelliJ, the auto-converted code also has compiler errors.

      // provide a RestClientFactory for custom configuration on the internally created REST client
      // i only show the setMaxRetryTimeoutMillis for illustration purposes, the actual code will use HTTP cutom callback
      esSinkBuilder.setRestClientFactory(
        restClientBuilder -> {
          restClientBuilder.setMaxRetryTimeoutMillis(10)
        }
      )

Then i tried (auto generated Java to Scala code by IntelliJ)

// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
      import org.apache.http.auth.AuthScope
      import org.apache.http.auth.UsernamePasswordCredentials
      import org.apache.http.client.CredentialsProvider
      import org.apache.http.impl.client.BasicCredentialsProvider
      import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
      import org.elasticsearch.client.RestClientBuilder
      // provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client

      esSinkBuilder.setRestClientFactory((restClientBuilder) => {
        def foo(restClientBuilder) = restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
          override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { // elasticsearch username and password
            val credentialsProvider = new BasicCredentialsProvider
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
          }
        })

        foo(restClientBuilder)
      })

The original code snippet produces the error "cannot resolve RestClientFactory" and then Java to Scala shows several other errors.

So basically i need to find a Scala version of the solution described in Apache Flink (v1.6.0) authenticate Elasticsearch Sink (v6.4)


Update 1: I was able to make some progress with some help from IntelliJ. The following code compiles and runs but there is another problem.

esSinkBuilder.setRestClientFactory(
          new RestClientFactory {
            override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
              restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
                  // elasticsearch username and password
                  val credentialsProvider = new BasicCredentialsProvider
                  credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
                  httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                  httpClientBuilder.setSSLContext(trustfulSslContext)
                }
              })
            }
          }

The problem is that i am not sure if i should be doing a new of the RestClientFactory object. What happens is that the application connects to the elasticsearch cluster but then discovers that the SSL CERT is not valid, so i had to put the trustfullSslContext (as described here https://gist.github.com/iRevive/4a3c7cb96374da5da80d4538f3da17cb), this got me past the SSL issue but now the ES REST Client does a ping test and the ping fails, it throws an exception and the app shutsdown. I am suspecting that the ping fails because of the SSL error and maybe it is not using the trustfulSslContext i setup as part of new RestClientFactory and this makes me suspect that i should not have done the new, there should be a simple way to update the existing RestclientFactory object and basically this is all happening because of my lack of Scala knowledge.


Solution

  • Happy to report that this is resolved. The code i posted in Update 1 is correct. The ping to ECE was not working for two reasons:

    1. The certificate needs to include the complete chain including the root CA, the intermediate CA and the cert for the ECE. This helped get rid of the whole trustfulSslContext stuff.

    2. The ECE was sitting behind an ha-proxy and the proxy did the mapping for the hostname in the HTTP request to the actual deployment cluster name in ECE. this mapping logic did not take into account that the Java REST High Level client uses the org.apache.httphost class which creates the hostname as hostname:port_number even when the port number is 443. Since it did not find the mapping because of the 443 therefore the ECE returned a 404 error instead of 200 ok (only way to find this was to look at unencrypted packets at the ha-proxy). Once the mapping logic in ha-proxy was fixed, the mapping was found and the pings are now successfull.