Skip to content
Advertisement

how to load 1000 lines of a csv into elasticsearch as 1000 different documents using elasticsearch API

so ive been trying to load 1000 lines of a csv into elasticsearhc as 1000 different documents, the csv has 8 headers: telease year, title, origin/ethnicity, director, cast, wiki page, plot. my current code for loading the dataset loads it using the bulk command from helpers

import csv
from elasticsearch import helpers, Elasticsearch
es = Elasticsearch("http://localhost:9200")

es.indices.delete(index='movie-plots', ignore=[400, 404])
es.indices.create(index='movie-plots', body=body) 

filename = 'wiki_movie_plots_deduped.csv'

def csv_reader(file_name):
    with open(file_name, 'r') as outfile:
        reader = csv.DictReader(outfile)
        helpers.bulk(es, reader, index="movie-plots", doc_type="_doc")

this i think loads 1000 lines into one document.

Advertisement

Answer

You are on the right path, the code below will split the csv into 1000 different items but splitting out the headers and turning each line item into a map/dictionary item with appropriate headers. This is then appended to a list so you upload a list of dictionary items.

import csv, sys
from elasticsearch import helpers, Elasticsearch, RequestsHttpConnection

es = Elasticsearch(
    hosts=[{
        'host': 'localhost',
        'port': '9200'}],
    use_ssl=False,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

upload_list = [] # list of items for upload

# Load all csv data
with open('my_folder/my_csv_file.csv', newline='') as csvfile:
    
    data_list = []

    csv_data = csv.reader(csvfile)
    for row in csv_data:
        data_list.append(row)

    # separate out the headers from the main data 
    headers = data_list[0]
    # drop headers from data_list
    data_list.pop(0)

    for item in data_list: # iterate over each row/item in the csv

        item_dict = {}

        # match a column header to the row data for an item
        i = 0
        for header in headers:
            item_dict[header] = item[i]
            i = i+1

        # add the transformed item/row to a list of dicts
        upload_list += [item_dict]

# using helper library's Bulk API to index list of Elasticsearch docs
try:
    resp = helpers.bulk(
        es,
        upload_list,
        index="my-index-name"
    )
    msg = "helpers.bulk() RESPONSE: " + str(resp)
    print(msg) # print the response returned by Elasticsearch
except Exception as err:
    msg = "Elasticsearch helpers.bulk() ERROR: " + str(err)
    print(msg)
    sys.exit(1)
User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement