I have 74 relatively large Pandas DataFrames (About 34,600 rows and 8 columns) that I am trying to insert into a SQL Server database as quickly as possible. After doing some research, I learned that the good ole pandas.to_sql
function is not good for such large inserts into a SQL Server database, which was the initial approach that I took (very slow – almost an hour for the application to complete vs about 4 minutes when using mysql database.)
This article, and many other StackOverflow posts have been helpful in pointing me in the right direction, however I’ve hit a roadblock:
I am trying to use SQLAlchemy’s Core rather than the ORM for reasons explained in the link above. So, I am converting the dataframe to a dictionary, using pandas.to_dict
and then doing an execute()
and insert()
:
self._session_factory.engine.execute( TimeSeriesResultValues.__table__.insert(), data) # 'data' is a list of dictionaries.
The problem is that insert is not getting any values — they appear as a bunch of empty parenthesis and I get this error:
(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot insert the value NULL into the column...
There are values in the list of dictionaries that I passed in, so I can’t figure out why the values aren’t showing up.
EDIT:
Here’s the example I’m going off of:
def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in range(n)] ) print("SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs")
Advertisement
Answer
I’ve got some sad news for you, SQLAlchemy actually doesn’t implement bulk imports for SQL Server, it’s actually just going to do the same slow individual INSERT statements that to_sql
is doing. I would say that your best bet is to try and script something up using the bcp
command line tool. Here is a script that I’ve used in the past, but no guarantees:
from subprocess import check_output, call import pandas as pd import numpy as np import os pad = 0.1 tablename = 'sandbox.max.pybcp_test' overwrite=True raise_exception = True server = 'P01' trusted_connection= True username=None password=None delimiter='|' df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False) def get_column_def_sql(col): if col.dtype == object: width = col.str.len().max() * (1+pad) return '[{}] varchar({})'.format(col.name, int(width)) elif np.issubdtype(col.dtype, float): return'[{}] float'.format(col.name) elif np.issubdtype(col.dtype, int): return '[{}] int'.format(col.name) else: if raise_exception: raise NotImplementedError('data type {} not implemented'.format(col.dtype)) else: print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype)) width = col.str.len().max() * (1+pad) return '[{}] varchar({})'.format(col.name, int(width)) def create_table(df, tablename, server, trusted_connection, username, password, pad): if trusted_connection: login_string = '-E' else: login_string = '-U {} -P {}'.format(username, password) col_defs = [] for col in df: col_defs += [get_column_def_sql(df[col])] query_string = 'CREATE TABLE {}n({})nGOnQUIT'.format(tablename, ',n'.join(col_defs)) if overwrite == True: query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string query_file = 'c:\pybcp_tempqueryfile.sql' with open (query_file,'w') as f: f.write(query_string) if trusted_connection: login_string = '-E' else: login_string = '-U {} -P {}'.format(username, password) o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True) if o != 0: raise BaseException("Failed to create table") # o = call('del {}'.format(query_file), shell=True) def call_bcp(df, tablename): if trusted_connection: login_string = '-T' else: login_string = '-U {} -P {}'.format(username, password) temp_file = 'c:\pybcp_tempqueryfile.csv' #remove the delimiter and change the encoding of the data frame to latin so sql server can read it df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin')) df.to_csv(temp_file, index = False, sep = '|', errors='ignore') o = call('bcp sandbox.max.pybcp_test2 in c:pybcp_tempqueryfile.csv -S "localhost" -T -t^| -rn -c')