From 9adb04bdb827f44a91e45d53b1fad5a02213777c Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 15 Dec 2020 20:46:27 +1100 Subject: [PATCH v3] 2PC-Solution1-WIP-20201215. This patch applies onto the v30 patch set [1] from other 2PC thread: [1] https://www.postgresql.org/message-id/CAFPTHDYA8yE6tEmQ2USYS68kNt%2BkM%3DSwKgj%3Djy4AvFD5e9-UTQ%40mail.gmail.com ==== Coded / WIP: * tablesync slot is now permanent instead of temporary * the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions * tablesync worked now allowing multiple tx instead of single tx * a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart. * if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase. TODO / Known Issues: * The tablesync replication origin/lsn logic all needs to be updated so that tablesync knows where to restart based on information held by the now permanent slot. * the current implementation of tablesync drop slot (e.g. from DROP SUBSCRIPTION) or finish_sync_worker regenerates the tablesync slot name so it knows what slot to drop. The current code may be ok for normal use cases, but if there is a ALTER SUBSCRIPTION ... SET (slot_name = newname) it would fail to be able to find the tablesync slot. * help / comments / cleanup * There is temporary "!!>>" excessive logging of mine scattered around which I added to help my testing during development --- src/backend/commands/subscriptioncmds.c | 108 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 163 ++++++++++++++++++++++------ src/backend/replication/logical/worker.c | 21 +--- src/include/catalog/pg_subscription_rel.h | 1 + src/include/commands/subscriptioncmds.h | 1 + src/include/replication/slot.h | 1 + 6 files changed, 249 insertions(+), 46 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b0745d5..e2b9618 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -37,6 +37,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/worker_internal.h" +#include "replication/slot.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -1070,6 +1071,41 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + /* Is this a tablesync worker? If yes, drop the tablesync's slot. */ + if (OidIsValid(w->relid)) + { + /* FIXME 1 - This slotname check below is workaround needed because the tablesync slot name + * is derived from the subscription slot name, so if that was set slot_name = NONE then we cannot do + * that calculation anymore to get the tablesyn slot name. + * + * FIXME 2 - If subscription slot name changes from 'aaa' to 'bbb' then it will be not be possible + * to get back to those tablesyn slots. Some resigned needed (eg store the tablesync slotname somewhere) + * to avoid this trouble... + */ + if (slotname) + { + extern void ReplicationSlotDropAtPubNode( + WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname); + + /* Calculate the name of the tablesync slot */ + char *syncslotname = ReplicationSlotNameForTablesync(slotname, w->subid, w->relid); + + elog(LOG, "!!>> DROP SUBSCRIPTION - now dropping the tablesync slot \"%s\".", syncslotname); + ReplicationSlotDropAtPubNode( + NULL, + conninfo, /* use conninfo to make a new connection. */ + subname, + syncslotname); + + pfree(syncslotname); + } + else + { + elog(LOG, "!!>> DROP SUBSCRIPTION - no slotname for relid %u.", w->relid); + } + } + + /* Stop the worker. */ logicalrep_worker_stop(w->subid, w->relid); } list_free(subworkers); @@ -1144,6 +1180,78 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, NoLock); } + +/* + * Drop the replication slot at the publisher node + * using the replication connection. + * + * If the connection is passed then just use that, + * otherwise connect/disconnect within this function. + */ +void +ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname) +{ + StringInfoData cmd; + + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); + + /* + * If the connection was passed then use it. + * If the connection was not passed then make a new connection using the passed conninfo. + */ + if (wrconn_given != NULL) + { + Assert (conninfo == NULL); + wrconn = wrconn_given; + } + else + { + char *err = NULL; + + Assert(conninfo != NULL); + wrconn = walrcv_connect(conninfo, true, subname, &err); + + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", slotname), + errdetail("The error was: %s", err))); + } + + PG_TRY(); + { + WalRcvExecResult *res; + + res = walrcv_exec(wrconn, cmd.data, 0, NULL); + + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("could not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", res->err))); + else + ereport(LOG, + (errmsg("dropped replication slot \"%s\" on publisher", + slotname))); + + walrcv_clear_result(res); + } + PG_CATCH(); + { + /* NOP. Just gobble any ERROR. */ + } + PG_END_TRY(); + + /* Disconnect the connection (unless using one passed) */ + if (wrconn_given == NULL) + walrcv_disconnect(wrconn); + + pfree(cmd.data); +} + /* * Internal workhorse for changing a subscription owner */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1904f34..7378cb6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -102,6 +102,7 @@ #include "replication/logicalrelation.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "replication/slot.h" #include "storage/ipc.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -139,6 +140,28 @@ finish_sync_worker(void) get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); + /* + * Cleanup the tablesync slot. + */ + { + extern void ReplicationSlotDropAtPubNode( + WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname); + + /* Calculate the name of the tablesync slot */ + char *syncslotname = ReplicationSlotNameForTablesync( + MySubscription->slotname, + MySubscription->oid, + MyLogicalRepWorker->relid); + + elog(LOG, "!!>> Dropping the tablesync slot \"%s\".", syncslotname); + ReplicationSlotDropAtPubNode( + wrconn, + NULL, /* use the current connection. */ + MySubscription->name, syncslotname); + + pfree(syncslotname); + } + /* Find the main apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); @@ -270,8 +293,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - Assert(IsTransactionState()); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && @@ -284,6 +305,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * UpdateSubscriptionRelState must be called within a transaction. + * That transaction will be ended within the finish_sync_worker(). + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + } + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, @@ -808,6 +838,35 @@ copy_table(Relation rel) logicalrep_rel_close(relmapentry, NoLock); } + +/* + * Determine the tablesync slot name. + * + * The returned slot name is palloc'ed in current memory context. + */ +char * +ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid) +{ + char *syncslotname; + + /* + * To build a slot name for the sync work, we are limited to NAMEDATALEN - + * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars + * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the + * NAMEDATALEN on the remote that matters, but this scheme will also work + * reasonably if that is different.) + */ + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + + syncslotname = psprintf("%.*s_%u_sync_%u", + NAMEDATALEN - 28, + subslotname, + suboid, + relid); + + return syncslotname; +} + /* * Start syncing the table in the sync worker. * @@ -825,6 +884,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) XLogRecPtr relstate_lsn; Relation rel; WalRcvExecResult *res; + bool copied_ok; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -850,16 +910,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) finish_sync_worker(); /* doesn't return */ } - /* - * To build a slot name for the sync work, we are limited to NAMEDATALEN - - * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars - * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the - * NAMEDATALEN on the remote that matters, but this scheme will also work - * reasonably if that is different.) - */ - StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ - slotname = psprintf("%.*s_%u_sync_%u", - NAMEDATALEN - 28, + /* Calculate the name of the tablesync slot */ + slotname = ReplicationSlotNameForTablesync( MySubscription->slotname, MySubscription->oid, MyLogicalRepWorker->relid); @@ -875,7 +927,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) (errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || - MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC); + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || + MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE); + + /* + * The COPY phase was previously done, but tablesync then crashed/etc + * before it was able to finish normally. + */ + if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE) + { + elog(LOG, "!!>> tablesync relstate was SUBREL_STATE_COPYDONE."); + goto copy_table_done; + } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -891,9 +954,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); pgstat_report_stat(false); - /* - * We want to do the table data sync in a single transaction. - */ StartTransactionCommand(); /* @@ -919,29 +979,70 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_clear_result(res); /* - * Create a new temporary logical decoding slot. This slot will be used + * Create a new permanent logical decoding slot. This slot will be used * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ - walrcv_create_slot(wrconn, slotname, true, + elog(LOG, "!!>> LogicalRepSyncTableStart calls walrcv_create_slot for \"%s\".", slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_USE_SNAPSHOT, origin_startpos); - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* + * Be sure to remove the newly created tablesync slot if the COPY fails. + */ + copied_ok = false; + PG_TRY(); + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, - (errmsg("table copy could not finish transaction on publisher"), - errdetail("The error was: %s", res->err))); - walrcv_clear_result(res); + res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not finish transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); + + table_close(rel, NoLock); + + /* Make the copy visible. */ + CommandCounterIncrement(); + + copied_ok = true; + } + PG_FINALLY(); + { + /* If something failed during copy table then cleanup the created slot. */ + if (!copied_ok) + { + extern void ReplicationSlotDropAtPubNode( + WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname); + + elog(LOG, "!!>> The tablesync copy failed. Drop the tablesync slot \"%s\".", slotname); + ReplicationSlotDropAtPubNode( + wrconn, + NULL, /* use the current connection. */ + MySubscription->name, + slotname); + + pfree(slotname); + } + } + PG_END_TRY(); - table_close(rel, NoLock); + CommitTransactionCommand(); + + /* Update the persisted state to flag COPY phase is done; make it visible to others. */ + StartTransactionCommand(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_COPYDONE, + MyLogicalRepWorker->relstate_lsn); + CommitTransactionCommand(); - /* Make the copy visible. */ - CommandCounterIncrement(); +copy_table_done: /* * We are done with the initial data synchronization, update the state. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9271f87..a60e9fd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -771,8 +771,7 @@ apply_handle_prepare_txn(LogicalRepPrepareData *prepare_data) Assert(prepare_data->prepare_lsn == remote_final_lsn); - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * BeginTransactionBlock is necessary to balance the @@ -1079,12 +1078,8 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* The synchronization worker runs in single transaction. */ - if (!am_tablesync_worker()) - { - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - } + /* Commit the per-stream transaction */ + CommitTransactionCommand(); in_streamed_transaction = false; @@ -1161,9 +1156,7 @@ apply_handle_stream_abort(StringInfo s) /* Cleanup the subxact info */ cleanup_subxact_info(); - /* The synchronization worker runs in single transaction */ - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); return; } @@ -1190,8 +1183,7 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); } } @@ -1350,8 +1342,7 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data) { - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index acc2926..e9f2b3f 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn * NULL) */ +#define SUBREL_STATE_COPYDONE 'C' /* tablesync copy phase is completed */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of * apply (sublsn set) */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 804e47b..82c09d1 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -27,3 +27,4 @@ extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); #endif /* SUBSCRIPTIONCMDS_H */ + diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 63bab69..366a737 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -211,6 +211,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern char *ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); -- 1.8.3.1