so ive been trying to load 1000 lines of a csv into elasticsearhc as 1000 different documents, the csv has 8 headers: telease year, title, origin/ethnicity, director, cast, wiki page, plot. my current code for loading the dataset loads it using the bulk command from helpers
JavaScript
x
15
15
1
import csv
2
from elasticsearch import helpers, Elasticsearch
3
es = Elasticsearch("http://localhost:9200")
4
5
es.indices.delete(index='movie-plots', ignore=[400, 404])
6
es.indices.create(index='movie-plots', body=body)
7
8
filename = 'wiki_movie_plots_deduped.csv'
9
10
def csv_reader(file_name):
11
with open(file_name, 'r') as outfile:
12
reader = csv.DictReader(outfile)
13
helpers.bulk(es, reader, index="movie-plots", doc_type="_doc")
14
15
this i think loads 1000 lines into one document.
Advertisement
Answer
You are on the right path, the code below will split the csv into 1000 different items but splitting out the headers and turning each line item into a map/dictionary item with appropriate headers. This is then appended to a list so you upload a list of dictionary items.
JavaScript
1
55
55
1
import csv, sys
2
from elasticsearch import helpers, Elasticsearch, RequestsHttpConnection
3
4
es = Elasticsearch(
5
hosts=[{
6
'host': 'localhost',
7
'port': '9200'}],
8
use_ssl=False,
9
verify_certs=True,
10
connection_class=RequestsHttpConnection
11
)
12
13
upload_list = [] # list of items for upload
14
15
# Load all csv data
16
with open('my_folder/my_csv_file.csv', newline='') as csvfile:
17
18
data_list = []
19
20
csv_data = csv.reader(csvfile)
21
for row in csv_data:
22
data_list.append(row)
23
24
# separate out the headers from the main data
25
headers = data_list[0]
26
# drop headers from data_list
27
data_list.pop(0)
28
29
for item in data_list: # iterate over each row/item in the csv
30
31
item_dict = {}
32
33
# match a column header to the row data for an item
34
i = 0
35
for header in headers:
36
item_dict[header] = item[i]
37
i = i+1
38
39
# add the transformed item/row to a list of dicts
40
upload_list += [item_dict]
41
42
# using helper library's Bulk API to index list of Elasticsearch docs
43
try:
44
resp = helpers.bulk(
45
es,
46
upload_list,
47
index="my-index-name"
48
)
49
msg = "helpers.bulk() RESPONSE: " + str(resp)
50
print(msg) # print the response returned by Elasticsearch
51
except Exception as err:
52
msg = "Elasticsearch helpers.bulk() ERROR: " + str(err)
53
print(msg)
54
sys.exit(1)
55