*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1932,1937 **** SET ENABLE_SEQSCAN TO OFF;
--- 1932,1962 ----
+
+ quorum (integer)
+
+ quorum> configuration parameter
+
+
+
+ Specifies how many standby servers transaction commit will wait
+ for WAL records to be replicated to, before the command returns
+ a success> indication to the client. The default value
+ is zero, which always doesn't make transaction commit wait for
+ replication without regard to .
+ Also transaction commit always doesn't wait for replication to
+ asynchronous standby (i.e., its replication_mode> is
+ set to async>) without regard to this parameter.
+
+
+ This parameter can be changed at any time; the behavior for any
+ one transaction is determined by the setting in effect when it
+ commits. It is therefore possible, and useful, to have some
+ transactions replicate synchronously and others asynchronously.
+
+
+
+
vacuum_defer_cleanup_age (integer)
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 148,161 **** protocol to make nodes agree on a serializable transactional order.
stream of write-ahead log (WAL>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This is asynchronous and
! can only be done for the entire database server.
A PITR standby server can be implemented using file-based log shipping
() or streaming replication (see
! ), or a combination of both. For
! information on hot standby, see .
--- 148,163 ----
stream of write-ahead log (WAL>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This can only be done for
! the entire database server.
A PITR standby server can be implemented using file-based log shipping
() or streaming replication (see
! ), or a combination of both.
! While file-based log shipping is asynchronous, synchronization mode can
! be chosen in streaming replication (see ).
! For information on hot standby, see .
***************
*** 348,354 **** protocol to make nodes agree on a serializable transactional order.
No master server overhead
•
! •
•
--- 350,356 ----
No master server overhead
•
! Optional
•
***************
*** 359,365 **** protocol to make nodes agree on a serializable transactional order.
No waiting for multiple servers
•
! •
•
•
--- 361,367 ----
No waiting for multiple servers
•
! Optional
•
•
***************
*** 370,376 **** protocol to make nodes agree on a serializable transactional order.
Master failure will never lose data
•
•
!
•
--- 372,378 ----
Master failure will never lose data
•
•
! Optional
•
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1337,1347 **** The commands accepted in walsender mode are:
! START_REPLICATION XXX>/XXX>
Instructs server to start streaming WAL, starting at
! WAL position XXX>/XXX>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
--- 1337,1348 ----
! START_REPLICATION XXX>/XXX> N>
Instructs server to start streaming WAL, starting at
! WAL position XXX>/XXX>
! with the replication mode N>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
***************
*** 1360,1365 **** The commands accepted in walsender mode are:
--- 1361,1401 ----
+ XLogRecPtr (F)
+
+
+
+
+
+
+ Byte1('l')
+
+
+
+ Identifies the message as an acknowledgment of replication.
+
+
+
+
+
+ Byte8
+
+
+
+ The end of the WAL data replicated to the standby, given in
+ XLogRecPtr format.
+
+
+
+
+
+
+
+
+
+
+
+
XLogData (B)
*** a/doc/src/sgml/recovery-config.sgml
--- b/doc/src/sgml/recovery-config.sgml
***************
*** 280,285 **** restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
--- 280,343 ----
+
+ replication_mode (string)
+
+ replication_mode> recovery parameter
+
+
+
+ Specifies the replication mode which can control how long transaction
+ commit on the master server waits for replication before the command
+ returns a success> indication to the client. Valid modes are:
+
+
+
+ async> (doesn't make transaction commit wait for replication,
+ i.e., asynchronous replication)
+
+
+
+
+ recv> (makes transaction commit wait until the standby has
+ received WAL records)
+
+
+
+
+ fsync> (makes transaction commit wait until the standby has
+ received and flushed WAL records to disk)
+
+
+
+
+ replay> (makes transaction commit wait until the standby has
+ replayed WAL records after receiving and flushing them to disk)
+
+
+
+
+
+ In asynchronous replication, there can be a delay between when a
+ success> is reported to the client and when the transaction
+ is really guaranteed to be safe against a failover. Setting this
+ parameter to async> does not create any risk of database
+ inconsistency: a crash at the master server might result in some recent
+ allegedly-committed transactions being lost at the standby server,
+ but the database state of the standby will be just the same as if
+ those transactions had been aborted cleanly. So, turning
+ replication_mode> async> can be a useful
+ alternative when performance is more important than exact certainty
+ about the durability of a transaction.
+ The default setting is async>.
+
+
+ If is set to zero in the master server,
+ transaction commit always doesn't wait for replication without regard
+ to this parameter.
+
+
+
trigger_file (string)
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 100,105 ****
--- 100,110 ----
#primary_conninfo = '' # e.g. 'host=localhost port=5432'
#
#
+ # Specifies the synchronization mode of replication.
+ #
+ #replication_mode = 'async' # 'async', 'recv', 'fsync' or 'replay'
+ #
+ #
# By default, a standby server keeps streaming XLOG records from the
# primary indefinitely. If you want to stop streaming and finish recovery,
# opening up the system in read/write mode, specify path to a trigger file.
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 36,41 ****
--- 36,42 ----
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1088,1093 **** RecordTransactionCommit(void)
--- 1089,1106 ----
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for WAL to be replicated up to the COMMIT record if replication
+ * is enabled and quorum > 0. This operation has to be performed after
+ * the COMMIT record is generated and before other transactions know that
+ * this one has been committed.
+ *
+ * XXX: Since the caller prevents cancel/die interrupt, we cannot
+ * process that while waiting. Should we remove this restriction?
+ */
+ if (max_wal_senders > 0 && quorum > 0)
+ WaitXLogSend(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 189,194 **** static TimestampTz recoveryTargetTime;
--- 189,195 ----
static bool StandbyMode = false;
static char *PrimaryConnInfo = NULL;
static char *TriggerFile = NULL;
+ int rplMode = REPLICATION_MODE_ASYNC;
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
static TransactionId recoveryStopXid;
***************
*** 5258,5263 **** readRecoveryCommandFile(void)
--- 5259,5282 ----
(errmsg("trigger_file = '%s'",
TriggerFile)));
}
+ else if (strcmp(tok1, "replication_mode") == 0)
+ {
+ if (strcmp(tok2, "async") == 0)
+ rplMode = REPLICATION_MODE_ASYNC;
+ else if (strcmp(tok2, "recv") == 0)
+ rplMode = REPLICATION_MODE_RECV;
+ else if (strcmp(tok2, "fsync") == 0)
+ rplMode = REPLICATION_MODE_FSYNC;
+ else if (strcmp(tok2, "replay") == 0)
+ rplMode = REPLICATION_MODE_REPLAY;
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for parameter \"replication_mode\": \"%s\"",
+ tok2)));
+ ereport(DEBUG2,
+ (errmsg("replication_mode = '%s'", tok2)));
+ }
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
***************
*** 6867,6872 **** GetFlushRecPtr(void)
--- 6886,6908 ----
}
/*
+ * GetReplayRecPtr -- Returns the last replay position.
+ */
+ XLogRecPtr
+ GetReplayRecPtr(void)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->recoveryLastRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return recptr;
+ }
+
+ /*
* Get the time of the last xlog segment switch
*/
pg_time_t
***************
*** 8824,8837 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! SpinLockAcquire(&xlogctl->info_lck);
! recptr = xlogctl->recoveryLastRecPtr;
! SpinLockRelease(&xlogctl->info_lck);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
--- 8860,8869 ----
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! recptr = GetReplayRecPtr();
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
***************
*** 9463,9469 **** retry:
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo);
continue;
}
}
--- 9495,9501 ----
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo, rplMode);
continue;
}
}
*** a/src/backend/libpq/be-secure.c
--- b/src/backend/libpq/be-secure.c
***************
*** 71,76 ****
--- 71,86 ----
#endif
#endif /* USE_SSL */
+ #ifdef HAVE_POLL_H
+ #include
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include
+ #endif
+
#include "libpq/libpq.h"
#include "tcop/tcopprot.h"
***************
*** 397,402 **** wloop:
--- 407,472 ----
return n;
}
+ /*
+ * Checks a socket, using poll or select, for data to be read.
+ * Returns >0 if there is data to read, 0 if it timed out, -1
+ * if an error occurred (including the interrupt).
+ *
+ * Timeout is specified in millisec. Timeout is infinite if
+ * timeout_ms is negative. Timeout is immediate (no blocking)
+ * if timeout_ms is 0.
+ *
+ * If SSL is in use, the SSL buffer is checked prior to
+ * checking the socket for read data directly.
+ *
+ * This function is based on pqSocketCheck and pqSocketPoll.
+ */
+ int
+ secure_poll(Port *port, int timeout_ms)
+ {
+ #ifdef USE_SSL
+ /* Check for SSL library buffering read bytes */
+ if (port->ssl && SSL_pending(port->ssl) > 0)
+ {
+ /* short-circuit the select */
+ return 1;
+ }
+ #endif
+
+ {
+ /* We use poll(2) if available, otherwise select(2) */
+ #ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = port->sock;
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ return poll(&input_fd, 1, timeout_ms);
+ #else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(port->sock, &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ return select(port->sock + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+ #endif /* HAVE_POLL */
+ }
+ }
+
/* ------------------------------------------------------------ */
/* SSL specific code */
/* ------------------------------------------------------------ */
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,62 ----
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_wait - wait until we can read connection
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
***************
*** 911,916 **** pq_getbyte_if_available(unsigned char *c)
--- 912,959 ----
}
/* --------------------------------
+ * pq_wait - wait until we can read the connection socket.
+ *
+ * returns >0 if there is data to read, 0 if it timed out or
+ * interrupted, -1 if an error occurred.
+ *
+ * this function is based on pqSocketCheck.
+ * --------------------------------
+ */
+ int
+ pq_wait(int timeout_ms)
+ {
+ int result;
+
+ if (!MyProcPort)
+ return -1;
+ if (MyProcPort->sock < 0)
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+ return -1;
+ }
+
+ result = secure_poll(MyProcPort, timeout_ms);
+ if (result < 0)
+ {
+ if (errno == EINTR)
+ return 0; /* interrupted */
+
+ /*
+ * XXX: Should we suppress duplicate log messages also here,
+ * like internal_flush?
+ */
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ }
+
+ return result;
+ }
+
+ /* --------------------------------
* pq_getbytes - get a known number of bytes from connection
*
* returns 0 if OK, EOF if trouble
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 47,55 **** static bool justconnected = false;
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
--- 47,57 ----
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint,
! int mode);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_disconnect = libpqrcv_disconnect;
}
--- 66,76 ----
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
+ walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
***************
*** 75,81 **** _PG_init(void)
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
--- 78,84 ----
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, int mode)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
***************
*** 154,161 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
! startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
--- 157,164 ----
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X %d",
! startpoint.xlogid, startpoint.xrecoff, mode);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 401,418 ----
return true;
}
+
+ /*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send data to WAL stream: %s",
+ PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool am_walreceiver;
--- 57,63 ----
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
***************
*** 113,118 **** static void WalRcvDie(int code, Datum arg);
--- 114,120 ----
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+ static void XLogWalRcvSendRecPtr(XLogRecPtr recptr);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
***************
*** 159,164 **** WalReceiverMain(void)
--- 161,167 ----
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
+ XLogRecPtr ackedpoint = {0, 0};
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 206,211 **** WalReceiverMain(void)
--- 209,215 ----
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ rplMode = walrcv->rplMode;
startpoint = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
***************
*** 247,253 **** WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
--- 251,257 ----
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
***************
*** 261,267 **** WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
--- 265,271 ----
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint, rplMode);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
***************
*** 311,316 **** WalReceiverMain(void)
--- 315,339 ----
*/
XLogWalRcvFlush();
}
+
+ /*
+ * If replication_mode is "replay", send the last WAL replay location
+ * to the primary, to acknowledge that replication has been completed
+ * up to that. This occurs only when WAL records were replayed since
+ * the last acknowledgement.
+ */
+ if (rplMode == REPLICATION_MODE_REPLAY &&
+ XLByteLT(ackedpoint, LogstreamResult.Flush))
+ {
+ XLogRecPtr recptr;
+
+ recptr = GetReplayRecPtr();
+ if (XLByteLT(ackedpoint, recptr))
+ {
+ XLogWalRcvSendRecPtr(recptr);
+ ackedpoint = recptr;
+ }
+ }
}
}
***************
*** 406,411 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 429,447 ----
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
+ /*
+ * If replication_mode is "recv", send the last WAL receive
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_RECV)
+ {
+ XLogRecPtr endptr = msghdr.dataStart;
+
+ XLByteAdvance(endptr, len);
+ XLogWalRcvSendRecPtr(endptr);
+ }
+
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
***************
*** 523,528 **** XLogWalRcvFlush(void)
--- 559,572 ----
LogstreamResult.Flush = LogstreamResult.Write;
+ /*
+ * If replication_mode is "fsync", send the last WAL flush
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_FSYNC)
+ XLogWalRcvSendRecPtr(LogstreamResult.Flush);
+
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
walrcv->latestChunkStart = walrcv->receivedUpto;
***************
*** 541,543 **** XLogWalRcvFlush(void)
--- 585,608 ----
}
}
}
+
+ /* Send the lsn to the primary server */
+ static void
+ XLogWalRcvSendRecPtr(XLogRecPtr recptr)
+ {
+ static char *msgbuf = NULL;
+ WalAckMessageData msgdata;
+
+ /*
+ * Allocate buffer that will be used for each output message if first
+ * time through. We do this just once to reduce palloc overhead.
+ * The buffer must be made large enough for maximum-sized messages.
+ */
+ if (msgbuf == NULL)
+ msgbuf = palloc(1 + sizeof(WalAckMessageData));
+
+ msgbuf[0] = 'l';
+ msgdata.ackEnd = recptr;
+ memcpy(msgbuf + 1, &msgdata, sizeof(WalAckMessageData));
+ walrcv_send(msgbuf, 1 + sizeof(WalAckMessageData));
+ }
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 168,178 **** ShutdownWalRcv(void)
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, and conninfo
! * is a libpq connection string to use.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
--- 168,178 ----
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, conninfo is
! * a libpq connection string to use, and mode is a replication mode.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo, int mode)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 196,201 **** RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
--- 196,202 ----
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
+ walrcv->rplMode = mode;
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 65,73 **** bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
! #define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
! * (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
--- 65,73 ----
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
+ int quorum = 0; /* the maximum number of synchronous walsenders */
! #define NAPTIME_PER_CYCLE 100L /* max sleep time between cycles (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 84,89 **** static uint32 sendOff = 0;
--- 84,96 ----
*/
static XLogRecPtr sentPtr = {0, 0};
+ /*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+ static XLogRecPtr ackdPtr = {0, 0};
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
***************
*** 101,107 **** static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void CheckClosedConnection(void);
/* Main entry point for walsender process */
--- 108,114 ----
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void ProcessStreamMsgs(StringInfo inMsg);
/* Main entry point for walsender process */
***************
*** 255,262 **** WalSndHandshake(void)
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X",
! &recptr.xlogid, &recptr.xrecoff) == 2)
{
StringInfoData buf;
--- 262,269 ----
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X %d",
! &recptr.xlogid, &recptr.xrecoff, &rplMode) == 3)
{
StringInfoData buf;
***************
*** 277,282 **** WalSndHandshake(void)
--- 284,318 ----
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
+ /* Verify that the specified replication mode is valid */
+ switch (rplMode)
+ {
+ case REPLICATION_MODE_ASYNC:
+ break;
+ case REPLICATION_MODE_RECV:
+ case REPLICATION_MODE_FSYNC:
+ case REPLICATION_MODE_REPLAY:
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ /*
+ * Update the current number of synchronous standbys
+ * if replication mode is "synchronous"
+ */
+ SpinLockAcquire(&walsndctl->info_lck);
+ walsndctl->num_sync_sbys++;
+ Assert(walsndctl->num_sync_sbys > 0);
+ SpinLockRelease(&walsndctl->info_lck);
+ break;
+ }
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication mode: %d", rplMode)));
+ }
+ MyWalSnd->rplMode = rplMode;
+
/* Send a CopyOutResponse message, and start streaming */
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
***************
*** 285,294 **** WalSndHandshake(void)
pq_flush();
/*
! * Initialize position to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = recptr;
/* break out of the loop */
replication_started = true;
--- 321,330 ----
pq_flush();
/*
! * Initialize positions to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = ackdPtr = recptr;
/* break out of the loop */
replication_started = true;
***************
*** 322,364 **** WalSndHandshake(void)
}
/*
! * Check if the remote end has closed the connection.
*/
static void
! CheckClosedConnection(void)
{
! unsigned char firstchar;
! int r;
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
{
! /* no data available without blocking */
! return;
! }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
}
}
--- 358,466 ----
}
/*
! * Process messages received from the standby.
! *
! * ereports on error.
*/
static void
! ProcessStreamMsgs(StringInfo inMsg)
{
! bool acked = false;
! /* Loop to process successive complete messages available */
! for (;;)
{
! unsigned char firstchar;
! int r;
!
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
! {
! /* no data available without blocking */
! break;
! }
!
! /* Handle the very limited subset of commands expected in this phase */
! switch (firstchar)
! {
! case 'd': /* CopyData message */
! {
! unsigned char rpltype;
!
! /*
! * Read the message contents. This is expected to be done without
! * blocking because we've been able to get message type code.
! */
! if (pq_getmessage(inMsg, 0))
! proc_exit(0); /* suitable message already logged */
!
! /* Read the replication message type from CopyData message */
! rpltype = pq_getmsgbyte(inMsg);
! switch (rpltype)
! {
! case 'l':
! {
! WalAckMessageData *msgdata;
!
! msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
!
! /*
! * Update local status.
! *
! * The ackd ptr received from standby should not
! * go backwards.
! */
! if (XLByteLE(ackdPtr, msgdata->ackEnd))
! ackdPtr = msgdata->ackEnd;
! else
! ereport(FATAL,
! (errmsg("replication completion location went back from "
! "%X/%X to %X/%X",
! ackdPtr.xlogid, ackdPtr.xrecoff,
! msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
!
! acked = true; /* also need to update shared position */
! break;
! }
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid replication message type %d",
! rpltype)));
! }
! break;
! }
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
! }
! }
!
! if (acked)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = MyWalSnd;
!
! SpinLockAcquire(&walsnd->mutex);
! walsnd->ackdPtr = ackdPtr;
! SpinLockRelease(&walsnd->mutex);
}
}
***************
*** 366,374 **** CheckClosedConnection(void)
--- 468,479 ----
static int
WalSndLoop(void)
{
+ StringInfoData input_message;
char *output_message;
bool caughtup = false;
+ initStringInfo(&input_message);
+
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
***************
*** 428,443 **** WalSndLoop(void)
*/
if (caughtup)
{
! remain = WalSndDelay * 1000L;
while (remain > 0)
{
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /* Sleep and check that the connection is still alive */
! pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE;
}
--- 533,562 ----
*/
if (caughtup)
{
! remain = WalSndDelay;
while (remain > 0)
{
+ int res;
+
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /*
! * Check to see whether a message from the standby or an interrupt
! * from other processes has arrived.
! */
! res = pq_wait(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! if (res < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (res > 0)
! ProcessStreamMsgs(&input_message);
remain -= NAPTIME_PER_CYCLE;
}
***************
*** 496,501 **** InitWalSnd(void)
--- 615,621 ----
MyWalSnd = (WalSnd *) walsnd;
walsnd->pid = MyProcPid;
MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&MyWalSnd->ackdPtr, 0, sizeof(XLogRecPtr));
SpinLockRelease(&walsnd->mutex);
break;
}
***************
*** 523,528 **** WalSndKill(int code, Datum arg)
--- 643,663 ----
*/
MyWalSnd->pid = 0;
+ /*
+ * Update the current number of synchronous standbys if replication
+ * mode is "synchronous"
+ */
+ if (rplMode >= REPLICATION_MODE_RECV)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ SpinLockAcquire(&walsndctl->info_lck);
+ Assert(walsndctl->num_sync_sbys > 0);
+ walsndctl->num_sync_sbys--;
+ SpinLockRelease(&walsndctl->info_lck);
+ }
+
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
}
***************
*** 884,889 **** WalSndShmemInit(void)
--- 1019,1025 ----
{
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ SpinLockInit(&WalSndCtl->info_lck);
for (i = 0; i < max_wal_senders; i++)
{
***************
*** 895,937 **** WalSndShmemInit(void)
}
/*
! * This isn't currently used for anything. Monitoring tools might be
! * interested in the future, and we'll need something like this in the
! * future for synchronous replication.
*/
! #ifdef NOT_USED
! /*
! * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
! * if none.
! */
! XLogRecPtr
! GetOldestWALSendPointer(void)
{
! XLogRecPtr oldest = {0, 0};
! int i;
! bool found = false;
! for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! if (walsnd->pid == 0)
! continue;
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->sentPtr;
! SpinLockRelease(&walsnd->mutex);
! if (recptr.xlogid == 0 && recptr.xrecoff == 0)
! continue;
! if (!found || XLByteLT(recptr, oldest))
! oldest = recptr;
! found = true;
}
- return oldest;
}
-
- #endif
--- 1031,1102 ----
}
/*
! * Ensure that all xlog records through the given position is
! * replicated to the standby
*/
! void
! WaitXLogSend(XLogRecPtr record)
{
! Assert(max_wal_senders > 0);
! Assert(quorum > 0);
! for (;;)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSndCtlData *walsndctl = WalSndCtl;
! int i;
! int already_acked = 0;
! bool unacked = false;
! /* Don't need to wait if there is no synchronous standbys */
! if (walsndctl->num_sync_sbys == 0)
! return;
! /*
! * Count walsenders which have already received the ACK meaning
! * completion of replication up to the given position. If the
! * sum is more than or equal to the "quorum", the backend breaks
! * out of this loop and returns a "success" of the transaction
! * to a client.
! */
! for (i = 0; i < max_wal_senders; i++)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! /* Don't count inactive or asynchronous walsenders */
! if (walsnd->pid == 0 ||
! walsnd->rplMode == REPLICATION_MODE_ASYNC)
! continue;
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->ackdPtr;
! SpinLockRelease(&walsnd->mutex);
!
! if ((recptr.xlogid == 0 && recptr.xrecoff == 0) ||
! XLByteLT(recptr, record))
! {
! unacked = true;
! continue;
! }
!
! if (++already_acked >= quorum)
! return;
! }
!
! /*
! * If synchronous walsender was not found in the WalSnd array,
! * we no longer need to wait. This can happen if all synchronous
! * walsenders are terminated while searching the array.
! *
! * If all synchronous walsenders have already received the ACK,
! * we no longer need to wait, too. This can happen when the
! * "quorum" is more than max_wal_senders.
! */
! if (!unacked)
! return;
!
! pg_usleep(100000L); /* 100ms */
}
}
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1756,1761 **** static struct config_int ConfigureNamesInt[] =
--- 1756,1770 ----
},
{
+ {"quorum", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Sets the maximum number of synchronous standby servers."),
+ NULL
+ },
+ &quorum,
+ 0, 0, INT_MAX / 4, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 188,193 ****
--- 188,194 ----
#max_wal_senders = 0 # max number of walsender processes
#wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
+ #quorum = 0 # max number of synchronous standbys
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
# - Standby Servers -
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 261,266 **** typedef struct CheckpointStatsData
--- 261,299 ----
extern CheckpointStatsData CheckpointStats;
+ /*
+ * Synchronization mode of replication. These modes identify how far
+ * we should wait for replication.
+ */
+ typedef enum
+ {
+ /*
+ * doesn't make transaction commit wait for replication, i.e.,
+ * asynchronous replication.
+ */
+ REPLICATION_MODE_ASYNC,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received
+ * on the standby server.
+ */
+ REPLICATION_MODE_RECV,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received
+ * and fsync'd on the standby server.
+ */
+ REPLICATION_MODE_FSYNC,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received,
+ * fsync'd and replayed on the standby server.
+ */
+ REPLICATION_MODE_REPLAY
+ } ReplicationMode;
+ extern int rplMode;
+
+
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern void XLogBackgroundFlush(void);
***************
*** 298,303 **** extern void XLogPutNextOid(Oid nextOid);
--- 331,337 ----
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
+ extern XLogRecPtr GetReplayRecPtr(void);
extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
extern TimeLineID GetRecoveryTargetTLI(void);
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 58,63 **** extern int pq_getmessage(StringInfo s, int maxlen);
--- 58,64 ----
extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
+ extern int pq_wait(int timeout_ms);
extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
***************
*** 74,78 **** extern int secure_open_server(Port *port);
--- 75,80 ----
extern void secure_close(Port *port);
extern ssize_t secure_read(Port *port, void *ptr, size_t len);
extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+ extern int secure_poll(Port *port, int timeout_ms);
#endif /* LIBPQ_H */
*** a/src/include/replication/walprotocol.h
--- b/src/include/replication/walprotocol.h
***************
*** 50,53 **** typedef struct
--- 50,63 ----
*/
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
+ /*
+ * Body for a WAL acknowledgment message (message type 'l'). This is wrapped
+ * within a CopyData message at the FE/BE protocol level.
+ */
+ typedef struct
+ {
+ /* End of WAL replicated to the standby */
+ XLogRecPtr ackEnd;
+ } WalAckMessageData;
+
#endif /* _WALPROTOCOL_H */
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 71,89 **** typedef struct
*/
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
--- 71,99 ----
*/
char conninfo[MAXCONNINFO];
+ /*
+ * replication mode; controls how long transaction commit on the primary
+ * server waits for replication.
+ */
+ int rplMode;
+
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint,
! int mode);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
***************
*** 93,99 **** extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
--- 103,110 ----
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo,
! int mode);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 22,27 **** typedef struct WalSnd
--- 22,30 ----
{
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ XLogRecPtr ackdPtr; /* WAL has been replicated up to this point */
+
+ int rplMode; /* replication mode */
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
***************
*** 29,34 **** typedef struct WalSnd
--- 32,41 ----
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ int num_sync_sbys; /* current # of synchronous standbys */
+
+ slock_t info_lck; /* protects the variable shown above */
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
***************
*** 40,49 **** extern bool am_walsender;
--- 47,58 ----
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+ extern int quorum;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+ extern void WaitXLogSend(XLogRecPtr recptr);
#endif /* _WALSENDER_H */
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 2002,2007 **** PQnotifies(PGconn *conn)
--- 2002,2010 ----
/*
* PQputCopyData - send some data to the backend during COPY IN
*
+ * Note that this function might be called by walreceiver even during
+ * COPY OUT to send a message to XLOG stream.
+ *
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
*/
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
--- 2013,2020 ----
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN &&
! conn->asyncStatus != PGASYNC_COPY_OUT)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));