From 1e1f21ac0f86c1efc64acec36f7ee249e5c576af Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 23 Mar 2023 22:19:33 +0100 Subject: [PATCH 4/5] add interlock with ALTER SEQUENCE --- src/backend/commands/sequence.c | 19 +++++++++++ src/backend/replication/logical/tablesync.c | 36 +++++++++++++++++++++ src/include/catalog/pg_proc.dat | 4 +++ 3 files changed, 59 insertions(+) diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 3311ad034f7..3b36c346e03 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -2029,6 +2029,25 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } +Datum +pg_sequence_lock_for_sync(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation seqrel; + + seqrel = relation_open(relid, RowExclusiveLock); + + if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", + RelationGetRelationName(seqrel)))); + + /* close the relation, but keep the lock */ + relation_close(seqrel, NoLock); + + PG_RETURN_VOID(); +} void seq_redo(XLogReaderState *record) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 78a280dda01..df220ba6629 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1506,6 +1506,42 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); + /* + * If we're syncing a sequence, lock it on the source to prevent concurrent + * ALTER SEQUENCE changes that might be written to WAL before the slot gets + * created (so not replicated), but invisible to the copy. + * + * XXX Has to happen after creating the slot, because it also installs a + * snapshot and so there must not be any queries before it. + * + * XXX Does this need a version check? Probably not, because for older + * versions we don't replicate sequences. + */ + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + StringInfoData cmd; + Oid lockRow[] = {VOIDOID}; + + initStringInfo(&cmd); + + /* + * XXX maybe this should do fetch_remote_table_info and use the relation + * and namespace names from the result? + */ + appendStringInfo(&cmd, "SELECT pg_catalog.pg_sequence_lock_for_sync('%s')", + quote_qualified_identifier(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel))); + elog(LOG, "locking: %s", cmd.data); + res = walrcv_exec(LogRepWorkerWalRcvConn, + cmd.data, 1, lockRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequence copy failed to lock on publisher: %s", + res->err))); + walrcv_clear_result(res); + } + /* * Setup replication origin tracking. The purpose of doing this before the * copy is to avoid doing the copy again due to any error in setting up diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b3843467205..dbc9804040c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11982,4 +11982,8 @@ proname => 'any_value_transfn', prorettype => 'anyelement', proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' }, +{ oid => '8003', descr => 'lock sequence for logical replication sync', + proname => 'pg_sequence_lock_for_sync', prorettype => 'void', + proargtypes => 'regclass', prosrc => 'pg_sequence_lock_for_sync' }, + ] -- 2.39.2