From 686f10d930bcbc06e0c8297068e455bd678b5234 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 26 Jul 2023 14:58:19 +0200 Subject: [PATCH 5/8] use page LSN for sequences --- src/backend/commands/sequence.c | 92 ++++++++++++++++++--- src/backend/replication/logical/tablesync.c | 78 +++++++---------- src/include/catalog/pg_proc.dat | 8 ++ 3 files changed, 118 insertions(+), 60 deletions(-) diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index cd04c0ad05..f5b7ef7171 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -44,6 +44,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" #include "utils/resowner.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -101,7 +102,8 @@ static Relation lock_and_open_sequence(SeqTable seq); static void create_seq_hashtable(void); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); static Form_pg_sequence_data read_seq_tuple(Relation rel, - Buffer *buf, HeapTuple seqdatatuple); + Buffer *buf, HeapTuple seqdatatuple, + XLogRecPtr *lsn); static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, @@ -293,7 +295,7 @@ ResetSequence(Oid seq_relid) * indeed a sequence. */ init_sequence(seq_relid, &elm, &seq_rel); - (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple); + (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple, NULL); pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seq_relid)); if (!HeapTupleIsValid(pgstuple)) @@ -366,7 +368,7 @@ SetSequence_non_transactional(Oid seqrelid, int64 value) init_sequence(seqrelid, &elm, &seqrel); /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) @@ -439,7 +441,7 @@ SetSequence_transactional(Oid seq_relid, int64 value) init_sequence(seq_relid, &elm, &seqrel); /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); /* Copy the existing sequence tuple. */ tuple = heap_copytuple(&seqdatatuple); @@ -670,7 +672,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) seqform = (Form_pg_sequence) GETSTRUCT(seqtuple); /* lock page's buffer and read tuple into new sequence structure */ - (void) read_seq_tuple(seqrel, &buf, &datatuple); + (void) read_seq_tuple(seqrel, &buf, &datatuple, NULL); /* copy the existing sequence data tuple, so it can be modified locally */ newdatatuple = heap_copytuple(&datatuple); @@ -755,7 +757,7 @@ SequenceChangePersistence(Oid relid, char newrelpersistence) GetCurrentTransactionId(); } - (void) read_seq_tuple(seqrel, &buf, &seqdatatuple); + (void) read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); RelationSetNewRelfilenumber(seqrel, newrelpersistence); fill_seq_with_data(seqrel, &seqdatatuple); UnlockReleaseBuffer(buf); @@ -884,7 +886,7 @@ nextval_internal(Oid relid, bool check_permissions) ReleaseSysCache(pgstuple); /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); page = BufferGetPage(buf); elm->increment = incby; @@ -1207,7 +1209,7 @@ do_setval(Oid relid, int64 next, bool iscalled) PreventCommandIfParallelMode("setval()"); /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL); if ((next < minv) || (next > maxv)) ereport(ERROR, @@ -1427,11 +1429,13 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel) * *buf receives the reference to the pinned-and-ex-locked buffer * *seqdatatuple receives the reference to the sequence tuple proper * (this arg should point to a local variable of type HeapTupleData) + * *lsn receives LSN of the last sequence change (page LSN), optional * * Function's return value points to the data payload of the tuple */ static Form_pg_sequence_data -read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) +read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple, + XLogRecPtr *lsn) { Page page; ItemId lp; @@ -1448,6 +1452,13 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) elog(ERROR, "bad magic number in sequence \"%s\": %08X", RelationGetRelationName(rel), sm->magic); + /* + * If the caller requested it, set the page LSN. This allows deciding which + * sequence changes are before/after the returned sequence state. + */ + if (lsn) + *lsn = PageGetLSN(page); + lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); @@ -2043,7 +2054,7 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) errmsg("permission denied for sequence %s", RelationGetRelationName(seqrel)))); - seq = read_seq_tuple(seqrel, &buf, &seqtuple); + seq = read_seq_tuple(seqrel, &buf, &seqtuple, NULL); is_called = seq->is_called; result = seq->last_value; @@ -2057,6 +2068,67 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } +/* + * Return the current on-disk state of the sequence. + * + * Note: This is roughly equivalent to selecting the data from the sequence, + * except that it also returns the page LSN. + */ +Datum +pg_sequence_state(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqtuple; + Form_pg_sequence_data seq; + Datum result; + + int64 last_value; + int64 log_cnt; + bool is_called; + XLogRecPtr lsn; + + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[4]; + bool nulls[4]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* open and lock sequence */ + init_sequence(relid, &elm, &seqrel); + + if (pg_class_aclcheck(elm->relid, GetUserId(), + ACL_SELECT | ACL_USAGE) != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for sequence %s", + RelationGetRelationName(seqrel)))); + + seq = read_seq_tuple(seqrel, &buf, &seqtuple, &lsn); + + is_called = seq->is_called; + last_value = seq->last_value; + log_cnt = seq->log_cnt; + + UnlockReleaseBuffer(buf); + relation_close(seqrel, NoLock); + + values[0] = LSNGetDatum(lsn); + values[1] = Int64GetDatum(last_value); + values[2] = Int64GetDatum(log_cnt); + values[3] = BoolGetDatum(is_called); + + memset(nulls, 0, sizeof(nulls)); + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} void seq_redo(XLogReaderState *record) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 694ce70b7d..44e64ed1de 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1211,22 +1211,23 @@ copy_table(Relation rel) } /* - * Fetch sequence data (current state) from the remote node. + * Fetch sequence data (current state) from the remote node, including the + * page LSN. */ static int64 -fetch_sequence_data(char *nspname, char *relname) +fetch_sequence_data(Oid remoteid, XLogRecPtr *lsn) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[1] = {INT8OID}; + Oid tableRow[2] = {INT8OID, LSNOID}; int64 value = (Datum) 0; initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT (last_value + log_cnt)\n" - " FROM %s", quote_qualified_identifier(nspname, relname)); + appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn " + "FROM pg_sequence_state(%d)", remoteid); - res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 2, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1242,45 +1243,8 @@ fetch_sequence_data(char *nspname, char *relname) value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - } - - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); - - return value; -} - -/* - * Fetch remote insert LSN from the remote node. - */ -static XLogRecPtr -fetch_remote_lsn(void) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[1] = {LSNOID}; - XLogRecPtr value = InvalidXLogRecPtr; - - initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()"); - - res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow); - pfree(cmd.data); - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive current LSN from the publisher: %s", - res->err))); - - /* Process the sequence. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - bool isnull; - - value = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + *lsn = DatumGetInt64(slot_getattr(slot, 2, &isnull)); Assert(!isnull); } @@ -1304,6 +1268,7 @@ copy_sequence(Relation rel) List *qual = NIL; StringInfoData cmd; int64 sequence_value; + XLogRecPtr lsn = InvalidXLogRecPtr; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), @@ -1334,15 +1299,15 @@ copy_sequence(Relation rel) * Otherwise we might get duplicate values (on subscriber) if we failed * over right after the sync. */ - sequence_value = fetch_sequence_data(lrel.nspname, lrel.relname); + sequence_value = fetch_sequence_data(lrel.remoteid, &lsn); /* tablesync sets the sequences in non-transactional way */ SetSequence(RelationGetRelid(rel), false, sequence_value); logicalrep_rel_close(relmapentry, NoLock); - /* also fetch current remote LSN (after the data was selected) */ - return fetch_remote_lsn(); + /* return the LSN when the sequence state was set */ + return lsn; } /* @@ -1393,7 +1358,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UserContext ucxt; bool must_use_password; bool run_as_owner; - XLogRecPtr remote_lsn = InvalidXLogRecPtr; + XLogRecPtr sequence_lsn = InvalidXLogRecPtr; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -1612,8 +1577,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { /* Now do the initial sequence copy */ PushActiveSnapshot(GetTransactionSnapshot()); - remote_lsn = copy_sequence(rel); + sequence_lsn = copy_sequence(rel); PopActiveSnapshot(); + + /* + * Sequences are not consistent (in the MVCC sense) with respect to the + * replication slot, so the copy might have read a more recent state + * than origin_startpos. The sequence_lsn comes from page LSN (which is + * LSN of the last sequence change), so that's the right position where + * to start with the catchup apply. + * + * It might be before the slot, though (if the sequence was not used + * since between the slot creation and copy), so make sure the position + * does not move backwards. + */ + *origin_startpos = Max(*origin_startpos, sequence_lsn); } else { @@ -1661,7 +1639,7 @@ copy_table_done: */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; - MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn); + MyLogicalRepWorker->relstate_lsn = *origin_startpos; SpinLockRelease(&MyLogicalRepWorker->relmutex); /* diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 00124946c0..dc150dfd1c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3326,6 +3326,14 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, +{ oid => '4549', + descr => 'current on-disk sequence state', + proname => 'pg_sequence_state', provolatile => 'v', + prorettype => 'record', proargtypes => 'regclass', + proallargtypes => '{oid,pg_lsn,int8,int8,bool}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{seq_oid,page_lsn,last_value,log_cnt,is_called}', + prosrc => 'pg_sequence_state' }, { oid => '275', descr => 'return the next oid for a system table', proname => 'pg_nextoid', provolatile => 'v', proparallel => 'u', -- 2.41.0