*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 1932,1937 **** SET ENABLE_SEQSCAN TO OFF; --- 1932,1962 ---- + + quorum (integer) + + quorum configuration parameter + + + + Specifies how many standby servers transaction commit will wait + for WAL records to be replicated to, before the command returns + a success indication to the client. The default value + is zero, which always doesn't make transaction commit wait for + replication without regard to . + Also transaction commit always doesn't wait for replication to + asynchronous standby (i.e., its replication_mode is + set to async) without regard to this parameter. + + + This parameter can be changed at any time; the behavior for any + one transaction is determined by the setting in effect when it + commits. It is therefore possible, and useful, to have some + transactions replicate synchronously and others asynchronously. + + + + vacuum_defer_cleanup_age (integer) *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 148,161 **** protocol to make nodes agree on a serializable transactional order. stream of write-ahead log (WAL) records. If the main server fails, the standby contains almost all of the data of the main server, and can be quickly ! made the new master database server. This is asynchronous and ! can only be done for the entire database server. A PITR standby server can be implemented using file-based log shipping () or streaming replication (see ! ), or a combination of both. For ! information on hot standby, see . --- 148,163 ---- stream of write-ahead log (WAL) records. If the main server fails, the standby contains almost all of the data of the main server, and can be quickly ! made the new master database server. This can only be done for ! the entire database server. A PITR standby server can be implemented using file-based log shipping () or streaming replication (see ! ), or a combination of both. ! While file-based log shipping is asynchronous, synchronization mode can ! be chosen in streaming replication (see ). ! For information on hot standby, see . *************** *** 348,354 **** protocol to make nodes agree on a serializable transactional order. No master server overhead ! --- 350,356 ---- No master server overhead ! Optional *************** *** 359,365 **** protocol to make nodes agree on a serializable transactional order. No waiting for multiple servers ! --- 361,367 ---- No waiting for multiple servers ! Optional *************** *** 370,376 **** protocol to make nodes agree on a serializable transactional order. Master failure will never lose data ! --- 372,378 ---- Master failure will never lose data ! Optional *** a/doc/src/sgml/protocol.sgml --- b/doc/src/sgml/protocol.sgml *************** *** 1337,1347 **** The commands accepted in walsender mode are: ! START_REPLICATION XXX/XXX Instructs server to start streaming WAL, starting at ! WAL position XXX/XXX. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a CopyOutResponse message, and then starts to stream WAL to the frontend. --- 1337,1348 ---- ! START_REPLICATION XXX/XXX N Instructs server to start streaming WAL, starting at ! WAL position XXX/XXX ! with the replication mode N. The server can reply with an error, e.g. if the requested section of WAL has already been recycled. On success, server responds with a CopyOutResponse message, and then starts to stream WAL to the frontend. *************** *** 1360,1365 **** The commands accepted in walsender mode are: --- 1361,1401 ---- + XLogRecPtr (F) + + + + + + + Byte1('l') + + + + Identifies the message as an acknowledgment of replication. + + + + + + Byte8 + + + + The end of the WAL data replicated to the standby, given in + XLogRecPtr format. + + + + + + + + + + + + XLogData (B) *** a/doc/src/sgml/recovery-config.sgml --- b/doc/src/sgml/recovery-config.sgml *************** *** 280,285 **** restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows --- 280,343 ---- + + replication_mode (string) + + replication_mode recovery parameter + + + + Specifies the replication mode which can control how long transaction + commit on the master server waits for replication before the command + returns a success indication to the client. Valid modes are: + + + + async (doesn't make transaction commit wait for replication, + i.e., asynchronous replication) + + + + + recv (makes transaction commit wait until the standby has + received WAL records) + + + + + fsync (makes transaction commit wait until the standby has + received and flushed WAL records to disk) + + + + + replay (makes transaction commit wait until the standby has + replayed WAL records after receiving and flushing them to disk) + + + + + + In asynchronous replication, there can be a delay between when a + success is reported to the client and when the transaction + is really guaranteed to be safe against a failover. Setting this + parameter to async does not create any risk of database + inconsistency: a crash at the master server might result in some recent + allegedly-committed transactions being lost at the standby server, + but the database state of the standby will be just the same as if + those transactions had been aborted cleanly. So, turning + replication_mode async can be a useful + alternative when performance is more important than exact certainty + about the durability of a transaction. + The default setting is async. + + + If is set to zero in the master server, + transaction commit always doesn't wait for replication without regard + to this parameter. + + + trigger_file (string) *** a/src/backend/access/transam/recovery.conf.sample --- b/src/backend/access/transam/recovery.conf.sample *************** *** 100,105 **** --- 100,110 ---- #primary_conninfo = '' # e.g. 'host=localhost port=5432' # # + # Specifies the synchronization mode of replication. + # + #replication_mode = 'async' # 'async', 'recv', 'fsync' or 'replay' + # + # # By default, a standby server keeps streaming XLOG records from the # primary indefinitely. If you want to stop streaming and finish recovery, # opening up the system in read/write mode, specify path to a trigger file. *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 36,41 **** --- 36,42 ---- #include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "pgstat.h" + #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" *************** *** 1088,1093 **** RecordTransactionCommit(void) --- 1089,1106 ---- /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for WAL to be replicated up to the COMMIT record if replication + * is enabled and quorum > 0. This operation has to be performed after + * the COMMIT record is generated and before other transactions know that + * this one has been committed. + * + * XXX: Since the caller prevents cancel/die interrupt, we cannot + * process that while waiting. Should we remove this restriction? + */ + if (max_wal_senders > 0 && quorum > 0) + WaitXLogSend(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 189,194 **** static TimestampTz recoveryTargetTime; --- 189,195 ---- static bool StandbyMode = false; static char *PrimaryConnInfo = NULL; static char *TriggerFile = NULL; + int rplMode = REPLICATION_MODE_ASYNC; /* if recoveryStopsHere returns true, it saves actual stop xid/time here */ static TransactionId recoveryStopXid; *************** *** 5258,5263 **** readRecoveryCommandFile(void) --- 5259,5282 ---- (errmsg("trigger_file = '%s'", TriggerFile))); } + else if (strcmp(tok1, "replication_mode") == 0) + { + if (strcmp(tok2, "async") == 0) + rplMode = REPLICATION_MODE_ASYNC; + else if (strcmp(tok2, "recv") == 0) + rplMode = REPLICATION_MODE_RECV; + else if (strcmp(tok2, "fsync") == 0) + rplMode = REPLICATION_MODE_FSYNC; + else if (strcmp(tok2, "replay") == 0) + rplMode = REPLICATION_MODE_REPLAY; + else + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"replication_mode\": \"%s\"", + tok2))); + ereport(DEBUG2, + (errmsg("replication_mode = '%s'", tok2))); + } else ereport(FATAL, (errmsg("unrecognized recovery parameter \"%s\"", *************** *** 6867,6872 **** GetFlushRecPtr(void) --- 6886,6908 ---- } /* + * GetReplayRecPtr -- Returns the last replay position. + */ + XLogRecPtr + GetReplayRecPtr(void) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr recptr; + + SpinLockAcquire(&xlogctl->info_lck); + recptr = xlogctl->recoveryLastRecPtr; + SpinLockRelease(&xlogctl->info_lck); + + return recptr; + } + + /* * Get the time of the last xlog segment switch */ pg_time_t *************** *** 8824,8837 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS) Datum pg_last_xlog_replay_location(PG_FUNCTION_ARGS) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; char location[MAXFNAMELEN]; ! SpinLockAcquire(&xlogctl->info_lck); ! recptr = xlogctl->recoveryLastRecPtr; ! SpinLockRelease(&xlogctl->info_lck); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); --- 8860,8869 ---- Datum pg_last_xlog_replay_location(PG_FUNCTION_ARGS) { XLogRecPtr recptr; char location[MAXFNAMELEN]; ! recptr = GetReplayRecPtr(); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); *************** *** 9463,9469 **** retry: { RequestXLogStreaming( fetching_ckpt ? RedoStartLSN : *RecPtr, ! PrimaryConnInfo); continue; } } --- 9495,9501 ---- { RequestXLogStreaming( fetching_ckpt ? RedoStartLSN : *RecPtr, ! PrimaryConnInfo, rplMode); continue; } } *** a/src/backend/libpq/be-secure.c --- b/src/backend/libpq/be-secure.c *************** *** 71,76 **** --- 71,86 ---- #endif #endif /* USE_SSL */ + #ifdef HAVE_POLL_H + #include + #endif + #ifdef HAVE_SYS_POLL_H + #include + #endif + #ifdef HAVE_SYS_SELECT_H + #include + #endif + #include "libpq/libpq.h" #include "tcop/tcopprot.h" *************** *** 397,402 **** wloop: --- 407,472 ---- return n; } + /* + * Checks a socket, using poll or select, for data to be read. + * Returns >0 if there is data to read, 0 if it timed out, -1 + * if an error occurred (including the interrupt). + * + * Timeout is specified in millisec. Timeout is infinite if + * timeout_ms is negative. Timeout is immediate (no blocking) + * if timeout_ms is 0. + * + * If SSL is in use, the SSL buffer is checked prior to + * checking the socket for read data directly. + * + * This function is based on pqSocketCheck and pqSocketPoll. + */ + int + secure_poll(Port *port, int timeout_ms) + { + #ifdef USE_SSL + /* Check for SSL library buffering read bytes */ + if (port->ssl && SSL_pending(port->ssl) > 0) + { + /* short-circuit the select */ + return 1; + } + #endif + + { + /* We use poll(2) if available, otherwise select(2) */ + #ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = port->sock; + input_fd.events = POLLIN | POLLERR; + input_fd.revents = 0; + + return poll(&input_fd, 1, timeout_ms); + #else /* !HAVE_POLL */ + + fd_set input_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_SET(port->sock, &input_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + return select(port->sock + 1, &input_mask, + NULL, NULL, ptr_timeout); + #endif /* HAVE_POLL */ + } + } + /* ------------------------------------------------------------ */ /* SSL specific code */ /* ------------------------------------------------------------ */ *** a/src/backend/libpq/pqcomm.c --- b/src/backend/libpq/pqcomm.c *************** *** 56,61 **** --- 56,62 ---- * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output * pq_getbyte_if_available - get a byte if available without blocking + * pq_wait - wait until we can read connection * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) *************** *** 911,916 **** pq_getbyte_if_available(unsigned char *c) --- 912,959 ---- } /* -------------------------------- + * pq_wait - wait until we can read the connection socket. + * + * returns >0 if there is data to read, 0 if it timed out or + * interrupted, -1 if an error occurred. + * + * this function is based on pqSocketCheck. + * -------------------------------- + */ + int + pq_wait(int timeout_ms) + { + int result; + + if (!MyProcPort) + return -1; + if (MyProcPort->sock < 0) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + return -1; + } + + result = secure_poll(MyProcPort, timeout_ms); + if (result < 0) + { + if (errno == EINTR) + return 0; /* interrupted */ + + /* + * XXX: Should we suppress duplicate log messages also here, + * like internal_flush? + */ + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + } + + return result; + } + + /* -------------------------------- * pq_getbytes - get a known number of bytes from connection * * returns 0 if OK, EOF if trouble *** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c --- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c *************** *** 47,55 **** static bool justconnected = false; static char *recvBuf = NULL; /* Prototypes for interface functions */ ! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ --- 47,57 ---- static char *recvBuf = NULL; /* Prototypes for interface functions */ ! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, ! int mode); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); + static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ *************** *** 64,73 **** _PG_init(void) { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || ! walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; walrcv_disconnect = libpqrcv_disconnect; } --- 66,76 ---- { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || ! walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; + walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; } *************** *** 75,81 **** _PG_init(void) * Establish the connection to the primary server for XLOG streaming */ static bool ! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) { char conninfo_repl[MAXCONNINFO + 37]; char *primary_sysid; --- 78,84 ---- * Establish the connection to the primary server for XLOG streaming */ static bool ! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, int mode) { char conninfo_repl[MAXCONNINFO + 37]; char *primary_sysid; *************** *** 154,161 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ThisTimeLineID = primary_tli; /* Start streaming from the point requested by startup process */ ! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", ! startpoint.xlogid, startpoint.xrecoff); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) { --- 157,164 ---- ThisTimeLineID = primary_tli; /* Start streaming from the point requested by startup process */ ! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X %d", ! startpoint.xlogid, startpoint.xrecoff, mode); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) { *************** *** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) --- 401,418 ---- return true; } + + /* + * Send a message to XLOG stream. + * + * ereports on error. + */ + static void + libpqrcv_send(const char *buffer, int nbytes) + { + if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || + PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send data to WAL stream: %s", + PQerrorMessage(streamConn)))); + } *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 57,62 **** bool am_walreceiver; --- 57,63 ---- /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; + walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ *************** *** 113,118 **** static void WalRcvDie(int code, Datum arg); --- 114,120 ---- static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); + static void XLogWalRcvSendRecPtr(XLogRecPtr recptr); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); *************** *** 159,164 **** WalReceiverMain(void) --- 161,167 ---- { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; + XLogRecPtr ackedpoint = {0, 0}; /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; *************** *** 206,211 **** WalReceiverMain(void) --- 209,215 ---- /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + rplMode = walrcv->rplMode; startpoint = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); *************** *** 247,253 **** WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || ! walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* --- 251,257 ---- /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || ! walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* *************** *** 261,267 **** WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); ! walrcv_connect(conninfo, startpoint); DisableWalRcvImmediateExit(); /* Loop until end-of-streaming or error */ --- 265,271 ---- /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); ! walrcv_connect(conninfo, startpoint, rplMode); DisableWalRcvImmediateExit(); /* Loop until end-of-streaming or error */ *************** *** 311,316 **** WalReceiverMain(void) --- 315,339 ---- */ XLogWalRcvFlush(); } + + /* + * If replication_mode is "replay", send the last WAL replay location + * to the primary, to acknowledge that replication has been completed + * up to that. This occurs only when WAL records were replayed since + * the last acknowledgement. + */ + if (rplMode == REPLICATION_MODE_REPLAY && + XLByteLT(ackedpoint, LogstreamResult.Flush)) + { + XLogRecPtr recptr; + + recptr = GetReplayRecPtr(); + if (XLByteLT(ackedpoint, recptr)) + { + XLogWalRcvSendRecPtr(recptr); + ackedpoint = recptr; + } + } } } *************** *** 406,411 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) --- 429,447 ---- buf += sizeof(WalDataMessageHeader); len -= sizeof(WalDataMessageHeader); + /* + * If replication_mode is "recv", send the last WAL receive + * location to the primary, to acknowledge that replication + * has been completed up to that. + */ + if (rplMode == REPLICATION_MODE_RECV) + { + XLogRecPtr endptr = msghdr.dataStart; + + XLByteAdvance(endptr, len); + XLogWalRcvSendRecPtr(endptr); + } + XLogWalRcvWrite(buf, len, msghdr.dataStart); break; } *************** *** 523,528 **** XLogWalRcvFlush(void) --- 559,572 ---- LogstreamResult.Flush = LogstreamResult.Write; + /* + * If replication_mode is "fsync", send the last WAL flush + * location to the primary, to acknowledge that replication + * has been completed up to that. + */ + if (rplMode == REPLICATION_MODE_FSYNC) + XLogWalRcvSendRecPtr(LogstreamResult.Flush); + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); walrcv->latestChunkStart = walrcv->receivedUpto; *************** *** 541,543 **** XLogWalRcvFlush(void) --- 585,608 ---- } } } + + /* Send the lsn to the primary server */ + static void + XLogWalRcvSendRecPtr(XLogRecPtr recptr) + { + static char *msgbuf = NULL; + WalAckMessageData msgdata; + + /* + * Allocate buffer that will be used for each output message if first + * time through. We do this just once to reduce palloc overhead. + * The buffer must be made large enough for maximum-sized messages. + */ + if (msgbuf == NULL) + msgbuf = palloc(1 + sizeof(WalAckMessageData)); + + msgbuf[0] = 'l'; + msgdata.ackEnd = recptr; + memcpy(msgbuf + 1, &msgdata, sizeof(WalAckMessageData)); + walrcv_send(msgbuf, 1 + sizeof(WalAckMessageData)); + } *** a/src/backend/replication/walreceiverfuncs.c --- b/src/backend/replication/walreceiverfuncs.c *************** *** 168,178 **** ShutdownWalRcv(void) /* * Request postmaster to start walreceiver. * ! * recptr indicates the position where streaming should begin, and conninfo ! * is a libpq connection string to use. */ void ! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; --- 168,178 ---- /* * Request postmaster to start walreceiver. * ! * recptr indicates the position where streaming should begin, conninfo is ! * a libpq connection string to use, and mode is a replication mode. */ void ! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo, int mode) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; *************** *** 196,201 **** RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) --- 196,202 ---- strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; + walrcv->rplMode = mode; walrcv->walRcvState = WALRCV_STARTING; walrcv->startTime = now; *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 65,73 **** bool am_walsender = false; /* Am I a walsender process ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ ! #define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles ! * (100ms) */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, --- 65,73 ---- /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ + int quorum = 0; /* the maximum number of synchronous walsenders */ ! #define NAPTIME_PER_CYCLE 100L /* max sleep time between cycles (100ms) */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, *************** *** 84,89 **** static uint32 sendOff = 0; --- 84,96 ---- */ static XLogRecPtr sentPtr = {0, 0}; + /* + * How far have we completed replication already? This is also + * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous + * replication case. + */ + static XLogRecPtr ackdPtr = {0, 0}; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t shutdown_requested = false; *************** *** 101,107 **** static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(char *msgbuf, bool *caughtup); ! static void CheckClosedConnection(void); /* Main entry point for walsender process */ --- 108,114 ---- static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(char *msgbuf, bool *caughtup); ! static void ProcessStreamMsgs(StringInfo inMsg); /* Main entry point for walsender process */ *************** *** 255,262 **** WalSndHandshake(void) ReadyForQuery(DestRemote); /* ReadyForQuery did pq_flush for us */ } ! else if (sscanf(query_string, "START_REPLICATION %X/%X", ! &recptr.xlogid, &recptr.xrecoff) == 2) { StringInfoData buf; --- 262,269 ---- ReadyForQuery(DestRemote); /* ReadyForQuery did pq_flush for us */ } ! else if (sscanf(query_string, "START_REPLICATION %X/%X %d", ! &recptr.xlogid, &recptr.xrecoff, &rplMode) == 3) { StringInfoData buf; *************** *** 277,282 **** WalSndHandshake(void) --- 284,318 ---- (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); + /* Verify that the specified replication mode is valid */ + switch (rplMode) + { + case REPLICATION_MODE_ASYNC: + break; + case REPLICATION_MODE_RECV: + case REPLICATION_MODE_FSYNC: + case REPLICATION_MODE_REPLAY: + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSndCtlData *walsndctl = WalSndCtl; + + /* + * Update the current number of synchronous standbys + * if replication mode is "synchronous" + */ + SpinLockAcquire(&walsndctl->info_lck); + walsndctl->num_sync_sbys++; + Assert(walsndctl->num_sync_sbys > 0); + SpinLockRelease(&walsndctl->info_lck); + break; + } + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid replication mode: %d", rplMode))); + } + MyWalSnd->rplMode = rplMode; + /* Send a CopyOutResponse message, and start streaming */ pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, 0); *************** *** 285,294 **** WalSndHandshake(void) pq_flush(); /* ! * Initialize position to the received one, then the * xlog records begin to be shipped from that position */ ! sentPtr = recptr; /* break out of the loop */ replication_started = true; --- 321,330 ---- pq_flush(); /* ! * Initialize positions to the received one, then the * xlog records begin to be shipped from that position */ ! sentPtr = ackdPtr = recptr; /* break out of the loop */ replication_started = true; *************** *** 322,364 **** WalSndHandshake(void) } /* ! * Check if the remote end has closed the connection. */ static void ! CheckClosedConnection(void) { ! unsigned char firstchar; ! int r; ! r = pq_getbyte_if_available(&firstchar); ! if (r < 0) ! { ! /* unexpected error or EOF */ ! ereport(COMMERROR, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("unexpected EOF on standby connection"))); ! proc_exit(0); ! } ! if (r == 0) { ! /* no data available without blocking */ ! return; ! } - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { /* * 'X' means that the standby is closing down the socket. */ ! case 'X': ! proc_exit(0); ! default: ! ereport(FATAL, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("invalid standby closing message type %d", ! firstchar))); } } --- 358,466 ---- } /* ! * Process messages received from the standby. ! * ! * ereports on error. */ static void ! ProcessStreamMsgs(StringInfo inMsg) { ! bool acked = false; ! /* Loop to process successive complete messages available */ ! for (;;) { ! unsigned char firstchar; ! int r; ! ! r = pq_getbyte_if_available(&firstchar); ! if (r < 0) ! { ! /* unexpected error or EOF */ ! ereport(COMMERROR, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("unexpected EOF on standby connection"))); ! proc_exit(0); ! } ! if (r == 0) ! { ! /* no data available without blocking */ ! break; ! } ! ! /* Handle the very limited subset of commands expected in this phase */ ! switch (firstchar) ! { ! case 'd': /* CopyData message */ ! { ! unsigned char rpltype; ! ! /* ! * Read the message contents. This is expected to be done without ! * blocking because we've been able to get message type code. ! */ ! if (pq_getmessage(inMsg, 0)) ! proc_exit(0); /* suitable message already logged */ ! ! /* Read the replication message type from CopyData message */ ! rpltype = pq_getmsgbyte(inMsg); ! switch (rpltype) ! { ! case 'l': ! { ! WalAckMessageData *msgdata; ! ! msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData)); ! ! /* ! * Update local status. ! * ! * The ackd ptr received from standby should not ! * go backwards. ! */ ! if (XLByteLE(ackdPtr, msgdata->ackEnd)) ! ackdPtr = msgdata->ackEnd; ! else ! ereport(FATAL, ! (errmsg("replication completion location went back from " ! "%X/%X to %X/%X", ! ackdPtr.xlogid, ackdPtr.xrecoff, ! msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff))); ! ! acked = true; /* also need to update shared position */ ! break; ! } ! default: ! ereport(FATAL, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("invalid replication message type %d", ! rpltype))); ! } ! break; ! } /* * 'X' means that the standby is closing down the socket. */ ! case 'X': ! proc_exit(0); ! default: ! ereport(FATAL, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("invalid standby closing message type %d", ! firstchar))); ! } ! } ! ! if (acked) ! { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = MyWalSnd; ! ! SpinLockAcquire(&walsnd->mutex); ! walsnd->ackdPtr = ackdPtr; ! SpinLockRelease(&walsnd->mutex); } } *************** *** 366,374 **** CheckClosedConnection(void) --- 468,479 ---- static int WalSndLoop(void) { + StringInfoData input_message; char *output_message; bool caughtup = false; + initStringInfo(&input_message); + /* * Allocate buffer that will be used for each output message. We do this * just once to reduce palloc overhead. The buffer must be made large *************** *** 428,443 **** WalSndLoop(void) */ if (caughtup) { ! remain = WalSndDelay * 1000L; while (remain > 0) { /* Check for interrupts */ if (got_SIGHUP || shutdown_requested || ready_to_stop) break; ! /* Sleep and check that the connection is still alive */ ! pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); ! CheckClosedConnection(); remain -= NAPTIME_PER_CYCLE; } --- 533,562 ---- */ if (caughtup) { ! remain = WalSndDelay; while (remain > 0) { + int res; + /* Check for interrupts */ if (got_SIGHUP || shutdown_requested || ready_to_stop) break; ! /* ! * Check to see whether a message from the standby or an interrupt ! * from other processes has arrived. ! */ ! res = pq_wait(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); ! if (res < 0) ! { ! /* unexpected error or EOF */ ! ereport(COMMERROR, ! (errcode(ERRCODE_PROTOCOL_VIOLATION), ! errmsg("unexpected EOF on standby connection"))); ! proc_exit(0); ! } ! if (res > 0) ! ProcessStreamMsgs(&input_message); remain -= NAPTIME_PER_CYCLE; } *************** *** 496,501 **** InitWalSnd(void) --- 615,621 ---- MyWalSnd = (WalSnd *) walsnd; walsnd->pid = MyProcPid; MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr)); + MemSet(&MyWalSnd->ackdPtr, 0, sizeof(XLogRecPtr)); SpinLockRelease(&walsnd->mutex); break; } *************** *** 523,528 **** WalSndKill(int code, Datum arg) --- 643,663 ---- */ MyWalSnd->pid = 0; + /* + * Update the current number of synchronous standbys if replication + * mode is "synchronous" + */ + if (rplMode >= REPLICATION_MODE_RECV) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSndCtlData *walsndctl = WalSndCtl; + + SpinLockAcquire(&walsndctl->info_lck); + Assert(walsndctl->num_sync_sbys > 0); + walsndctl->num_sync_sbys--; + SpinLockRelease(&walsndctl->info_lck); + } + /* WalSnd struct isn't mine anymore */ MyWalSnd = NULL; } *************** *** 884,889 **** WalSndShmemInit(void) --- 1019,1025 ---- { /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); + SpinLockInit(&WalSndCtl->info_lck); for (i = 0; i < max_wal_senders; i++) { *************** *** 895,937 **** WalSndShmemInit(void) } /* ! * This isn't currently used for anything. Monitoring tools might be ! * interested in the future, and we'll need something like this in the ! * future for synchronous replication. */ ! #ifdef NOT_USED ! /* ! * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr ! * if none. ! */ ! XLogRecPtr ! GetOldestWALSendPointer(void) { ! XLogRecPtr oldest = {0, 0}; ! int i; ! bool found = false; ! for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! XLogRecPtr recptr; ! if (walsnd->pid == 0) ! continue; ! SpinLockAcquire(&walsnd->mutex); ! recptr = walsnd->sentPtr; ! SpinLockRelease(&walsnd->mutex); ! if (recptr.xlogid == 0 && recptr.xrecoff == 0) ! continue; ! if (!found || XLByteLT(recptr, oldest)) ! oldest = recptr; ! found = true; } - return oldest; } - - #endif --- 1031,1102 ---- } /* ! * Ensure that all xlog records through the given position is ! * replicated to the standby */ ! void ! WaitXLogSend(XLogRecPtr record) { ! Assert(max_wal_senders > 0); ! Assert(quorum > 0); ! for (;;) { /* use volatile pointer to prevent code rearrangement */ ! volatile WalSndCtlData *walsndctl = WalSndCtl; ! int i; ! int already_acked = 0; ! bool unacked = false; ! /* Don't need to wait if there is no synchronous standbys */ ! if (walsndctl->num_sync_sbys == 0) ! return; ! /* ! * Count walsenders which have already received the ACK meaning ! * completion of replication up to the given position. If the ! * sum is more than or equal to the "quorum", the backend breaks ! * out of this loop and returns a "success" of the transaction ! * to a client. ! */ ! for (i = 0; i < max_wal_senders; i++) ! { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! XLogRecPtr recptr; ! /* Don't count inactive or asynchronous walsenders */ ! if (walsnd->pid == 0 || ! walsnd->rplMode == REPLICATION_MODE_ASYNC) ! continue; ! SpinLockAcquire(&walsnd->mutex); ! recptr = walsnd->ackdPtr; ! SpinLockRelease(&walsnd->mutex); ! ! if ((recptr.xlogid == 0 && recptr.xrecoff == 0) || ! XLByteLT(recptr, record)) ! { ! unacked = true; ! continue; ! } ! ! if (++already_acked >= quorum) ! return; ! } ! ! /* ! * If synchronous walsender was not found in the WalSnd array, ! * we no longer need to wait. This can happen if all synchronous ! * walsenders are terminated while searching the array. ! * ! * If all synchronous walsenders have already received the ACK, ! * we no longer need to wait, too. This can happen when the ! * "quorum" is more than max_wal_senders. ! */ ! if (!unacked) ! return; ! ! pg_usleep(100000L); /* 100ms */ } } *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 1756,1761 **** static struct config_int ConfigureNamesInt[] = --- 1756,1770 ---- }, { + {"quorum", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Sets the maximum number of synchronous standby servers."), + NULL + }, + &quorum, + 0, 0, INT_MAX / 4, NULL, NULL + }, + + { {"commit_delay", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " "flushing WAL to disk."), *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 188,193 **** --- 188,194 ---- #max_wal_senders = 0 # max number of walsender processes #wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables + #quorum = 0 # max number of synchronous standbys #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 261,266 **** typedef struct CheckpointStatsData --- 261,299 ---- extern CheckpointStatsData CheckpointStats; + /* + * Synchronization mode of replication. These modes identify how far + * we should wait for replication. + */ + typedef enum + { + /* + * doesn't make transaction commit wait for replication, i.e., + * asynchronous replication. + */ + REPLICATION_MODE_ASYNC, + + /* + * makes transaction commit wait for XLOG records to be received + * on the standby server. + */ + REPLICATION_MODE_RECV, + + /* + * makes transaction commit wait for XLOG records to be received + * and fsync'd on the standby server. + */ + REPLICATION_MODE_FSYNC, + + /* + * makes transaction commit wait for XLOG records to be received, + * fsync'd and replayed on the standby server. + */ + REPLICATION_MODE_REPLAY + } ReplicationMode; + extern int rplMode; + + extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern void XLogBackgroundFlush(void); *************** *** 298,303 **** extern void XLogPutNextOid(Oid nextOid); --- 331,337 ---- extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(void); + extern XLogRecPtr GetReplayRecPtr(void); extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch); extern TimeLineID GetRecoveryTargetTLI(void); *** a/src/include/libpq/libpq.h --- b/src/include/libpq/libpq.h *************** *** 58,63 **** extern int pq_getmessage(StringInfo s, int maxlen); --- 58,64 ---- extern int pq_getbyte(void); extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char *c); + extern int pq_wait(int timeout_ms); extern int pq_putbytes(const char *s, size_t len); extern int pq_flush(void); extern int pq_putmessage(char msgtype, const char *s, size_t len); *************** *** 74,78 **** extern int secure_open_server(Port *port); --- 75,80 ---- extern void secure_close(Port *port); extern ssize_t secure_read(Port *port, void *ptr, size_t len); extern ssize_t secure_write(Port *port, void *ptr, size_t len); + extern int secure_poll(Port *port, int timeout_ms); #endif /* LIBPQ_H */ *** a/src/include/replication/walprotocol.h --- b/src/include/replication/walprotocol.h *************** *** 50,53 **** typedef struct --- 50,63 ---- */ #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) + /* + * Body for a WAL acknowledgment message (message type 'l'). This is wrapped + * within a CopyData message at the FE/BE protocol level. + */ + typedef struct + { + /* End of WAL replicated to the standby */ + XLogRecPtr ackEnd; + } WalAckMessageData; + #endif /* _WALPROTOCOL_H */ *** a/src/include/replication/walreceiver.h --- b/src/include/replication/walreceiver.h *************** *** 71,89 **** typedef struct */ char conninfo[MAXCONNINFO]; slock_t mutex; /* locks shared variables shown above */ } WalRcvData; extern WalRcvData *WalRcv; /* libpqwalreceiver hooks */ ! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_disconnect_type) (void); extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; --- 71,99 ---- */ char conninfo[MAXCONNINFO]; + /* + * replication mode; controls how long transaction commit on the primary + * server waits for replication. + */ + int rplMode; + slock_t mutex; /* locks shared variables shown above */ } WalRcvData; extern WalRcvData *WalRcv; /* libpqwalreceiver hooks */ ! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint, ! int mode); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; + typedef void (*walrcv_send_type) (const char *buffer, int nbytes); + extern PGDLLIMPORT walrcv_send_type walrcv_send; + typedef void (*walrcv_disconnect_type) (void); extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; *************** *** 93,99 **** extern void WalRcvShmemInit(void); extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); ! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); #endif /* _WALRECEIVER_H */ --- 103,110 ---- extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); ! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo, ! int mode); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); #endif /* _WALRECEIVER_H */ *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 22,27 **** typedef struct WalSnd --- 22,30 ---- { pid_t pid; /* this walsender's process id, or 0 */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + XLogRecPtr ackdPtr; /* WAL has been replicated up to this point */ + + int rplMode; /* replication mode */ slock_t mutex; /* locks shared variables shown above */ } WalSnd; *************** *** 29,34 **** typedef struct WalSnd --- 32,41 ---- /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + int num_sync_sbys; /* current # of synchronous standbys */ + + slock_t info_lck; /* protects the variable shown above */ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; *************** *** 40,49 **** extern bool am_walsender; --- 47,58 ---- /* user-settable parameters */ extern int WalSndDelay; extern int max_wal_senders; + extern int quorum; extern int WalSenderMain(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); + extern void WaitXLogSend(XLogRecPtr recptr); #endif /* _WALSENDER_H */ *** a/src/interfaces/libpq/fe-exec.c --- b/src/interfaces/libpq/fe-exec.c *************** *** 2002,2007 **** PQnotifies(PGconn *conn) --- 2002,2010 ---- /* * PQputCopyData - send some data to the backend during COPY IN * + * Note that this function might be called by walreceiver even during + * COPY OUT to send a message to XLOG stream. + * * Returns 1 if successful, 0 if data could not be sent (only possible * in nonblock mode), or -1 if an error occurs. */ *************** *** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes) { if (!conn) return -1; ! if (conn->asyncStatus != PGASYNC_COPY_IN) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); --- 2013,2020 ---- { if (!conn) return -1; ! if (conn->asyncStatus != PGASYNC_COPY_IN && ! conn->asyncStatus != PGASYNC_COPY_OUT) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n"));