Assuming an existing mysql table, “user”.
Assuming a single column primary key “id”.
Assuming the data to be inserted is always given as a list of dictionaries, in the form:
[{'column_name1':'valueA', 'column_name2':'valueB'}, {'column_name1':'valueC', 'column_name2':'valueD'}].
If a row is inserted with the same primary key (aka id), I’d like to just update the values of all the other columns.
data1 = [{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'}, {'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'}, {'id': 3, 'name': 'banana', 'role': 'user', 'number': 890, 'text': 'text3'}] data2 = [{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'}, {'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}] from sqlalchemy import create_engine from sqlalchemy import MetaData from sqlalchemy import Table engine = create_engine(connectionString) metadata = MetaData(engine) table = Table('user', metadata, autoload=True) #assuming user table is empty engine.execute(table.insert(), data1) bulk_insert = prepare_bulk_upsert_statement(data2) engine.execute(bulk_insert)
I understand SQLAlchemy does have a on_duplicate_key_update method I could use in sqlalchemy.dialects.mysql.insert. But from the example, I just cannot figure out what my prepare_bulk_upsert_statement function would look like. Ultimately, the content of the user table should look like this:
query = table.select() print ([r._asdict() for r in engine.execute(query)]) > [{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'}, {'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'}, {'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'}, {'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]
Looking at the SQLAlchemy example:
from sqlalchemy.dialects.mysql import insert insert_stmt = insert(my_table).values( id='some_existing_id', data='inserted value') on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( data=insert_stmt.inserted.data, status='U' )
It seems on_duplicate_key_update can only handle a row (aka one dictionary). Is there an efficient way to upsert using this method? Or is there a better approach?
Advertisement
Answer
I went with this:
insert_stmt = insert(table).values(data2) primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()] updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames] onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames} on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate) engine.execute(on_duplicate_key_stmt)
Get the primary key (can be mutiple columns), remove them from the list of columns, use that list to create the dict for on_duplicate_key_update and pass it to execute.