From 0b9b3944b572165072e00cafb0bbc8f5a80554be Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 7 Aug 2023 12:26:20 +0530 Subject: [PATCH v1 2/2] Logs to measure creation of replication slot breakup. Logs to measure creation of replication slot breakup. --- src/backend/replication/logical/logical.c | 27 +++++++++++++++- src/backend/replication/logical/snapbuild.c | 14 +++++++- src/backend/replication/walsender.c | 36 +++++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..aa195c4aa9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -630,11 +630,20 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { ReplicationSlot *slot = ctx->slot; + int count = 0; + + instr_time start; + instr_time elapsed; + instr_time total_read; + instr_time total_decode; + + INSTR_TIME_SET_ZERO(total_read); + INSTR_TIME_SET_ZERO(total_decode); /* Initialize from where to start reading WAL. */ XLogBeginRead(ctx->reader, slot->data.restart_lsn); - elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", + elog(LOG, "searching for logical decoding starting point, starting at %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); /* Wait for a consistent starting point */ @@ -642,16 +651,29 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecord *record; char *err = NULL; + count++; + + INSTR_TIME_SET_CURRENT(start); /* the read_page callback waits for new WAL */ record = XLogReadRecord(ctx->reader, &err); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_read, elapsed); + if (err) elog(ERROR, "could not find logical decoding starting point: %s", err); if (!record) elog(ERROR, "could not find logical decoding starting point"); + INSTR_TIME_SET_CURRENT(start); + LogicalDecodingProcessRecord(ctx, ctx->reader); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_decode, elapsed); + /* only continue till we found a consistent spot */ if (DecodingContextReady(ctx)) break; @@ -664,6 +686,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) if (slot->data.two_phase) slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); + elog(LOG, "LOGICAL_XLOG_READ %ld", INSTR_TIME_GET_MICROSEC(total_read)); + elog(LOG, "LOGICAL_DECODE_PROCESS_RECORD %ld", INSTR_TIME_GET_MICROSEC(total_decode)); + elog(LOG, "FIND_DECODING_XLOG_RECORD_COUNT %d", count); } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 843ceba840..2ce302a597 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1209,6 +1209,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, * ----------------------------------- */ +extern instr_time total_wait; + /* * Process a running xacts record, and use its information to first build a * historic snapshot and later to release resources that aren't needed @@ -1227,8 +1229,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (builder->state < SNAPBUILD_CONSISTENT) { + instr_time start; + instr_time elapsed; + bool result; + + INSTR_TIME_SET_CURRENT(start); + result = SnapBuildFindSnapshot(builder, lsn, running); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_wait, elapsed); + /* returns false if there's no point in performing cleanup just yet */ - if (!SnapBuildFindSnapshot(builder, lsn, running)) + if (!result) return; } else diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..13831bdc6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -197,6 +197,10 @@ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; +static TimestampTz start = 0; +static long secs = 0; +static int microsecs = 0; + /* A sample associating a WAL location with the time it was written. */ typedef struct { @@ -1034,6 +1038,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, } } +instr_time total_wait; + /* * Create a new replication slot. */ @@ -1052,6 +1058,15 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Datum values[4]; bool nulls[4] = {0}; + instr_time begin; + instr_time elapsed; + instr_time total_create; + + INSTR_TIME_SET_ZERO(total_create); + INSTR_TIME_SET_ZERO(total_wait); + + INSTR_TIME_SET_CURRENT(begin); + Assert(!MyReplicationSlot); parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); @@ -1083,6 +1098,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) LogicalDecodingContext *ctx; bool need_full_snapshot = false; + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, begin); + INSTR_TIME_ADD(total_create, elapsed); + + elog(LOG, "LOGICAL_SLOT_CREATION %ld", INSTR_TIME_GET_MICROSEC(total_create)); + /* * Do options check early so that we can bail before calling the * DecodingContextFindStartpoint which can take long time. @@ -1131,6 +1152,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } + start = GetCurrentTimestamp(); ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, @@ -1139,6 +1161,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "INIT_DECODING_CONTEXT %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* * Signal that we don't need the timeout mechanism. We're just * creating the replication slot and don't yet accept feedback @@ -1151,6 +1177,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "FIND_DECODING_STARTPOINT %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + + elog(LOG, "LOGICAL_WAIT_TRANSACTION %ld", INSTR_TIME_GET_MICROSEC(total_wait)); + /* * Export or use the snapshot if we've been asked to do so. * @@ -1169,6 +1201,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) RestoreTransactionSnapshot(snap, MyProc); } + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SNAPSHOT_BUILD %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* don't need the decoding context anymore */ FreeDecodingContext(ctx); -- 2.34.1