From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 1 Jun 2018 14:30:55 -0400 Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication xslot data While debugging issues on HEAD for the new slot forwarding feature of Postgres 11, some monitoring of the code surrounding in-memory slot data has proved that the lock handling may cause inconsistent data to be read by read-only callers of slot functions, particularly pg_get_replication_slots() which fetches data for the system view pg_replication_slots. The code paths involved in those problems concern the WAL sender, logical decoding initialization and WAL reservation for slots. A set of comments documenting all the lock handlings, particularly the dependency with LW locks for slots and the in_use flag as well as the internal mutex lock is added, based on a suggested by Simon Riggs. Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com --- src/backend/replication/logical/logical.c | 13 +++++++++---- src/backend/replication/slot.c | 4 ++++ src/backend/replication/walsender.c | 14 +++++++++++--- src/include/replication/slot.h | 10 ++++++++++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1393591538..61588d626f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin, xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; if (need_full_snapshot) slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -445,13 +447,14 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecPtr startptr; + ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = ctx->slot->data.restart_lsn; + startptr = slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32) (ctx->slot->data.restart_lsn >> 32), - (uint32) ctx->slot->data.restart_lsn); + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } - ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockAcquire(&slot->mutex); + slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockRelease(&slot->mutex); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca6bc..0b1f1ba3c1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) static void StartLogicalReplication(StartReplicationCmd *cmd) { - StringInfoData buf; + StringInfoData buf; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndWriteData, WalSndUpdateProgress); + /* Fetch all needed values from the slot */ + SpinLockAcquire(&MyReplicationSlot->mutex); + restart_lsn = MyReplicationSlot->data.restart_lsn; + confirmed_lsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); + /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + logical_startptr = restart_lsn; /* * Report the location after which we'll send out further commits as the * current sentPtr. */ - sentPtr = MyReplicationSlot->data.confirmed_flush; + sentPtr = confirmed_lsn; /* Also update the sent position status in shared memory */ SpinLockAcquire(&MyWalSnd->mutex); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 76a88c6de7..6fa9df5553 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData /* * Shared memory state of a single replication slot. + * + * The data included in this structure, including the contents within + * ReplicationSlotPersistentData, are protected by mutex when read from + * other backends than the one registering the slot as in_use. If the + * slot is not marked as in_use, then no code paths refer or should refer + * to the in-memory data of a slot. + * + * Note that a slot is switched as in_use only with + * ReplicationSlotControlLock held in exclusive mode, protecting from any + * while readers have to hold this lock at least in shared mode. */ typedef struct ReplicationSlot { -- 2.17.0