From db5c4311baf4e3a2ae3308c4d0d9975ee3692a18 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 16 Apr 2015 14:40:24 +0300 Subject: [PATCH v2 1/1] Make WAL archival behave more sensibly in standby mode. This adds two new archive_modes, 'shared' and 'always', to indicate whether the WAL archive is shared between the primary and standby, or not. In shared mode, the standby tracks which files have been archived by the primary. The standby refrains from recycling files that the primary has not yet archived, and at failover, the standby archives all those files too from the old timeline. In 'always' mode, the standby's WAL archive is taken to be separate from the primary's, and the standby independently archives all files it receives from the primary. Fujii Masao and me. --- doc/src/sgml/config.sgml | 12 +- doc/src/sgml/high-availability.sgml | 48 +++++++ doc/src/sgml/protocol.sgml | 31 +++++ src/backend/access/transam/xlog.c | 29 ++++- src/backend/postmaster/postmaster.c | 37 ++++-- src/backend/replication/walreceiver.c | 172 ++++++++++++++++++++------ src/backend/replication/walsender.c | 47 +++++++ src/backend/utils/misc/guc.c | 21 ++-- src/backend/utils/misc/postgresql.conf.sample | 2 +- src/include/access/xlog.h | 14 ++- 10 files changed, 351 insertions(+), 62 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index b30c68d..e352b8e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2521,7 +2521,7 @@ include_dir 'conf.d' - archive_mode (boolean) + archive_mode (enum) archive_mode configuration parameter @@ -2530,7 +2530,15 @@ include_dir 'conf.d' When archive_mode is enabled, completed WAL segments are sent to archive storage by setting - . + . In addition to off, + to disable, there are three modes: on, shared, + and always. During normal operation, there is no + difference between the three modes, but in archive recovery or + standby mode, it indicates whether the WAL archive is shared between + the primary and the standby server or not. See + for details. + + archive_mode and archive_command are separate variables so that archive_command can be changed without leaving archiving mode. diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index a17f555..62f7c75 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1220,6 +1220,54 @@ primary_slot_name = 'node_a_slot' + + + Continuous archiving in standby + + + continuous archiving + in standby + + + + When continuous WAL archiving is used in a standby, there are two + different scenarios: the WAL archive can be shared between the primary + and the standby, or the standby can have its own WAL archive. In the + shared archive scenario, archive_mode must be set to + shared, and in the separate archive scenario, to + always. Setting it to on in a + standby server, or when performing point-in-time recovery, is not + allowed and an error will be raised. When a server is not in recovery + mode, there is no difference between on, + shared, and always modes. + + + + In shared archive mode, the standby server tries to + ensure that the archive is complete, even if the primary crashes and + failover happens. The standby server will not archive any WAL segments + as long as it is in standby mode; it is the primary server's + responsibility to do so. It will, however, keep track of which files + have already been archived by the primary, and if failover happens, it + takes over and attempts to archive any files that the primary had not + yet archived. + + + + In always archive mode, the standby server will + archive all WAL it receives, whether it's through streaming replication + or by restoring from the primary's archive using + restore_command. + + + + In cascading replication, the first standby server and the cascaded + standby servers can use archive_mode settings. In + each standby, it should be set to shared or + always, depending on whether that standby shares the + archive with the primary or standby it is connected to. + + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3a753a0..a42344e 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1646,6 +1646,37 @@ The commands accepted in walsender mode are: + + + WAL archival report message (B) + + + + + + + Byte1('a') + + + + Tells the receiver the last archived WAL segment. + + + + + + Byten + + + + Filename of the latest archived file. + + + + + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2580996..22b5dda 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -84,7 +84,7 @@ int min_wal_size = 5; /* 80 MB */ int wal_keep_segments = 0; int XLOGbuffers = -1; int XLogArchiveTimeout = 0; -bool XLogArchiveMode = false; +int XLogArchiveMode = ARCHIVE_MODE_OFF; char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; bool fullPageWrites = true; @@ -138,6 +138,25 @@ const struct config_enum_entry sync_method_options[] = { {NULL, 0, false} }; + +/* + * Although only "on", "off", and "always" are documented, + * we accept all the likely variants of "on" and "off". + */ +const struct config_enum_entry archive_mode_options[] = { + {"shared", ARCHIVE_MODE_SHARED, false}, + {"always", ARCHIVE_MODE_ALWAYS, false}, + {"on", ARCHIVE_MODE_ON, false}, + {"off", ARCHIVE_MODE_OFF, false}, + {"true", ARCHIVE_MODE_ON, true}, + {"false", ARCHIVE_MODE_OFF, true}, + {"yes", ARCHIVE_MODE_ON, true}, + {"no", ARCHIVE_MODE_OFF, true}, + {"1", ARCHIVE_MODE_ON, true}, + {"0", ARCHIVE_MODE_OFF, true}, + {NULL, 0, false} +}; + /* * Statistics for current checkpoint are collected in this global struct. * Because only the checkpointer or a stand-alone backend can perform @@ -756,7 +775,7 @@ static MemoryContext walDebugCxt = NULL; #endif static void readRecoveryCommandFile(void); -static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); +static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog); static bool recoveryStopsBefore(XLogReaderState *record); static bool recoveryStopsAfter(XLogReaderState *record); static void recoveryPausesHere(void); @@ -5949,6 +5968,12 @@ StartupXLOG(void) if (ArchiveRecoveryRequested) { + /* archive_mode=on is not allowed during archive recovery. */ + if (XLogArchiveMode == ARCHIVE_MODE_ON) + ereport(ERROR, + (errmsg("archive_mode='on' cannot be used in archive recovery"), + (errhint("Use 'shared' or 'always' mode instead.")))); + if (StandbyModeRequested) ereport(LOG, (errmsg("entering standby mode"))); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a9f20ac..72fe4fd 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -828,9 +828,9 @@ PostmasterMain(int argc, char *argv[]) write_stderr("%s: max_wal_senders must be less than max_connections\n", progname); ExitPostmaster(1); } - if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL) + if (XLogArchiveMode > ARCHIVE_MODE_OFF && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); + (errmsg("WAL archival (archive_mode=on/always/shared) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); @@ -1645,13 +1645,21 @@ ServerLoop(void) start_autovac_launcher = false; /* signal processed */ } - /* If we have lost the archiver, try to start a new one */ - if (XLogArchivingActive() && PgArchPID == 0 && pmState == PM_RUN) - PgArchPID = pgarch_start(); - - /* If we have lost the stats collector, try to start a new one */ - if (PgStatPID == 0 && pmState == PM_RUN) - PgStatPID = pgstat_start(); + /* + * If we have lost the archiver, try to start a new one. + * + * If WAL archiving is enabled always, we try to start a new archiver + * even during recovery. + */ + if (PgArchPID == 0 && wal_level >= WAL_LEVEL_ARCHIVE) + { + if ((pmState == PM_RUN && XLogArchiveMode > ARCHIVE_MODE_OFF) || + ((pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY) && + XLogArchiveMode == ARCHIVE_MODE_ALWAYS)) + { + PgArchPID = pgarch_start(); + } + } /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) @@ -4807,6 +4815,17 @@ sigusr1_handler(SIGNAL_ARGS) Assert(BgWriterPID == 0); BgWriterPID = StartBackgroundWriter(); + /* + * Start the archiver if we're responsible for (re-)archiving received + * files. + */ + Assert(PgArchPID == 0); + if (wal_level >= WAL_LEVEL_ARCHIVE && + XLogArchiveMode == ARCHIVE_MODE_ALWAYS) + { + PgArchPID = pgarch_start(); + } + pmState = PM_RECOVERY; } if (CheckPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY) && diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9c7710f..e53ffeb 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -52,8 +52,11 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/pgarch.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" @@ -107,6 +110,9 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* */ +static char primary_last_archived[MAX_XFN_CHARS + 1]; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -141,6 +147,7 @@ static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessArchivalReport(void); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -526,21 +533,12 @@ WalReceiverMain(void) */ if (recvFile >= 0) { - char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false); if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", XLogFileNameP(recvFileTLI, recvSegNo)))); - - /* - * Create .done file forcibly to prevent the streamed segment from - * being archived later. - */ - XLogFileName(xlogfname, recvFileTLI, recvSegNo); - XLogArchiveForceDone(xlogfname); } recvFile = -1; @@ -846,6 +844,26 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogWalRcvSendReply(true, false); break; } + case 'a': /* Archival report */ + { + /* the content of the message is a filename */ + if (len >= sizeof(primary_last_archived)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid archival report message with length %d", + (int) len))); + memcpy(primary_last_archived, buf, len); + primary_last_archived[len] = '\0'; + if (strspn(buf, VALID_XFN_CHARS) != len) + { + primary_last_archived[0] = '\0'; + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("unexpected character in primary's last archived filename"))); + } + ProcessArchivalReport(); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -867,39 +885,18 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo)) + if (!XLByteInSeg(recptr, recvSegNo)) { bool use_existent; /* - * fsync() and close current file before we switch to next one. We - * would otherwise have to reopen this file to fsync it later + * We take care to always close the current file, after writing + * the last byte to it. So this shouldn't happen. */ if (recvFile >= 0) - { - char xlogfname[MAXFNAMELEN]; - - XLogWalRcvFlush(false); - - /* - * XLOG segment files will be re-read by recovery in startup - * process soon, so we don't advise the OS to release cache - * pages associated with the file like XLogFileClose() does. - */ - if (close(recvFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log segment %s: %m", - XLogFileNameP(recvFileTLI, recvSegNo)))); - - /* - * Create .done file forcibly to prevent the streamed segment - * from being archived later. - */ - XLogFileName(xlogfname, recvFileTLI, recvSegNo); - XLogArchiveForceDone(xlogfname); - } - recvFile = -1; + ereport(ERROR, + (errmsg("unexpected WAL receive location %s", + XLogFileNameP(recvFileTLI, recvSegNo)))); /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo); @@ -954,6 +951,51 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) buf += byteswritten; LogstreamResult.Write = recptr; + + /* + * If we just wrote the last byte to this segment, fsync() and close + * current file before we switch to next one. We would otherwise have + * to reopen this file to fsync it later. + */ + if (recvOff == XLOG_SEG_SIZE) + { + char xlogfname[MAXFNAMELEN]; + + XLogWalRcvFlush(false); + + /* + * XLOG segment files will be re-read by recovery in startup + * process soon, so we don't advise the OS to release cache + * pages associated with the file like XLogFileClose() does. + */ + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + XLogFileNameP(recvFileTLI, recvSegNo)))); + recvFile = -1; + + /* + * Now that this segment is complete, do we need to archive it? + * + * In 'always' mode, we clearly need to archive this. + * + * In 'shared' mode, we might need to, if we get promoted before + * the master has archived this file, so create a .ready file. It + * will be replaced with .done later, if we get acknowledgemet + * from the primary that this has already been archived. + * + * In 'on' mode, we're only responsible for WAL we've generated + * ourselves. + */ + if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS || + XLogArchiveMode == ARCHIVE_MODE_SHARED) + { + XLogFileName(xlogfname, recvFileTLI, recvSegNo); + + XLogArchiveCheckDone(xlogfname); + } + } } } @@ -1215,3 +1257,61 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) pfree(receipttime); } } + +/* + * Create .done and .ready files, based on the master's last archival report. + */ +static void +ProcessArchivalReport(void) +{ + DIR *xldir; + struct dirent *xlde; + + elog(DEBUG2, "received archival report from master: %s", + primary_last_archived); + + if (XLogArchiveMode != ARCHIVE_MODE_SHARED) + return; + + /* Check that the filename the primary reported looks valid */ + if (strlen(primary_last_archived) < 24 || + strspn(primary_last_archived, "0123456789ABCDEF") != 24) + return; + + xldir = AllocateDir(XLOGDIR); + if (xldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open transaction log directory \"%s\": %m", + XLOGDIR))); + + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + /* + * We ignore the timeline part of the XLOG segment identifiers in + * deciding whether a segment is still needed. This ensures that we + * won't prematurely remove a segment from a parent timeline. We could + * probably be a little more proactive about removing segments of + * non-parent timelines, but that would be a whole lot more + * complicated. + * + * We use the alphanumeric sorting property of the filenames to decide + * which ones are earlier than the lastoff segment. + */ + if (strlen(xlde->d_name) == 24 && + strspn(xlde->d_name, "0123456789ABCDEF") == 24 && + strcmp(xlde->d_name + 8, primary_last_archived + 8) <= 0) + { + XLogArchiveForceDone(xlde->d_name); + } + } + + FreeDir(xldir); + + /* + * Remember this location in pgstat as well. This makes it visible in + * pg_stat_archiver, and allows the location to be relayed to cascaded + * standbys. + */ + pgstat_send_archiver(primary_last_archived, false); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a20569..74bdeff 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -55,6 +55,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/replnodes.h" +#include "pgstat.h" #include "replication/basebackup.h" #include "replication/decode.h" #include "replication/logical.h" @@ -152,6 +153,7 @@ static StringInfoData tmpbuf; * wal_sender_timeout doesn't need to be active. */ static TimestampTz last_reply_timestamp = 0; +static char last_archival_report[MAX_XFN_CHARS + 1] = ""; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; @@ -209,6 +211,8 @@ static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); static void WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(TimestampTz now); +static void WalSndArchivalReport(void); +static void WalSndArchivalReportIfNecessary(void); static void WalSndCheckTimeOut(TimestampTz now); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); @@ -2879,6 +2883,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now) TimestampTz ping_time; /* + * Send an archival status message, if necessary. + */ + WalSndArchivalReportIfNecessary(); + + /* * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ @@ -2907,6 +2916,44 @@ WalSndKeepaliveIfNecessary(TimestampTz now) } /* + * This function is used to send archival report message to standby. + */ +static void +WalSndArchivalReport(void) +{ + elog(LOG, "sending archival report: %s", last_archival_report); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 'a'); + pq_sendbytes(&output_message, last_archival_report, strlen(last_archival_report)); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); +} + +static void +WalSndArchivalReportIfNecessary(void) +{ + PgStat_ArchiverStats *archiver_stats; + + archiver_stats = pgstat_fetch_stat_archiver(); + + if (strcmp(last_archival_report, archiver_stats->last_archived_wal) == 0) + { + pgstat_clear_snapshot(); + return; + } + + strlcpy(last_archival_report, archiver_stats->last_archived_wal, + sizeof(last_archival_report)); + + pgstat_clear_snapshot(); + + WalSndArchivalReport(); +} + +/* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the * future for synchronous replication. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f43aff2..7115bcc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -396,6 +396,7 @@ static const struct config_enum_entry row_security_options[] = { * Options for enum values stored in other modules */ extern const struct config_enum_entry wal_level_options[]; +extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; @@ -1530,16 +1531,6 @@ static struct config_bool ConfigureNamesBool[] = }, { - {"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING, - gettext_noop("Allows archiving of WAL files using archive_command."), - NULL - }, - &XLogArchiveMode, - false, - NULL, NULL, NULL - }, - - { {"hot_standby", PGC_POSTMASTER, REPLICATION_STANDBY, gettext_noop("Allows connections and queries during recovery."), NULL @@ -3552,6 +3543,16 @@ static struct config_enum ConfigureNamesEnum[] = }, { + {"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING, + gettext_noop("Allows archiving of WAL files using archive_command."), + NULL + }, + &XLogArchiveMode, + ARCHIVE_MODE_OFF, archive_mode_options, + NULL, NULL, NULL + }, + + { {"trace_recovery_messages", PGC_SIGHUP, DEVELOPER_OPTIONS, gettext_noop("Enables logging of recovery-related debugging information."), gettext_noop("Each level includes all the levels that follow it. The later" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 110983f..90371d7 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -206,7 +206,7 @@ # - Archiving - -#archive_mode = off # allows archiving to be done +#archive_mode = off # allows archiving to be done; off, on, shared, or always # (change requires restart) #archive_command = '' # command to use to archive a logfile segment # placeholders: %p = path of file to archive diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 2b1f423..3a49702 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -95,7 +95,6 @@ extern int wal_keep_segments; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; -extern bool XLogArchiveMode; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; @@ -105,6 +104,16 @@ extern bool log_checkpoints; extern int CheckPointSegments; +/* Archive modes */ +typedef enum ArchiveMode +{ + ARCHIVE_MODE_OFF = 0, /* disabled */ + ARCHIVE_MODE_ON, /* enabled while server is running normally */ + ARCHIVE_MODE_SHARED, /* archive is shared with master */ + ARCHIVE_MODE_ALWAYS /* enabled always (even during recovery) */ +} ArchiveMode; +extern int XLogArchiveMode; + /* WAL levels */ typedef enum WalLevel { @@ -115,7 +124,8 @@ typedef enum WalLevel } WalLevel; extern int wal_level; -#define XLogArchivingActive() (XLogArchiveMode && wal_level >= WAL_LEVEL_ARCHIVE) +#define XLogArchivingActive() \ + (XLogArchiveMode > ARCHIVE_MODE_OFF && wal_level >= WAL_LEVEL_ARCHIVE) #define XLogArchiveCommandSet() (XLogArchiveCommand[0] != '\0') /* -- 2.1.4