so ive been trying to load 1000 lines of a csv into elasticsearhc as 1000 different documents, the csv has 8 headers: telease year, title, origin/ethnicity, director, cast, wiki page, plot. my current code for loading the dataset loads it using the bulk command from helpers
import csv from elasticsearch import helpers, Elasticsearch es = Elasticsearch("http://localhost:9200") es.indices.delete(index='movie-plots', ignore=[400, 404]) es.indices.create(index='movie-plots', body=body) filename = 'wiki_movie_plots_deduped.csv' def csv_reader(file_name): with open(file_name, 'r') as outfile: reader = csv.DictReader(outfile) helpers.bulk(es, reader, index="movie-plots", doc_type="_doc")
this i think loads 1000 lines into one document.
Advertisement
Answer
You are on the right path, the code below will split the csv into 1000 different items but splitting out the headers and turning each line item into a map/dictionary item with appropriate headers. This is then appended to a list so you upload a list of dictionary items.
import csv, sys from elasticsearch import helpers, Elasticsearch, RequestsHttpConnection es = Elasticsearch( hosts=[{ 'host': 'localhost', 'port': '9200'}], use_ssl=False, verify_certs=True, connection_class=RequestsHttpConnection ) upload_list = [] # list of items for upload # Load all csv data with open('my_folder/my_csv_file.csv', newline='') as csvfile: data_list = [] csv_data = csv.reader(csvfile) for row in csv_data: data_list.append(row) # separate out the headers from the main data headers = data_list[0] # drop headers from data_list data_list.pop(0) for item in data_list: # iterate over each row/item in the csv item_dict = {} # match a column header to the row data for an item i = 0 for header in headers: item_dict[header] = item[i] i = i+1 # add the transformed item/row to a list of dicts upload_list += [item_dict] # using helper library's Bulk API to index list of Elasticsearch docs try: resp = helpers.bulk( es, upload_list, index="my-index-name" ) msg = "helpers.bulk() RESPONSE: " + str(resp) print(msg) # print the response returned by Elasticsearch except Exception as err: msg = "Elasticsearch helpers.bulk() ERROR: " + str(err) print(msg) sys.exit(1)