From 165f7bc6888968db3e2f952585e3a5e6524d2cda Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 6 Jun 2024 12:35:41 +1000 Subject: [PATCH v1] DEBUG LOGGING --- src/backend/access/transam/xlog.c | 2 + src/backend/access/transam/xlogreader.c | 144 +++++++++++++++++++++++++++++++- src/backend/replication/walsender.c | 127 ++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 330e058..d1b5fc2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6554,12 +6554,14 @@ ShutdownXLOG(int code, Datum arg) /* * Signal walsenders to move to stopping state. */ + elog(LOG, ">>> ShutdownXLOG: calling WalSndInitStopping"); WalSndInitStopping(); /* * Wait for WAL senders to be in stopping state. This prevents commands * from writing new WAL. */ + elog(LOG, ">>> ShutdownXLOG: calling WalSndWaitStopping"); WalSndWaitStopping(); if (RecoveryInProgress()) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 37d2a57..434cd2e 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -32,13 +32,15 @@ #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" #include "replication/origin.h" - #ifndef FRONTEND #include "pgstat.h" +#include "utils/elog.h" #else #include "common/logging.h" #endif +bool ps_readpageinternal2_failing = false; + static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static void allocate_recordbuf(XLogReaderState *state, uint32 reclength); @@ -594,12 +596,23 @@ restart: * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: before ReadPageInternal-1"); + } +#endif readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) + { +#ifndef FRONTEND + elog(LOG, ">>> XLogDecodeNextRecord: called ReadPageInternal-1, which returned %d", readOff); +#endif goto err; + } /* * ReadPageInternal always returns at least the page header, so we can @@ -723,6 +736,12 @@ restart: targetPagePtr += XLOG_BLCKSZ; /* Wait for the next page to become available */ +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: before ReadPageInternal-2"); + } +#endif readOff = ReadPageInternal(state, targetPagePtr, Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); @@ -730,7 +749,13 @@ restart: if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) + { +#ifndef FRONTEND + elog(LOG, ">>> XLogDecodeNextRecord: called ReadPageInternal-2, which returned %d", readOff); + ps_readpageinternal2_failing = true; +#endif goto err; + } Assert(SizeOfXLogShortPHD <= readOff); @@ -779,8 +804,22 @@ restart: pageHeaderSize = XLogPageHeaderSize(pageHeader); if (readOff < pageHeaderSize) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: before ReadPageInternal-3"); + } +#endif readOff = ReadPageInternal(state, targetPagePtr, pageHeaderSize); +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: called ReadPageInternal-3, which returned %d", readOff); + } +#endif + } Assert(pageHeaderSize <= readOff); @@ -790,8 +829,22 @@ restart: len = pageHeader->xlp_rem_len; if (readOff < pageHeaderSize + len) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: before ReadPageInternal-4"); + } +#endif readOff = ReadPageInternal(state, targetPagePtr, pageHeaderSize + len); +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: called ReadPageInternal-4, which returned %d", readOff); + } +#endif + } memcpy(buffer, (char *) contdata, len); buffer += len; @@ -843,12 +896,26 @@ restart: else { /* Wait for the record data to become available */ +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: before ReadPageInternal-5"); + } +#endif readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> XLogDecodeNextRecord: called ReadPageInternal-5, which returned %d", readOff); + } +#endif goto err; + } /* Record does not cross a page boundary */ if (!ValidXLogRecord(state, record, RecPtr)) @@ -1012,7 +1079,15 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* check whether we have all the requested data already */ if (targetSegNo == state->seg.ws_segno && targetPageOff == state->segoff && reqLen <= state->readLen) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: return #1 readLen=%d", state->readLen); + } +#endif return state->readLen; + } /* * Invalidate contents of internal buffer before read attempt. Just set @@ -1037,9 +1112,22 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: before calling routine.page_read #1"); + } +#endif readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, state->readBuf); +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: routine.page_read #1 returned readLen %d", readLen); + } +#endif + if (readLen == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readLen < 0) @@ -1050,16 +1138,36 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, state->readBuf)) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: XLogReadValidatePageHeader was false"); + } +#endif goto err; + } } /* * First, read the requested data length, but at least a short page header * so that we can validate it. */ +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: before calling routine.page_read #2"); + } +#endif readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, state->readBuf); +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: routine.page_read #2 returned readLen %d", readLen); + } +#endif if (readLen == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readLen < 0) @@ -1069,7 +1177,15 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* Do we have enough data to check the header length? */ if (readLen <= SizeOfXLogShortPHD) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: readLen (%d) <= SizeOfXLogShortPHD (%ld)", readLen, SizeOfXLogShortPHD); + } +#endif goto err; + } Assert(readLen >= reqLen); @@ -1078,9 +1194,27 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: still not enough"); + } +#endif +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: before calling routine.page_read #3"); + } +#endif readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, state->readBuf); +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: routine.page_read #3 returned readLen %d", readLen); + } +#endif if (readLen == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readLen < 0) @@ -1091,7 +1225,15 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Now that we know we have the full header, validate it. */ if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) + { +#ifndef FRONTEND + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> ReadPageInternal: invalid page header"); + } +#endif goto err; + } /* update read state information */ state->seg.ws_segno = targetSegNo; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c623b07..a2470fa 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -272,6 +272,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); +static const char *WalSndGetStateString(WalSndState state); /* Initialize walsender process before entering the main command loop */ void @@ -1039,6 +1040,8 @@ StartReplication(StartReplicationCmd *cmd) EndReplicationCommand("START_STREAMING"); } +extern bool ps_readpageinternal2_failing; /* this is set in the PageReadInternal call in xlogreader.c */ + /* * XLogReaderRoutine->page_read callback for logical decoding contexts, as a * walsender process. @@ -1062,7 +1065,18 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * timeline. This is needed to determine am_cascading_walsender accurately * which is needed to determine the current timeline. */ + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> logical_read_xlog_page: calling WalSndWaitForWal"); + } flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> logical_read_xlog_page: just called WalSndWaitForWal. flushptr=%X/%X, targetPagePtr=%X/%X, reqLen=%d", + LSN_FORMAT_ARGS(flushptr), + LSN_FORMAT_ARGS(targetPagePtr), + reqLen); + } /* * Since logical decoding is also permitted on a standby server, we need @@ -1085,12 +1099,30 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) + { + elog(LOG, ">>> logical_read_xlog_page: returning -1. flushptr=%X/%X, targetPagePtr=%X/%X, reqLen=%d", + LSN_FORMAT_ARGS(flushptr), + LSN_FORMAT_ARGS(targetPagePtr), + reqLen); return -1; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + { count = XLOG_BLCKSZ; /* more than one block available */ + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> logical_read_xlog_page: set count = XLOG_BLCKSZ =%d (%X)", count, count); + } + } else + { count = flushptr - targetPagePtr; /* part of the page available */ + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> logical_read_xlog_page: set count = part page = %d (%X)", count, count); + } + } /* now actually read the data, we know it's there */ if (!WALRead(state, @@ -1113,6 +1145,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); + if (ps_readpageinternal2_failing) + { + elog(LOG, ">>> logical_read_xlog_page: returning count=%d", count); + } return count; } @@ -1820,6 +1856,7 @@ WalSndWaitForWal(XLogRecPtr loc) int wakeEvents; uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + bool ps_logging = ps_readpageinternal2_failing; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1829,7 +1866,12 @@ WalSndWaitForWal(XLogRecPtr loc) */ if (!XLogRecPtrIsInvalid(RecentFlushPtr) && !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: return #1 %X/%X", LSN_FORMAT_ARGS(RecentFlushPtr)); + return RecentFlushPtr; + } /* * Within the loop, we wait for the necessary WALs to be flushed to disk @@ -1863,7 +1905,11 @@ WalSndWaitForWal(XLogRecPtr loc) * written, because walwriter has shut down already. */ if (got_STOPPING) + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: calling XLogBackgroundFlush"); XLogBackgroundFlush(); + } /* * To avoid the scenario where standbys need to catch up to a newer @@ -1874,9 +1920,17 @@ WalSndWaitForWal(XLogRecPtr loc) if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) { if (!RecoveryInProgress()) + { RecentFlushPtr = GetFlushRecPtr(NULL); + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: 1. RecentFlushPtr assigned %X/%X", LSN_FORMAT_ARGS(RecentFlushPtr)); + } else + { RecentFlushPtr = GetXLogReplayRecPtr(NULL); + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: 2. RecentFlushPtr assigned %X/%X", LSN_FORMAT_ARGS(RecentFlushPtr)); + } } /* @@ -1890,9 +1944,17 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) { if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event)) + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: got_STOPPING; wait_for_standby_at_stop=true"); wait_for_standby_at_stop = true; + } else + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: got_STOPPING; break #1"); break; + } } /* @@ -1914,7 +1976,11 @@ WalSndWaitForWal(XLogRecPtr loc) */ if (!wait_for_standby_at_stop && !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: break #2"); break; + } /* * Waiting for new WAL or waiting for standbys to catch up. Since we @@ -1935,7 +2001,11 @@ WalSndWaitForWal(XLogRecPtr loc) */ if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) + { + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: break #3"); break; + } /* die if timeout was reached */ WalSndCheckTimeOut(); @@ -1964,6 +2034,8 @@ WalSndWaitForWal(XLogRecPtr loc) /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); + if (ps_logging) + elog(LOG, ">>> WalSndWaitForWal: return at bottom of function %X/%X", LSN_FORMAT_ARGS(RecentFlushPtr)); return RecentFlushPtr; } @@ -2762,6 +2834,8 @@ WalSndCheckTimeOut(void) } } +static bool ps_logging = false; + /* Main loop of walsender process that streams the WAL over Copy messages. */ static void WalSndLoop(WalSndSendDataCallback send_data) @@ -2779,6 +2853,20 @@ WalSndLoop(WalSndSendDataCallback send_data) */ for (;;) { + ps_logging = got_STOPPING || got_SIGUSR2; + if (ps_logging) + { + static bool prev_got_STOPPING = false, prev_got_SIGUSR2 = false; + + if (got_STOPPING != prev_got_STOPPING || got_SIGUSR2 != prev_got_SIGUSR2) + { + elog(LOG, ">>> WalSndLoop: flags have changed. got_STOPPING flag is %d, and got_SIGUSR2 flag is %d", + got_STOPPING, got_SIGUSR2); + prev_got_STOPPING = got_STOPPING; + prev_got_SIGUSR2 = got_SIGUSR2; + } + } + /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -2822,6 +2910,8 @@ WalSndLoop(WalSndSendDataCallback send_data) /* If nothing remains to be sent right now ... */ if (WalSndCaughtUp && !pq_is_send_pending()) { + elog(LOG, ">>> WalSndLoop: \tall caught up..."); + /* * If we're in catchup state, move to streaming. This is an * important state change for users to know about, since before @@ -2846,7 +2936,10 @@ WalSndLoop(WalSndSendDataCallback send_data) * is not sure which. */ if (got_SIGUSR2) + { + elog(LOG, ">>> WalSndLoop: \tcalling WalSndDone"); WalSndDone(send_data); + } } /* Check for replication timeout. */ @@ -3386,6 +3479,11 @@ XLogSendLogical(void) */ static XLogRecPtr flushPtr = InvalidXLogRecPtr; + if (ps_logging) + { + elog(LOG, ">>> XLogSendLogical:"); + } + /* * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to * true in WalSndWaitForWal, if we're actually waiting. We also set to @@ -3430,13 +3528,29 @@ XLogSendLogical(void) if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) WalSndCaughtUp = true; + if (ps_logging) + { + elog(LOG, ">>> XLogSendLogical: EndRecPtr is %X/%X, flushPtr is %X/%X, WalSndCaughtUp=%d", + LSN_FORMAT_ARGS(logical_decoding_ctx->reader->EndRecPtr), + LSN_FORMAT_ARGS(flushPtr), + WalSndCaughtUp); + } + + if (ps_logging && WalSndCaughtUp) + { + elog(LOG, ">>> XLogSendLogical: flags WalSndCaughtUp is %d, got_STOPPING is %d", WalSndCaughtUp, got_STOPPING); + } + /* * If we're caught up and have been requested to stop, have WalSndLoop() * terminate the connection in an orderly manner, after writing out all * the pending data. */ if (WalSndCaughtUp && got_STOPPING) + { got_SIGUSR2 = true; + elog(LOG, ">>> XLogSendLogical: just set the got_SIGUSR2 flag to true"); + } /* Update shared memory status */ { @@ -3571,6 +3685,8 @@ HandleWalSndInitStopping(void) kill(MyProcPid, SIGTERM); else got_STOPPING = true; + + elog(LOG, ">>> HandleWalSndInitStopping: got_STOPPING=%d", got_STOPPING); } /* @@ -3746,6 +3862,7 @@ WalSndInitStopping(void) { int i; + elog(LOG, ">>> WalSndInitStopping: tell active walsenders to stop"); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; @@ -3755,9 +3872,12 @@ WalSndInitStopping(void) pid = walsnd->pid; SpinLockRelease(&walsnd->mutex); + elog(LOG, ">>> WalSndInitStopping: \tpid[%d] is %d", i, pid); + if (pid == 0) continue; + elog(LOG, ">>> WalSndInitStopping: \tsending PROCSIG_WALSND_INIT_STOPPING signal to pid %d", pid); SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, INVALID_PROC_NUMBER); } } @@ -3775,6 +3895,7 @@ WalSndWaitStopping(void) int i; bool all_stopped = true; + elog(LOG, ">>> WalSndWaitStopping: waiting for all %d walsenders...", max_wal_senders); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; @@ -3784,6 +3905,7 @@ WalSndWaitStopping(void) if (walsnd->pid == 0) { SpinLockRelease(&walsnd->mutex); + elog(LOG, ">>> WalSndWaitStopping: \twalsnd[%d] pid is 0", i); continue; } @@ -3791,6 +3913,7 @@ WalSndWaitStopping(void) { all_stopped = false; SpinLockRelease(&walsnd->mutex); + elog(LOG, ">>> WalSndWaitStopping: \twalsnd[%d] has state %d (%s)", i, walsnd->state, WalSndGetStateString(walsnd->state)); break; } SpinLockRelease(&walsnd->mutex); @@ -3798,8 +3921,12 @@ WalSndWaitStopping(void) /* safe to leave if confirmation is done for all WAL senders */ if (all_stopped) + { + elog(LOG, ">>> WalSndWaitStopping: \tALL STOPPED!!"); return; + } + elog(LOG, ">>> WalSndWaitStopping: \tsleep 10s before checking again"); pg_usleep(10000L); /* wait for 10 msec */ } } -- 1.8.3.1