From c70daa35f2308ea195e177e30b54e2a613f78811 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 3 Jul 2023 10:28:00 +0900 Subject: [PATCH v2] Skip decoding already-aborted transactions. Previously, we had the mechanism for detecting concurrent aborts for streaming transactions. This commit enables us to check if the large-transaction already aborted so that we can ignore further changes. This is helpful for a case where the transaction is quite large and already rolled back since we can avoid disk or network I/O. We do the check for only large-transactions when eviction since checking CLOG is costly and could cause a slowdown with lots of small transactions, where most transactions commit. For testing purpose, we disable this check when logical_replication_mode is set to "immediate". Author: Reviewed-by: Discussion: https://postgr.es/m/ --- .../replication/logical/reorderbuffer.c | 94 ++++++++++++++++--- src/include/replication/reorderbuffer.h | 13 ++- 2 files changed, 90 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 26d252bd87..387d2e9131 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -100,6 +100,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -256,7 +257,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, - bool txn_prepared); + bool txn_prepared, bool streaming); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -780,11 +781,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * While streaming the previous changes we have detected that the - * transaction is aborted. So there is no point in collecting further - * changes for it. + * If we have detected that the transaction is aborted while streaming + * the previous changes or by checking its CLOG, there is no point in + * collecting further changes for it. */ - if (txn->concurrent_abort) + if (txn->aborted) { /* * We don't need to update memory accounting for this change as we @@ -1603,9 +1604,12 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * * 'txn_prepared' indicates that we have decoded the transaction at prepare * time. + * 'streaming' indicates that this function is called while streaming the transaction + * and we can mark the transaction as streamed. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared, + bool streaming) { dlist_mutable_iter iter; @@ -1624,7 +1628,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, streaming); } /* cleanup changes in the txn */ @@ -1658,7 +1662,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) + if (streaming && + (!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; if (txn_prepared) @@ -1887,7 +1892,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * full cleanup will happen as part of the COMMIT PREPAREDs, so now * just truncate txn by removing changes and tuplecids. */ - ReorderBufferTruncateTXN(rb, txn, true); + ReorderBufferTruncateTXN(rb, txn, true, true); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2030,7 +2035,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2555,7 +2560,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2608,7 +2613,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, FlushErrorState(); FreeErrorData(errdata); errdata = NULL; - curtxn->concurrent_abort = true; + curtxn->aborted = true; /* Reset the TXN so that it is allowed to stream remaining data. */ ReorderBufferResetTXN(rb, txn, snapshot_now, @@ -2792,10 +2797,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, * when rollback prepared is decoded and sent, the downstream should be * able to rollback such a xact. See comments atop DecodePrepare. * - * Note, for the concurrent_abort + streaming case a stream_prepare was - * already sent within the ReorderBufferReplay call above. + * Note, for the abort + streaming case a stream_prepare was already sent + * within the ReorderBufferReplay call above. */ - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + if (txn->aborted && !rbtxn_is_streamed(txn)) rb->prepare(rb, txn, txn->final_lsn); } @@ -3561,6 +3566,59 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) return largest; } +/* + * Check the transaction status of the given transaction. If the transaction + * already aborted, we discards all changes accumulated so far and ignore + * future changes, and return true. Otherwise return false. + * + * If logical_replication_mode is set to "immediate", we disable this check + * for regression tests. + */ +static bool +ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* + * If logical_replication_mode is "immediate", we don't check the + * transaction status so the caller always process this transaction. + */ + if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)) + return false; + + if (txn->aborted) + return true; + + if (txn->committed) + return false; + + if (TransactionIdIsInProgress(txn->xid)) + return false; + + if (TransactionIdDidCommit(txn->xid)) + { + /* + * Remember the transaction is committed so that we can skip + * CLOG check next time, avoiding the pressure on CLOG lookup. + */ + txn->committed = true; + return false; + } + + /* + * The transaction aborted. We discard the changes we've collected + * so far, and free all resources allocated for toast reconstruction. + */ + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false); + ReorderBufferToastReset(rb, txn); + + /* + * Mark the transaction as aborted so we ignore future changes of this + * transaction. + */ + txn->aborted = true; + + return true; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to @@ -3613,6 +3671,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); + if (ReorderBufferCheckTXNAbort(rb, txn)) + continue; + ReorderBufferStreamTXN(rb, txn); } else @@ -3628,6 +3689,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->size > 0); Assert(rb->size >= txn->size); + if (ReorderBufferCheckTXNAbort(rb, txn)) + continue; + ReorderBufferSerializeTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1b9db22acb..fae431ef95 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -428,8 +428,17 @@ typedef struct ReorderBufferTXN /* Size of top-transaction including sub-transactions. */ Size total_size; - /* If we have detected concurrent abort then ignore future changes. */ - bool concurrent_abort; + /* + * True if the transaction committed. Then we skip transaction status + * check for this transaction. + */ + bool committed; + + /* + * True if the transaction (concurrently) aborted. Then we ignore + * future changes. + */ + bool aborted; /* * Private data pointer of the output plugin. -- 2.31.1