From 0a7cbaeaa179bde9dd5146e60f7b5c52b6c34899 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Mon, 17 Dec 2018 15:43:13 +0300 Subject: [PATCH] Fix worker, historic MVCC visibility rules, subxacts, schema send, tests --- doc/src/sgml/logicaldecoding.sgml | 2 +- .../replication/logical/reorderbuffer.c | 61 +++++++++--- src/backend/replication/logical/worker.c | 25 ++--- src/backend/replication/pgoutput/pgoutput.c | 20 +++- src/backend/replication/walsender.c | 2 +- src/backend/utils/time/tqual.c | 16 ++- src/include/replication/reorderbuffer.h | 5 + ..._stream_simple.pl => 011_stream_simple.pl} | 2 +- ...tream_subxact.pl => 012_stream_subxact.pl} | 2 +- .../{011_stream_ddl.pl => 013_stream_ddl.pl} | 2 +- .../subscription/t/014_stream_tough_ddl.pl | 98 +++++++++++++++++++ ...t_abort.pl => 015_stream_subxact_abort.pl} | 2 +- ...ort.pl => 016_stream_subxact_ddl_abort.pl} | 2 +- 13 files changed, 200 insertions(+), 39 deletions(-) rename src/test/subscription/t/{009_stream_simple.pl => 011_stream_simple.pl} (98%) rename src/test/subscription/t/{010_stream_subxact.pl => 012_stream_subxact.pl} (98%) rename src/test/subscription/t/{011_stream_ddl.pl => 013_stream_ddl.pl} (98%) create mode 100644 src/test/subscription/t/014_stream_tough_ddl.pl rename src/test/subscription/t/{012_stream_subxact_abort.pl => 015_stream_subxact_abort.pl} (97%) rename src/test/subscription/t/{013_stream_subxact_ddl_abort.pl => 016_stream_subxact_ddl_abort.pl} (97%) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 3571f96a8d..dbec2e4ef7 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -1057,7 +1057,7 @@ stream_commit_cb(...); <-- commit of the streamed transaction - Similarly to spill-to-disk behavior, sStreaming is triggered when the total + Similarly to spill-to-disk behavior, streaming is triggered when the total amount of changes decoded from the WAL (for all in-progress transactions) exceeds limit defined by logical_work_mem setting. At that point the largest toplevel transaction (measured by amount of memory diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1c394296ac..2200332999 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1744,9 +1744,10 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) } else { + // TOCHECK: Is the second assert actually necessary? Assert(ent->cmin == change->data.tuplecid.cmin); - Assert(ent->cmax == InvalidCommandId || - ent->cmax == change->data.tuplecid.cmax); + // Assert(ent->cmax == InvalidCommandId || + // ent->cmax == change->data.tuplecid.cmax); /* * if the tuple got valid in this transaction and now got deleted @@ -2881,6 +2882,9 @@ ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) for (i = 0; i < txn->ninvalidations; i++) LocalExecuteInvalidationMessage(&txn->invalidations[i]); + + /* Invalidate current schema as well */ + txn->is_schema_sent = false; } /* @@ -2895,6 +2899,23 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * We read catalog changes from WAL, which are not yet sent, so + * invalidate current schema in order output plugin can resend + * schema again. + */ + txn->is_schema_sent = false; + + /* + * TOCHECK: Mark toplevel transaction as having catalog changes too + * if one of its children has. + */ + if (txn->toptxn != NULL) + { + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + txn->toptxn->is_schema_sent = false; + } } /* @@ -3476,7 +3497,15 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * using snapshot half-way through the subxact. */ command_id = txn->command_id; - snapshot_now = txn->snapshot_now; + + /* + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all + * information about subtransactions, which could arrive after streaming start. + */ + if (!txn->is_schema_sent) + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + // snapshot_now = txn->snapshot_now; } /* @@ -3522,7 +3551,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Enforce correct ordering of changes, merged from multiple * subtransactions. The changes may have the same LSN due to - * MULTI_INSERT xllog records. + * MULTI_INSERT xlog records. */ if (prev_lsn != InvalidXLogRecPtr) Assert(prev_lsn <= change->lsn); @@ -3731,6 +3760,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) snapshot_now = change->data.snapshot; } + /* + * TOCHECK: Snapshot changed, then invalidate current schema to reflect + * possible catalog changes. + */ + txn->is_schema_sent = false; /* and continue with the new one */ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, @@ -3868,7 +3902,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) rb->streamTxns += (rbtxn_is_streamed(txn)) ? 1 : 0; rb->streamBytes += txn->size; - elog(WARNING, "updating stream stats %p %ld %ld %ld", + elog(INFO, "updating stream stats %p %ld %ld %ld", rb, rb->streamCount, rb->streamTxns, txn->size); /* @@ -4950,7 +4984,7 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, CommandId *cmin, CommandId *cmax) { ReorderBufferTupleCidKey key; - ReorderBufferTupleCidEnt *ent; + ReorderBufferTupleCidEnt *ent = NULL; ForkNumber forkno; BlockNumber blockno; bool updated_mapping = false; @@ -4974,11 +5008,16 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, &key.tid); restart: - ent = (ReorderBufferTupleCidEnt *) - hash_search(tuplecid_data, - (void *) &key, - HASH_FIND, - NULL); + /* + * TOCHECK: If tuplecid_data is NULL, then we are not able to resolve cmin/cmax, + * so try to update mappings and return false. + */ + if (tuplecid_data != NULL) + ent = (ReorderBufferTupleCidEnt *) + hash_search(tuplecid_data, + (void *) &key, + HASH_FIND, + NULL); /* * failed to find a mapping, check whether the table was rewritten and diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eaefba2049..adf69a5a38 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2444,10 +2444,21 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) { MemoryContext oldcxt; - stream_cleanup_files(subid, xid); + /* + * TOCHECK: If nxids=0, then we have nothing to clean up. + */ + if (nxids > 0) + stream_cleanup_files(subid, xid); oldcxt = MemoryContextSwitchTo(TopMemoryContext); + /* TOCHECK: Initialize xids array if it is the first run. */ + if (xids == NULL) + { + maxnxids = 64; + xids = palloc(maxnxids * sizeof(TransactionId)); + } + /* * We need to remember the XIDs we spilled to files, so that we can * remove them at worker exit (e.g. after DROP SUBSCRIPTION). @@ -2462,16 +2473,8 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) */ if (nxids == maxnxids) /* array of XIDs is full */ { - if (!xids) - { - maxnxids = 64; - xids = palloc(maxnxids * sizeof(TransactionId)); - } - else - { - maxnxids = 2 * maxnxids; - xids = repalloc(xids, maxnxids * sizeof(TransactionId)); - } + maxnxids = 2 * maxnxids; + xids = repalloc(xids, maxnxids * sizeof(TransactionId)); } xids[nxids++] = xid; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 04432ffb57..d5a3cf6308 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -370,7 +370,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void maybe_send_schema(LogicalDecodingContext *ctx, TransactionId topxid, TransactionId xid, - Relation relation, RelationSyncEntry *relentry) + Relation relation, RelationSyncEntry *relentry, ReorderBufferTXN *txn) { bool schema_sent = relentry->schema_sent; @@ -381,7 +381,15 @@ maybe_send_schema(LogicalDecodingContext *ctx, * that we don't know at this point. */ if (in_streaming) - schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + { + /* + * TOCHECK: We have to send schema after each catalog change and it may + * occur when streaming already started, so we have to track new catalog + * changes somehow. + */ + schema_sent = txn->is_schema_sent; + // schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + } if (!schema_sent) { @@ -415,7 +423,9 @@ maybe_send_schema(LogicalDecodingContext *ctx, relentry->xid = xid; if (in_streaming) - set_schema_sent_in_streamed_txn(relentry, topxid); + /* TOCHECK: Maybe change flag location? */ + txn->is_schema_sent = true; + // set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; } @@ -479,7 +489,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, topxid, xid, relation, relentry); + maybe_send_schema(ctx, topxid, xid, relation, relentry, txn); /* Send the data */ switch (change->action) @@ -569,7 +579,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; - maybe_send_schema(ctx, topxid, xid, relation, relentry); + maybe_send_schema(ctx, topxid, xid, relation, relentry, txn); } if (nrelids > 0) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index be92f3e5f5..de709250c0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3621,7 +3621,7 @@ UpdateSpillStats(LogicalDecodingContext *ctx) MyWalSnd->streamCount = rb->streamCount; MyWalSnd->streamBytes = rb->streamBytes; - elog(WARNING, "UpdateSpillStats: updating stats %p %ld %ld %ld %ld %ld %ld", + elog(INFO, "UpdateSpillStats: updating stats %p %ld %ld %ld %ld %ld %ld", rb, rb->spillTxns, rb->spillCount, rb->spillBytes, rb->streamTxns, rb->streamCount, rb->streamBytes); diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index f7c4c9188c..b905077164 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -1692,8 +1692,12 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * TOCHECK: If we accidentally see a tuple from our transaction, but cannot resolve its + * cmin, so probably it is from the future, thus drop it. + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1763,10 +1767,12 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * TOCHECK: If we accidentally see a tuple from our transaction, but cannot resolve its + * cmax or cmax == InvalidCommandId, so probably it is still valid, thus accept it. + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bf5766c1e4..29dba6673c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -220,6 +220,11 @@ typedef struct ReorderBufferTXN /* In case of 2PC we need to pass GID to output plugin */ char *gid; + /* + * Do we need to send schema for this transaction in output plugin? + */ + bool is_schema_sent; + /* * Toplevel transaction for this subxact (NULL for top-level). */ diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/011_stream_simple.pl similarity index 98% rename from src/test/subscription/t/009_stream_simple.pl rename to src/test/subscription/t/011_stream_simple.pl index 4d01f7e5ec..f0aae1041a 100644 --- a/src/test/subscription/t/009_stream_simple.pl +++ b/src/test/subscription/t/011_stream_simple.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/012_stream_subxact.pl similarity index 98% rename from src/test/subscription/t/010_stream_subxact.pl rename to src/test/subscription/t/012_stream_subxact.pl index 1a8b8ffe9e..00dd60b91f 100644 --- a/src/test/subscription/t/010_stream_subxact.pl +++ b/src/test/subscription/t/012_stream_subxact.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/013_stream_ddl.pl similarity index 98% rename from src/test/subscription/t/011_stream_ddl.pl rename to src/test/subscription/t/013_stream_ddl.pl index 04af0900ac..ecaf4383b1 100644 --- a/src/test/subscription/t/011_stream_ddl.pl +++ b/src/test/subscription/t/013_stream_ddl.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/014_stream_tough_ddl.pl b/src/test/subscription/t/014_stream_tough_ddl.pl new file mode 100644 index 0000000000..02969c7260 --- /dev/null +++ b/src/test/subscription/t/014_stream_tough_ddl.pl @@ -0,0 +1,98 @@ +# Test streaming of large transaction with DDL, subtransactions and rollbacks. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d text, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 1000) s(i); +SAVEPOINT s2; +ALTER TABLE test_tab ADD COLUMN c INT; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(1001, 2000) s(i); +SAVEPOINT s3; +ALTER TABLE test_tab ADD COLUMN d text; +SAVEPOINT s4; +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text) FROM generate_series(2001, 3000) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(3001, 4000) s(i); +SAVEPOINT s10; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(4001, 5000) s(i); +ALTER TABLE test_tab ADD COLUMN d text; +ROLLBACK TO SAVEPOINT s10; +RELEASE SAVEPOINT s10; +SAVEPOINT s10; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(5001, 6000) s(i); +SAVEPOINT s6; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(6001, 7000) s(i); +SAVEPOINT s7; +ALTER TABLE test_tab ADD COLUMN d text; +INSERT INTO test_tab (a, b, c, d, e) SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(7001, 8000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(a), count(b), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(7000|7000|7000|6000|4000|4000), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/015_stream_subxact_abort.pl similarity index 97% rename from src/test/subscription/t/012_stream_subxact_abort.pl rename to src/test/subscription/t/015_stream_subxact_abort.pl index 6fecfe6fe7..dcec081fa1 100644 --- a/src/test/subscription/t/012_stream_subxact_abort.pl +++ b/src/test/subscription/t/015_stream_subxact_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/016_stream_subxact_ddl_abort.pl similarity index 97% rename from src/test/subscription/t/013_stream_subxact_ddl_abort.pl rename to src/test/subscription/t/016_stream_subxact_ddl_abort.pl index 50990c170c..41ad2b668c 100644 --- a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/016_stream_subxact_ddl_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" ); wait_for_caught_up($node_publisher, $appname); -- 2.17.1