In my previous post, I asked how I can record items in bulk using scrapy. The topic is here:
Buffered items and bulk insert to Mysql using scrapy
With the help of @Alexander, I can keep 1000 items in cache. However, my problem here is that the items in the cache are recording one by one while they are being transferred to mysql. My only issue here is speed. I think this problem is caused by SQL codes that I can’t optimize enough.
The logic to save in SQL is as follows;
Add the items to the products
table, if the product_id
doesn’t exist add it to the new_products
table. (I’m running a script in the background that deletes these rows from old to new. I have no problem here. In other words, a maximum of 50k rows are recorded in total.)
Probably mysql is slowing down during insertion to new_products
table. Because it checks if product_id exists in existing rows.
I would be very happy if you could suggest a method where I can save 1000 items in a database at once.
The pipeline.py I am using:
from __future__ import print_function import logging from scrapy import signals from itemadapter import ItemAdapter from mysql.connector import errorcode from amazon_scraper.items import AmazonMobileDetailsItem import mysql.connector class AmazonScraperPipeline: table = 'products' table2 = 'new_products' conf = { 'host': 'localhost', 'user': 'xxxxxx', 'password': 'xxxxxx', 'database': 'xxxxxxx', 'raise_on_warnings': True } def __init__(self, **kwargs): self._rows = [] # store rows temporarily self._cached_rows = 0 # number of cached rows self._cache_limit = 1000 # limit before saving to database self.cnx = self.mysql_connect() def open_spider(self, spider): print("spider open") def save_all(self): # calls self.save method for all cached rows if len(self._rows) > 0: list(map(self.save, self._rows)) self._cached_rows = 0 # reset the count self._rows = [] # reset the cache def cache_result(self, item): # adds new row to cache self._rows.append(dict(item)) self._cached_rows += 1 if self._cached_rows >= self._cache_limit: # checks if limit reached self.save_all() # if it has been reached then save all rows def process_item(self, item, spider): print("Saving item into db ...") self.cache_result(item) # cache this item return item def close_spider(self, spider): self.save_all() # Saves remaining rows once spider closes self.cnx.close() def mysql_connect(self): try: return mysql.connector.connect(**self.conf) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) def save(self, row): cursor = self.cnx.cursor() cursor.execute("SELECT DISTINCT product_id FROM products;") existing_ids = [row[0] for row in cursor.fetchall()] create_query = ("INSERT INTO " + self.table + "(rowid, date, listing_id, product_id, product_name, price, url) " "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)") # data_user = (rowid, date, listing_id, product_id, product_name, price, url) # Insert new row cursor.execute(create_query, row) # lastRecordId = cursor.lastrowid # Make sure data is committed to the database # self.cnx.commit() # cursor.close() print("Item saved") product_id = row['product_id'] if not product_id in existing_ids: create_query = ("INSERT INTO " + self.table2 + "(product_rowid, date, listing_id, product_id, product_name, price, url) " "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)") # data_user = (rowid, date, listing_id, product_id, product_name, price, url) # new_cursor = self.cnx.cursor() cursor.execute(create_query, row) # lastRecordId = cursor.lastrowid # self.cnx.commit() # new_cursor.close() print("New Item saved") self.cnx.commit()
Advertisement
Answer
You can eliminate the first query of the save method by executing it upon initialization and storing a copy of it as an instance variable, and then updating it with new entries in the save method. And another performance booster would probably come from using an executemany
feature of the mysql cursor by passing all the rows to the save method instead of one at a time.
class Pipeline: table = 'products' table2 = 'new_products' conf = { 'host': 'localhost', 'user': 'xxxxxx', 'password': 'xxxxxx', 'database': 'xxxxxxx', 'raise_on_warnings': True } def __init__(self, **kwargs): self._rows = [] # store rows temporarily self._unique_products = [] # unique product rows self._cached_rows = 0 # number of cached rows self._cache_limit = 1000 # limit before saving to database self.cnx = self.mysql_connect() self.existing_ids = self.get_product_ids() def open_spider(self, spider): print("spider open") def save_all(self): # calls self.save method for all cached rows if len(self._rows) > 0: self.save(self._rows, self._unique_products) self._cached_rows = 0 # reset the count self._rows = [] # reset the cache self._unique_products = [] def process_item(self, item, spider): row = dict(item) product_id = row['product_id'] if product_id not in self.existing_ids: self._unique_products.append(row) self.existing_ids.add(product_id) self._rows.append(row) self._cached_rows += 1 if self._cached_rows >= self._cache_limit: # checks if limit reached self.save_all() # if it has been reached then save all rows return item def close_spider(self, spider): self.save_all() # Saves remaining rows once spider closes self.cnx.close() def mysql_connect(self): try: return mysql.connector.connect(**self.conf) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) def get_product_ids(self): cursor = self.cnx.cursor() cursor.execute("SELECT DISTINCT product_id FROM products;") return set([row[0] for row in cursor.fetchall()]) def save(self, rows, products): cursor = self.cnx.cursor() create_query = ("INSERT INTO " + self.table + "(rowid, date, listing_id, product_id, product_name, price, url) " "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)") # Insert new row cursor.executemany(create_query, rows) # Make sure data is committed to the database self.cnx.commit() cursor.close() print("Item saved with ID: {}" . format(cursor.lastrowid)) create_query = ("INSERT INTO " + self.table2 + "(product_rowid, date, listing_id, product_id, product_name, price, url) " "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)") new_cursor = self.cnx.cursor() new_cursor.executemany(create_query, products) self.cnx.commit() new_cursor.close() print("New Item saved with ID: {}" . format(new_cursor.lastrowid))
I am actually curious how much a performance boost this will have so please share the difference in time.