Skip to content
Advertisement

SQLAlchemy Core – Efficient UPSERT of a python list of dictionaries with Mysql

Assuming an existing mysql table, “user”.
Assuming a single column primary key “id”.
Assuming the data to be inserted is always given as a list of dictionaries, in the form:
[{'column_name1':'valueA', 'column_name2':'valueB'}, {'column_name1':'valueC', 'column_name2':'valueD'}].

If a row is inserted with the same primary key (aka id), I’d like to just update the values of all the other columns.

data1 = [{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'}, {'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'}, {'id': 3, 'name': 'banana', 'role': 'user', 'number': 890, 'text': 'text3'}]
data2 = [{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'}, {'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]

from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table

engine = create_engine(connectionString)
metadata = MetaData(engine)
table = Table('user', metadata, autoload=True)
#assuming user table is empty
engine.execute(table.insert(), data1)
bulk_insert = prepare_bulk_upsert_statement(data2)
engine.execute(bulk_insert)

I understand SQLAlchemy does have a on_duplicate_key_update method I could use in sqlalchemy.dialects.mysql.insert. But from the example, I just cannot figure out what my prepare_bulk_upsert_statement function would look like. Ultimately, the content of the user table should look like this:

query = table.select()  
print ([r._asdict() for r in engine.execute(query)])

>
[{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'},  
{'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'},
{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'},
{'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]

Looking at the SQLAlchemy example:

from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
     id='some_existing_id',
     data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

It seems on_duplicate_key_update can only handle a row (aka one dictionary). Is there an efficient way to upsert using this method? Or is there a better approach?

Advertisement

Answer

I went with this:

        insert_stmt = insert(table).values(data2)
        primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()]
        updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames]
        onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames}
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate)
        engine.execute(on_duplicate_key_stmt)

Get the primary key (can be mutiple columns), remove them from the list of columns, use that list to create the dict for on_duplicate_key_update and pass it to execute.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement