From a1fd6f76c955482efa7e63ea6c25ae6350160b4d Mon Sep 17 00:00:00 2001 From: vignesh Date: Tue, 27 Apr 2021 10:56:02 +0530 Subject: [PATCH v3] Update replication statistics after every stream/spill. Currently, replication slot statistics are updated at prepare, commit, and rollback. Now, if the transaction is interrupted the stats might not get updated. Fixed this by updating replication statistics after every stream/spill. --- src/backend/replication/logical/decode.c | 6 +++--- src/backend/replication/logical/logical.c | 6 +++--- src/backend/replication/logical/reorderbuffer.c | 3 +++ src/include/replication/logical.h | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7924581cdc..2c009d51ec 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -750,7 +750,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * not clear that sending more or less frequently than this would be * better. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } /* @@ -832,7 +832,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * not clear that sending more or less frequently than this would be * better. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } @@ -889,7 +889,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* update the decoding stats */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(ctx->reorder); } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 00543ede45..0c3ef0f93f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1770,10 +1770,10 @@ ResetLogicalStreamingState(void) * Report stats for a slot. */ void -UpdateDecodingStats(LogicalDecodingContext *ctx) +UpdateDecodingStats(ReorderBuffer *rb) { - ReorderBuffer *rb = ctx->reorder; PgStat_StatReplSlotEntry repSlotStat; + ReplicationSlot *slot = MyReplicationSlot; /* Nothing to do if we don't have any replication stats to be sent. */ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) @@ -1790,7 +1790,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) (long long) rb->totalTxns, (long long) rb->totalBytes); - namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); + namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; repSlotStat.spill_bytes = rb->spillBytes; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c27f710053..f4d97c1de3 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3551,6 +3551,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* don't consider already serialized transactions */ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + UpdateDecodingStats(rb); } Assert(spilled == txn->nentries_mem); @@ -3920,6 +3921,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Don't consider already streamed transaction. */ rb->streamTxns += (txn_is_streamed) ? 0 : 1; + UpdateDecodingStats(rb); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7dfcb7be18..8e03e055f6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -134,6 +134,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); -extern void UpdateDecodingStats(LogicalDecodingContext *ctx); +extern void UpdateDecodingStats(ReorderBuffer *rb); #endif -- 2.25.1