Search code examples
pythonpython-3.xelasticsearchelasticsearch-pluginelasticsearch-dsl

How do you use the Elasticsearch Ingest Attachment Processor Plugin with the Python package elasticsearch-dsl


I'm having trouble trying to use the Ingest Attachment Processor Plugin with ElasticSearch (5.5 on AWS, 5.6 local). I'm developing in Python (3.6) and am using the elasticsearch-dls library.

I'm using Persistence and have my class set-up like this:

import base64
from elasticsearch_dsl.field import Attachment, Text
from elasticsearch_dsl import DocType, analyzer

lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"])

class ExampleIndex(DocType):
class Meta:
    index = 'example'
    doc_type = 'Example'

    id = Text()
    name = Text(analyzer=lower_keyword)
    my_file = Attachment()

I then have a function like this that I call to create the index and save the document.

def index_doc(a_file):
    # Ensure that the Index is created before any documents are saved
    try:
        i = Index('example')
        i.doc_type(ExampleIndex)
        i.create()

        # todo - Pipeline creation needs to go here - But how do you do it?

    except Exception:
        pass

    # Check for existing index
    indices = ExampleIndex()
    try:
        s = indices.search()
        r = s.query('match', name=a_file.name).execute()
        if r.success():
            for h in r:
                indices = ExampleIndex.get(id=h.meta.id)
                break
    except NotFoundError:
        pass
    except Exception:
        logger.exception("Something went wrong")
        raise

    # Populate the document   
    indices.name = a_file.name
    with open(a_file.path_to_file, 'rb') as f:
        contents = f.read()
    indices.my_file = base64.b64encode(contents).decode("ascii")

    indices.save(pipeline="attachment") if indices.my_file else indices.save()

I have a text file with the contents This is a test document. When it's contents are base64 encoded they become VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK

If I use CURL directly then it works:

Create the pipline:

curl -XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' -H 'Content-Type: application/json' -d' {   "description" : "Extract attachment information",   "processors" : [
    {
      "attachment" : {
        "field" : "my_file"
      }
    }   ] }

Put the data

curl -XPUT 'localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pipeline=attachment&pretty'\
-H 'Content-Type: application/json' \
-d '{"my_file": "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK"}'

Fetch the data http://localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pretty

{
    "_index" : "example",
    "_type" : "Example",
    "_id" : "AV9nkyJMZAQ2lQ3CtsLb",
    "_version" : 4,
    "found" : true,
    "_source" : {
        "my_file" : "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK",
        "attachment" : {
            "content_type" : "text/plain; charset=ISO-8859-1",
            "language" : "en",
            "content" : "This is a test document",
            "content_length" : 25
        }
    }
}

The trouble is I can not see how to recreate this using the elasticsearch-dsl Python library

UPDATE: I can get everything working now other than the initial creation of the pipeline. If I create the pipeline using CURL then I can use it by simply changing the .save() method call to .save(pipeline="attachment"). I've updated my earlier function to show this as well as to have a comment on where the pipline creation would need to go.

Here is an example of the CURL implementation of creating the pipeline

curl - XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' \
     - H 'Content-Type: application/json' \
     - d '"description": "Extract attachment information","processors": [{"attachment": {"field": "my_field"}}]}'

Solution

  • The answer to the question is to use the IngestClient from the lower-level elasticseatch.py library to create the Pipeline before you use it.

    from elasticsearch.client.ingest import IngestClient
    p = IngestClient(es_connection)
    p.put_pipeline(id='attachment', body={
        'description': "Extract attachment information",
        'processors': [
            {"attachment": {"field": "cv"}}
        ]
    })
    

    A full working example of creating a pipeline, an index and documents within ElasticSearch using the elasticsearch-dsl Persistence flow (DocType) is:

    import base64
    from uuid import uuid4
    from elasticsearch.client.ingest import IngestClient
    from elasticsearch.exceptions import NotFoundError
    from elasticsearch_dsl import analyzer, DocType, Index
    from elasticsearch_dsl.connections import connections
    from elasticsearch_dsl.field import Attachment, Text
    
    
    # Establish a connection
    host = '127.0.0.1'
    port = 9200
    es = connections.create_connection(host=host, port=port)
    
    # Some custom analyzers
    html_strip = analyzer('html_strip', tokenizer="standard", filter=["standard", "lowercase", "stop", "snowball"],
                          char_filter=["html_strip"])
    lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"])
    
    
    class ExampleIndex(DocType):
        class Meta:
            index = 'example'
            doc_type = 'Example'
    
        id = Text()
        uuid = Text()
        name = Text()
        town = Text(analyzer=lower_keyword)
        my_file = Attachment(analyzer=html_strip)
    
    
    def save_document(doc):
        """
    
        :param obj doc: Example object containing values to save
        :return:
        """
        try:
            # Create the Pipeline BEFORE creating the index
            p = IngestClient(es)
            p.put_pipeline(id='myattachment', body={
                'description': "Extract attachment information",
                'processors': [
                    {
                        "attachment": {
                            "field": "my_file"
                        }
                    }
                ]
            })
    
            # Create the index. An exception will be raise if it already exists
            i = Index('example')
            i.doc_type(ExampleIndex)
            i.create()
        except Exception:
            # todo - should be restricted to the expected Exception subclasses
            pass
    
        indices = ExampleIndex()
        try:
            s = indices.search()
            r = s.query('match', uuid=doc.uuid).execute()
            if r.success():
                for h in r:
                    indices = ExampleIndex.get(id=h.meta.id)
                    break
        except NotFoundError:
            # New record
            pass
        except Exception:
            print("Unexpected error")
            raise
    
        # Now set the doc properties
        indices.uuid = doc.uuid
        indices.name = doc.name
        indices.town = doc.town
        if doc.my_file:
            with open(doc.my_file, 'rb') as f:
                contents = f.read()
            indices.my_file = base64.b64encode(contents).decode("ascii")
    
        # Save the index, using the Attachment pipeline if a file was attached
        return indices.save(pipeline="myattachment") if indices.my_file else indices.save()
    
    
    class MyObj(object):
        uuid = uuid4()
        name = ''
        town = ''
        my_file = ''
    
        def __init__(self, name, town, file):
            self.name = name
            self.town = town
            self.my_file = file
    
    
    me = MyObj("Steve", "London", '/home/steve/Documents/test.txt')
    
    res = save_document(me)