Search code examples
pythonmongodbelasticsearchpymongo

Elasticsearch fails in parsing datetime field coming from pymongo as object


I am trying to stream data from a mongoDB to Elasticsearch using both pymongo and the Python client elasticsearch.

I have set a mapping, I report here the snippet related to the field of interest:

"updated_at": { "type": "date", "format": "dateOptionalTime" }

My script grabs each document from the MongoDB using pymongo and tries indexing it into Elasticsearch as

from elasticsearch import Elasticsearch
from pymongo import MongoClient

mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']

for doc in collection.find():
    es_client.index(
         index='index_name', 
         doc_type='my_type', 
         id=str(doc['_id']), 
         body=json.dumps(doc, default=json_util.default)
    )

The problem I have in running it is:

elasticsearch.exceptions.RequestError: TransportError(400, u'MapperParsingException[failed to parse [updated_at]]; nested: ElasticsearchIllegalArgumentException[unknown property [$date]]; ')

I believe the source of the problem is in the fact that pymongo serializes the field updated_at as a datetime.datetime object, as I can see if I print the doc in the for loop:

u'updated_at': datetime.datetime(2014, 8, 31, 17, 18, 13, 17000)

This conflicts with Elasticsearch looking for an object of type date as specified in the mapping.

Any ideas how to solve this?


Solution

  • I guess your problem is that you're using

    body=json.dumps(doc, default=json_util.default)
    

    but you should be using

    body=doc
    

    Doing that works for me, since it seems elasticsearch is caring for the aliasing of the dictionarly into a JSON document (of course, assuming doc is a dictionary, which I guess it is).

    At least in the version of elasticsearch I'm using (2.x), datetime.datetime is correctly aliased, with no need of a mapping. For example, this works for me:

    doc = {"updated_on": datetime.now(timezone.utc)}
    res = es.index(index=es_index, doc_type='my_type',
                   id=1, body=doc)
    

    And is recognized by Kibana as a date.