diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 860e1d4b0bb..7a500e30c74 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -115,11 +115,14 @@ #include "utils/relfilenumbermap.h" /* - * Maximum number of distributed invalidation messages per transaction. - * Each message is ~16 bytes, this allows up to 8 MB of invalidation - * message data. + * Each transaction is limited to a maximum of 8MV of inval messages distributed + * from other transaction. Once the number of distributed inval messages reach + * this threshold, the transaction is marked as RBTXN_INVAL_OVERFLOWED, + * invalidating all caches instead as we have lost some inval messages and hence + * don't know what needs to be invalidated. */ -#define MAX_DISTR_INVAL_MSG_PER_TXN 524288 +#define MAX_DISTR_INVAL_MSG_PER_TXN \ + ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage)) /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -2675,8 +2678,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - if (rbtxn_inval_all_cache(txn)) + if (rbtxn_inval_overflowed(txn)) + { + Assert(txn->ninvalidations == 0 && txn->ninvalidations_distributed == 0); InvalidateSystemCaches(); + } else { ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); @@ -2731,8 +2737,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - if (rbtxn_inval_all_cache(txn)) + if (rbtxn_inval_overflowed(txn)) + { + Assert(txn->ninvalidations == 0 && txn->ninvalidations_distributed == 0); InvalidateSystemCaches(); + } else { ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); @@ -3470,30 +3479,32 @@ ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, } /* - * Accumulate the invalidations. + * A helper function for ReorderBufferAddInvalidations() and + * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation + * messages to the **invals_out. */ static void -ReorderBufferAccumulateInvalidations(uint32 *ninvalidations, - SharedInvalidationMessage **invalidations, - Size nmsgs, - SharedInvalidationMessage *msgs) +ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, + uint32 *ninvals_out, + SharedInvalidationMessage *msgs_new, + Size nmsgs_new) { - if (*ninvalidations == 0) + if (*ninvals_out == 0) { - *ninvalidations = nmsgs; - *invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(*invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); + *ninvals_out = nmsgs_new; + *invals_out = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs_new); + memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new); } else { - *invalidations = (SharedInvalidationMessage *) - repalloc(*invalidations, sizeof(SharedInvalidationMessage) * - (*ninvalidations + nmsgs)); - - memcpy(*invalidations + *ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - *ninvalidations += nmsgs; + /* Enlarge the array of inval messages */ + *invals_out = (SharedInvalidationMessage *) + repalloc(*invals_out, sizeof(SharedInvalidationMessage) * + (*ninvals_out + nmsgs_new)); + memcpy(*invals_out + *ninvals_out, msgs_new, + nmsgs_new * sizeof(SharedInvalidationMessage)); + *ninvals_out += nmsgs_new; } } @@ -3532,12 +3543,15 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, Assert(nmsgs > 0); /* - * If the complete cache will be invalidated, we don't need to accumulate - * the invalidations. + * Accumulate the invalidation messages unless it's overflowed. + * + * XXX: need to explain here why we don't try to mark it as overflowed + * unlike ReorderBufferAddDistributedInvalidations(). */ - if (!rbtxn_inval_all_cache(txn)) - ReorderBufferAccumulateInvalidations(&txn->ninvalidations, - &txn->invalidations, nmsgs, msgs); + if (!rbtxn_inval_overflowed(txn)) + ReorderBufferAccumulateInvalidations(&txn->invalidations, + &txn->ninvalidations, + msgs, nmsgs); ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); @@ -3545,11 +3559,15 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, } /* - * Accumulate the invalidations sent by committed transactions for executing - * them later. + * Accumulate the invalidations distributed by other committed transactions + * for executing them later. * - * This needs to be called by committed transactions to distribute the - * invalidations to the in-progress transactions. + * This function is similar to ReorderBufferAddInvalidations() but stores + * the given inval messages to the txn->invalidations_distributed with the + * overflow check. + * + * This needs to be called by committed transactions to distribute their + * inval messages to in-progress transactions. */ void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, @@ -3573,13 +3591,16 @@ ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, Assert(nmsgs > 0); /* - * If the number of invalidation messages is high, performing a full cache - * invalidation is more efficient than handling each message separately. + * Check the transaction has enough space for storing distributed + * invalidation messages. */ - if (((nmsgs + txn->ninvalidations_distributed) > MAX_DISTR_INVAL_MSG_PER_TXN) || - rbtxn_inval_all_cache(txn)) + if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN) { - txn->txn_flags |= RBTXN_INVAL_ALL_CACHE; + /* + * Mark the invalidation message as overflowed and free up the + * messages accumulated so far. + */ + txn->txn_flags |= RBTXN_INVAL_OVERFLOWED; if (txn->invalidations_distributed) { @@ -3595,11 +3616,13 @@ ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, txn->ninvalidations = 0; } } - else - ReorderBufferAccumulateInvalidations(&txn->ninvalidations_distributed, - &txn->invalidations_distributed, - nmsgs, msgs); + if (!rbtxn_inval_overflowed(txn)) + ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed, + &txn->ninvalidations_distributed, + msgs, nmsgs); + + /* Queue the invalidation messages into the transaction */ ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 3e052ea1d58..c334e550342 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -794,6 +794,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact * contents built by the current transaction even after its decoding, * which should have been invalidated due to concurrent catalog * changing transaction. + * + * XXX: need to explain here why we need to distribute only inval + * messages coming from the current committed transactions. */ if (txn->xid != xid) { diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 15d839f3d90..d4fbe5ff709 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -176,7 +176,7 @@ typedef struct ReorderBufferChange #define RBTXN_SENT_PREPARE 0x0200 #define RBTXN_IS_COMMITTED 0x0400 #define RBTXN_IS_ABORTED 0x0800 -#define RBTXN_INVAL_ALL_CACHE 0x1000 +#define RBTXN_INVAL_OVERFLOWED 0x1000 #define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE) @@ -266,10 +266,10 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ ) -/* Should the complete cache be invalidated? */ -#define rbtxn_inval_all_cache(txn) \ +/* Is the array of inval message overflowed? */ +#define rbtxn_inval_overflowed(txn) \ ( \ - ((txn)->txn_flags & RBTXN_INVAL_ALL_CACHE) != 0 \ + ((txn)->txn_flags & RBTXN_INVAL_OVERFLOWED) != 0 \ ) /* Is this a top-level transaction? */ @@ -430,8 +430,7 @@ typedef struct ReorderBufferTXN SharedInvalidationMessage *invalidations; /* - * Stores the cache invalidation messages distributed by the committing - * transaction. + * Stores cache invalidation messages distributed by other transactions. */ uint32 ninvalidations_distributed; SharedInvalidationMessage *invalidations_distributed;