From a0b6c483c0180771e9fd0a21f13aa3ce924fb0db Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 22 Nov 2019 12:43:38 +0530 Subject: [PATCH v2 13/13] Extend handling of concurrent aborts for streaming transaction --- src/backend/replication/logical/reorderbuffer.c | 38 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 5 ++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 410da36..710a22f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2350,9 +2350,9 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* * When the (sub)transaction was streamed, notify the remote node - * about the abort. + * about the abort only if we have sent any data for this transaction. */ - if (rbtxn_is_streamed(txn)) + if (rbtxn_is_streamed(txn) && txn->any_data_sent) rb->stream_abort(rb, txn, lsn); /* cosmetic... */ @@ -3281,6 +3281,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) volatile CommandId command_id; bool using_subtxn; Size streamed = 0; + MemoryContext ccxt = CurrentMemoryContext; ReorderBufferStreamIterTXNState *volatile iterstate = NULL; /* @@ -3411,6 +3412,13 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* we're going to stream this change */ streamed++; + /* + * Set the CheckXidAlive to the current (sub)xid for which this + * change belongs to so that we can detect the abort while we are + * decoding. + */ + CheckXidAlive = change->txn->xid; + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -3472,6 +3480,10 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferToastReplace(rb, txn, relation, change); rb->stream_change(rb, txn, relation, change); + /* Remember that we have sent some data for this txn.*/ + if (!change->txn->any_data_sent) + change->txn->any_data_sent = true; + /* * Only clear reassembled toast chunks if we're sure * they're not required anymore. The creator of the @@ -3654,6 +3666,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the message itself? */ LocalExecuteInvalidationMessage(&change->data.inval.msg); + + /* Invalidate current schema as well */ txn->is_schema_sent = false; break; @@ -3717,6 +3731,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferStreamIterTXNFinish(rb, iterstate); @@ -3735,7 +3752,22 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + { + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + rb->stream_stop(rb, txn); + + FlushErrorState(); + } } PG_END_TRY(); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index c183fce..1db6da6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -242,6 +242,11 @@ typedef struct ReorderBufferTXN bool is_schema_sent; /* + * Have we sent any changes for this transaction in output plugin? + */ + bool any_data_sent; + + /* * Toplevel transaction for this subxact (NULL for top-level). */ struct ReorderBufferTXN *toptxn; -- 1.8.3.1