ElasticSearch 7.10.2
Python 3.8.5
elasticsearch-py 7.12.1
I’m trying to do a bulk insert of 100,000 records to ElasticSearch using elasticsearch-py bulk helper.
Here is the Python code:
import sys import datetime import json import os import logging from elasticsearch import Elasticsearch from elasticsearch.helpers import streaming_bulk # ES Configuration start es_hosts = [ "http://localhost:9200",] es_api_user = 'user' es_api_password = 'pw' index_name = 'index1' chunk_size = 10000 errors_before_interrupt = 5 refresh_index_after_insert = False max_insert_retries = 3 yield_ok = False # if set to False will skip successful documents in the output # ES Configuration end # ======================= filename = file.json logging.info('Importing data from {}'.format(filename)) es = Elasticsearch( es_hosts, #http_auth=(es_api_user, es_api_password), sniff_on_start=True, # sniff before doing anything sniff_on_connection_fail=True, # refresh nodes after a node fails to respond sniffer_timeout=60, # and also every 60 seconds retry_on_timeout=True, # should timeout trigger a retry on different node? ) def data_generator(): f = open(filename) for line in f: yield {**json.loads(line), **{ "_index": index_name, }} errors_count = 0 for ok, result in streaming_bulk(es, data_generator(), chunk_size=chunk_size, refresh=refresh_index_after_insert, max_retries=max_insert_retries, yield_ok=yield_ok): if ok is not True: logging.error('Failed to import data') logging.error(str(result)) errors_count += 1 if errors_count == errors_before_interrupt: logging.fatal('Too many import errors, exiting with error code') exit(1) print("Documents loaded to Elasticsearch")
When the json file contains a small amount of documents (~100), this code runs without issue. But I just tested it with a file of 100k documents, and I got this error:
WARNING:elasticsearch:POST http://127.0.0.1:9200/_bulk?refresh=false [status:N/A request:10.010s] Traceback (most recent call last): File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 426, in _make_request six.raise_from(e, None) File "<string>", line 3, in raise_from File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 421, in _make_request httplib_response = conn.getresponse() File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 1347, in getresponse response.begin() File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 307, in begin version, status, reason = self._read_status() File "/Users/me/opt/anaconda3/lib/python3.8/http/client.py", line 268, in _read_status line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/Users/me/opt/anaconda3/lib/python3.8/socket.py", line 669, in readinto return self._sock.recv_into(b) socket.timeout: timed out During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 251, in perform_request response = self.pool.urlopen( File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 726, in urlopen retries = retries.increment( File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/util/retry.py", line 386, in increment raise six.reraise(type(error), error, _stacktrace) File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/packages/six.py", line 735, in reraise raise value File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen httplib_response = self._make_request( File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 428, in _make_request self._raise_timeout(err=e, url=url, timeout_value=read_timeout) File "/Users/me/opt/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 335, in _raise_timeout raise ReadTimeoutError( urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='127.0.0.1', port=9200): Read timed out. (read timeout=10)
I have to admit this one is a bit over my head. I don’t typically like to paste large error messages here, but I’m not sure what about this message is relevant.
I can’t help but think that I maybe need to adjust some of the params in the es
object? Or the configuration variables? I don’t know enough about the params to be able to make an educated decision on my own.
And the last but certainly not least point – it looks like some documents were loaded into the ES index nonetheless. But even stranger, the count shows 110k when the json file only has 100k.
Advertisement
Answer
TL;DR:
Reduce the chunk_size
from 10000 to the default of 500 and I’d expect it to work. You probably want to disable automatic retries if that can give you duplicates.
What happened?
When creating your Elasticsearch
object, you specified chunk_size=10000
. This means that the streaming_bulk
call will try to insert chunks of 10000 elements. The connection to elasticsearch has a configurable timeout, which by default is 10 seconds. So, if your elasticsearch server takes more than 10 seconds to process the 10000 elements you want to insert, a timeout will happen and this will be handled as an error.
When creating your Elasticsearch
object, you also specified retry_on_timeout
as True and in the streaming_bulk_call
you set max_retries=max_insert_retries
, which is 3.
This means that when such a timeout happens, the library will try reconnecting 3 times, however, when the insert still has a timeout after that, it will give you the error you noticed. (Documentation)
Also, when the timeout happens, the library can not know whether the documents were inserted successfully, so it has to assume that they were not. Thus, it will try to insert the same documents again. I don’t know how your input lines look like, but if they do not contain an _id
field, this would create duplicates in your index. You probably want to prevent this — either by adding some kind of _id
, or by disabling the automatic retry and handling it manually.
What to do?
There is two ways you can go about this:
- Increase the
timeout
- Reduce the
chunk_size
streaming_bulk
by default has chunk_size
set to 500. Your 10000 is much higher. I wouldn’t expect a high performance gain when increasing this to more than 500, so I’d advice you to just use the default of 500 here. If 500 still fails with a timeout, you may even want to reduce it further. This could happen if the documents you want to index are very complex.
You could also increase the timeout for the streaming_bulk
call, or, alternatively, for your es
object. To only change it for the streaming_bulk
call, you can provide the request_timeout
keyword argument:
for ok, result in streaming_bulk( es, data_generator(), chunk_size=chunk_size, refresh=refresh_index_after_insert, request_timeout=60*3, # 3 minutes yield_ok=yield_ok): # handle like you did pass
However, this also means that elasticsearch node failure will only be detected after this higher timeout. See the documentation for more details