Search code examples
javaelasticsearchtitanamazon-elasticsearchelasticsearch-jest

Adding a new HTTP client to Elasticsearch to support client apps to run against AWS Elasticsearch?


I am trying to add Elasticsearch HTTP access to the Titan ES client using JEST. titan-es only supports ES' local and transport (TCP) mode. But I would like to support communication over ES' HTTP interface. That would allow client libraries like titan-es to use AWS Elasticsearch as an indexing backend which only provides a HTTP(S) interface. See this post for more information.

I am looking for some feedback on the approach I am considering so far:

  1. Create a new class ElasticsearchHttpClient that implements the org.elasticache.client.Client interface. The new class will use the JestClient as it's internal client. This way it will communicate with ES over HTTP. The new class will likely extend ES' AbstractClient to reduce the methods that have to be implemented to: admin(), settings(), execute(), threadPool(), and close().
  2. Add a new enum HTTP_CLIENT to ElasticSearchSetup
  3. Ensure that the connect() method on HTTP_CLIENT returns an instance of Connection which contains proper values for node and client. The client member would be an instance of the new ElasticsearchHttpClient class.
  4. Ensure that ElasticSearchIndex.interfaceConfiguration() method retrieves the correct instance of Connection (containing the new ElasticsearchHttpClient) if the INTERFACE is configured as HTTP_CLIENT. From that point on the rest of the code should continue to work over the new protocol.

Does that sound like it should work? The 1st step is my biggest concern - I am not confident that I can implement all Client methods using the JestClient.

package com.thinkaurelius.titan.diskstorage.es;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;

public class ElasticsearchHttpClient extends AbstractClient {
    private final JestClient internalClient;
    private final ThreadPool pool;

    public ElasticsearchHttpClient(String hostname, int port) {
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(String.format("http://%s:%d", hostname, port))
                .multiThreaded(true)
                .build());
        JestClient client = factory.getObject();

        this.pool = new ThreadPool("jest");
        this.internalClient = client;
    }

    @Override
    public AdminClient admin() {
        return null;
    }

    @Override
    public Settings settings() {
        return null;
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        try {
            JestResult response = internalClient.execute(convertRequest(action, request));
            return convertResponse(response);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        execute(action, request);
    }

    private <Response extends ActionResponse> ActionFuture<Response> convertResponse(JestResult result) {
        // TODO How to convert a JestResult a Elasticsearch ActionResponse/ActionFuture?
        return null;
    }

    private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> io.searchbox.action.Action<JestResult> convertRequest(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        // TODO How to convert an Elasticsearch Action<..> and Request to a Jest Action<JestResult>?
        return null;
    }

    @Override
    public ThreadPool threadPool() {
        return pool;
    }

    @Override
    public void close() throws ElasticsearchException {
        pool.shutdownNow();
    }
}

[I also asked this on the Titan mailing list and Elasticsearch forum.]


Solution

  • I've posted an answer in the Titan mailing list.

    What you'd need to do from a Titan perspective is implement the IndexProvider interface. My guess is that it isn't feasible to make Jest look like a full Elasticsearch client.

    I think you would use JestHttpClient -- you don't need to implement the Jest interface. IndexProvider has methods to create/drop/mutate/query an index, which you should be able to do over HTTP. Check the Elasticsearch HTTP documentation to see if you can do all the required methods on IndexProvider with JestHttpClient.

    There's already an ElasticSearchIndex implementation of IndexProvider, which does NODE and TRANSPORT. You're trying to add an HTTP or JEST option. So you might consider shoehorning your changes into ElasticSearchIndex, but I'm not sure how well that will work out since the 2 existing impls are both full Elasticsearch clients. Perhaps consider creating a separate ElasticSearchHttpIndex implements IndexProvider if it's cleaner.