I'm trying to upload a CSV file to an elasticsearch index. Let's say the file is something like this (no headers, just data):
bird,10,dog
cat,20,giraffe
This is the code I have:
from elasticsearch_dsl import DocType, Integer, Keyword
from elasticsearch_dsl.connections import connections
from elasticsearch.helpers import bulk
import csv
connections.create_connection(hosts=["localhost"])
class Mapping(DocType):
animal1 = Keyword()
number = Integer()
animal2 = Keyword()
class Meta:
index = "index-name"
doc_type = "doc-type"
Mapping.init()
with open("/path/to/file", "r", encoding="latin-1") as f:
reader = csv.DictReader(f)
bulk(
connections.get_connection(),
(Mapping(**row).to_dict(True) for row in reader)
)
The problem is that elasticsearch seems to be ignoring the mapping and using the first line of the file as headers (and creating a mapping based on that).
Edit: it really uses my mapping and the first line of the file. The mapping it generates is:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"10": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dog": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"bird": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"number": {
"type": "integer"
}
}
}
}
}
}
If I only create the index without uploading data, the mapping seems fine:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"number": {
"type": "integer"
}
}
}
}
}
}
How can I make ES use the given mapping and just that?
Elasticsearch creates a new field with the first element of each row, because you haven't specified a field for the csv data with headers, you haven't specified that 1 element of the tuple is animal1, and the second is animal2 and so on..Try this:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
paramL = []
contatore = 1
with open(args.file, "r", encoding="latin-1") as f:
for line in f:
line = line.split(",")
tripla = {"animal1": line[0], "number": line[1], "animal2": line[2]}
ogg={
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_id': contatore,
'_source': tripla
}
contatore +=1
paramL.append(ogg)
for success, info in helpers.parallel_bulk(client=esConnector, actions=paramL, thread_count=4):
if not success:
print 'Doc failed', info