Assuming an existing mysql table, “user”.
Assuming a single column primary key “id”.
Assuming the data to be inserted is always given as a list of dictionaries, in the form:
[{'column_name1':'valueA', 'column_name2':'valueB'}, {'column_name1':'valueC', 'column_name2':'valueD'}].
If a row is inserted with the same primary key (aka id), I’d like to just update the values of all the other columns.
data1 = [{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'}, {'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'}, {'id': 3, 'name': 'banana', 'role': 'user', 'number': 890, 'text': 'text3'}]
data2 = [{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'}, {'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table
engine = create_engine(connectionString)
metadata = MetaData(engine)
table = Table('user', metadata, autoload=True)
#assuming user table is empty
engine.execute(table.insert(), data1)
bulk_insert = prepare_bulk_upsert_statement(data2)
engine.execute(bulk_insert)
I understand SQLAlchemy does have a on_duplicate_key_update method I could use in sqlalchemy.dialects.mysql.insert. But from the example, I just cannot figure out what my prepare_bulk_upsert_statement function would look like. Ultimately, the content of the user table should look like this:
query = table.select()
print ([r._asdict() for r in engine.execute(query)])
>
[{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'},
{'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'},
{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'},
{'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]
Looking at the SQLAlchemy example:
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
It seems on_duplicate_key_update can only handle a row (aka one dictionary). Is there an efficient way to upsert using this method? Or is there a better approach?
Advertisement
Answer
I went with this:
insert_stmt = insert(table).values(data2)
primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()]
updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames]
onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames}
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate)
engine.execute(on_duplicate_key_stmt)
Get the primary key (can be mutiple columns), remove them from the list of columns, use that list to create the dict for on_duplicate_key_update and pass it to execute.