From d3153f6469800f0e1eaa115e674eff1efeba3fc0 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Mon, 25 Jan 2021 13:07:40 +1100 Subject: [PATCH v19] Tablesync Solution1. ==== Features: * The tablesync worker is now allowing multiple tx instead of single tx. * A new state (SUBREL_STATE_FINISHEDCOPY) is persisted after a successful copy_table in tablesync's LogicalRepSyncTableStart. * If a re-launched tablesync finds state SUBREL_STATE_FINISHEDCOPY then it will bypass the initial copy_table phase. * Now tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created. * The tablesync replication origin tracking record is cleaned up by: - process_syncing_tables_for_apply - DropSubscription - AlterSubscription_refresh * Updates to PG docs. * New TAP test case. Known Issues: * None. --- doc/src/sgml/catalogs.sgml | 1 + src/backend/commands/subscriptioncmds.c | 33 +++++++ src/backend/replication/logical/tablesync.c | 139 ++++++++++++++++++++++++++-- src/backend/replication/logical/worker.c | 18 +--- src/include/catalog/pg_subscription_rel.h | 2 + src/test/subscription/t/004_sync.pl | 96 ++++++++++++++++++- 6 files changed, 265 insertions(+), 24 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 43d7a1a..82e74e1 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7662,6 +7662,7 @@ SCRAM-SHA-256$<iteration count>:&l State code: i = initialize, d = data is being copied, + f = finished table copy, s = synchronized, r = ready (normal replication) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 082f785..af13448 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -649,10 +649,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp)) { + char originname[NAMEDATALEN]; + RepOriginId originid; + RemoveSubscriptionRel(sub->oid, relid); logicalrep_worker_stop_at_commit(sub->oid, relid); + /* Remove the tablesync's origin tracking if exists. */ + snprintf(originname, sizeof(originname), "pg_%u_%u", sub->oid, relid); + originid = replorigin_by_name(originname, true); + if (originid != InvalidRepOriginId) + replorigin_drop(originid, false /* nowait */ ); + ereport(DEBUG1, (errmsg("table \"%s.%s\" removed from subscription \"%s\"", get_namespace_name(get_rel_namespace(relid)), @@ -930,6 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) WalReceiverConn *wrconn = NULL; StringInfoData cmd; Form_pg_subscription form; + List *rstates; /* * Lock pg_subscription with AccessExclusiveLock to ensure that the @@ -1042,6 +1052,29 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Tablesync resource cleanup (origins). + * + * Any READY-state relations have already done this. + */ + rstates = GetSubscriptionNotReadyRelations(subid); + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup the tablesync worker resources */ + if (!OidIsValid(relid)) + continue; + + /* Remove the tablesync's origin tracking if exists. */ + snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid); + originid = replorigin_by_name(originname, true); + if (originid != InvalidRepOriginId) + replorigin_drop(originid, false /* nowait */ ); + } + list_free(rstates); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 863d196..ae446f5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -31,8 +31,10 @@ * table state to INIT. * - Tablesync worker starts; changes table state from INIT to DATASYNC while * copying. - * - Tablesync worker finishes the copy and sets table state to SYNCWAIT; - * waits for state change. + * - Tablesync worker does initial table copy; there is a FINISHEDCOPY state to + * indicate when the copy phase has completed, so if the worker crashes + * with this (non-memory) state then the copy will not be re-attempted. + * - Tablesync worker then sets table state to SYNCWAIT; waits for state change. * - Apply worker periodically checks for tables in SYNCWAIT state. When * any appear, it sets the table state to CATCHUP and starts loop-waiting * until either the table state is set to SYNCDONE or the sync worker @@ -48,8 +50,8 @@ * point it sets state to READY and stops tracking. Again, there might * be zero changes in between. * - * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> - * CATCHUP -> SYNCDONE -> READY. + * So the state progression is always: INIT -> DATASYNC -> + * (sync worker FINISHEDCOPY) -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about * subscribed tables and their state. Some transient state during data @@ -59,6 +61,7 @@ * Example flows look like this: * - Apply is in front: * sync:8 + * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:10 * -> set in memory CATCHUP @@ -74,6 +77,7 @@ * * - Sync is in front: * sync:10 + * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:8 * -> set in memory CATCHUP @@ -102,7 +106,10 @@ #include "replication/logicalrelation.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "replication/slot.h" +#include "replication/origin.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -270,8 +277,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - Assert(IsTransactionState()); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && @@ -279,11 +284,23 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; + /* + * Change state to SYNCDONE. + */ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * UpdateSubscriptionRelState must be called within a transaction. + * That transaction will be ended within the finish_sync_worker(). + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + } + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, @@ -404,6 +421,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { + char originname[NAMEDATALEN]; + RepOriginId originid; + rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -412,6 +432,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } + /* + * Remove the tablesync origin tracking if exists. + * + * The normal case origin drop must be done here, not in the + * process_syncing_tables_for_sync function, because if the + * tablesync worker process attempted to drop its own origin + * then it would fail (origin is "busy"). + */ + snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid); + originid = replorigin_by_name(originname, true); + if (OidIsValid(originid)) + { + elog(DEBUG1, + "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", + originname); + replorigin_drop(originid, false /* nowait */ ); + } + + /* + * Update the state to READY only after the origin cleanup. + */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); @@ -824,6 +865,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) XLogRecPtr relstate_lsn; Relation rel; WalRcvExecResult *res; + char originname[NAMEDATALEN]; + RepOriginId originid; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -874,7 +917,38 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) (errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || - MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC); + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || + MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); + + /* Assign the origin tracking record name. */ + snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) + { + /* + * The COPY phase was previously done, but tablesync then crashed + * before it was able to finish normally. + */ + StartTransactionCommand(); + + /* + * Slot creation passes NULL lsn because the origin startpos is got + * from origin tracking this time, not from the slot. + */ + walrcv_create_slot(wrconn, slotname, true /* temporary */ , + CRS_NOEXPORT_SNAPSHOT, NULL /* lsn */ ); + + /* + * The origin tracking name must already exist. It was created first + * time this tablesync was launched. + */ + originid = replorigin_by_name(originname, false /* missing_ok */ ); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + + goto copy_table_done; + } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -890,9 +964,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); pgstat_report_stat(false); - /* - * We want to do the table data sync in a single transaction. - */ StartTransactionCommand(); /* @@ -942,6 +1013,54 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Make the copy visible. */ CommandCounterIncrement(); + /* Setup replication origin tracking. */ + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist, so create it now. + * + * Then advance to the LSN got from walrcv_create_slot. This is WAL + * logged for the purpose of recovery. Locks are to prevent the + * replication origin from vanishing while advancing. + */ + originid = replorigin_create(originname); + + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + replorigin_session_setup(originid); + replorigin_session_origin = originid; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin \"%s\" already exists", + originname))); + } + + /* + * Update the persisted state to indicate the COPY phase is done; make it + * visible to others. + */ + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relstate_lsn); + +copy_table_done: + + elog(DEBUG1, + "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", + originname, + (uint32) (*origin_startpos >> 32), + (uint32) *origin_startpos); + + CommitTransactionCommand(); + /* * We are done with the initial data synchronization, update the state. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eb7db89..cfc924c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* The synchronization worker runs in single transaction. */ - if (!am_tablesync_worker()) - { - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - } + /* Commit the per-stream transaction */ + CommitTransactionCommand(); in_streamed_transaction = false; @@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s) /* Cleanup the subxact info */ cleanup_subxact_info(); - /* The synchronization worker runs in single transaction */ - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); return; } @@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); } } @@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) { - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 06663b9..9027c42 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn * NULL) */ +#define SUBREL_STATE_FINISHEDCOPY 'f' /* tablesync copy phase is completed + * (sublsn NULL) */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of * apply (sublsn set) */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index e111ab9..52915d9 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -3,7 +3,9 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 10; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -149,7 +151,99 @@ $result = $node_subscriber->safe_psql('postgres', is($result, qq(20), 'changes for table added after subscription initialized replicated'); +## +## slot integrity +## +## Manually create a slot with the same name that tablesync will want. +## Expect tablesync ERROR when clash is detected. +## Then remove the slot so tablesync can proceed. +## Expect tablesync can now finish normally. +## + +# drop the subscription +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +# empty the table tab_rep +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); + +# empty the table tab_rep_next +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep_next;"); + +# recreate the subscription again, but leave it disabled so that we can get the OID +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub + with (enabled = false)" +); + +# need to create the name of the tablesync slot, for this we need the subscription OID +# and the table OID. +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'tap_sub';"); +is(looks_like_number($subid), qq(1), 'get the subscription OID'); + +my $relid = $node_subscriber->safe_psql('postgres', + "SELECT 'tab_rep_next'::regclass::oid"); +is(looks_like_number($relid), qq(1), 'get the table OID'); + +# name of the tablesync slot is 'slotname'_'suboid'_sync_'tableoid'. +my $slotname = 'tap_sub_' . $subid . '_' . 'sync_' . $relid; + +# temporarily, create a slot having the same name of the tablesync slot. +$node_publisher->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('$slotname', 'pgoutput', false);"); + +# enable the subscription +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ENABLE" +); + +# check for occurrence of the expected error +poll_output_until("replication slot \"$slotname\" already exists") + or die "no error stop for the pre-existing origin"; + +# now drop the offending slot, the tablesync should recover. +$node_publisher->safe_psql('postgres', + "SELECT pg_drop_replication_slot('$slotname');"); + +# wait for sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(20), + 'data for table added after subscription initialized are now synced'); + +# Cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 10 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_subscriber->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} -- 1.8.3.1