Skip to content
Advertisement

Write Large Pandas DataFrames to SQL Server database

I have 74 relatively large Pandas DataFrames (About 34,600 rows and 8 columns) that I am trying to insert into a SQL Server database as quickly as possible. After doing some research, I learned that the good ole pandas.to_sql function is not good for such large inserts into a SQL Server database, which was the initial approach that I took (very slow – almost an hour for the application to complete vs about 4 minutes when using mysql database.)

This article, and many other StackOverflow posts have been helpful in pointing me in the right direction, however I’ve hit a roadblock:

I am trying to use SQLAlchemy’s Core rather than the ORM for reasons explained in the link above. So, I am converting the dataframe to a dictionary, using pandas.to_dict and then doing an execute() and insert():

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(),
    data)
# 'data' is a list of dictionaries.

The problem is that insert is not getting any values — they appear as a bunch of empty parenthesis and I get this error:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...

There are values in the list of dictionaries that I passed in, so I can’t figure out why the values aren’t showing up.

EDIT:

Here’s the example I’m going off of:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in range(n)]
    )
    print("SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

Advertisement

Answer

I’ve got some sad news for you, SQLAlchemy actually doesn’t implement bulk imports for SQL Server, it’s actually just going to do the same slow individual INSERT statements that to_sql is doing. I would say that your best bet is to try and script something up using the bcp command line tool. Here is a script that I’ve used in the past, but no guarantees:

from subprocess import check_output, call
import pandas as pd
import numpy as np
import os

pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)



def get_column_def_sql(col):
   if col.dtype == object:
      width = col.str.len().max() * (1+pad)
      return '[{}] varchar({})'.format(col.name, int(width)) 
   elif np.issubdtype(col.dtype, float):
      return'[{}] float'.format(col.name) 
   elif np.issubdtype(col.dtype, int):
      return '[{}] int'.format(col.name) 
   else:
      if raise_exception:
         raise NotImplementedError('data type {} not implemented'.format(col.dtype))
      else:
         print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
         width = col.str.len().max() * (1+pad)
         return '[{}] varchar({})'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):         
    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    col_defs = []
    for col in df:
       col_defs += [get_column_def_sql(df[col])]

    query_string = 'CREATE TABLE {}n({})nGOnQUIT'.format(tablename, ',n'.join(col_defs))       
    if overwrite == True:
       query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string


    query_file = 'c:\pybcp_tempqueryfile.sql'
    with open (query_file,'w') as f:
       f.write(query_string)

    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
    if o != 0:
       raise BaseException("Failed to create table")
   # o = call('del {}'.format(query_file), shell=True)


def call_bcp(df, tablename):   
    if trusted_connection:
       login_string = '-T'
    else:
       login_string = '-U {} -P {}'.format(username, password)
    temp_file = 'c:\pybcp_tempqueryfile.csv'

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
    o = call('bcp sandbox.max.pybcp_test2 in c:pybcp_tempqueryfile.csv -S "localhost" -T -t^| -rn -c')
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement