From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 1 Jun 2018 14:30:55 -0400 Subject: [PATCH] Fix and document lock handling for in-memory replication slot data While debugging issues on HEAD for the new slot forwarding feature of Postgres 11, some monitoring of the code surrounding in-memory slot data has proved that the lock handling may cause inconsistent data to be read by read-only callers of slot functions, particularly pg_get_replication_slots() which fetches data for the system view pg_replication_slots. The code paths involved in those problems concern logical decoding initialization (down to 9.4) and WAL reservation for slots (new as of 10). A set of comments documenting all the lock handlings, particularly the dependency with LW locks for slots and the in_use flag as well as the internal mutex lock is added, based on a suggested by Simon Riggs. Backpatch down to 9.4, where WAL decoding has been introduced. Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com --- src/backend/replication/logical/logical.c | 13 +++++++++---- src/backend/replication/slot.c | 4 ++++ src/include/replication/slot.h | 13 +++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1393591538..61588d626f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin, xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; if (need_full_snapshot) slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -445,13 +447,14 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecPtr startptr; + ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = ctx->slot->data.restart_lsn; + startptr = slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32) (ctx->slot->data.restart_lsn >> 32), - (uint32) ctx->slot->data.restart_lsn); + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } - ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockAcquire(&slot->mutex); + slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockRelease(&slot->mutex); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 76a88c6de7..2af2a14994 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData /* * Shared memory state of a single replication slot. + * + * The in-memory data of replication slots follows a locking model based + * on two linked concepts: + * - A replication slot's in_use is switched when added or discarded using + * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive + * mode when updating the flag by the backend owning the slot and doing the + * operation, while readers (concurrent backends not owning the slot) need + * to hold it in shared mode when looking at replication slot data. + * - Individual fields are protected by mutex where only the backend owning + * the slot is authorized to update the fields from its own slot. The + * backend owning the slot does not need to take this lock when reading its + * own fields, while concurrent backends not owning this slot should take the + * lock when reading this slot's data. */ typedef struct ReplicationSlot { -- 2.17.0