Skip to content
Advertisement

SQL optimization to increase batch insert using Scrapy

In my previous post, I asked how I can record items in bulk using scrapy. The topic is here:

Buffered items and bulk insert to Mysql using scrapy

With the help of @Alexander, I can keep 1000 items in cache. However, my problem here is that the items in the cache are recording one by one while they are being transferred to mysql. My only issue here is speed. I think this problem is caused by SQL codes that I can’t optimize enough.

The logic to save in SQL is as follows;

Add the items to the products table, if the product_id doesn’t exist add it to the new_products table. (I’m running a script in the background that deletes these rows from old to new. I have no problem here. In other words, a maximum of 50k rows are recorded in total.)

Probably mysql is slowing down during insertion to new_products table. Because it checks if product_id exists in existing rows.

I would be very happy if you could suggest a method where I can save 1000 items in a database at once.

The pipeline.py I am using:

from __future__ import print_function
import logging
from scrapy import signals
from itemadapter import ItemAdapter
from mysql.connector import errorcode
from amazon_scraper.items import AmazonMobileDetailsItem
import mysql.connector


class AmazonScraperPipeline:
    table = 'products'
    table2 = 'new_products'
    conf = {
        'host': 'localhost',
        'user': 'xxxxxx',
        'password': 'xxxxxx',
        'database': 'xxxxxxx',
        'raise_on_warnings': True
    }
    
    def __init__(self, **kwargs):
        self._rows = []   #  store rows temporarily
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            list(map(self.save, self._rows))
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache

    def cache_result(self, item):  # adds new row to cache
        self._rows.append(dict(item))
        self._cached_rows += 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows

    def process_item(self, item, spider):
        print("Saving item into db ...")
        self.cache_result(item)    # cache this item
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)


    def save(self, row):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        existing_ids = [row[0] for row in cursor.fetchall()]
        create_query = ("INSERT INTO " + self.table +
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        # data_user = (rowid, date, listing_id, product_id, product_name, price, url)
        # Insert new row
        cursor.execute(create_query, row)
        # lastRecordId = cursor.lastrowid

        # Make sure data is committed to the database
        # self.cnx.commit()
        # cursor.close()
        print("Item saved")

        product_id = row['product_id']
        if not product_id in existing_ids:
            create_query = ("INSERT INTO " + self.table2 +
                "(product_rowid, date, listing_id, product_id, product_name, price, url) "
                "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
            # data_user = (rowid, date, listing_id, product_id, product_name, price, url)

            # new_cursor = self.cnx.cursor()
            cursor.execute(create_query, row)
            # lastRecordId = cursor.lastrowid
            # self.cnx.commit()
            # new_cursor.close()
            print("New Item saved")
        self.cnx.commit()

Advertisement

Answer

You can eliminate the first query of the save method by executing it upon initialization and storing a copy of it as an instance variable, and then updating it with new entries in the save method. And another performance booster would probably come from using an executemany feature of the mysql cursor by passing all the rows to the save method instead of one at a time.

class Pipeline:
    table = 'products'
    table2 = 'new_products'
    conf = {
        'host': 'localhost',
        'user': 'xxxxxx',
        'password': 'xxxxxx',
        'database': 'xxxxxxx',
        'raise_on_warnings': True
    }


    def __init__(self, **kwargs):
        self._rows = []   #  store rows temporarily
        self._unique_products = [] # unique product rows
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()
        self.existing_ids = self.get_product_ids()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            self.save(self._rows, self._unique_products)
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache
            self._unique_products = []

    def process_item(self, item, spider):
        row = dict(item)
        product_id = row['product_id']
        if product_id not in self.existing_ids:
            self._unique_products.append(row)
            self.existing_ids.add(product_id)
        self._rows.append(row)
        self._cached_rows += 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)

    def get_product_ids(self):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        return set([row[0] for row in cursor.fetchall()])

    def save(self, rows, products):
        cursor = self.cnx.cursor()
        create_query = ("INSERT INTO " + self.table +
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        # Insert new row
        cursor.executemany(create_query, rows)
        # Make sure data is committed to the database
        self.cnx.commit()
        cursor.close()
        print("Item saved with ID: {}" . format(cursor.lastrowid))


        create_query = ("INSERT INTO " + self.table2 +
            "(product_rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        new_cursor = self.cnx.cursor()
        new_cursor.executemany(create_query, products)
        self.cnx.commit()
        new_cursor.close()
        print("New Item saved with ID: {}" . format(new_cursor.lastrowid))

I am actually curious how much a performance boost this will have so please share the difference in time.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement