From 92223cbf9f45f4de5449f41ea65195c17fc86f1a Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 8 Feb 2020 09:14:29 +1300 Subject: [PATCH 5/5] Reuse a WaitEventSet in walreceiver. To avoid repeatedly setting up and tearing down WaitEventSet objects and associated kernel objects, reuse a WaitEventSet. Recreate it whenever PQsocketChangeCount() indicates that the socket has changed since the last wait. XXX This code seems a bit ugly, changes the callback interface for libpqwalreceiver (which other code cares about that?) and will have to be duplicated in eg logical/worker.c. Can we find a nice design? --- .../libpqwalreceiver/libpqwalreceiver.c | 10 ++-- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 4 +- src/backend/replication/walreceiver.c | 47 +++++++++++-------- src/include/replication/walreceiver.h | 7 +-- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..17a6f27a42 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -66,7 +66,7 @@ static bool libpqrcv_startstreaming(WalReceiverConn *conn, static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli); static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd); + pgsocket *wait_fd, int64 *wait_fd_change_count); static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes); static char *libpqrcv_create_slot(WalReceiverConn *conn, @@ -694,7 +694,9 @@ libpqrcv_disconnect(WalReceiverConn *conn) * until the next libpqrcv_* call. * * If no data was available immediately, returns 0, and *wait_fd is set to a - * socket descriptor which can be waited on before trying again. + * socket descriptor which can be waited on before trying again. If + * wait_fd_change_count is not NULL, then *wait_fd_change_count is set to + * a counter that can be used to detect changes in the socket between calls. * * -1 if the server ended the COPY. * @@ -702,7 +704,7 @@ libpqrcv_disconnect(WalReceiverConn *conn) */ static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd) + pgsocket *wait_fd, int64 *wait_fd_change_count) { int rawlen; @@ -726,6 +728,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { /* Tell caller to try again when our socket is ready. */ *wait_fd = PQsocket(conn->streamConn); + if (wait_fd_change_count) + *wait_fd_change_count = PQsocketChangeCount(conn->streamConn); return 0; } } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 44d8c275cc..f448e11409 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -580,7 +580,7 @@ copy_read_data(void *outbuf, int minread, int maxread) for (;;) { /* Try read the data. */ - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(wrconn, &buf, &fd, NULL); CHECK_FOR_INTERRUPTS(); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7a5471f95c..485231ccb3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1149,7 +1149,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(wrconn, &buf, &fd, NULL); if (len != 0) { @@ -1229,7 +1229,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(wrconn, &buf, &fd, NULL); } } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a5e85d32f3..bff726144d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -310,6 +310,8 @@ WalReceiverMain(void) char *primary_sysid; char standby_sysid[32]; WalRcvStreamOptions options; + WaitEventSet *wes = NULL; + int64 wes_change_count = -1; /* * Check that we're connected to a valid server using the @@ -434,6 +436,8 @@ WalReceiverMain(void) int len; bool endofwal = false; pgsocket wait_fd = PGINVALID_SOCKET; + int64 wait_fd_change_count; + WaitEvent event; int rc; /* @@ -455,7 +459,8 @@ WalReceiverMain(void) } /* See if we can read data immediately */ - len = walrcv_receive(wrconn, &buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd, + &wait_fd_change_count); if (len != 0) { /* @@ -486,7 +491,8 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(wrconn, &buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd, + &wait_fd_change_count); } /* Let the master know that we received some data. */ @@ -505,24 +511,27 @@ WalReceiverMain(void) break; /* - * Ideally we would reuse a WaitEventSet object repeatedly - * here to avoid the overheads of WaitLatchOrSocket on epoll - * systems, but we can't be sure that libpq (or any other - * walreceiver implementation) has the same socket (even if - * the fd is the same number, it may have been closed and - * reopened since the last time). In future, if there is a - * function for removing sockets from WaitEventSet, then we - * could add and remove just the socket each time, potentially - * avoiding some system calls. + * If we don't have a WaitEventSet yet, or the socket has + * changed, we need to create one. Otherwise it's safe + * to reuse the one we have, to cut down on system calls. */ Assert(wait_fd != PGINVALID_SOCKET); - rc = WaitLatchOrSocket(walrcv->latch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_LATCH_SET, - wait_fd, - NAPTIME_PER_CYCLE, - WAIT_EVENT_WAL_RECEIVER_MAIN); - if (rc & WL_LATCH_SET) + if (wes == NULL || wes_change_count != wait_fd_change_count) + { + if (wes != NULL) + FreeWaitEventSet(wes); + wes = CreateWaitEventSet(CurrentMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, + walrcv->latch, NULL); + AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, + PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, wait_fd, NULL, + NULL); + wes_change_count = wait_fd_change_count; + } + rc = WaitEventSetWait(wes, NAPTIME_PER_CYCLE, &event, 1, + WAIT_EVENT_WAL_RECEIVER_MAIN); + if (rc == 1 && event.events == WL_LATCH_SET) { ResetLatch(walrcv->latch); ProcessWalRcvInterrupts(); @@ -540,7 +549,7 @@ WalReceiverMain(void) XLogWalRcvSendReply(true, false); } } - if (rc & WL_TIMEOUT) + if (rc == 0) { /* * We didn't receive anything new. If we haven't heard diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e08afc6548..f20bfc18cc 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -226,7 +226,8 @@ typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, TimeLineID *next_tli); typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd); + pgsocket *wait_fd, + int64 *wait_fd_change_count); typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, int nbytes); typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, @@ -279,8 +280,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_startstreaming(conn, options) #define walrcv_endstreaming(conn, next_tli) \ WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) -#define walrcv_receive(conn, buffer, wait_fd) \ - WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) +#define walrcv_receive(conn, buffer, wait_fd, wait_event_change_count) \ + WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd, wait_event_change_count) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) #define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ -- 2.23.0