From 4fd60ba2487e6f57d435cc57449ad2a8b5287632 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 21 Jan 2020 10:57:10 +0530 Subject: [PATCH v8 13/13] Bugfix handling of incomplete toast tuple --- contrib/test_decoding/logical.conf | 1 + src/backend/replication/logical/reorderbuffer.c | 159 +++++++++++++++++++++--- src/include/replication/reorderbuffer.h | 8 ++ 3 files changed, 152 insertions(+), 16 deletions(-) diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf index 07c4d3d..f748994 100644 --- a/contrib/test_decoding/logical.conf +++ b/contrib/test_decoding/logical.conf @@ -1,3 +1,4 @@ wal_level = logical max_replication_slots = 4 logical_decoding_work_mem = 64kB +logging_collector=on diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index beb6cd2..acede78 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -664,6 +664,16 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn->nentries++; txn->nentries_mem++; + /* + * If we have detected the toast chunk without the main table change while + * sending the previous stream, then reset the flag if we have got any + * insert/update so that we can retry to stream this. + */ + if (txn->incomplte_toast_chunks && + (change->action == REORDER_BUFFER_CHANGE_INSERT || + change->action == REORDER_BUFFER_CHANGE_UPDATE)) + txn->incomplte_toast_chunks = false; + /* update memory accounting information */ ReorderBufferChangeMemoryUpdate(rb, change, true); @@ -1682,6 +1692,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, MemoryContext ccxt = CurrentMemoryContext; ReorderBufferIterTXNState *volatile iterstate = NULL; volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; + dlist_head toast_change; + Oid toastrelid = InvalidOid; /* * build data to be able to lookup the CommandIds of catalog tuples @@ -1691,6 +1703,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* setup the initial snapshot */ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, txn->xid); + /* Initialize the local toast change list. */ + dlist_init(&toast_change); + /* * Decoding needs access to syscaches et al., which in turn use * heavyweight locks and such. Thus we need to have enough state around to @@ -1734,6 +1749,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; + Relation toastrel = NULL; Oid reloid; /* @@ -1838,6 +1854,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* user-triggered change */ if (!IsToastRelation(relation)) { + /* + * If we have the toast changes list then reassemble the + * toast chunks in the hash table. + */ + if (!dlist_is_empty(&toast_change)) + { + dlist_mutable_iter iter; + + Assert(streaming); + + /* open the toast relation. */ + toastrel = RelationIdGetRelation(toastrelid); + + dlist_foreach_modify(iter, &toast_change) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, + node, iter.cur); + dlist_delete(&change->node); + ReorderBufferToastAppendChunk(rb, txn, toastrel, + change); + } + RelationClose(toastrel); + } + ReorderBufferToastReplace(rb, txn, relation, change); if (streaming) { @@ -1869,8 +1911,25 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * disk. */ dlist_delete(&change->node); - ReorderBufferToastAppendChunk(rb, txn, relation, - change); + + /* + * For the streaming run don't directly reassemble the + * chunks in the hash table instead collect those + * changes in the list because there is a possibility + * that we might not get the change for the main table + * in this stream. So we will assemble the chunks when + * we actually get the change for the main table. + * Otherwise we will attach the list back to the main + * changes list. + */ + if (streaming) + { + dlist_push_tail(&toast_change, &change->node); + toastrelid = reloid; + } + else + ReorderBufferToastAppendChunk(rb, txn, relation, + change); } change_done: @@ -2125,7 +2184,36 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * deallocate the ReorderBufferTXN. */ if (streaming) + { ReorderBufferTruncateTXN(rb, txn); + + /* + * If we could not stream the toasted chunks then append them back + * to the main txn list. + */ + if (!dlist_is_empty(&toast_change)) + { + dlist_mutable_iter iter; + + Assert(streaming); + dlist_foreach_modify(iter, &toast_change) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* + * Remove from temp list and add it back to the txn changes + * list. + */ + dlist_delete(&change->node); + dlist_push_tail(&txn->changes, &change->node); + txn->nentries_mem++; + txn->nentries++; + } + txn->incomplte_toast_chunks = true; + } + } else ReorderBufferCleanupTXN(rb, txn); } @@ -2177,6 +2265,33 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, rb->stream_stop(rb, txn); FlushErrorState(); + + /* + * If we could not stream the toasted chunks then append them + * back to the main txn list. + */ + if (!dlist_is_empty(&toast_change)) + { + dlist_mutable_iter iter; + + Assert(streaming); + dlist_foreach_modify(iter, &toast_change) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* + * Remove from temp list and add it back to the txn + * changes list. + */ + dlist_delete(&change->node); + dlist_push_tail(&txn->changes, &change->node); + txn->nentries_mem++; + txn->nentries++; + } + txn->incomplte_toast_chunks = true; + } } } else @@ -2523,6 +2638,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { Size sz; ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2538,7 +2654,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* if subxact, and streaming supported, use the toplevel instead */ if (txn->toptxn && ReorderBufferCanStream(rb)) - txn = txn->toptxn; + toptxn = txn->toptxn; sz = ReorderBufferChangeSize(change); @@ -2546,12 +2662,16 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { txn->size += sz; rb->size += sz; + if (toptxn) + toptxn->size += sz; } else { Assert((rb->size >= sz) && (txn->size >= sz)); txn->size -= sz; rb->size -= sz; + if (toptxn) + toptxn->size -= sz; } Assert(txn->size <= rb->size); @@ -2805,14 +2925,11 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) txn = dlist_container(ReorderBufferTXN, node, iter.cur); /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + if (((!largest) || (txn->size > largest->size)) && + (!txn->incomplte_toast_chunks) && (txn->size > 0)) + largest = txn; } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); - return largest; } @@ -2834,11 +2951,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) if (rb->size < logical_decoding_work_mem * 1024L) return; +retry: /* * Pick the largest transaction (or subtransaction) and evict it from * memory by streaming, if supported. Otherwise spill to disk. */ - if (ReorderBufferCanStream(rb)) + if (ReorderBufferCanStream(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) { /* * Pick the largest toplevel transaction and evict it from memory by @@ -2877,8 +2996,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * streaming. But for streaming we should really check nentries_mem for * all subtransactions too. */ - Assert(txn->size == 0); - Assert(txn->nentries_mem == 0); + Assert(txn->incomplte_toast_chunks || txn->specinsert || txn->size == 0); + Assert(txn->incomplte_toast_chunks || txn->nentries_mem == 0); /* * And furthermore, evicting the transaction should get us below the @@ -2890,7 +3009,16 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * the memory limit). So by evicting it we're definitely back below the * memory limit. */ - Assert(rb->size < logical_decoding_work_mem * 1024L); + Assert(txn->incomplte_toast_chunks || + rb->size < logical_decoding_work_mem * 1024L); + + /* + * For streaming transaction it's possible that we are not yet below the + * memory limit due to incomplete toast tuple. So we retry with some other + * transaction. + */ + if (rb->size >= logical_decoding_work_mem * 1024L) + goto retry; } /* @@ -3277,9 +3405,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Don't consider already streamed transaction. */ rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1; - Assert(dlist_is_empty(&txn->changes)); - Assert(txn->nentries == 0); - Assert(txn->nentries_mem == 0); + Assert(txn->incomplte_toast_chunks || txn->nentries == 0); + Assert(txn->incomplte_toast_chunks || txn->nentries_mem == 0); } /* diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 02650c3..1348cde 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -251,6 +251,14 @@ typedef struct ReorderBufferTXN bool any_data_sent; /* + * While sending the last stream we have found that the transaction has some + * toast tuples but haven't yet got the change of the main table so we could + * not stream it. Don't try to stream it again until we get any new change + * for the transaction. + */ + bool incomplte_toast_chunks; + + /* * Toplevel transaction for this subxact (NULL for top-level). */ struct ReorderBufferTXN *toptxn; -- 1.8.3.1