From 8a20341eaf165252c685d92c01718222923c4799 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Thu, 9 Mar 2017 10:53:55 +0800 Subject: [PATCH 3/3] Documentation and comments fixes relating to replication position tracking There is some confusing code in the walsender and in logical decoding that treats a "sentPos" variable as "sent up to but not including", and treats the "confirmed_flush" value in a replication slot as "confirmed up to but not including". The latter has already resulted in an off-by-one error where confirmed_flush was assumed to mean "confirmed flushed up and including" in some places. Additionally, some log message output for logical decoding stated that commits "after" the specified LSN would be replayed, when in fact it is commits beginning at or after the specified LSN that will be replayed. Improve comments, messages and documentation to emphasise that sentPtr and confirmed_flush are "up to and excluding", that the pg_stat_replication fields are the first un-sent/un-flushed/un-committed LSN, that replication functions start at the LSN specified inclusive, etc. Additionally, document the restart_lsn and confirmed_flush lsn in struct ReplicationSlotPersistentData. --- doc/src/sgml/func.sgml | 8 +++--- doc/src/sgml/logicaldecoding.sgml | 3 ++- doc/src/sgml/monitoring.sgml | 15 +++++++---- doc/src/sgml/protocol.sgml | 20 +++++++++----- src/backend/replication/logical/logical.c | 21 ++++++++++----- src/backend/replication/walsender.c | 42 +++++++++++++++++++++++------ src/include/replication/reorderbuffer.h | 4 +-- src/include/replication/slot.h | 25 +++++++++++++---- src/include/replication/walsender_private.h | 2 +- 9 files changed, 103 insertions(+), 37 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 583b3b2..f6f74cd 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18815,9 +18815,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); (location pg_lsn, xid xid, data text) - Returns changes in the slot slot_name, starting - from the point at which since changes have been consumed last. If - upto_lsn and upto_nchanges are NULL, + Returns changes in the logical slot + slot_name, starting from the point at which + since changes have been consumed last. If upto_lsn and + upto_nchanges are NULL, logical decoding will continue until end of WAL. If upto_lsn is non-NULL, decoding will include only those transactions which commit prior to the specified LSN. If diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 03c2c69..30874ce 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -11,7 +11,8 @@ - Changes are sent out in streams identified by logical replication slots. + Changes are sent out in streams identified by logical replication slots. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4d03531..7f33f91 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1404,24 +1404,29 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i sent_location pg_lsn - Last transaction log position sent on this connection + + Last transaction log position + 1 sent on this connection, or, for + logical decoding sessions, the log position of the last record processed + and buffered for sending at commit-time. + write_location pg_lsn - Last transaction log position written to disk by this standby - server + + Last transaction log position + 1 written to disk by this standby + server, i.e the start of the first record not written. flush_location pg_lsn - Last transaction log position flushed to disk by this standby + Last transaction log position + 1 flushed to disk by this standby server replay_location pg_lsn - Last transaction log position replayed into the database on this + Last transaction log position + 1 replayed into the database on this standby server diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3d6e8ee..f8233df 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1925,11 +1925,13 @@ The commands accepted in walsender mode are: START_REPLICATION SLOT slot_name LOGICAL XXX/XXX [ ( option_name [ option_value ] [, ...] ) ] - Instructs server to start streaming WAL for logical replication, starting - at WAL position XXX/XXX. The server can - reply with an error, for example if the requested section of WAL has already - been recycled. On success, server responds with a CopyBothResponse - message, and then starts to stream WAL to the frontend. + Instructs server to start streaming WAL for logical decoding, starting at the first + commit with starting WAL position equal to or greater than XXX/XXX. The server can reply with an error, for + example if the requested section of WAL has already been recycled. On + success, server responds with a CopyBothResponse message, and + then starts to stream WAL to the frontend. @@ -1958,7 +1960,13 @@ The commands accepted in walsender mode are: XXX/XXX - The WAL position to begin streaming at. + The WAL position to begin streaming commits at. Inclusive; a commit record + beginning at exactly XXX/XXX will be streamed to the client. + + + If the requested WAL position is less than the confirmed_flush_lsn + for SLOT it will be ignored and the confirmed flush position + will be used as the start point. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7e03f33..68193de 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -304,10 +304,13 @@ CreateInitDecodingContext(char *plugin, * used already. * * start_lsn - * The LSN at which to start decoding. If InvalidXLogRecPtr, restart - * from the slot's confirmed_flush; otherwise, start from the specified - * location (but move it forwards to confirmed_flush if it's older than - * that, see below). + * The LSN at which to start sending commits to the output plugin. If + * InvalidXLogRecPtr, restart from the slot's confirmed_flush; otherwise, + * start from the specified location (but move it forwards to + * confirmed_flush if it's older than that, see below). + * + * start_lsn is inclusive, so a commit beginning exactly at start_lsn + * will be sent to the client. * * output_plugin_options * contains options passed to the output plugin. @@ -320,6 +323,10 @@ CreateInitDecodingContext(char *plugin, * as the decoding context because further memory contexts will be created * inside it. * + * WAL reading and logical decoding always starts at restart_lsn and is not + * controlled by start_lsn. That argument only controls which decoded commits + * are sent to the client. + * * Returns an initialized decoding context after calling the output plugin's * startup function. */ @@ -389,7 +396,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ereport(LOG, (errmsg("starting logical decoding for slot \"%s\"", NameStr(slot->data.name)), - errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X", + errdetail("streaming transactions committing at or after %X/%X, reading WAL from %X/%X", (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush, (uint32) (slot->data.restart_lsn >> 32), @@ -446,6 +453,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } + /* Start output to client for commits after end of last record */ ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; } @@ -885,7 +893,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart } /* - * Handle a consumer's confirmation having received all changes up to lsn. + * Handle a consumer's confirmation having received all changes up to (but not + * including) lsn. */ void LogicalConfirmReceivedLocation(XLogRecPtr lsn) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ec5d9db..35a0e7c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -142,8 +142,10 @@ static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; /* - * How far have we sent WAL already? This is also advertised in - * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) + * Position up to, but not including, which we have sent WAL already. + * The next request will start from this position. + * + * Also advertised in MyWalSnd->sentPtr. */ static XLogRecPtr sentPtr = 0; @@ -888,6 +890,8 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). + * + * Handles START_REPLICATION ... LOGICAL ... */ static void StartLogicalReplication(StartReplicationCmd *cmd) @@ -927,20 +931,28 @@ StartLogicalReplication(StartReplicationCmd *cmd) sendTimeLine = ThisTimeLineID; /* - * Initialize position to the last ack'ed one, then the xlog records begin - * to be shipped from that position. + * Let logical decoding decide where to start reading WAL and where to + * start sending commits to the client, giving it the client-supplied start + * point so it can skip over any unwanted commits the client has already + * processed. */ logical_decoding_ctx = CreateDecodingContext( cmd->startpoint, cmd->options, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); - /* Start reading WAL from the oldest required WAL. */ + /* + * Start reading WAL from the oldest required WAL. + * + * This is just a parameter to XLogSendLogical passed via a global. + */ logical_startptr = MyReplicationSlot->data.restart_lsn; /* - * Report the location after which we'll send out further commits as the - * current sentPtr. + * Report the location we start processing WAL from as the "sent" location, + * even though it will generally be well behind cmd->startpoint. + * + * See XLogSendLogical for rationale. */ sentPtr = MyReplicationSlot->data.restart_lsn; @@ -2388,7 +2400,7 @@ XLogSendLogical(void) WalSndCaughtUp = false; record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); - logical_startptr = InvalidXLogRecPtr; + logical_startptr = InvalidXLogRecPtr; /* no longer used */ /* xlog record was invalid */ if (errm != NULL) @@ -2398,6 +2410,20 @@ XLogSendLogical(void) { LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); + /* + * Report the "sent" pointer as the LSN after the end of the most + * recent xlog record we've processed in logical decoding. This is + * somewhat misleading since we have "sent" the record to logical + * decoding, but not yet to the output plugin or the client its self. + * + * Due to reorder buffer processing we can't really report any other + * measure of progress in terms of an LSN, though. If we reported + * the LSN of the last row change during reorder buffer commit + * processing the LSNs would go backwards whenever we started + * processing the next commit (if they were running concurrently), + * and we'd have nothing to report when we weren't processing a + * commit since we're just buffering. + */ sentPtr = logical_decoding_ctx->reader->EndRecPtr; } else diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 17e47b3..4b2f73c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -160,8 +160,8 @@ typedef struct ReorderBufferTXN XLogRecPtr first_lsn; /* ---- - * LSN of the record that lead to this xact to be committed or - * aborted. This can be a + * LSN of the beginning of the record that lead to this xact to be + * committed or aborted. This can be a * * plain commit record * * plain commit record, of a parent transaction * * prepared transaction commit diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 62cacdb..81b74e2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -64,14 +64,29 @@ typedef struct ReplicationSlotPersistentData */ TransactionId catalog_xmin; - /* oldest LSN that might be required by this replication slot */ + /* + * Oldest LSN that might be required by this replication slot. + * + * For logical decoding this points to the most recent xl_running_xacts + * record prior to the xid allocation (BEGIN) of the oldest xact the client + * has not yet confirmed replay of. WAL will be re-read from this LSN and + * needed changes and invalidations will be assembled into reorder buffers. + */ XLogRecPtr restart_lsn; /* - * Oldest LSN that the client has acked receipt for. This is used as the - * start_lsn point in case the client doesn't specify one, and also as a - * safety measure to jump forwards in case the client specifies a - * start_lsn that's further in the past than this value. + * The client has acked all records up to but not including confirmed_flush + * as safely flushed to client storage. + * + * This is used as the point at which logical decoding begins sending + * changes to the client if the client doesn't specify one. It also serves + * as a safety measure to (silently) jump forwards in case the client + * specifies a start_lsn that's further in the past than this value. + * + * Logical decoding may only invoke the output plugin for changes where the + * start of the commit record is equal to or greater than to this LSN. + * catalog_xmin may have been advanced so that needed catalogs for any + * earlier commits have been vacuumed away. */ XLogRecPtr confirmed_flush; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5e6ccfc..2db0b2a 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -34,7 +34,7 @@ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ - XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + XLogRecPtr sentPtr; /* WAL has been sent up to (but not including) this point */ bool needreload; /* does currently-open file need to be * reloaded? */ -- 2.5.5