So I’m using sqlalchemy
for a project I’m working on. I’ve got an issue where I will eventually have thousands of records that need to be saved every hour. These records may be inserted or updated. I’ve been using bulk_save_objects
for this and it’s worked great. However now I have to introduce a history to these records being saved, which means I need the IDs returned so I can link these entries to an entry in a history table. I know about using return_defaults
, and that works. However, it introduces a problem that my bulk_save_objects
inserts and updates one entry at a time, instead of in bulk, which removes the purpose. Is there another option, where I can bulk insert and update at the same time, but retain the IDs?
Advertisement
Answer
The desired result can be achieved using a technique similar to the one described in the answer here by uploading the rows to a temporary table and then performing an UPDATE followed by an INSERT that returns the inserted ID values. For SQL Server, that would be an OUTPUT clause on the INSERT statement:
main_table = "team" # <set up test environment> with engine.begin() as conn: conn.execute(sa.text(f"DROP TABLE IF EXISTS [{main_table}]")) conn.execute( sa.text( f""" CREATE TABLE [dbo].[{main_table}]( [id] [int] IDENTITY(1,1) NOT NULL, [prov] [varchar](2) NOT NULL, [city] [varchar](50) NOT NULL, [name] [varchar](50) NOT NULL, [comments] [varchar](max) NULL, CONSTRAINT [PK_team] PRIMARY KEY CLUSTERED ( [id] ASC ) ) """ ) ) conn.execute( sa.text( f""" CREATE UNIQUE NONCLUSTERED INDEX [UX_team_prov_city] ON [dbo].[{main_table}] ( [prov] ASC, [city] ASC ) """ ) ) conn.execute( sa.text( f""" INSERT INTO [{main_table}] ([prov], [city], [name]) VALUES ('AB', 'Calgary', 'Flames') """ ) ) # <data for upsert> df = pd.DataFrame( [ ("AB", "Calgary", "Flames", "hard-working, handsome lads"), ("AB", "Edmonton", "Oilers", "ruffians and scalawags"), ], columns=["prov", "city", "name", "comments"], ) # <perform upsert, returning IDs> temp_table = "#so65525098" with engine.begin() as conn: df.to_sql(temp_table, conn, index=False, if_exists="replace") conn.execute( sa.text( f""" UPDATE main SET main.name = temp.name, main.comments = temp.comments FROM [{main_table}] main INNER JOIN [{temp_table}] temp ON main.prov = temp.prov AND main.city = temp.city """ ) ) inserted = conn.execute( sa.text( f""" INSERT INTO [{main_table}] (prov, city, name, comments) OUTPUT INSERTED.prov, INSERTED.city, INSERTED.id SELECT prov, city, name, comments FROM [{temp_table}] temp WHERE NOT EXISTS ( SELECT * FROM [{main_table}] main WHERE main.prov = temp.prov AND main.city = temp.city ) """ ) ).fetchall() print(inserted) """console output: [('AB', 'Edmonton', 2)] """ # <check results> with engine.begin() as conn: pprint(conn.execute(sa.text(f"SELECT * FROM {main_table}")).fetchall()) """console output: [(1, 'AB', 'Calgary', 'Flames', 'hard-working, handsome lads'), (2, 'AB', 'Edmonton', 'Oilers', 'ruffians and scalawags')] """