diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 63c6283..726c9c0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2029,8 +2029,122 @@ SET ENABLE_SEQSCAN TO OFF; This parameter can only be set in the postgresql.conf file or on the server command line. + + You should also consider setting hot_standby_feedback + as an alternative to using this parameter. + + + + + + + + Synchronous Replication + + + These settings control the behavior of the built-in + synchronous replication feature. + These parameters would be set on the primary server that is + to send replication data to one or more standby servers. + + + + + synchronous_replication (boolean) + + synchronous_replication configuration parameter + + + + Specifies whether transaction commit will wait for WAL records + to be replicated before the command returns a success + indication to the client. The default setting is off. + When on, there will be a delay while the client waits + for confirmation of successful replication. That delay will + increase depending upon the physical distance and network activity + between primary and standby. The commit wait will last until the + first reply from any standby. Multiple standby servers allow + increased availability and possibly increase performance as well. + + + The parameter must be set on both primary and standby. + + + On the primary, this parameter can be changed at any time; the + behavior for any one transaction is determined by the setting in + effect when it commits. It is therefore possible, and useful, to have + some transactions replicate synchronously and others asynchronously. + For example, to make a single multistatement transaction commit + asynchronously when the default is synchronous replication, issue + SET LOCAL synchronous_replication TO OFF within the + transaction. + + + On the standby, the parameter value is taken only at server start. + + + + + + allow_standalone_primary (boolean) + + allow_standalone_primary configuration parameter + + + + If allow_standalone_primary is set, then the server + can operate normally whether or not replication is active. If + a client requests synchronous_replication and it is + not available, they will use asynchornous replication instead. + + + If allow_standalone_primary is not set, then the server + will prevent normal client connections until a standby connects that + has synchronous_replication_feedback enabled. Once + clients connect, if they request synchronous_replication + and it is no longer available they will wait for + replication_timeout_client. + + + + replication_timeout_client (integer) + + replication_timeout_client configuration parameter + + + + If the client has synchronous_replication set, + and a synchronous standby is currently available + then the commit will wait for up to replication_timeout_client + seconds before it returns a success. The commit will wait + forever for a confirmation when replication_timeout_client + is set to -1. + + + If the client has synchronous_replication set, + and yet no synchronous standby is available when we commit, then the + setting of allow_standalone_primary determines whether + or not we wait. + + + + + + replication_timeout_server (integer) + + replication_timeout_server configuration parameter + + + + If the primary server does not receive a reply from a standby server + within replication_timeout_server seconds then the + primary will terminate the replication connection. + + + + @@ -2121,6 +2235,42 @@ SET ENABLE_SEQSCAN TO OFF; + + hot_standby_feedback (boolean) + + hot_standby_feedback configuration parameter + + + + Specifies whether or not a hot standby will send feedback to the primary + about queries currently executing on the standby. This parameter can + be used to eliminate query cancels caused by cleanup records, though + it can cause database bloat on the primary for some workloads. + The default value is off. + This parameter can only be set at server start. It only has effect + if hot_standby is enabled. + + + + + + synchronous_replication_feedback (boolean) + + synchronous_replication_feedback configuration parameter + + + + Specifies whether the standby will provide reply messages to + allow synchronous replication on the primary. + Reasons for doing this might be that the standby is physically + co-located with the primary and so would be a bad choice as a + future primary server, or the standby might be a test server. + The default value is on. + This parameter can only be set at server start. + + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index a892969..c006f35 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -738,13 +738,12 @@ archive_cleanup_command = 'pg_archivecleanup /path/to/archive %r' - Streaming replication is asynchronous, so there is still a small delay + There is a small replication delay between committing a transaction in the primary and for the changes to become visible in the standby. The delay is however much smaller than with file-based log shipping, typically under one second assuming the standby is powerful enough to keep up with the load. With streaming replication, - archive_timeout is not required to reduce the data loss - window. + archive_timeout is not required. @@ -879,6 +878,236 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' + + Synchronous Replication + + + Synchronous Replication + + + + PostgreSQL streaming replication is asynchronous by + default. If the primary server + crashes then some transactions that were committed may not have been + replicated to the standby server, causing data loss. The amount + of data loss is proportional to the replication delay at the time of + failover. That could be zero, or more, we do not know for certain + either way, when using asynchronous replication. + + + + Synchronous replication offers the ability to confirm that all changes + made by a transaction have been transferred to at least one remote + standby server. This extends the standard level of durability + offered by a transaction commit. This level of protection is referred + to as 2-safe replication in computer science theory. + + + + Synchronous replication works in the following way. When requested, + the commit of a write transaction will wait until confirmation is + received that the commit has been written to the transaction log on disk + of both the primary and standby server. The only possibility that data + can be lost is if both the primary and the standby suffer crashes at the + same time. This can provide a much higher level of durability if the + sysadmin is cautious about the placement and management of the two servers. + Waiting for confirmation increases the user's confidence that the changes + will not be lost in the event of server crashes but it also necessarily + increases the response time for the requesting transaction. The minimum + wait time is the roundtrip time between primary to standby. + + + + Read only transactions and transaction rollbacks need not wait for + replies from standby servers. Subtransaction commits do not wait for + responses from standby servers, only final top-level commits. Long + running actions such as data loading or index building do not wait + until the very final commit message. + + + + Basic Configuration + + + Synchronous replication will be active if appropriate options are + enabled on both the primary and at least one standby server. If + options are not correctly set on both servers, the primary will use + use asynchronous replication by default. + + + + On the primary server we need to set + + +synchronous_replication = on + + + and on the standby server we need to set + + +synchronous_replication_feedback = on + + + On the primary, synchronous_replication can be set + for particular users or databases, or dynamically by applications + programs. On the standby, synchronous_replication_feedback + can only be set at server start. + + + + If more than one standby server + specifies synchronous_replication_feedback, then whichever + standby replies first will release waiting commits. + Turning this setting off for a standby allows the administrator to + exclude certain standby servers from releasing waiting transactions. + This is useful if not all standby servers are designated as potential + future primary servers, such as if a standby were co-located + with the primary, so that a disaster would cause both servers to be lost. + + + + + + Planning for Performance + + + Synchronous replication usually requires carefully planned and placed + standby servers to ensure applications perform acceptably. Waiting + doesn't utilise system resources, but transaction locks continue to be + held until the transfer is confirmed. As a result, incautious use of + synchronous replication will reduce performance for database + applications because of increased response times and higher contention. + + + + PostgreSQL allows the application developer + to specify the durability level required via replication. This can be + specified for the system overall, though it can also be specified for + specific users or connections, or even individual transactions. + + + + For example, an application workload might consist of: + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + With synchronous replication options specified at the application level + (on the primary) we can offer sync rep for the most important changes, + without slowing down the bulk of the total workload. Application level + options are an important and practical tool for allowing the benefits of + synchronous replication for high performance applications. + + + + You should consider that the network bandwidth must be higher than + the rate of generation of WAL data. + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + + + Planning for High Availability + + + The easiest and safest method of gaining High Availability using + synchronous replication is to configure at least two standby servers. + To understand why, we need to examine what can happen when you lose all + standby servers. + + + + Commits made when synchronous_replication is set will wait until at + least one standby responds. The response may never occur if the last, + or only, standby should crash or the network drops. What should we do in + that situation? + + + + Sitting and waiting will typically cause operational problems + because it is an effective outage of the primary server should all + sessions end up waiting. In contrast, allowing the primary server to + continue processing write transactions in the absence of a standby + puts those latest data changes at risk. So in this situation there + is a direct choice between database availability and the potential + durability of the data it contains. How we handle this situation + is controlled by allow_standalone_primary. The default + setting is on, allowing processing to continue, though + there is no recommended setting. Choosing the best setting for + allow_standalone_primary is a difficult decision and best + left to those with combined business responsibility for both data and + applications. The difficulty of this choice is the reason why we + recommend that you reduce the possibility of this situation occurring + by using multiple standby servers. + + + + A user will stop waiting once the replication_timeout_client + has been reached for their specific session. Users are not waiting for + a specific standby to reply, they are waiting for a reply from any + standby, so the unavailability of any one standby is not significant + to a user. It is possible for user sessions to hit timeout even though + standbys are communicating normally. In that case, the setting of + replication_timeout is probably too low. + + + + The standby sends regular status messages to the primary. If no status + messages have been received for replication_timeout_server + the primary server will assume the connection is dead and terminate it. + + + + When the primary is started with allow_standalone_primary + enabled, the primary will not allow connections until a standby connects + that also has synchronous_replication enabled. This is a + convenience to ensure that we don't allow connections before write + transactions will return successfully. + + + + When a standby first attaches to the primary, it may not be properly + synchronized. The standby is only able to become a synchronous standby + once it has become synchronized, or "caught up" with the the primary. + The catch-up duration may be long immediately after the standby has + been created. If the standby is shutdown, then the catch-up period + will increase according to the length of time the standby has been + down. You are advised to make sure allow_standalone_primary + is not set during the initial catch-up period. + + + + If primary crashes while commits are waiting for acknowledgement, those + transactions will be marked fully committed if the primary database + recovers, no matter how allow_standalone_primary is set. + There is no way to be certain that all standbys have received all + outstanding WAL data at time of the crash of the primary. Some + transactions may not show as committed on the standby, even though + they show as committed on the primary. The guarantee we offer is that + the application will not receive explicit acknowledgement of the + successful commit of a transaction until the WAL data is known to be + safely received by the standby. Hence this mechanism is technically + "semi synchronous" rather than "fully synchronous" replication. Note + that replication still not be fully synchronous even if we wait for + all standby servers, though this would reduce availability, as + described previously. + + + + If you need to re-create a standby server while transactions are + waiting, make sure that the commands to run pg_start_backup() and + pg_stop_backup() are run in a session with + synchronous_replication = off, otherwise those requests will wait + forever for the standby to appear. + + + + @@ -1393,11 +1622,18 @@ if (!triggered) These conflicts are hard conflicts in the sense that queries might need to be cancelled and, in some cases, sessions disconnected to resolve them. The user is provided with several ways to handle these - conflicts. Conflict cases include: + conflicts. Conflict cases in order of likely frequency are: + Application of a vacuum cleanup record from WAL conflicts with + standby transactions whose snapshots can still see any of + the rows to be removed. + + + + Access Exclusive locks taken on the primary server, including both explicit LOCK commands and various DDL actions, conflict with table accesses in standby queries. @@ -1417,14 +1653,8 @@ if (!triggered) - Application of a vacuum cleanup record from WAL conflicts with - standby transactions whose snapshots can still see any of - the rows to be removed. - - - - - Application of a vacuum cleanup record from WAL conflicts with + Buffer pin deadlock caused by + application of a vacuum cleanup record from WAL conflicts with queries accessing the target page on the standby, whether or not the data to be removed is visible. @@ -1539,17 +1769,16 @@ if (!triggered) Remedial possibilities exist if the number of standby-query cancellations - is found to be unacceptable. The first option is to connect to the - primary server and keep a query active for as long as needed to - run queries on the standby. This prevents VACUUM from removing - recently-dead rows and so cleanup conflicts do not occur. - This could be done using and - pg_sleep(), or via other mechanisms. If you do this, you + is found to be unacceptable. Typically the best option is to enable + hot_standby_feedback. This prevents VACUUM from + removing recently-dead rows and so cleanup conflicts do not occur. + If you do this, you should note that this will delay cleanup of dead rows on the primary, which may result in undesirable table bloat. However, the cleanup situation will be no worse than if the standby queries were running - directly on the primary server, and you are still getting the benefit of - off-loading execution onto the standby. + directly on the primary server. You are still getting the benefit + of off-loading execution onto the standby and the query may complete + faster than it would have done on the primary server. max_standby_archive_delay must be kept large in this case, because delayed WAL files might already contain entries that conflict with the desired standby queries. @@ -1563,7 +1792,8 @@ if (!triggered) a high max_standby_streaming_delay. However it is difficult to guarantee any specific execution-time window with this approach, since vacuum_defer_cleanup_age is measured in - transactions executed on the primary server. + transactions executed on the primary server. As of version 9.1, this + second option is much less likely to valuable. diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 287ad26..eb3cd6f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -56,6 +56,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/fd.h" #include "storage/predicate.h" #include "storage/procarray.h" @@ -2030,6 +2031,14 @@ RecordTransactionCommitPrepared(TransactionId xid, MyProc->inCommit = false; END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a0170b4..1da42c9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" @@ -54,6 +55,7 @@ #include "utils/snapmgr.h" #include "pg_trace.h" +extern void WalRcvWakeup(void); /* we are only caller, so include directly */ /* * User-tweakable parameters @@ -1055,7 +1057,7 @@ RecordTransactionCommit(void) * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ - if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0) + if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested()) { /* * Synchronous commit case: @@ -1125,6 +1127,14 @@ RecordTransactionCommit(void) /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; @@ -4533,6 +4543,14 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) */ if (XactCompletionForceSyncCommit(xlrec)) XLogFlush(lsn); + + /* + * If this standby is offering sync_rep_service then signal WALReceiver, + * in case it needs to send a reply just for this commit on an + * otherwise quiet server. + */ + if (sync_rep_service) + WalRcvWakeup(); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f5cb657..3fac09a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -41,6 +41,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -157,6 +158,11 @@ static XLogRecPtr LastRec; * known, need to check the shared state". */ static bool LocalRecoveryInProgress = true; +/* + * Local copy of SharedHotStandbyActive variable. False actually means "not + * known, need to check the shared state". + */ +static bool LocalHotStandbyActive = false; /* * Local state for XLogInsertAllowed(): @@ -405,6 +411,12 @@ typedef struct XLogCtlData bool SharedRecoveryInProgress; /* + * SharedHotStandbyActive indicates if we're still in crash or archive + * recovery. Protected by info_lck. + */ + bool SharedHotStandbyActive; + + /* * recoveryWakeupLatch is used to wake up the startup process to * continue WAL replay, if it is waiting for WAL to arrive or failover * trigger file to appear. @@ -4915,6 +4927,7 @@ XLOGShmemInit(void) */ XLogCtl->XLogCacheBlck = XLOGbuffers - 1; XLogCtl->SharedRecoveryInProgress = true; + XLogCtl->SharedHotStandbyActive = false; XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); SpinLockInit(&XLogCtl->info_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); @@ -5285,6 +5298,12 @@ readRecoveryCommandFile(void) (errmsg("recovery command file \"%s\" specified neither primary_conninfo nor restore_command", RECOVERY_COMMAND_FILE), errhint("The database server will regularly poll the pg_xlog subdirectory to check for files placed there."))); + + if (PrimaryConnInfo == NULL && sync_rep_service) + ereport(WARNING, + (errmsg("recovery command file \"%s\" specified synchronous_replication_service yet streaming was not requested", + RECOVERY_COMMAND_FILE), + errhint("Specify primary_conninfo to allow synchronous replication."))); } else { @@ -6159,6 +6178,13 @@ StartupXLOG(void) if (XLByteLT(ControlFile->minRecoveryPoint, checkPoint.redo)) ControlFile->minRecoveryPoint = checkPoint.redo; } + else + { + /* + * No need to calculate feedback if we're not in Hot Standby. + */ + hot_standby_feedback = false; + } /* * set backupStartupPoint if we're starting archive recovery from a @@ -6778,8 +6804,6 @@ StartupXLOG(void) static void CheckRecoveryConsistency(void) { - static bool backendsAllowed = false; - /* * Have we passed our safe starting point? */ @@ -6799,11 +6823,19 @@ CheckRecoveryConsistency(void) * enabling connections. */ if (standbyState == STANDBY_SNAPSHOT_READY && - !backendsAllowed && + !LocalHotStandbyActive && reachedMinRecoveryPoint && IsUnderPostmaster) { - backendsAllowed = true; + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->SharedHotStandbyActive = true; + SpinLockRelease(&xlogctl->info_lck); + + LocalHotStandbyActive = true; + SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY); } } @@ -6851,6 +6883,38 @@ RecoveryInProgress(void) } /* + * Is HotStandby active yet? This is only important in special backends + * since normal backends won't ever be able to connect until this returns + * true. + * + * Unlike testing standbyState, this works in any process that's connected to + * shared memory. + */ +bool +HotStandbyActive(void) +{ + /* + * We check shared state each time only until Hot Standby is active. We + * can't de-activate Hot Standby, so there's no need to keep checking after + * the shared variable has once been seen true. + */ + if (LocalHotStandbyActive) + return true; + else + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + /* spinlock is essential on machines with weak memory ordering! */ + SpinLockAcquire(&xlogctl->info_lck); + LocalHotStandbyActive = xlogctl->SharedHotStandbyActive; + SpinLockRelease(&xlogctl->info_lck); + + return LocalHotStandbyActive; + } +} + +/* * Is this process allowed to insert new WAL records? * * Ordinarily this is essentially equivalent to !RecoveryInProgress(). diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 408e174..5ce1888 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -509,6 +509,7 @@ CREATE VIEW pg_stat_replication AS S.client_port, S.backend_start, W.state, + W.sync, W.sent_location, W.write_location, W.flush_location, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 8f77d1b..1577875 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -275,6 +275,7 @@ typedef enum PM_STARTUP, /* waiting for startup subprocess */ PM_RECOVERY, /* in archive recovery mode */ PM_HOT_STANDBY, /* in hot standby mode */ + PM_WAIT_FOR_REPLICATION, /* waiting for sync replication to become active */ PM_RUN, /* normal "database is alive" state */ PM_WAIT_BACKUP, /* waiting for online backup mode to end */ PM_WAIT_READONLY, /* waiting for read only backends to exit */ @@ -735,6 +736,9 @@ PostmasterMain(int argc, char *argv[]) if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); + if (!allow_standalone_primary && max_wal_senders == 0) + ereport(ERROR, + (errmsg("WAL streaming (max_wal_senders > 0) is required if allow_standalone_primary = off"))); /* * Other one-time internal sanity checks can go here, if they are fast. @@ -1845,6 +1849,12 @@ retry1: (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is in recovery mode"))); break; + case CAC_REPLICATION_ONLY: + if (!am_walsender) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("the database system is waiting for replication to start"))); + break; case CAC_TOOMANY: ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), @@ -1942,7 +1952,9 @@ canAcceptConnections(void) */ if (pmState != PM_RUN) { - if (pmState == PM_WAIT_BACKUP) + if (pmState == PM_WAIT_FOR_REPLICATION) + result = CAC_REPLICATION_ONLY; /* allow replication only */ + else if (pmState == PM_WAIT_BACKUP) result = CAC_WAITBACKUP; /* allow superusers only */ else if (Shutdown > NoShutdown) return CAC_SHUTDOWN; /* shutdown is pending */ @@ -2396,8 +2408,13 @@ reaper(SIGNAL_ARGS) * Startup succeeded, commence normal operations */ FatalError = false; - ReachedNormalRunning = true; - pmState = PM_RUN; + if (allow_standalone_primary) + { + ReachedNormalRunning = true; + pmState = PM_RUN; + } + else + pmState = PM_WAIT_FOR_REPLICATION; /* * Crank up the background writer, if we didn't do that already @@ -3233,8 +3250,8 @@ BackendStartup(Port *port) /* Pass down canAcceptConnections state */ port->canAcceptConnections = canAcceptConnections(); bn->dead_end = (port->canAcceptConnections != CAC_OK && - port->canAcceptConnections != CAC_WAITBACKUP); - + port->canAcceptConnections != CAC_WAITBACKUP && + port->canAcceptConnections != CAC_REPLICATION_ONLY); /* * Unless it's a dead_end child, assign it a child slot number */ @@ -4284,6 +4301,16 @@ sigusr1_handler(SIGNAL_ARGS) WalReceiverPID = StartWalReceiver(); } + if (CheckPostmasterSignal(PMSIGNAL_SYNC_REPLICATION_ACTIVE) && + pmState == PM_WAIT_FOR_REPLICATION) + { + /* Allow connections now that a synchronous replication standby + * has successfully connected and is active. + */ + ReachedNormalRunning = true; + pmState = PM_RUN; + } + PG_SETMASK(&UnBlockSig); errno = save_errno; @@ -4534,6 +4561,7 @@ static void StartAutovacuumWorker(void) { Backend *bn; + CAC_state cac = CAC_OK; /* * If not in condition to run a process, don't try, but handle it like a @@ -4542,7 +4570,8 @@ StartAutovacuumWorker(void) * we have to check to avoid race-condition problems during DB state * changes. */ - if (canAcceptConnections() == CAC_OK) + cac = canAcceptConnections(); + if (cac == CAC_OK || cac == CAC_REPLICATION_ONLY) { bn = (Backend *) malloc(sizeof(Backend)); if (bn) diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 42c6eaf..3fe490e 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o + repl_gram.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README index 9c2e0d8..7387224 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -1,5 +1,27 @@ src/backend/replication/README +Overview +-------- + +The WALSender sends WAL data and receives replies. The WALReceiver +receives WAL data and sends replies. + +If there is no more WAL data to send then WALSender goes quiet, +apart from checking for replies. If there is no more WAL data +to receive then WALReceiver keeps sending replies until all the data +received has been applied, then it too goes quiet. When all is quiet +WALReceiver sends regular replies so that WALSender knows the link +is still working - we don't want to wait until a transaction +arrives before we try to determine the health of the connection. + +WALReceiver sends one reply per message received. If nothing is +received it sends one reply every time apply pointer advances, +with a minimum of one reply each cycletime. + +For synchronous replication, all decisions about whether to wait +and how long to wait are taken on the primary. The standby has no +state information about what is happening on the primary. + Walreceiver - libpqwalreceiver API ---------------------------------- diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c new file mode 100644 index 0000000..12a3825 --- /dev/null +++ b/src/backend/replication/syncrep.c @@ -0,0 +1,641 @@ +/*------------------------------------------------------------------------- + * + * syncrep.c + * + * Synchronous replication is new as of PostgreSQL 9.1. + * + * If requested, transaction commits wait until their commit LSN is + * acknowledged by the standby, or the wait hits timeout. + * + * This module contains the code for waiting and release of backends. + * All code in this module executes on the primary. The core streaming + * replication transport remains within WALreceiver/WALsender modules. + * + * The essence of this design is that it isolates all logic about + * waiting/releasing onto the primary. The primary is aware of which + * standby servers offer a synchronisation service. The standby is + * completely unaware of the durability requirements of transactions + * on the primary, reducing the complexity of the code and streamlining + * both standby operations and network bandwidth because there is no + * requirement to ship per-transaction state information. + * + * The bookeeping approach we take is that a commit is either synchronous + * or not synchronous (async). If it is async, we just fastpath out of + * here. If it is sync, then it follows exactly one rigid definition of + * synchronous replication as laid out by the various parameters. If we + * change the definition of replication, we'll need to scan through all + * waiting backends to see if we should now release them. + * + * The best performing way to manage the waiting backends is to have a + * single ordered queue of waiting backends, so that we can avoid + * searching the through all waiters each time we receive a reply. + * + * Starting sync replication is a two stage process. First, the standby + * must have caught up with the primary; that may take some time. Next, + * we must receive a reply from the standby before we change state so + * that sync rep is fully active and commits can wait on us. + * + * XXX Changing state to a sync rep service while we are running allows + * us to enable sync replication via SIGHUP on the standby at a later + * time, without restart, if we need to do that. Though you can't turn + * it off without disconnecting. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "access/xact.h" +#include "access/xlog_internal.h" +#include "miscadmin.h" +#include "postmaster/autovacuum.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" +#include "storage/latch.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "utils/guc.h" +#include "utils/guc_tables.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" + + +/* User-settable parameters for sync rep */ +bool sync_rep_mode = false; /* Only set in user backends */ +int sync_rep_timeout_client = 120; /* Only set in user backends */ +int sync_rep_timeout_server = 30; /* Only set in user backends */ +bool sync_rep_service = false; /* Never set in user backends */ +bool hot_standby_feedback = true; + +/* + * Queuing code is written to allow later extension to multiple + * queues. Currently, we use just one queue (==FSYNC). + * + * XXX We later expect to have RECV, FSYNC and APPLY modes. + */ +#define SYNC_REP_NOT_ON_QUEUE -1 +#define SYNC_REP_FSYNC 0 +#define IsOnSyncRepQueue() (current_queue > SYNC_REP_NOT_ON_QUEUE) +/* + * Queue identifier of the queue on which user backend currently waits. + */ +static int current_queue = SYNC_REP_NOT_ON_QUEUE; + +static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN, int qid); +static void SyncRepRemoveFromQueue(void); +static void SyncRepAddToQueue(int qid); +static bool SyncRepServiceAvailable(void); +static long SyncRepGetWaitTimeout(void); + +static void SyncRepWakeFromQueue(int wait_queue, XLogRecPtr lsn); + + +/* + * =========================================================== + * Synchronous Replication functions for normal user backends + * =========================================================== + */ + +/* + * Wait for synchronous replication, if requested by user. + */ +extern void +SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +{ + /* + * Fast exit if user has requested async replication, or + * streaming replication is inactive in this server. + */ + if (max_wal_senders == 0 || !sync_rep_mode) + return; + + Assert(sync_rep_mode); + + if (allow_standalone_primary) + { + bool avail_sync_mode; + + /* + * Check that the service level we want is available. + * If not, downgrade the service level to async. + */ + avail_sync_mode = SyncRepServiceAvailable(); + + /* + * Perform the wait here, then drop through and exit. + */ + if (avail_sync_mode) + SyncRepWaitOnQueue(XactCommitLSN, 0); + } + else + { + /* + * Wait only on the service level requested, + * whether or not it is currently available. + * Sounds weird, but this mode exists to protect + * against changes that will only occur on primary. + */ + SyncRepWaitOnQueue(XactCommitLSN, 0); + } +} + +/* + * Wait for specified LSN to be confirmed at the requested level + * of durability. Each proc has its own wait latch, so we perform + * a normal latch check/wait loop here. + */ +static void +SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN, int qid) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[0]); + TimestampTz now = GetCurrentTransactionStopTimestamp(); + long timeout = SyncRepGetWaitTimeout(); /* seconds */ + char *new_status = NULL; + const char *old_status; + int len; + + /* + * No need to wait for autovacuums. If the standby does go away and + * we wait for it to return we may as well do some usefulwork locally. + * This is critical since we may need to perform emergency vacuuming + * and cannot wait for standby to return. + */ + if (IsAutoVacuumWorkerProcess()) + return; + + ereport(DEBUG2, + (errmsg("synchronous replication waiting for %X/%X starting at %s", + XactCommitLSN.xlogid, + XactCommitLSN.xrecoff, + timestamptz_to_str(GetCurrentTransactionStopTimestamp())))); + + for (;;) + { + ResetLatch(&MyProc->waitLatch); + + /* + * First time through, add ourselves to the appropriate queue. + */ + if (!IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + if (XLByteLE(XactCommitLSN, queue->lsn)) + { + /* No need to wait */ + SpinLockRelease(&queue->qlock); + return; + } + + /* + * Set our waitLSN so WALSender will know when to wake us. + * We set this before we add ourselves to queue, so that + * any proc on the queue can be examined freely without + * taking a lock on each process in the queue. + */ + MyProc->waitLSN = XactCommitLSN; + SyncRepAddToQueue(qid); + SpinLockRelease(&queue->qlock); + current_queue = qid; /* Remember which queue we're on */ + + /* + * Alter ps display to show waiting for sync rep. + */ + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 21 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting for sync rep"); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting" */ + } + else + { + bool release = false; + bool timeout = false; + + SpinLockAcquire(&queue->qlock); + + /* + * Check the LSN on our queue and if its moved far enough then + * remove us from the queue. First time through this is + * unlikely to be far enough, yet is possible. Next time we are + * woken we should be more lucky. + */ + if (XLByteLE(XactCommitLSN, queue->lsn)) + release = true; + else if (timeout > 0 && + TimestampDifferenceExceeds(GetCurrentTransactionStopTimestamp(), + now, + timeout)) + { + release = true; + timeout = true; + } + + if (release) + { + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + + if (new_status) + { + /* Reset ps display */ + set_ps_display(new_status, false); + pfree(new_status); + } + + /* + * Our response to the timeout is to simply post a NOTICE and + * then return to the user. The commit has happened, we just + * haven't been able to verify it has been replicated to the + * level requested. + * + * XXX We could check here to see if our LSN has been sent to + * another standby that offers a lower level of service. That + * could be true if we had, for example, requested 'apply' + * with two standbys, one at 'apply' and one at 'recv' and the + * apply standby has just gone down. Something for the weekend. + */ + if (timeout) + ereport(NOTICE, + (errmsg("synchronous replication timeout at %s", + timestamptz_to_str(now)))); + else + ereport(DEBUG2, + (errmsg("synchronous replication wait complete at %s", + timestamptz_to_str(now)))); + + /* XXX Do we need to unset the latch? */ + return; + } + + SpinLockRelease(&queue->qlock); + } + + WaitLatch(&MyProc->waitLatch, timeout); + now = GetCurrentTimestamp(); + } +} + +/* + * Remove myself from sync rep wait queue. + * + * Assume on queue at start; will not be on queue at end. + * Queue is already locked at start and remains locked on exit. + * + * XXX Implements design pattern "Reinvent Wheel", think about changing + */ +void +SyncRepRemoveFromQueue(void) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[current_queue]); + PGPROC *proc = queue->head; + int numprocs = 0; + + Assert(IsOnSyncRepQueue()); + +#ifdef SYNCREP_DEBUG + elog(DEBUG3, "removing myself from queue %d", current_queue); +#endif + + for (; proc != NULL; proc = proc->lwWaitLink) + { + if (proc == MyProc) + { + elog(LOG, "proc %d lsn %X/%X is MyProc", + numprocs, + proc->waitLSN.xlogid, + proc->waitLSN.xrecoff); + } + else + { + elog(LOG, "proc %d lsn %X/%X", + numprocs, + proc->waitLSN.xlogid, + proc->waitLSN.xrecoff); + } + numprocs++; + } + + proc = queue->head; + + if (proc == MyProc) + { + if (MyProc->lwWaitLink == NULL) + { + /* + * We were the only waiter on the queue. Reset head and tail. + */ + Assert(queue->tail == MyProc); + queue->head = NULL; + queue->tail = NULL; + } + else + /* + * Move head to next proc on the queue. + */ + queue->head = MyProc->lwWaitLink; + } + else + { + while (proc->lwWaitLink != NULL) + { + /* Are we the next proc in our traversal of the queue? */ + if (proc->lwWaitLink == MyProc) + { + /* + * Remove ourselves from middle of queue. + * No need to touch head or tail. + */ + proc->lwWaitLink = MyProc->lwWaitLink; + } + + if (proc->lwWaitLink == NULL) + elog(WARNING, "could not locate ourselves on wait queue"); + proc = proc->lwWaitLink; + } + + if (proc->lwWaitLink == NULL) /* At tail */ + { + Assert(proc == MyProc); + /* Remove ourselves from tail of queue */ + Assert(queue->tail == MyProc); + queue->tail = proc; + proc->lwWaitLink = NULL; + } + } + MyProc->lwWaitLink = NULL; + current_queue = SYNC_REP_NOT_ON_QUEUE; +} + +/* + * Add myself to sync rep wait queue. + * + * Assume not on queue at start; will be on queue at end. + * Queue is already locked at start and remains locked on exit. + */ +static void +SyncRepAddToQueue(int qid) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[qid]); + PGPROC *tail = queue->tail; + +#ifdef SYNCREP_DEBUG + elog(DEBUG3, "adding myself to queue %d", qid); +#endif + + /* + * Add myself to tail of wait queue. + */ + if (tail == NULL) + { + queue->head = MyProc; + queue->tail = MyProc; + } + else + { + /* + * XXX extra code needed here to maintain sorted invariant. + * Our approach should be same as racing car - slow in, fast out. + */ + Assert(tail->lwWaitLink == NULL); + tail->lwWaitLink = MyProc; + } + queue->tail = MyProc; + + /* + * This used to be an Assert, but it keeps failing... why? + */ + MyProc->lwWaitLink = NULL; /* to be sure */ +} + +/* + * Dynamically decide the sync rep wait mode. It may seem a trifle + * wasteful to do this for every transaction but we need to do this + * so we can cope sensibly with standby disconnections. It's OK to + * spend a few cycles here anyway, since while we're doing this the + * WALSender will be sending the data we want to wait for, so this + * is dead time and the user has requested to wait anyway. + */ +static bool +SyncRepServiceAvailable(void) +{ + bool result = false; + + SpinLockAcquire(&WalSndCtl->ctlmutex); + result = WalSndCtl->sync_rep_service_available; + SpinLockRelease(&WalSndCtl->ctlmutex); + + return result; +} + +/* + * Allows more complex decision making about what the wait time should be. + */ +static long +SyncRepGetWaitTimeout(void) +{ + if (sync_rep_timeout_client <= 0) + return -1L; + + return 1000000L * sync_rep_timeout_client; +} + +void +SyncRepCleanupAtProcExit(int code, Datum arg) +{ +/* + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[qid]); + + if (IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + } +*/ + + if (MyProc != NULL && MyProc->ownLatch) + { + DisownLatch(&MyProc->waitLatch); + MyProc->ownLatch = false; + } +} + +/* + * =========================================================== + * Synchronous Replication functions for wal sender processes + * =========================================================== + */ + +/* + * Update the LSNs on each queue based upon our latest state. This + * implements a simple policy of first-valid-standby-releases-waiter. + * + * Other policies are possible, which would change what we do here and what + * perhaps also which information we store as well. + */ +void +SyncRepReleaseWaiters(bool timeout) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + int mode; + + /* + * If we are now streaming, and haven't yet enabled the sync rep service + * do so now. We don't enable sync rep service during a base backup since + * during that action we aren't sending WAL at all, so there cannot be + * any meaningful replies. We don't enable sync rep service while we + * are still in catchup mode either, since clients might experience an + * extended wait (perhaps hours) if they waited at that point. + * + * Note that we do release waiters, even if they aren't enabled yet. + * That sounds strange, but we may have dropped the connection and + * reconnected, so there may still be clients waiting for a response + * from when we were connected previously. + * + * If we already have a sync rep server connected, don't enable + * this server as well. + * + * XXX expect to be able to support multiple sync standbys in future. + */ + if (!MyWalSnd->sync_rep_service && + MyWalSnd->state == WALSNDSTATE_STREAMING && + !SyncRepServiceAvailable()) + { + ereport(LOG, + (errmsg("enabling synchronous replication service for standby"))); + + /* + * Update state for this WAL sender. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sync_rep_service = true; + SpinLockRelease(&walsnd->mutex); + } + + /* + * We have at least one standby, so we're open for business. + */ + { + SpinLockAcquire(&WalSndCtl->ctlmutex); + WalSndCtl->sync_rep_service_available = true; + SpinLockRelease(&WalSndCtl->ctlmutex); + } + + /* + * Let postmaster know we can allow connections, if the user + * requested waiting until sync rep was active before starting. + * We send this unconditionally to avoid more complexity in + * postmaster code. + */ + if (IsUnderPostmaster) + SendPostmasterSignal(PMSIGNAL_SYNC_REPLICATION_ACTIVE); + } + + /* + * No point trying to release waiters while doing a base backup + */ + if (MyWalSnd->state == WALSNDSTATE_BACKUP) + return; + +#ifdef SYNCREP_DEBUG + elog(LOG, "releasing waiters up to flush = %X/%X", + MyWalSnd->flush.xlogid, MyWalSnd->flush.xrecoff); +#endif + + + /* + * Only maintain LSNs of queues for which we advertise a service. + * This is important to ensure that we only wakeup users when a + * preferred standby has reached the required LSN. + * + * Since sycnhronous_replication_mode is currently a boolean, we either + * offer all modes, or none. + */ + for (mode = 0; mode < NUM_SYNC_REP_WAIT_MODES; mode++) + { + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[mode]); + + /* + * Lock the queue. Not really necessary with just one sync standby + * but it makes clear what needs to happen. + */ + SpinLockAcquire(&queue->qlock); + if (XLByteLT(queue->lsn, MyWalSnd->flush)) + { + /* + * Set the lsn first so that when we wake backends they will + * release up to this location. + */ + queue->lsn = MyWalSnd->flush; + SyncRepWakeFromQueue(mode, MyWalSnd->flush); + } + SpinLockRelease(&queue->qlock); + +#ifdef SYNCREP_DEBUG + elog(DEBUG2, "q%d queue = %X/%X flush = %X/%X", mode, + queue->lsn.xlogid, queue->lsn.xrecoff, + MyWalSnd->flush.xlogid, MyWalSnd->flush.xrecoff); +#endif + } +} + +/* + * Walk queue from head setting the latches of any procs that need + * to be woken. We don't modify the queue, we leave that for individual + * procs to release themselves. + * + * Must hold spinlock on queue. + */ +static void +SyncRepWakeFromQueue(int wait_queue, XLogRecPtr lsn) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue[wait_queue]); + PGPROC *proc = queue->head; + int numprocs = 0; + int totalprocs = 0; + + if (proc == NULL) + return; + + for (; proc != NULL; proc = proc->lwWaitLink) + { + elog(LOG, "proc %d lsn %X/%X", + numprocs, + proc->waitLSN.xlogid, + proc->waitLSN.xrecoff); + + if (XLByteLE(proc->waitLSN, lsn)) + { + numprocs++; + SetLatch(&proc->waitLatch); + } + totalprocs++; + } + elog(DEBUG2, "released %d procs out of %d waiting procs", numprocs, totalprocs); +#ifdef SYNCREP_DEBUG + elog(DEBUG2, "released %d procs up to %X/%X", numprocs, lsn.xlogid, lsn.xrecoff); +#endif +} + +void +SyncRepTimeoutExceeded(void) +{ + SyncRepReleaseWaiters(true); +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 30e35db..f35ab4a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include #include +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -45,6 +46,7 @@ #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -87,9 +89,9 @@ static volatile sig_atomic_t got_SIGTERM = false; */ static struct { - XLogRecPtr Write; /* last byte + 1 written out in the standby */ - XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ -} LogstreamResult; + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; static StandbyReplyMessage reply_message; @@ -210,6 +212,8 @@ WalReceiverMain(void) /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; walrcv->walRcvState = WALRCV_RUNNING; + elog(DEBUG2, "WALreceiver starting"); + OwnLatch(&WalRcv->latch); /* Run before signals enabled, since they can wakeup latch */ /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); @@ -277,6 +281,7 @@ WalReceiverMain(void) unsigned char type; char *buf; int len; + bool received_all = false; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -302,24 +307,44 @@ WalReceiverMain(void) ProcessConfigFile(PGC_SIGHUP); } - /* Wait a while for data to arrive */ - if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) + ResetLatch(&WalRcv->latch); + + if (walrcv_receive(0, &type, &buf, &len)) { - /* Accept the received data, and process it */ + received_all = false; XLogWalRcvProcessMsg(type, buf, len); + } + else + received_all = true; - /* Receive any more data we can without sleeping */ - while (walrcv_receive(0, &type, &buf, &len)) - XLogWalRcvProcessMsg(type, buf, len); + XLogWalRcvSendReply(); - /* Let the master know that we received some data. */ + if (received_all && !got_SIGHUP && !got_SIGTERM) + { + /* + * Flush, then reply. + * + * XXX We really need the WALWriter active as well + */ + XLogWalRcvFlush(); XLogWalRcvSendReply(); /* - * If we've written some records, flush them to disk and let the - * startup process know about them. + * Sleep for up to 500 ms, the fixed keepalive delay. + * + * We will be woken if new data is received from primary + * or if a commit is applied. This is sub-optimal in the + * case where a group of commits arrive, then it all goes + * quiet, but its not worth the extra code to handle both + * that and the simple case of a single commit. + * + * Note that we do not need to wake up when the Startup + * process has applied the last outstanding record. That + * is interesting iff that is a commit record. */ - XLogWalRcvFlush(); + pg_usleep(1000000L); /* slow down loop for debugging */ +// WaitLatchOrSocket(&WalRcv->latch, MyProcPort->sock, +// 500000L); } else { @@ -351,6 +376,8 @@ WalRcvDie(int code, Datum arg) walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); + DisownLatch(&WalRcv->latch); + /* Terminate the connection gracefully. */ if (walrcv_disconnect != NULL) walrcv_disconnect(); @@ -361,6 +388,7 @@ static void WalRcvSigHupHandler(SIGNAL_ARGS) { got_SIGHUP = true; + WalRcvWakeup(); } /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ @@ -368,6 +396,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS) { got_SIGTERM = true; + WalRcvWakeup(); /* Don't joggle the elbow of proc_exit */ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) @@ -609,14 +638,28 @@ XLogWalRcvSendReply(void) reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; + if (hot_standby_feedback && HotStandbyActive()) + reply_message.xmin = GetOldestXmin(true, false); + else + reply_message.xmin = InvalidTransactionId; - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %d", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff); + reply_message.apply.xlogid, reply_message.apply.xrecoff, + reply_message.xmin); /* Prepend with the message type and send it. */ buf[0] = 'r'; memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); } + +/* Wake up the WalRcv + * Prototype goes in xact.c since that is only external caller + */ +void +WalRcvWakeup(void) +{ + SetLatch(&WalRcv->latch); +}; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 04c9004..da97528 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -64,6 +64,7 @@ WalRcvShmemInit(void) MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); + InitSharedLatch(&WalRcv->latch); } } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ad95b4..2ace040 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -65,7 +65,7 @@ WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ -static WalSnd *MyWalSnd = NULL; +WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ @@ -73,6 +73,7 @@ bool am_walsender = false; /* Am I a walsender process ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ +bool allow_standalone_primary = true; /* action if no sync standby active */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, @@ -89,6 +90,8 @@ static uint32 sendOff = 0; */ static XLogRecPtr sentPtr = {0, 0}; +static TimestampTz last_reply_timestamp; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t walsender_shutdown_requested = false; @@ -113,7 +116,6 @@ static void StartReplication(StartReplicationCmd * cmd); static void ProcessStandbyReplyMessage(void); static void ProcessRepliesIfAny(void); - /* Main entry point for walsender process */ int WalSenderMain(void) @@ -150,6 +152,8 @@ WalSenderMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + elog(DEBUG2, "WALsender starting"); + /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); @@ -166,6 +170,8 @@ WalSenderMain(void) SpinLockRelease(&walsnd->mutex); } + elog(DEBUG2, "WALsender handshake complete"); + /* Main loop of walsender */ return WalSndLoop(); } @@ -250,6 +256,11 @@ WalSndHandshake(void) errmsg("invalid standby handshake message type %d", firstchar))); } } + + /* + * Initialize our timeout checking mechanism. + */ + last_reply_timestamp = GetCurrentTimestamp(); } /* @@ -417,9 +428,11 @@ HandleReplicationCommand(const char *cmd_string) /* break out of the loop */ replication_started = true; + WalSndSetState(WALSNDSTATE_CATCHUP); break; case T_BaseBackupCmd: + WalSndSetState(WALSNDSTATE_BACKUP); SendBaseBackup((BaseBackupCmd *) cmd_node); /* Send CommandComplete and ReadyForQuery messages */ @@ -524,10 +537,11 @@ ProcessStandbyReplyMessage(void) pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage)); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ", + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %d", reply.write.xlogid, reply.write.xrecoff, reply.flush.xlogid, reply.flush.xrecoff, - reply.apply.xlogid, reply.apply.xrecoff); + reply.apply.xlogid, reply.apply.xrecoff, + reply.xmin); /* * Update shared state for this WalSender process @@ -541,8 +555,16 @@ ProcessStandbyReplyMessage(void) walsnd->write = reply.write; walsnd->flush = reply.flush; walsnd->apply = reply.apply; + if (TransactionIdIsValid(reply.xmin) && + TransactionIdPrecedes(MyProc->xmin, reply.xmin)) + MyProc->xmin = reply.xmin; SpinLockRelease(&walsnd->mutex); } + + /* + * Release any backends waiting to commit. + */ + SyncRepReleaseWaiters(false); } /* Main loop of walsender process */ @@ -592,7 +614,11 @@ WalSndLoop(void) /* Normal exit from the walsender is here */ if (walsender_shutdown_requested) { - /* Inform the standby that XLOG streaming was done */ + ProcessRepliesIfAny(); + + /* Inform the standby that XLOG streaming was done + * by sending CommandComplete message. + */ pq_puttextmessage('C', "COPY 0"); pq_flush(); @@ -600,12 +626,31 @@ WalSndLoop(void) } /* - * If we had sent all accumulated WAL in last round, nap for the - * configured time before retrying. + * If we had sent all accumulated WAL in last round, then we don't + * have much to do. We still expect a steady stream of replies from + * standby. It is important to note that we don't keep track of + * whether or not there are backends waiting here, since that + * is potentially very complex state information. + * + * Also note that there is no delay between sending data and + * checking for the replies. We expect replies to take some time + * and we are more concerned with overall throughput than absolute + * response time to any single request. */ if (caughtup) { /* + * If we were still catching up, change state to streaming. + * While in the initial catchup phase, clients waiting for + * a response from the standby would wait for a very long + * time, so we need to have a one-way state transition to avoid + * problems. No need to grab a lock for the check; we are the + * only one to ever change the state. + */ + if (MyWalSnd->state < WALSNDSTATE_STREAMING) + WalSndSetState(WALSNDSTATE_STREAMING); + + /* * Even if we wrote all the WAL that was available when we started * sending, more might have arrived while we were sending this * batch. We had the latch set while sending, so we have not @@ -618,6 +663,13 @@ WalSndLoop(void) break; if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested) { + long timeout; + + if (sync_rep_timeout_server == -1) + timeout = -1L; + else + timeout = 1000000L * sync_rep_timeout_server; + /* * XXX: We don't really need the periodic wakeups anymore, * WaitLatchOrSocket should reliably wake up as soon as @@ -625,8 +677,14 @@ WalSndLoop(void) */ /* Sleep */ - WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, - WalSndDelay * 1000L); + if (WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, + timeout) == 0) + { + ereport(LOG, + (errmsg("streaming replication timeout after %d s", + sync_rep_timeout_server))); + break; + } } } else @@ -642,7 +700,7 @@ WalSndLoop(void) } /* - * Get here on send failure. Clean up and exit. + * Get here on send failure or timeout. Clean up and exit. * * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. @@ -873,9 +931,9 @@ XLogSend(char *msgbuf, bool *caughtup) * Attempt to send all data that's already been written out and fsync'd to * disk. We cannot go further than what's been written out given the * current implementation of XLogRead(). And in any case it's unsafe to - * send WAL that is not securely down to disk on the master: if the master + * send WAL that is not securely down to disk on the primary: if the primary * subsequently crashes and restarts, slaves must not have applied any WAL - * that gets lost on the master. + * that gets lost on the primary. */ SendRqstPtr = GetFlushRecPtr(); @@ -953,6 +1011,9 @@ XLogSend(char *msgbuf, bool *caughtup) msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); + elog(DEBUG2, "sent = %X/%X ", + startptr.xlogid, startptr.xrecoff); + memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); @@ -1110,6 +1171,16 @@ WalSndShmemInit(void) SpinLockInit(&walsnd->mutex); InitSharedLatch(&walsnd->latch); } + + /* + * Initialise the spinlocks on each sync rep queue + */ + for (i = 0; i < NUM_SYNC_REP_WAIT_MODES; i++) + { + SyncRepQueue *queue = &WalSndCtl->sync_rep_queue[i]; + + SpinLockInit(&queue->qlock); + } } } @@ -1169,7 +1240,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 6 +#define PG_STAT_GET_WAL_SENDERS_COLS 7 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -1212,6 +1283,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr flush; XLogRecPtr apply; WalSndState state; + bool sync_rep_service; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -1224,6 +1296,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + sync_rep_service = walsnd->sync_rep_service; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -1240,32 +1313,34 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[3] = true; nulls[4] = true; nulls[5] = true; + nulls[6] = true; } else { values[1] = CStringGetTextDatum(WalSndGetStateString(state)); + values[2] = BoolGetDatum(sync_rep_service); snprintf(location, sizeof(location), "%X/%X", sentPtr.xlogid, sentPtr.xrecoff); - values[2] = CStringGetTextDatum(location); + values[3] = CStringGetTextDatum(location); if (write.xlogid == 0 && write.xrecoff == 0) nulls[3] = true; snprintf(location, sizeof(location), "%X/%X", write.xlogid, write.xrecoff); - values[3] = CStringGetTextDatum(location); + values[4] = CStringGetTextDatum(location); if (flush.xlogid == 0 && flush.xrecoff == 0) nulls[4] = true; snprintf(location, sizeof(location), "%X/%X", flush.xlogid, flush.xrecoff); - values[4] = CStringGetTextDatum(location); + values[5] = CStringGetTextDatum(location); if (apply.xlogid == 0 && apply.xrecoff == 0) nulls[5] = true; snprintf(location, sizeof(location), "%X/%X", apply.xlogid, apply.xrecoff); - values[5] = CStringGetTextDatum(location); + values[6] = CStringGetTextDatum(location); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index be577bc..7aa7671 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -39,6 +39,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -196,6 +197,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -214,6 +216,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -224,6 +227,7 @@ InitProcGlobal(void) { AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); + InitSharedLatch(&procs[i].waitLatch); } /* Create ProcStructLock spinlock, too */ @@ -326,6 +330,13 @@ InitProcess(void) SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* Initialise the waitLSN for sync rep */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + OwnLatch((Latch *) &MyProc->waitLatch); + MyProc->ownLatch = true; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -365,6 +376,7 @@ InitProcessPhase2(void) /* * Arrange to clean that up at backend exit. */ + on_shmem_exit(SyncRepCleanupAtProcExit, 0); on_shmem_exit(RemoveProcFromArray, 0); } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 470183d..8c8e381 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -56,6 +56,7 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/walreceiver.h" +#include "replication/syncrep.h" #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/standby.h" @@ -620,6 +621,15 @@ const char *const config_type_names[] = static struct config_bool ConfigureNamesBool[] = { { + {"allow_standalone_primary", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Refuse connections on startup and force users to wait forever if synchronous replication has failed."), + NULL + }, + &allow_standalone_primary, + true, NULL, NULL + }, + + { {"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of sequential-scan plans."), NULL @@ -1279,6 +1289,33 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"synchronous_replication", PGC_USERSET, WAL_SETTINGS, + gettext_noop("Requests synchronous replication."), + NULL + }, + &sync_rep_mode, + false, NULL, NULL + }, + + { + {"synchronous_replication_feedback", PGC_POSTMASTER, WAL_STANDBY_SERVERS, + gettext_noop("Allows feedback from a standby to primary for synchronous replication."), + NULL + }, + &sync_rep_service, + true, NULL, NULL + }, + + { + {"hot_standby_feedback", PGC_POSTMASTER, WAL_STANDBY_SERVERS, + gettext_noop("Allows feedback from a hot standby to primary to avoid query conflicts."), + NULL + }, + &hot_standby_feedback, + false, NULL, NULL + }, + + { {"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS, gettext_noop("Allows modifications of the structure of system tables."), NULL, @@ -1484,6 +1521,26 @@ static struct config_int ConfigureNamesInt[] = }, { + {"replication_timeout_client", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Clients waiting for confirmation will timeout after this duration."), + NULL, + GUC_UNIT_S + }, + &sync_rep_timeout_client, + 120, -1, INT_MAX, NULL, NULL + }, + + { + {"replication_timeout_server", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Replication connection will timeout after this duration."), + NULL, + GUC_UNIT_S + }, + &sync_rep_timeout_server, + 30, -1, INT_MAX, NULL, NULL + }, + + { {"temp_buffers", PGC_USERSET, RESOURCES_MEM, gettext_noop("Sets the maximum number of temporary buffers used by each session."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5d31365..56c544d 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -184,7 +184,15 @@ #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables -# - Streaming Replication - +# - Replication - User Settings + +#synchronous_replication = off # commit waits for reply from standby +#replication_timeout_client = 120 # -1 means wait forever + +# - Streaming Replication - Server Settings + +#allow_standalone_primary = on # sync rep parameter +#replication_timeout_client = 30 # -1 means wait forever #max_wal_senders = 0 # max number of walsender processes # (change requires restart) @@ -196,6 +204,8 @@ #hot_standby = off # "on" allows queries during recovery # (change requires restart) +#hot_standby_feedback = off # info from standby to prevent query conflicts +#synchronous_replication_feedback = off # allows sync replication #max_standby_archive_delay = 30s # max delay before canceling queries # when reading WAL from archive; # -1 allows indefinite delay diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1803d5a..0fcbfe8 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -289,6 +289,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg); extern bool RecoveryInProgress(void); +extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(void); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index cb275b8..30fb3bf 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,apply_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,16,25,25,25,25}" "{o,o,o,o,o,o,o}" "{procpid,state,sync,sent_location,write_location,flush_location,apply_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 4cdb15f..9a00b2c 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -73,7 +73,7 @@ typedef struct typedef enum CAC_state { CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY, - CAC_WAITBACKUP + CAC_WAITBACKUP, CAC_REPLICATION_ONLY } CAC_state; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h new file mode 100644 index 0000000..a071b9a --- /dev/null +++ b/src/include/replication/syncrep.h @@ -0,0 +1,69 @@ +/*------------------------------------------------------------------------- + * + * syncrep.h + * Exports from replication/syncrep.c. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#ifndef _SYNCREP_H +#define _SYNCREP_H + +#include "access/xlog.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +#define SyncRepRequested() (sync_rep_mode) +#define StandbyOffersSyncRepService() (sync_rep_service) + +/* + * There is no reply from standby to primary for async mode, so the reply + * message needs one less slot than the maximum number of modes + */ +#define NUM_SYNC_REP_WAIT_MODES 1 + +extern XLogRecPtr ReplyLSN[NUM_SYNC_REP_WAIT_MODES]; + +/* + * Each synchronous rep wait mode has one SyncRepWaitQueue in shared memory. + * These queues live in the WAL sender shmem area. + */ +typedef struct SyncRepQueue +{ + /* + * Current location of the head of the queue. Nobody should be waiting + * on the queue for an lsn equal to or earlier than this value. Procs + * on the queue will always be later than this value, though we don't + * record those values here. + */ + XLogRecPtr lsn; + + PGPROC *head; + PGPROC *tail; + + slock_t qlock; /* locks shared variables shown above */ +} SyncRepQueue; + +/* user-settable parameters for synchronous replication */ +extern bool sync_rep_mode; +extern int sync_rep_timeout_client; +extern int sync_rep_timeout_server; +extern bool sync_rep_service; + +extern bool hot_standby_feedback; + +/* called by user backend */ +extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); + +/* called by wal sender */ +extern void SyncRepReleaseWaiters(bool timeout); +extern void SyncRepTimeoutExceeded(void); + +/* callback at exit */ +extern void SyncRepCleanupAtProcExit(int code, Datum arg); + +#endif /* _SYNCREP_H */ diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index 32c4962..8e4e7d0 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -56,6 +56,13 @@ typedef struct XLogRecPtr flush; XLogRecPtr apply; + /* + * The current xmin from the standby, for Hot Standby feedback. + * This may be invalid if the standby-side does not support feedback, + * or Hot Standby is not yet available. + */ + TransactionId xmin; + /* Sender's system clock at the time of transmission */ TimestampTz sendTime; } StandbyReplyMessage; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index aa5bfb7..f57df6a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -13,6 +13,8 @@ #define _WALRECEIVER_H #include "access/xlogdefs.h" +#include "replication/syncrep.h" +#include "storage/latch.h" #include "storage/spin.h" #include "pgtime.h" @@ -72,6 +74,11 @@ typedef struct */ char conninfo[MAXCONNINFO]; + /* + * Latch used by aux procs to wake up walreceiver when it has work to do. + */ + Latch latch; + slock_t mutex; /* locks shared variables shown above */ } WalRcvData; @@ -93,6 +100,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; /* prototypes for functions in walreceiver.c */ extern void WalReceiverMain(void); +extern void WalRcvWakeup(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 5843307..b44bdde 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "nodes/nodes.h" #include "storage/latch.h" +#include "replication/syncrep.h" #include "storage/spin.h" @@ -44,6 +45,17 @@ typedef struct WalSnd XLogRecPtr flush; XLogRecPtr apply; + /* + * The current xmin from the standby, for Hot Standby feedback. + * This may be invalid if the standby-side has not offered a value yet. + */ + TransactionId xmin; + + /* + * Highest level of sync rep available from this standby. + */ + bool sync_rep_service; + /* Protects shared variables shown above. */ slock_t mutex; @@ -54,9 +66,24 @@ typedef struct WalSnd Latch latch; } WalSnd; +extern WalSnd *MyWalSnd; + /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + /* + * Sync rep wait queues with one queue per request type. + * We use one queue per request type so that we can maintain the + * invariant that the individual queues are sorted on LSN. + * This may also help performance when multiple wal senders + * offer different sync rep service levels. + */ + SyncRepQueue sync_rep_queue[NUM_SYNC_REP_WAIT_MODES]; + + bool sync_rep_service_available; + + slock_t ctlmutex; /* locks shared variables shown above */ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; @@ -70,6 +97,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop; /* user-settable parameters */ extern int WalSndDelay; extern int max_wal_senders; +extern bool allow_standalone_primary; extern int WalSenderMain(void); extern void WalSndSignals(void); diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index 97bdc7b..0d2a78e 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -29,6 +29,7 @@ typedef enum PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ + PMSIGNAL_SYNC_REPLICATION_ACTIVE, /* walsender has completed handshake */ NUM_PMSIGNALS /* Must be last value of enum! */ } PMSignalReason; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 78dbade..27b57c8 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -14,6 +14,8 @@ #ifndef _PROC_H_ #define _PROC_H_ +#include "access/xlog.h" +#include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "utils/timestamp.h" @@ -115,6 +117,11 @@ struct PGPROC LOCKMASK heldLocks; /* bitmask for lock types already held on this * lock object by this backend */ + /* Info to allow us to wait for synchronous replication, if needed. */ + Latch waitLatch; + XLogRecPtr waitLSN; /* waiting for this LSN or higher */ + bool ownLatch; /* do we own the above latch? */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c0142c2..12ca1ee 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1297,7 +1297,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset; pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d; pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; - pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.apply_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, apply_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); + pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sync, w.sent_location, w.write_location, w.flush_location, w.apply_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sync, sent_location, write_location, flush_location, apply_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text)); pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text)); pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));