From aa3004a70e1ab2ee304367b29dde1549326354f1 Mon Sep 17 00:00:00 2001 From: Amit Khandekar Date: Mon, 1 Jul 2019 10:49:50 +0530 Subject: [PATCH] Logical decoding on standby - v11 Author : Andres Freund. Besides the above main changes, patch includes following : 1. Handle slot conflict recovery by dropping the conflicting slots. -Amit Khandekar. 2. test/recovery/t/016_logical_decoding_on_replica.pl added. Original author : Craig Ringer. few changes/additions from Amit Khandekar. 3. Handle slot conflicts when master wal_level becomes less than logical. Changes in v6 patch : While creating the slot, lastReplayedEndRecPtr is used to set the restart_lsn, but its position is later adjusted in DecodingContextFindStartpoint() in case it does not point to a valid record location. This can happen because replay pointer points to 1 + end of last record replayed, which means it can coincide with first byte of a new WAL block, i.e. inside block header. Also, modified the test to handle the requirement that the logical slot creation on standby requires a checkpoint (or any other transaction commit) to be given from master. For that, in src/test/perl/PostgresNode.pm, added a new function create_logical_slot_on_standby() which does the reqiured steps. Changes in v7 patch : Merge the two conflict messages for xmin and catalog_xmin into a single one. Changes in v8 : Fix incorrect flush ptr on standby (reported by Tushar Ahuja). In XLogSendLogical(), GetFlushRecPtr() was used to get the flushed point. On standby, GetFlushRecPtr() does not give a valid value, so it was wrongly determined that the sent record is beyond flush point, as a result of which, WalSndCaughtUp was set to true, causing WalSndLoop() to sleep for some duration after every record. This was reported by Tushar Ahuja, where pg_recvlogical seems like it is hanging when there are loads of insert. Fix: Use GetStandbyFlushRecPtr() if am_cascading_walsender Changes in v9 : While dropping a conflicting logical slot, if a backend has acquired it, send it a conflict recovery signal. Check new function ReplicationSlotDropConflicting(). Also, miscellaneous review comments addressed, but not all of them yet. Changes in v10 : Adjust restart_lsn if it's a Replay Pointer. This was earlier done in DecodingContextFindStartpoint() but now it is done in in ReplicationSlotReserveWal(), when restart_lsn is initialized. Changes in v11 : Added some test scenarios to test drop-slot conflicts. Organized the test file a bit. Also improved the conflict error message. --- src/backend/access/gist/gistxlog.c | 6 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/hash/hashinsert.c | 2 + src/backend/access/heap/heapam.c | 23 +- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/access/heap/visibilitymap.c | 2 +- src/backend/access/nbtree/nbtpage.c | 4 + src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgvacuum.c | 2 + src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 22 ++ src/backend/postmaster/pgstat.c | 4 + src/backend/replication/logical/decode.c | 14 +- src/backend/replication/logical/logical.c | 33 +- src/backend/replication/slot.c | 233 +++++++++++- src/backend/replication/walsender.c | 8 +- src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 7 +- src/backend/tcop/postgres.c | 23 +- src/backend/utils/adt/pgstatfuncs.c | 1 + src/backend/utils/cache/lsyscache.c | 16 + src/include/access/gistxlog.h | 3 +- src/include/access/hash_xlog.h | 1 + src/include/access/heapam_xlog.h | 8 +- src/include/access/nbtxlog.h | 2 + src/include/access/spgxlog.h | 1 + src/include/access/xlog.h | 1 + src/include/pgstat.h | 1 + src/include/replication/slot.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 +- src/include/utils/lsyscache.h | 1 + src/include/utils/rel.h | 1 + src/test/perl/PostgresNode.pm | 27 ++ .../recovery/t/018_logical_decoding_on_replica.pl | 420 +++++++++++++++++++++ 36 files changed, 830 insertions(+), 58 deletions(-) create mode 100644 src/test/recovery/t/018_logical_decoding_on_replica.pl diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 503db34..385ea1f 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -397,7 +398,7 @@ gistRedoPageReuse(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, - xlrec->node); + xlrec->onCatalogTable, xlrec->node); } } @@ -589,6 +590,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index d7b7098..00c3e0f 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index 5321762..e28465a 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -17,6 +17,7 @@ #include "access/hash.h" #include "access/hash_xlog.h" +#include "catalog/catalog.h" #include "miscadmin.h" #include "utils/rel.h" #include "storage/lwlock.h" @@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf) xl_hash_vacuum_one_page xlrec; XLogRecPtr recptr; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel); xlrec.latestRemovedXid = latestRemovedXid; xlrec.ntuples = ndeletable; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d768b9b..10b7857 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7149,12 +7149,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel, * see comments for vacuum_log_cleanup_info(). */ XLogRecPtr -log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid) +log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid) { xl_heap_cleanup_info xlrec; XLogRecPtr recptr; - xlrec.node = rnode; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); + xlrec.node = rel->rd_node; xlrec.latestRemovedXid = latestRemovedXid; XLogBeginInsert(); @@ -7190,6 +7191,7 @@ log_heap_clean(Relation reln, Buffer buffer, /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.latestRemovedXid = latestRemovedXid; xlrec.nredirected = nredirected; xlrec.ndead = ndead; @@ -7240,6 +7242,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, /* nor when there are no tuples to freeze */ Assert(ntuples > 0); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.cutoff_xid = cutoff_xid; xlrec.ntuples = ntuples; @@ -7270,7 +7273,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, * heap_buffer, if necessary. */ XLogRecPtr -log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, +log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 vmflags) { xl_heap_visible xlrec; @@ -7280,6 +7283,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, Assert(BufferIsValid(heap_buffer)); Assert(BufferIsValid(vm_buffer)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec.cutoff_xid = cutoff_xid; xlrec.flags = vmflags; XLogBeginInsert(); @@ -7700,7 +7704,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -7736,7 +7741,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -7832,7 +7838,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -7969,7 +7977,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index a3c4a1d..bf34d3a 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -473,7 +473,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) * No need to write the record at all unless it contains a valid value */ if (TransactionIdIsValid(vacrelstats->latestRemovedXid)) - (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid); + (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid); } /* diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index 64dfe06..c5fdd64 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, if (XLogRecPtrIsInvalid(recptr)) { Assert(!InRecovery); - recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf, + recptr = log_heap_visible(rel, heapBuf, vmBuf, cutoff_xid, flags); /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 0357030..6b641c9 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -31,6 +31,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "utils/lsyscache.h" #include "utils/snapmgr.h" static void _bt_cachemetadata(Relation rel, BTMetaPageData *input); @@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; @@ -1140,6 +1142,8 @@ _bt_delitems_delete(Relation rel, Buffer buf, XLogRecPtr recptr; xl_btree_delete xlrec_delete; + xlrec_delete.onCatalogTable = + RelationIsAccessibleInLogicalDecoding(heapRel); xlrec_delete.latestRemovedXid = latestRemovedXid; xlrec_delete.nitems = nitems; diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 6532a25..b874bda 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 2b1662a..eaaf631 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -27,6 +27,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/snapmgr.h" +#include "utils/lsyscache.h" /* Entry in pending-list of TIDs we need to revisit */ @@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) OffsetNumber itemnos[MaxIndexTuplesPerPage]; spgxlogVacuumRedirect xlrec; + xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid); xlrec.nToPlaceholder = 0; xlrec.newestRedirectXid = InvalidTransactionId; diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index ebe6ae8..800609c 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e08320e..7417bcf 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4926,6 +4926,15 @@ LocalProcessControlFile(bool reset) } /* + * Get the wal_level from the control file. + */ +WalLevel +GetActiveWalLevel(void) +{ + return ControlFile->wal_level; +} + +/* * Initialization of shared memory for XLOG */ Size @@ -9843,6 +9852,19 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Drop logical slots if we are in hot standby and master does not have + * logical data. Don't bother to search for the slots if standby is + * running with wal_level lower than logical, because in that case, + * we would have either disallowed creation of logical slots or dropped + * existing ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId, + gettext_noop("Logical decoding on standby requires wal_level >= logical on master.")); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b4f2b28..797ea0c 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4728,6 +4728,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -6352,6 +6353,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 151c3ef..c1bd028 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -190,11 +190,23 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* Cannot proceed if master itself does not have logical data */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index bbd38c0..4169828 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -94,23 +94,22 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + } } /* @@ -241,6 +240,8 @@ CreateInitDecodingContext(char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 55c306e..47c7dd8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/lock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -101,6 +102,7 @@ int max_replication_slots = 0; /* the maximum number of replication static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static void ReplicationSlotDropConflicting(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -638,6 +640,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) } /* + * Permanently drop a conflicting replication slot. If it's already active by + * another backend, send it a recovery conflict signal, and then try again. + */ +static void +ReplicationSlotDropConflicting(ReplicationSlot *slot) +{ + pid_t active_pid; + PGPROC *proc; + VirtualTransactionId vxid; + + ConditionVariablePrepareToSleep(&slot->active_cv); + while (1) + { + SpinLockAcquire(&slot->mutex); + active_pid = slot->active_pid; + if (active_pid == 0) + active_pid = slot->active_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + /* Drop the acquired slot, unless it is acquired by another backend */ + if (active_pid == MyProcPid) + { + elog(DEBUG1, "acquired conflicting slot, now dropping it"); + ReplicationSlotDropPtr(slot); + break; + } + + /* Send the other backend, a conflict recovery signal */ + + SetInvalidVirtualTransactionId(vxid); + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = BackendPidGetProcWithLock(active_pid); + if (proc) + GET_VXID_FROM_PGPROC(vxid, *proc); + LWLockRelease(ProcArrayLock); + + /* + * If coincidently that process finished, some other backend may + * acquire the slot again. So start over again. + * Note: Even if vxid.localTransactionId is invalid, we need to cancel + * that backend, because there is no other way to make it release the + * slot. So don't bother to validate vxid.localTransactionId. + */ + if (vxid.backendId == InvalidBackendId) + continue; + + elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot", + active_pid, vxid.backendId); + + CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + ConditionVariableSleep(&slot->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + + ConditionVariableCancelSleep(); +} + +/* * Serialize the currently acquired slot's state from memory to disk, thereby * guaranteeing the current state will survive a crash. */ @@ -1016,37 +1076,56 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be built + * using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base backup + * has to start replay at. */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + { + restart_lsn = GetXLogReplayRecPtr(NULL); + /* + * Replay pointer may point one past the end of the record. If that + * is a XLOG page boundary, it will not be a valid LSN for the + * start of a record, so bump it up past the page header. + */ + if (!XRecOffIsValid(restart_lsn)) + { + if (restart_lsn % XLOG_BLCKSZ != 0) + elog(ERROR, "invalid replay pointer"); + /* For the first page of a segment file, it's a long header */ + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0) + restart_lsn += SizeOfXLogLongPHD; + else + restart_lsn += SizeOfXLogShortPHD; + } + } + else + restart_lsn = GetXLogInsertRecPtr(); + + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + if (!RecoveryInProgress() && SlotIsLogical(slot)) { XLogRecPtr flushptr; - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogFlush(flushptr); } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1065,6 +1144,122 @@ ReplicationSlotReserveWal(void) } /* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that rows older than xid might have been + * removed. Therefore we need to drop slots that depend on seeing those rows. + * When xid is invalid, drop all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be dropped. Also, when xid is invalid, a common 'conflict_reason' is + * provided for the error detail; otherwise it is NULL, in which case it is + * constructed out of the xid value. + */ +void +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, + char *conflict_reason) +{ + int i; + bool found_conflict = false; + + if (max_replication_slots <= 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* Invalid xid means caller is asking to drop all logical slots */ + if (!TransactionIdIsValid(xid)) + found_conflict = true; + else + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + StringInfoData conflict_str, conflict_xmins; + char *conflict_sentence = + gettext_noop("Slot conflicted with xid horizon which was being increased to"); + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + /* + * Build the conflict_str which will look like : + * "Slot conflicted with xid horizon which was being increased + * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)." + */ + initStringInfo(&conflict_xmins); + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin); + } + if (TransactionIdIsValid(slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d", + conflict_xmins.len > 0 ? ", " : "", + slot_catalog_xmin); + + if (conflict_xmins.len > 0) + { + initStringInfo(&conflict_str); + appendStringInfo(&conflict_str, "%s %d (%s).", + conflict_sentence, xid, conflict_xmins.data); + found_conflict = true; + conflict_reason = conflict_str.data; + } + } + + if (found_conflict) + { + NameData slotname; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + SpinLockRelease(&s->mutex); + + /* ReplicationSlotDropPtr() would acquire the lock below */ + LWLockRelease(ReplicationSlotControlLock); + + ReplicationSlotDropConflicting(s); + + ereport(LOG, + (errmsg("dropped conflicting slot %s", NameStr(slotname)), + errdetail("%s", conflict_reason))); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + + +/* * Flush all replication slots to disk. * * This needn't actually be part of a checkpoint, but it's a convenient diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 92fa86f..4ce7096 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2814,6 +2814,7 @@ XLogSendLogical(void) { XLogRecord *record; char *errm; + XLogRecPtr flushPtr; /* * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to @@ -2830,10 +2831,11 @@ XLogSendLogical(void) if (errm != NULL) elog(ERROR, "%s", errm); + flushPtr = (am_cascading_walsender ? + GetStandbyFlushRecPtr() : GetFlushRecPtr()); + if (record != NULL) { - /* XXX: Note that logical decoding cannot be used while in recovery */ - XLogRecPtr flushPtr = GetFlushRecPtr(); /* * Note the lack of any call to LagTrackerWrite() which is handled by @@ -2857,7 +2859,7 @@ XLogSendLogical(void) * If the record we just wanted read is at or beyond the flushed * point, then we're caught up. */ - if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) + if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) { WalSndCaughtUp = true; diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 18a0f62..ec696f4 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2669,6 +2669,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 7605b2c..645f320 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -286,6 +286,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 25b7e31..7cfb6d5 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; @@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode ResolveRecoveryConflictWithVirtualXIDs(backends, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + + if (onCatalogTable) + ResolveRecoveryConflictWithLogicalSlots(node.dbNode, latestRemovedXid, NULL); } void diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 44a59e1..c23d361 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2393,6 +2393,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -2879,6 +2882,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be dropped, the + * requirement is for the signal receiver to release the slot, + * so that it could be dropped by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. @@ -2920,7 +2942,6 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional fall through to session cancel */ /* FALLTHROUGH */ - case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 05240bf..7dfbef7 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1499,6 +1499,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index c13c08a..bd35bc1 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -18,7 +18,9 @@ #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/table.h" #include "bootstrap/bootstrap.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_amop.h" @@ -1893,6 +1895,20 @@ get_rel_persistence(Oid relid) return result; } +bool +get_rel_logical_catalog(Oid relid) +{ + bool res; + Relation rel; + + /* assume previously locked */ + rel = heap_open(relid, NoLock); + res = RelationIsAccessibleInLogicalDecoding(rel); + heap_close(rel, NoLock); + + return res; +} + /* ---------- TRANSFORM CACHE ---------- */ diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h index 969a537..59246c3 100644 --- a/src/include/access/gistxlog.h +++ b/src/include/access/gistxlog.h @@ -48,9 +48,9 @@ typedef struct gistxlogPageUpdate */ typedef struct gistxlogDelete { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 ntodelete; /* number of deleted offsets */ - /* * In payload of blk 0 : todelete OffsetNumbers */ @@ -96,6 +96,7 @@ typedef struct gistxlogPageDelete */ typedef struct gistxlogPageReuse { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h index 53b682c..fd70b55 100644 --- a/src/include/access/hash_xlog.h +++ b/src/include/access/hash_xlog.h @@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page */ typedef struct xl_hash_vacuum_one_page { + bool onCatalogTable; TransactionId latestRemovedXid; int ntuples; diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index f6cdca8..a1d1f11 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -237,6 +237,7 @@ typedef struct xl_heap_update */ typedef struct xl_heap_clean { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 nredirected; uint16 ndead; @@ -252,6 +253,7 @@ typedef struct xl_heap_clean */ typedef struct xl_heap_cleanup_info { + bool onCatalogTable; RelFileNode node; TransactionId latestRemovedXid; } xl_heap_cleanup_info; @@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple */ typedef struct xl_heap_freeze_page { + bool onCatalogTable; TransactionId cutoff_xid; uint16 ntuples; } xl_heap_freeze_page; @@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page */ typedef struct xl_heap_visible { + bool onCatalogTable; TransactionId cutoff_xid; uint8 flags; } xl_heap_visible; @@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record); extern const char *heap2_identify(uint8 info); extern void heap_xlog_logical_rewrite(XLogReaderState *r); -extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, +extern XLogRecPtr log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid); extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, @@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple, bool *totally_frozen); extern void heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *xlrec_tp); -extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer, +extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags); #endif /* HEAPAM_XLOG_H */ diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h index 9beccc8..f64a33c 100644 --- a/src/include/access/nbtxlog.h +++ b/src/include/access/nbtxlog.h @@ -126,6 +126,7 @@ typedef struct xl_btree_split */ typedef struct xl_btree_delete { + bool onCatalogTable; TransactionId latestRemovedXid; int nitems; @@ -139,6 +140,7 @@ typedef struct xl_btree_delete */ typedef struct xl_btree_reuse_page { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h index 073f740..d3dad69 100644 --- a/src/include/access/spgxlog.h +++ b/src/include/access/spgxlog.h @@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot typedef struct spgxlogVacuumRedirect { + bool onCatalogTable; uint16 nToPlaceholder; /* number of redirects to make placeholders */ OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */ TransactionId newestRedirectXid; /* newest XID of removed redirects */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 237f4e0..e7439c1 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -299,6 +299,7 @@ extern Size XLOGShmemSize(void); extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevel(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0a3ad3a..4fe8684 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -604,6 +604,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8fbddea..73b954e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 05b186a..956d3c2 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -39,6 +39,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index a3f8f82..6dedebc 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index c8df5bf..579d9ff 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); +extern bool get_rel_logical_catalog(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes); extern bool get_typisdefined(Oid typid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index d7f33ab..8c90fd7 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -16,6 +16,7 @@ #include "access/tupdesc.h" #include "access/xlog.h" +#include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 6019f37..719837d 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -2000,6 +2000,33 @@ sub pg_recvlogical_upto =pod +=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname) + +Create logical replication slot on given standby + +=cut + +sub create_logical_slot_on_standby +{ + my ($self, $master, $slot_name, $dbname) = @_; + my ($stdout, $stderr); + + my $handle; + + $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); + sleep(1); + + # Slot creation on standby waits for an xl_running_xacts record. So arrange + # for it. + $master->safe_psql('postgres', 'CHECKPOINT'); + + $handle->finish(); + + return 0; +} + +=pod + =back =cut diff --git a/src/test/recovery/t/018_logical_decoding_on_replica.pl b/src/test/recovery/t/018_logical_decoding_on_replica.pl new file mode 100644 index 0000000..fd77e19 --- /dev/null +++ b/src/test/recovery/t/018_logical_decoding_on_replica.pl @@ -0,0 +1,420 @@ +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 58; +use RecursiveCopy; +use File::Copy; +use Time::HiRes qw(usleep); + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +my $node_master = get_new_node('master'); +my $node_replica = get_new_node('replica'); + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; + + my $slotinfo = $node->slot($slotname); + return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'}); +} + +sub print_phys_xmin +{ + my $slot = $node_master->slot('master_physical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +sub create_logical_slots +{ + is($node_replica->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb'), + 0, 'created dropslot on testdb') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + is($node_replica->slot('dropslot')->{'slot_type'}, 'logical', 'dropslot on standby created'); + is($node_replica->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb'), + 0, 'created activeslot on testdb') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + is($node_replica->slot('activeslot')->{'slot_type'}, 'logical', 'activeslot on standby created'); + + return 0; +} + +sub make_slot_active +{ + # make sure activeslot is in use + print "starting pg_recvlogical"; + $handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + + while (!$node_replica->slot('activeslot')->{'active_pid'}) + { + usleep(100_000); + print "waiting for slot to become active\n"; + } + return 0; +} + +sub check_slots_dropped +{ + is($node_replica->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped'); + is($node_replica->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); + + # our client should've terminated in response to the walsender error + eval { + $handle->finish; + }; + $return = $?; + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero "); + if ($return) { + like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/must be dropped/, 'recvlogical error detail'); + } + + return 0; +} + +# Initialize master node +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('master_physical');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=master_physical'); + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# After slot creation, xmins must be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +# Initialize slave node +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->append_conf('postgresql.conf', + q[primary_slot_name = 'master_physical']); + +$node_replica->start; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# with hot_standby_feedback off, xmin and catalog_xmin must still be null +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "xmin null after replica join"); +is($catalog_xmin, '', "catalog_xmin null after replica join"); + +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical', + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +# Create new slots on the replica, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +is($node_replica->create_logical_slot_on_standby($node_master, 'standby_logical', 'testdb'), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); +$node_master->safe_psql('testdb', 'DROP TABLE test_table'); +$node_master->safe_psql('testdb', 'VACUUM'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# Should show the inserts even when the table is dropped on master +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or BAIL_OUT('cannot continue if slot replay fails'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, + "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', + qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# logical slot catalog_xmin on slave should advance after +# pg_logical_slot_get_changes +($new_logical_xmin, $new_logical_catalog_xmin) = + wait_for_xmins($node_replica, 'standby_logical', + "catalog_xmin::varchar::int > ${logical_catalog_xmin}"); +is($new_logical_xmin, '', "logical xmin null"); + +# hot standby feedback should advance master's phys catalog_xmin now that the +# standby's slot doesn't hold it down as far. +my ($new_physical_xmin, $new_physical_catalog_xmin) = + wait_for_xmins($node_master, 'master_physical', + "catalog_xmin::varchar::int > ${physical_catalog_xmin}"); +isnt($new_physical_xmin, '', "physical xmin not null"); +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, + 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream oldestXid retention +######################################################### + +sub test_oldest_xid_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + print "upstream oldestXid $oldestXid, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid); +} + +my ($oldestXid) = test_oldest_xid_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, + 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. catalog_xmin should become NULL because we dropped +# the logical slot. +($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical', + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 1 : hot_standby_feedback off +################################################## + +create_logical_slots(); + +# One way to reproduce recovery conflict is to run VACUUM FULL with +# hot_standby_feedback turned off on slave. +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = off +]); +$node_replica->restart; +# ensure walreceiver feedback off by waiting for expected xmin and +# catalog_xmin on master. Both should be NULL since hs_feedback is off +($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical', + "xmin IS NULL AND catalog_xmin IS NULL"); + +make_slot_active(); + +# This should trigger the conflict +$node_master->safe_psql('testdb', 'VACUUM FULL'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +check_slots_dropped(); + +# Turn hot_standby_feedback back on +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical', + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 2 : incorrect wal_level at master +################################################## + +create_logical_slots(); + +make_slot_active(); + +# Make master wal_level replica. This will trigger slot conflict. +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'replica' +]); +$node_master->restart; + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +check_slots_dropped(); + +# Restore master wal_level +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'logical' +]); +$node_master->restart; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +################################################## +# Recovery: drop database drops slots, including active slots. +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB. +create_logical_slots(); + +make_slot_active(); + +# Create a slot on a database that would not be dropped. This slot should not +# get dropped. +is($node_replica->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres'), + 0, 'created otherslot on postgres') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', + q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +check_slots_dropped(); + +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', + 'otherslot on standby not dropped'); + +# Cleanup : manually drop the slot that was not dropped. +$node_replica->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); -- 2.1.4