Can psycopg2 copy_expert read from an io.StringIO() buffer? - Mailing list psycopg
From | Jeff Ross |
---|---|
Subject | Can psycopg2 copy_expert read from an io.StringIO() buffer? |
Date | |
Msg-id | 6dfed0bf-a52a-66c0-43a9-6258d7b9843b@openvistas.net Whole thread Raw |
Responses |
Re: Can psycopg2 copy_expert read from an io.StringIO() buffer?
|
List | psycopg |
Hi all, Bit of explanation first... I'm working on a script to "heal" a logically replicated database after a replication outage. We get these outages periodically, especially to the several we have hosted in RDS. Replication either stops outright or slows so much it might as well be stopped. When that happens, the only fix I've found is to drop the subscription and then start it back up with (copy_data = False). Once started again replication proceeds from that point on at full speed. However, we now have a nice sized whole in the data that needs to be backfilled. I've been doing this by joining the publisher and subscriber with postgres_fdw and comparing tables. Left joins between publisher and subscriber will show me the rows on the publisher that are not on the subscriber, and then flip the tables to find rows on the subscriber that are not on the publisher and need to be deleted. To find rows that need updated are found with select * from publisher.table except select * from subscriber.table. I had been doing this in a plpy function both reading and writing to the fdw tables. That works but is excruciatingly slow--one table we have has 170,000,000 rows and takes about 3 hours to sync. A 325G database takes about 6.5 hours total. Enter psycopg2. My plan now is to only query the fdw tables to identify the rows, then use psycopg2 connections to both databases to directly insert/delete/update (by deleting from and then inserting to) the subscriber table. Working on inserts now, with this code (queries freshly ported over from the plpy function): subscriber_connection.set_session(autocommit=True) csv_buf = io.StringIO() size = size = 3**20 #3GB copy_query = """ copy ( select a.* from %s.%s a join %s.pkeys b on a.%s = b.%s order by a.%s ) to stdout with csv """ % (schema,table_name,schema,pkey,pkey_column,pkey_column) insert_query = """ copy %s.%s from stdin with csv; """ % (schema,table_name) try: publisher_copy_cursor.copy_expert(copy_query,csv_buf,size) subscriber_copy_cursor.copy_expert(insert_query,csv_buf) subscriber_copy_cursor.close() except Exception as e: debug_output("insert error! %s" % (e)) So, that most of that works pretty well. csv_buf is filled with pkeys of the rows missing from the subscriber. I've verified that works from the python shell. On the subscriber I see the copy command hit the logs: 2022-11-03 14:27:15.357 EDT,"postgres","dr_metroarchive",43810,"172.26.27.10:39614",636407fc.ab22,30,"COPY",2022-11-03 14:27:08 EDT,4/0,0,LOG,00000,"duration: 53.782 ms statement: copy metro.client_profile from stdin with csv;",,,,,,,,,"" But no rows ever actually get inserted. The only thing I can think of so far is that copy_expert isn't reading csv_buf. And now that I write that I wonder if it's because csv_buf is in memory on the publisher and not on the subscriber. If that's the case maybe this all boils down to "how do I pipe stdout from the publisher to the subscriber across the subscriber cursor?" That's a lot of email for one short question--apologies and thanks in advance for any clue by fours! Jeff Ross