From 76cea7d3a261cde0492547db75e4c95ea787ad32 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sun, 29 Dec 2019 15:41:26 +0530 Subject: [PATCH v4 18/19] Review comment fix and refactoring --- src/backend/replication/logical/reorderbuffer.c | 995 ++++++------------------ 1 file changed, 237 insertions(+), 758 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 789b425..8b3f112 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -149,28 +149,6 @@ typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; } ReorderBufferIterTXNState; -/* - * k-way in-order change iteration support structures - * - * This is a simplified version for streaming, which does not require - * serialization to files and only reads changes that are currently in - * memory. - */ -typedef struct ReorderBufferStreamIterTXNEntry -{ - XLogRecPtr lsn; - ReorderBufferChange *change; - ReorderBufferTXN *txn; -} ReorderBufferStreamIterTXNEntry; - -typedef struct ReorderBufferStreamIterTXNState -{ - binaryheap *heap; - Size nr_txns; - dlist_head old_change; - ReorderBufferStreamIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; -} ReorderBufferStreamIterTXNState; - /* toast datastructures */ typedef struct ReorderBufferToastEnt { @@ -235,20 +213,6 @@ static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); - -/* iterator for streaming (only get data from memory) */ -static ReorderBufferStreamIterTXNState * ReorderBufferStreamIterTXNInit( - ReorderBuffer *rb, - ReorderBufferTXN *txn); - -static ReorderBufferChange *ReorderBufferStreamIterTXNNext( - ReorderBuffer *rb, - ReorderBufferStreamIterTXNState * state); - -static void ReorderBufferStreamIterTXNFinish( - ReorderBuffer *rb, - ReorderBufferStreamIterTXNState * state); - /* * --------------------------------------- * Disk serialization support functions @@ -1324,210 +1288,6 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, } /* - * Binary heap comparison function (streaming iterator). - */ -static int -ReorderBufferStreamIterCompare(Datum a, Datum b, void *arg) -{ - ReorderBufferStreamIterTXNState *state = (ReorderBufferStreamIterTXNState *) arg; - XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; - XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; - - if (pos_a < pos_b) - return 1; - else if (pos_a == pos_b) - return 0; - return -1; -} - -/* - * Allocate & initialize an iterator which iterates in lsn order over a - * transaction and all its subtransactions. This version is meant for - * streaming of incomplete transactions. - */ -static ReorderBufferStreamIterTXNState * -ReorderBufferStreamIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) -{ - Size nr_txns = 0; - ReorderBufferStreamIterTXNState *state; - dlist_iter cur_txn_i; - int32 off; - - /* Check ordering of changes in the toplevel transaction. */ - AssertChangeLsnOrder(rb, txn); - - /* - * Calculate the size of our heap: one element for every transaction that - * contains changes. (Besides the transactions already in the reorder - * buffer, we count the one we were directly passed.) - */ - if (txn->nentries > 0) - nr_txns++; - - dlist_foreach(cur_txn_i, &txn->subtxns) - { - ReorderBufferTXN *cur_txn; - - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - - /* Check ordering of changes in this subtransaction. */ - AssertChangeLsnOrder(rb, cur_txn); - - if (cur_txn->nentries > 0) - nr_txns++; - } - - /* - * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no - * need to allocate/build a heap then. - */ - - /* allocate iteration state */ - state = (ReorderBufferStreamIterTXNState *) - MemoryContextAllocZero(rb->context, - sizeof(ReorderBufferStreamIterTXNState) + - sizeof(ReorderBufferStreamIterTXNEntry) * nr_txns); - - state->nr_txns = nr_txns; - dlist_init(&state->old_change); - - /* allocate heap */ - state->heap = binaryheap_allocate(state->nr_txns, - ReorderBufferStreamIterCompare, - state); - - /* - * Now insert items into the binary heap, in an unordered fashion. (We - * will run a heap assembly step at the end; this is more efficient.) - */ - - off = 0; - - /* add toplevel transaction if it contains changes */ - if (txn->nentries > 0) - { - ReorderBufferChange *cur_change; - - cur_change = dlist_head_element(ReorderBufferChange, node, - &txn->changes); - - state->entries[off].lsn = cur_change->lsn; - state->entries[off].change = cur_change; - state->entries[off].txn = txn; - - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); - } - - /* add subtransactions if they contain changes */ - dlist_foreach(cur_txn_i, &txn->subtxns) - { - ReorderBufferTXN *cur_txn; - - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - - if (cur_txn->nentries > 0) - { - ReorderBufferChange *cur_change; - - cur_change = dlist_head_element(ReorderBufferChange, node, - &cur_txn->changes); - - state->entries[off].lsn = cur_change->lsn; - state->entries[off].change = cur_change; - state->entries[off].txn = cur_txn; - - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); - } - } - - Assert(off == nr_txns); - - /* assemble a valid binary heap */ - binaryheap_build(state->heap); - - return state; -} - -/* - * Return the next change when iterating over a transaction and its - * subtransactions. - * - * Returns NULL when no further changes exist. - */ -static ReorderBufferChange * -ReorderBufferStreamIterTXNNext(ReorderBuffer *rb, ReorderBufferStreamIterTXNState * state) -{ - ReorderBufferChange *change; - ReorderBufferStreamIterTXNEntry *entry; - int32 off; - - /* nothing there anymore */ - if (state->heap->bh_size == 0) - return NULL; - - off = DatumGetInt32(binaryheap_first(state->heap)); - entry = &state->entries[off]; - - /* free memory we might have "leaked" in the previous *Next call */ - if (!dlist_is_empty(&state->old_change)) - { - change = dlist_container(ReorderBufferChange, node, - dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); - Assert(dlist_is_empty(&state->old_change)); - } - - change = entry->change; - - /* - * update heap with information about which transaction has the next - * relevant change in LSN order - */ - - /* there are in-memory changes */ - if (dlist_has_next(&entry->txn->changes, &entry->change->node)) - { - dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); - ReorderBufferChange *next_change = - dlist_container(ReorderBufferChange, node, next); - - /* txn stays the same */ - state->entries[off].lsn = next_change->lsn; - state->entries[off].change = next_change; - - binaryheap_replace_first(state->heap, Int32GetDatum(off)); - return change; - } - - /* ok, no changes there anymore, remove */ - binaryheap_remove_first(state->heap); - - return change; -} - -/* - * Deallocate the iterator - */ -static void -ReorderBufferStreamIterTXNFinish(ReorderBuffer *rb, - ReorderBufferStreamIterTXNState * state) -{ - /* free memory we might have "leaked" in the last *Next call */ - if (!dlist_is_empty(&state->old_change)) - { - ReorderBufferChange *change; - - change = dlist_container(ReorderBufferChange, node, - dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); - Assert(dlist_is_empty(&state->old_change)); - } - - binaryheap_free(state->heap); - pfree(state); -} - -/* * Cleanup the contents of a transaction, usually after the transaction * committed or aborted. */ @@ -1854,6 +1614,11 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) SnapBuildSnapDecRefcount(snap); } +/* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + */ static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) { @@ -1868,86 +1633,38 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Perform the replay of a transaction and its non-aborted subtransactions. - * - * Subtransactions previously have to be processed by - * ReorderBufferCommitChild(), even if previously assigned to the toplevel - * transaction with ReorderBufferAssignChild. + * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN * - * We currently can only decode a transaction's contents when its commit - * record is read because that's the only place where we know about cache - * invalidations. Thus, once a toplevel commit is read, we iterate over the top - * and subtransactions (using a k-way merge) and replay the changes in lsn - * order. + * Send data of a transaction (and its subtransactions) to the + * output plugin. If streaming is true then data will be sent using stream API. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn, + volatile Snapshot snapshot_now, + volatile CommandId command_id, + bool streaming) { - ReorderBufferTXN *txn; - volatile Snapshot snapshot_now; - volatile CommandId command_id = FirstCommandId; bool using_subtxn; + Size streamed = 0; + MemoryContext ccxt = CurrentMemoryContext; ReorderBufferIterTXNState *volatile iterstate = NULL; + volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; + /* + * build data to be able to lookup the CommandIds of catalog tuples + */ + ReorderBufferBuildTupleCidHash(rb, txn); - txn->final_lsn = commit_lsn; - txn->end_lsn = end_lsn; - txn->commit_time = commit_time; - txn->origin_id = origin_id; - txn->origin_lsn = origin_lsn; + /* setup the initial snapshot */ + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, txn->xid); /* - * If the transaction was (partially) streamed, we need to commit it in a - * 'streamed' way. That is, we first stream the remaining part of the - * transaction, and then invoke stream_commit message. - * - * XXX Called after everything (origin ID and LSN, ...) is stored in the - * transaction, so we don't pass that directly. - * - * XXX Somewhat hackish redirection, perhaps needs to be refactored? - */ - if (rbtxn_is_streamed(txn)) - { - ReorderBufferStreamCommit(rb, txn); - return; - } - - /* - * If this transaction has no snapshot, it didn't make any changes to the - * database, so there's nothing to decode. Note that - * ReorderBufferCommitChild will have transferred any snapshots from - * subtransactions if there were any. - */ - if (txn->base_snapshot == NULL) - { - Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); - return; - } - - snapshot_now = txn->base_snapshot; - - /* build data to be able to lookup the CommandIds of catalog tuples */ - ReorderBufferBuildTupleCidHash(rb, txn); - - /* setup the initial snapshot */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); - - /* - * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around to - * keep track of those. The easiest way is to simply use a transaction - * internally. That also allows us to easily enforce that nothing writes - * to the database by checking for xid assignments. + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. * * When we're called via the SQL SRF there's already a transaction * started, so start an explicit subtransaction there. @@ -1961,11 +1678,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferChange *specinsert = NULL; if (using_subtxn) - BeginInternalSubTransaction("replay"); + BeginInternalSubTransaction("stream"); else StartTransactionCommand(); - rb->begin(rb, txn); + /* start streaming this chunk of transaction */ + if (streaming) + rb->stream_start(rb, txn); + else + rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -1978,11 +1699,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * subtransactions. The changes may have the same LSN due to * MULTI_INSERT xlog records. */ - if (prev_lsn != InvalidXLogRecPtr) - Assert(prev_lsn <= change->lsn); + Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); prev_lsn = change->lsn; + if (streaming) + { + /* + * Set the CheckXidAlive to the current (sub)xid for which this + * change belongs to so that we can detect the abort while we are + * decoding. + */ + CheckXidAlive = change->txn->xid; + + /* Increment the stream count. */ + streamed++; + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1992,8 +1725,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * use as a normal record. It'll be cleaned up at the end * of INSERT processing. */ - if (specinsert == NULL) - elog(ERROR, "invalid ordering of speculative insertion changes"); Assert(specinsert->data.tp.oldtuple == NULL); change = specinsert; change->action = REORDER_BUFFER_CHANGE_INSERT; @@ -2059,7 +1790,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (!IsToastRelation(relation)) { ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); + if (streaming) + { + rb->stream_change(rb, txn, relation, change); + + /* Remember that we have sent some data for this xid. */ + change->txn->any_data_sent = true; + } + else + rb->apply_change(rb, txn, relation, change); /* * Only clear reassembled toast chunks if we're sure @@ -2080,8 +1819,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * freed/reused while restoring spooled data from * disk. */ - Assert(change->data.tp.newtuple != NULL); - dlist_delete(&change->node); ReorderBufferToastAppendChunk(rb, txn, relation, change); @@ -2099,7 +1836,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = NULL; } - if (relation != NULL) + if (RelationIsValid(relation)) { RelationClose(relation); relation = NULL; @@ -2157,7 +1894,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relations[nrelations++] = relation; } - rb->apply_truncate(rb, txn, nrelations, relations, change); + if (streaming) + { + rb->stream_truncate(rb, txn, nrelations, relations, change); + + /* Remember that we have sent some data. */ + change->txn->any_data_sent = true; + } + else + rb->apply_truncate(rb, txn, nrelations, relations, change); for (i = 0; i < nrelations; i++) RelationClose(relations[i]); @@ -2166,10 +1911,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } case REORDER_BUFFER_CHANGE_MESSAGE: - rb->message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + if (streaming) + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + else + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -2200,9 +1951,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now = change->data.snapshot; } - /* and continue with the new one */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, + txn->xid); break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -2222,7 +1973,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now->curcid = command_id; TeardownHistoricSnapshot(false); - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, + txn->xid); } break; @@ -2260,16 +2012,46 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* + * Done with current changes, call stream_stop callback for streaming + * transaction, commit callback otherwise. + */ + if (streaming) + { + /* + * Set the last last of the stream as the final lsn before calling + * stream stop. + */ + txn->final_lsn = prev_lsn; + rb->stream_stop(rb, txn); + } + else + rb->commit(rb, txn, commit_lsn); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); - /* cleanup */ - TeardownHistoricSnapshot(false); + /* + * Remember the command ID and snapshot if transaction is streaming + * otherwise free the snapshot if we have copied it. + */ + if (streaming) + { + txn->command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + } + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak + * any memory. We could also keep the hash table and update it with + * new ctid values, but this seems simpler and good enough for now. + */ + ReorderBufferDestroyTupleCidHash(rb, txn); /* * Aborting the current (sub-)transaction as a whole has the right @@ -2285,14 +2067,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); - - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + /* + * If we are streaming the in-progress transaction then Discard the + * changes that we just streamed, and mark the transactions as streamed + * (if they contained changes). Otherwise, remove all the changes and + * deallocate the ReorderBufferTXN. + */ + if (streaming) + ReorderBufferTruncateTXN(rb, txn); + else + ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -2311,18 +2101,117 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); - - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + if (streaming) + { + /* Discard the changes that we just streamed. */ + ReorderBufferTruncateTXN(rb, txn); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + { + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + /* + * Set the last last of the stream as the final lsn before + * calling stream stop. + */ + txn->final_lsn = prev_lsn; + rb->stream_stop(rb, txn); + + FlushErrorState(); + } + } + else + { + ReorderBufferCleanupTXN(rb, txn); + PG_RE_THROW(); + } } PG_END_TRY(); } /* + * Perform the replay of a transaction and its non-aborted subtransactions. + * + * Subtransactions previously have to be processed by + * ReorderBufferCommitChild(), even if previously assigned to the toplevel + * transaction with ReorderBufferAssignChild. + * + * We currently can only decode a transaction's contents when its commit + * record is read because that's the only place where we know about cache + * invalidations. Thus, once a toplevel commit is read, we iterate over the top + * and subtransactions (using a k-way merge) and replay the changes in lsn + * order. + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + volatile Snapshot snapshot_now; + volatile CommandId command_id = FirstCommandId; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * XXX Called after everything (origin ID and LSN, ...) is stored in the + * transaction, so we don't pass that directly. + * + * XXX Somewhat hackish redirection, perhaps needs to be refactored? + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; + } + + /* + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + ReorderBufferCleanupTXN(rb, txn); + return; + } + + snapshot_now = txn->base_snapshot; + + /* + * Access the main routine to decode the changes and send to output plugin. + */ + ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, + command_id, false); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -3264,10 +3153,6 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { volatile Snapshot snapshot_now; volatile CommandId command_id; - bool using_subtxn; - Size streamed = 0; - MemoryContext ccxt = CurrentMemoryContext; - ReorderBufferStreamIterTXNState *volatile iterstate = NULL; /* * If this is a subxact, we need to stream the top-level transaction @@ -3305,16 +3190,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferTXN *subtxn; subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); - - if (subtxn->base_snapshot != NULL && - (txn->base_snapshot == NULL || - txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) - { - txn->base_snapshot = subtxn->base_snapshot; - txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; - subtxn->base_snapshot = NULL; - subtxn->base_snapshot_lsn = InvalidXLogRecPtr; - } + ReorderBufferTransferSnapToParent(txn, subtxn); } command_id = FirstCommandId; @@ -3344,407 +3220,10 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * build data to be able to lookup the CommandIds of catalog tuples - */ - ReorderBufferBuildTupleCidHash(rb, txn); - - /* setup the initial snapshot */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, txn->xid); - - /* - * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around to - * keep track of those. The easiest way is to simply use a transaction - * internally. That also allows us to easily enforce that nothing writes - * to the database by checking for xid assignments. - * - * When we're called via the SQL SRF there's already a transaction - * started, so start an explicit subtransaction there. + * Access the main routine to decode the changes and send to output plugin. */ - using_subtxn = IsTransactionOrTransactionBlock(); - - PG_TRY(); - { - XLogRecPtr prev_lsn = InvalidXLogRecPtr; - ReorderBufferChange *change; - ReorderBufferChange *specinsert = NULL; - - if (using_subtxn) - BeginInternalSubTransaction("stream"); - else - StartTransactionCommand(); - - /* start streaming this chunk of transaction */ - rb->stream_start(rb, txn); - - iterstate = ReorderBufferStreamIterTXNInit(rb, txn); - while ((change = ReorderBufferStreamIterTXNNext(rb, iterstate)) != NULL) - { - Relation relation = NULL; - Oid reloid; - - /* - * Enforce correct ordering of changes, merged from multiple - * subtransactions. The changes may have the same LSN due to - * MULTI_INSERT xlog records. - */ - if (prev_lsn != InvalidXLogRecPtr) - Assert(prev_lsn <= change->lsn); - - prev_lsn = change->lsn; - - /* we're going to stream this change */ - streamed++; - - /* - * Set the CheckXidAlive to the current (sub)xid for which this - * change belongs to so that we can detect the abort while we are - * decoding. - */ - CheckXidAlive = change->txn->xid; - - switch (change->action) - { - case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: - - /* - * Confirmation for speculative insertion arrived. Simply - * use as a normal record. It'll be cleaned up at the end - * of INSERT processing. - */ - Assert(specinsert->data.tp.oldtuple == NULL); - change = specinsert; - change->action = REORDER_BUFFER_CHANGE_INSERT; - - /* intentionally fall through */ - case REORDER_BUFFER_CHANGE_INSERT: - case REORDER_BUFFER_CHANGE_UPDATE: - case REORDER_BUFFER_CHANGE_DELETE: - Assert(snapshot_now); - - reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, - change->data.tp.relnode.relNode); - - /* - * Catalog tuple without data, emitted while catalog was - * in the process of being rewritten. - */ - if (reloid == InvalidOid && - change->data.tp.newtuple == NULL && - change->data.tp.oldtuple == NULL) - goto change_done; - else if (reloid == InvalidOid) - elog(ERROR, "could not map filenode \"%s\" to relation OID", - relpathperm(change->data.tp.relnode, - MAIN_FORKNUM)); - - relation = RelationIdGetRelation(reloid); - - if (relation == NULL) - elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", - reloid, - relpathperm(change->data.tp.relnode, - MAIN_FORKNUM)); - - if (!RelationIsLogicallyLogged(relation)) - goto change_done; - - /* - * For now ignore sequence changes entirely. Most of the - * time they don't log changes using records we - * understand, so it doesn't make sense to handle the few - * cases we do. - */ - if (relation->rd_rel->relkind == RELKIND_SEQUENCE) - goto change_done; - - /* user-triggered change */ - if (!IsToastRelation(relation)) - { - ReorderBufferToastReplace(rb, txn, relation, change); - rb->stream_change(rb, txn, relation, change); - - /* Remember that we have sent some data for this txn.*/ - if (!change->txn->any_data_sent) - change->txn->any_data_sent = true; - - /* - * Only clear reassembled toast chunks if we're sure - * they're not required anymore. The creator of the - * tuple tells us. - */ - if (change->data.tp.clear_toast_afterwards) - ReorderBufferToastReset(rb, txn); - } - /* we're not interested in toast deletions */ - else if (change->action == REORDER_BUFFER_CHANGE_INSERT) - { - /* - * Need to reassemble the full toasted Datum in - * memory, to ensure the chunks don't get reused till - * we're done remove it from the list of this - * transaction's changes. Otherwise it will get - * freed/reused while restoring spooled data from - * disk. - */ - dlist_delete(&change->node); - ReorderBufferToastAppendChunk(rb, txn, relation, - change); - } - - change_done: - - /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. - */ - if (specinsert != NULL) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } - - if (relation != NULL) - { - RelationClose(relation); - relation = NULL; - } - break; - - case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: - - /* - * Speculative insertions are dealt with by delaying the - * processing of the insert until the confirmation record - * arrives. For that we simply unlink the record from the - * chain, so it does not get freed/reused while restoring - * spooled data from disk. - * - * This is safe in the face of concurrent catalog changes - * because the relevant relation can't be changed between - * speculative insertion and confirmation due to - * CheckTableNotInUse() and locking. - */ - - /* clear out a pending (and thus failed) speculation */ - if (specinsert != NULL) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } - - /* and memorize the pending insertion */ - dlist_delete(&change->node); - specinsert = change; - break; - - case REORDER_BUFFER_CHANGE_TRUNCATE: - { - int i; - int nrelids = change->data.truncate.nrelids; - int nrelations = 0; - Relation *relations; - - relations = palloc0(nrelids * sizeof(Relation)); - for (i = 0; i < nrelids; i++) - { - Oid relid = change->data.truncate.relids[i]; - Relation relation; - - relation = RelationIdGetRelation(relid); - - if (relation == NULL) - elog(ERROR, "could not open relation with OID %u", relid); - - if (!RelationIsLogicallyLogged(relation)) - continue; - - relations[nrelations++] = relation; - } - - rb->stream_truncate(rb, txn, nrelations, relations, change); - - for (i = 0; i < nrelations; i++) - RelationClose(relations[i]); - - break; - } - - case REORDER_BUFFER_CHANGE_MESSAGE: - - rb->stream_message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); - break; - - case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: - /* get rid of the old */ - TeardownHistoricSnapshot(false); - - if (snapshot_now->copied) - { - ReorderBufferFreeSnap(rb, snapshot_now); - snapshot_now = - ReorderBufferCopySnap(rb, change->data.snapshot, - txn, command_id); - } - - /* - * Restored from disk, need to be careful not to double - * free. We could introduce refcounting for that, but for - * now this seems infrequent enough not to care. - */ - else if (change->data.snapshot->copied) - { - snapshot_now = - ReorderBufferCopySnap(rb, change->data.snapshot, - txn, command_id); - } - else - { - snapshot_now = change->data.snapshot; - } - - /* and continue with the new one */ - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, - txn->xid); - break; - - case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - Assert(change->data.command_id != InvalidCommandId); - - if (command_id < change->data.command_id) - { - command_id = change->data.command_id; - - if (!snapshot_now->copied) - { - /* we don't use the global one anymore */ - snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, - txn, command_id); - } - - snapshot_now->curcid = command_id; - - TeardownHistoricSnapshot(false); - SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, - txn->xid); - } - - break; - - case REORDER_BUFFER_CHANGE_INVALIDATION: - - /* - * Execute the invalidation message locally. - * - * XXX Do we need to care about relcacheInitFileInval and - * the other fields added to ReorderBufferChange, or just - * about the message itself? - */ - LocalExecuteInvalidationMessage(&change->data.inval.msg); - break; - - case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: - elog(ERROR, "tuplecid value in changequeue"); - break; - } - } - - /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. - */ - if (specinsert) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } - - /* clean up the iterator */ - ReorderBufferStreamIterTXNFinish(rb, iterstate); - iterstate = NULL; - - /* call stream_stop callback */ - rb->stream_stop(rb, txn); - - /* this is just a sanity check against bad output plugin behaviour */ - if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) - elog(ERROR, "output plugin used XID %u", - GetCurrentTransactionId()); - - /* remember the command ID and snapshot for the streaming run */ - txn->command_id = command_id; - txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, - txn, command_id); - - /* cleanup */ - TeardownHistoricSnapshot(false); - - /* - * Destroy the (relfilenode, ctid) hashtable, so that we don't leak - * any memory. We could also keep the hash table and update it with - * new ctid values, but this seems simpler and good enough for now. - */ - ReorderBufferDestroyTupleCidHash(rb, txn); - - /* - * Aborting the current (sub-)transaction as a whole has the right - * semantics. We want all locks acquired in here to be released, not - * reassigned to the parent and we do not want any database access - * have persistent effects. - */ - AbortCurrentTransaction(); - - /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - } - PG_CATCH(); - { - MemoryContext ecxt = MemoryContextSwitchTo(ccxt); - ErrorData *errdata = CopyErrorData(); - - /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ - if (iterstate) - ReorderBufferStreamIterTXNFinish(rb, iterstate); - - TeardownHistoricSnapshot(true); - - /* - * Force cache invalidation to happen outside of a valid transaction - * to prevent catalog access as we just caught an error. - */ - AbortCurrentTransaction(); - - /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - - /* re-throw only if it's not an abort */ - if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) - { - MemoryContextSwitchTo(ecxt); - PG_RE_THROW(); - } - else - { - /* remember the command ID and snapshot for the streaming run */ - txn->command_id = command_id; - txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, - txn, command_id); - rb->stream_stop(rb, txn); - - FlushErrorState(); - } - } - PG_END_TRY(); + ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, + command_id, true); /* * Update the stream statistics. -- 1.8.3.1