I am trying to stream data from a mongoDB to Elasticsearch using both pymongo and the Python client elasticsearch.
I have set a mapping, I report here the snippet related to the field of interest:
"updated_at": { "type": "date", "format": "dateOptionalTime" }
My script grabs each document from the MongoDB using pymongo and tries indexing it into Elasticsearch as
from elasticsearch import Elasticsearch
from pymongo import MongoClient
mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']
for doc in collection.find():
es_client.index(
index='index_name',
doc_type='my_type',
id=str(doc['_id']),
body=json.dumps(doc, default=json_util.default)
)
The problem I have in running it is:
elasticsearch.exceptions.RequestError: TransportError(400, u'MapperParsingException[failed to parse [updated_at]]; nested: ElasticsearchIllegalArgumentException[unknown property [$date]]; ')
I believe the source of the problem is in the fact that pymongo serializes the field updated_at as a datetime.datetime object, as I can see if I print the doc in the for loop:
u'updated_at': datetime.datetime(2014, 8, 31, 17, 18, 13, 17000)
This conflicts with Elasticsearch looking for an object of type date as specified in the mapping.
Any ideas how to solve this?
I guess your problem is that you're using
body=json.dumps(doc, default=json_util.default)
but you should be using
body=doc
Doing that works for me, since it seems elasticsearch is caring for the aliasing of the dictionarly into a JSON document (of course, assuming doc is a dictionary, which I guess it is).
At least in the version of elasticsearch I'm using (2.x), datetime.datetime is correctly aliased, with no need of a mapping. For example, this works for me:
doc = {"updated_on": datetime.now(timezone.utc)}
res = es.index(index=es_index, doc_type='my_type',
id=1, body=doc)
And is recognized by Kibana as a date.