From ed3e8e3ba693908873a3a7c8b89af1bb2969c884 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Fri, 18 Mar 2022 17:17:39 +0000 Subject: [PATCH v3 3/3] Logical replication of DDL messages Integration with pgoutput: Supports sending and receiving the DDL message using the logical replication wire protocol. A new LogicalRepMsgType is introduced for this purpose: LOGICAL_REP_MSG_DDLMESSAGE = 'L'. Logical replication worker change: Supports execution of the DDL command in the original user role and search_path. For any new table created this way, we also set its srsubstate in the pg_subscription_rel catalog to SUBREL_STATE_INIT, So that DML replication can progress on this new table without manually running "ALTER SUBSCRIPTION ... REFRESH PUBLICATION". TAP test: A new TAP test 030_rep_ddl.pl is added. We mainly focused on testing the happy path of database level replication so far. Corner case DDLs and table level DDL replication are still to be carefully tested. --- src/backend/replication/logical/proto.c | 63 ++++- src/backend/replication/logical/worker.c | 264 ++++++++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 69 ++++- src/include/replication/logicalproto.h | 10 +- src/test/subscription/t/004_sync.pl | 2 +- src/test/subscription/t/006_rewrite.pl | 2 +- src/test/subscription/t/008_diff_schema.pl | 2 +- src/test/subscription/t/009_matviews.pl | 2 +- src/test/subscription/t/012_collation.pl | 2 +- src/test/subscription/t/013_partition.pl | 8 +- src/test/subscription/t/030_rep_ddls.pl | 237 ++++++++++++++++++ 11 files changed, 646 insertions(+), 15 deletions(-) create mode 100644 src/test/subscription/t/030_rep_ddls.pl diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index c9b0eeefd7..762b897546 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -626,8 +626,8 @@ logicalrep_read_truncate(StringInfo in, */ void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, - const char *message) + bool transactional, const char *prefix, + Size sz, const char *message) { uint8 flags = 0; @@ -648,6 +648,63 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, pq_sendbytes(out, message, sz); } +/* + * Read DDL MESSAGE from stream + */ +const char * +logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, + const char **prefix, + const char **role, + const char **search_path, + bool *transactional, + Size *sz) +{ + uint8 flags; + const char *msg; + + //TODO double check when do we need to get TransactionId. + + flags = pq_getmsgint(in, 1); + *transactional = (flags & MESSAGE_TRANSACTIONAL) > 0; + *lsn = pq_getmsgint64(in); + *prefix = pq_getmsgstring(in); + *role = pq_getmsgstring(in); + *search_path = pq_getmsgstring(in); + *sz = pq_getmsgint(in, 4); + msg = pq_getmsgbytes(in, *sz); + + return msg; +} + +/* + * Write DDL MESSAGE to stream + */ +void +logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, const char *role, + const char *search_path, Size sz, const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendstring(out, role); + pq_sendstring(out, search_path); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + /* * Write relation description to the output stream. */ @@ -1185,6 +1242,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "TYPE"; case LOGICAL_REP_MSG_MESSAGE: return "MESSAGE"; + case LOGICAL_REP_MSG_DDLMESSAGE: + return "DDL"; case LOGICAL_REP_MSG_BEGIN_PREPARE: return "BEGIN PREPARE"; case LOGICAL_REP_MSG_PREPARE: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db..5972748b5e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -156,6 +156,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/optimizer.h" +#include "parser/analyze.h" #include "pgstat.h" #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" @@ -180,6 +181,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "tcop/tcopprot.h" +#include "tcop/pquery.h" +#include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -346,6 +349,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation); +static void apply_execute_sql_command(const char *cmdstr, + const char* role, + const char* search_path, + bool isTopLevel); /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); @@ -2446,6 +2453,259 @@ apply_handle_truncate(StringInfo s) end_replication_step(); } +/* + * Handle generic messages. + */ +static void +apply_handle_ddlmessage(StringInfo s) +{ + XLogRecPtr lsn; + bool transactional; + Size sz; + const char *prefix; + const char *role; + const char *search_path; + const char *msg; + + msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz); + + apply_execute_sql_command(msg, role, search_path, true); +} + +/* + * Add context to the errors produced by apply_execute_sql_command(). + */ +static void +execute_sql_command_error_cb(void *arg) +{ + errcontext("during execution of SQL statement: %s", (char *) arg); +} + +/* + * Execute an SQL command. This can be multiple queries. + * This is modified based on pglogical_execute_sql_command(). + */ +static void +apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path, + bool isTopLevel) +{ + const char *save_debug_query_string = debug_query_string; + List *parsetree_list; + ListCell *parsetree_item; + MemoryContext oldcontext; + ErrorContextCallback errcallback; + int save_nestlevel; + + /* + * Switch to appropriate context for constructing parsetrees. + */ + oldcontext = MemoryContextSwitchTo(ApplyMessageContext); + begin_replication_step(); + + /* + * Set the current role to the user that executed the command on the + * publication server. + * Set the current search_path to the search_path on the publication + * server when the command was executed. + */ + save_nestlevel = NewGUCNestLevel(); + SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE); + SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE); + + errcallback.callback = execute_sql_command_error_cb; + errcallback.arg = (char *) cmdstr; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + debug_query_string = cmdstr; + + parsetree_list = pg_parse_query(cmdstr); + + /* + * Do a limited amount of safety checking against CONCURRENTLY commands + * executed in situations where they aren't allowed. The sender side should + * provide protection, but better be safe than sorry. + */ + isTopLevel = isTopLevel && (list_length(parsetree_list) == 1); + + /* + * Switch back to transaction context to enter the loop. + */ + MemoryContextSwitchTo(oldcontext); + + foreach(parsetree_item, parsetree_list) + { + List *plantree_list; + List *querytree_list; + RawStmt *command = (RawStmt *) lfirst(parsetree_item); + CommandTag commandTag; + MemoryContext per_parsetree_context = NULL; + Portal portal; + DestReceiver *receiver; + bool snapshot_set = false; + char *schemaname = NULL; /* For CREATE TABLE stmt only */ + char *relname = NULL; /* For CREATE TABLE stmt only */ + + commandTag = CreateCommandTag((Node *)command); + + /* + * Remember the schemaname and relname if it's a CREATE TABLE stmt + * because we will need them for some post-processing after we + * execute the stmt. At that point, CreateStmt may have been freeed up. + */ + if (commandTag == CMDTAG_CREATE_TABLE) + { + CreateStmt *cstmt = (CreateStmt *)command->stmt; + RangeVar *rv = cstmt->relation; + schemaname = rv->schemaname; + relname = rv->relname; + } + + /* + * Set up a snapshot if parse analysis/planning will need one. + */ + if (analyze_requires_snapshot(command)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + /* + * OK to analyze, rewrite, and plan this query. + * + * Switch to appropriate context for constructing query and plan trees + * (these can't be in the transaction context, as that will get reset + * when the command is COMMIT/ROLLBACK). If we have multiple + * parsetrees, we use a separate context for each one, so that we can + * free that memory before moving on to the next one. But for the + * last (or only) parsetree, just use MessageContext, which will be + * reset shortly after completion anyway. In event of an error, the + * per_parsetree_context will be deleted when MessageContext is reset. + */ + if (lnext(parsetree_list, parsetree_item) != NULL) + { + per_parsetree_context = + AllocSetContextCreate(MessageContext, + "per-parsetree message context", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(per_parsetree_context); + } + else + oldcontext = MemoryContextSwitchTo(ApplyMessageContext); + + querytree_list = pg_analyze_and_rewrite_fixedparams( + command, + cmdstr, + NULL, 0, NULL); + + plantree_list = pg_plan_queries( + querytree_list, cmdstr, 0, NULL); + + /* + * Done with the snapshot used for parsing/planning. + * + * While it looks promising to reuse the same snapshot for query + * execution (at least for simple protocol), unfortunately it causes + * execution to use a snapshot that has been acquired before locking + * any of the tables mentioned in the query. This creates user- + * visible anomalies, so refrain. Refer to + * https://postgr.es/m/flat/5075D8DF.6050500@fuzzy.cz for details. + */ + if (snapshot_set) + PopActiveSnapshot(); + + portal = CreatePortal("logical replication", true, true); + + /* + * We don't have to copy anything into the portal, because everything + * we are passing here is in MessageContext or the + * per_parsetree_context, and so will outlive the portal anyway. + */ + PortalDefineQuery(portal, + NULL, + cmdstr, + commandTag, + plantree_list, + NULL); + + /* + * Start the portal. No parameters here. + */ + PortalStart(portal, NULL, 0, InvalidSnapshot); + + /* DestNone for logical replication */ + receiver = CreateDestReceiver(DestNone); + + /* + * Switch back to transaction context for execution. + */ + MemoryContextSwitchTo(oldcontext); + + (void) PortalRun(portal, + FETCH_ALL, + isTopLevel, + true, + receiver, + receiver, + NULL); + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + + CommandCounterIncrement(); + + /* + * Table created by DDL replication (database level) is automatically + * added to the subscription here. + * + * Call AddSubscriptionRelState for CREATE TABEL command to set + * the relstate to SUBREL_STATE_INIT so DML changes on this + * new table can be replicated without having to manually run + * "alter subscription ... refresh publication" + */ + if (commandTag == CMDTAG_CREATE_TABLE) + { + Oid relid; + Oid relnamespace = InvalidOid; + + if (schemaname != NULL) + relnamespace = get_namespace_oid(schemaname, false); + if (relnamespace != InvalidOid) + relid = get_relname_relid(relname, relnamespace); + else + { + /* + * Try to resolve unqualified relname. + * Notice we have set the search_path to the original search_path on the publisher + * at the beginning of this function. + */ + relid = RelnameGetRelid(relname); + } + + if (relid != InvalidOid) + { + AddSubscriptionRelState(MySubscription->oid, relid, + SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s\" added to subscription \"%s\"", + relname, MySubscription->name))); + } + } + } + + /* + * Restore the GUC variables we set above. + */ + AtEOXact_GUC(true, save_nestlevel); + + /* protect against stack resets during CONCURRENTLY processing */ + if (error_context_stack == &errcallback) + error_context_stack = errcallback.previous; + + debug_query_string = save_debug_query_string; + end_replication_step(); +} /* * Logical replication protocol message dispatcher. @@ -2511,6 +2771,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_DDLMESSAGE: + apply_handle_ddlmessage(s); + break; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5fddab3a3d..ae9c92af2c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -53,6 +53,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, const char *role, + const char *search_path, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -208,6 +212,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->ddlmessage_cb = pgoutput_ddlmessage; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -224,6 +229,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; + cb->stream_ddlmessage_cb = pgoutput_ddlmessage; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; @@ -1413,8 +1419,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, - const char *message) + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size sz, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; @@ -1440,6 +1446,57 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char * role, + const char *search_path, Size sz, const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + ListCell *lc; + + /* Reload publications if needed before use. */ + if (!publications_valid) + { + MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext); + if (data->publications) + list_free_deep(data->publications); + + data->publications = LoadPublications(data->publication_names); + MemoryContextSwitchTo(oldctx); + publications_valid = true; + } + + /* Check if ddl replication is turned on for the publications */ + foreach(lc, data->publications) + { + Publication *pub = (Publication *) lfirst(lc); + /* TODO need to check relid for table level DDLs */ + if (!pub->pubactions.pubddl_database && !pub->pubactions.pubddl_table) + return; + } + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_ddlmessage(ctx->out, + xid, + message_lsn, + transactional, + prefix, + role, + search_path, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ @@ -1725,7 +1782,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = + entry->pubactions.pubddl_database = + entry->pubactions.pubddl_table = false; entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); @@ -1780,6 +1839,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->pubactions.pubddl_database = false; + entry->pubactions.pubddl_table = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). @@ -1889,6 +1950,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + entry->pubactions.pubddl_database |= pub->pubactions.pubddl_database; + entry->pubactions.pubddl_table |= pub->pubactions.pubddl_table; /* * We want to publish the changes as the top-most ancestor diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4d2c881644..862ed467a6 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_DDLMESSAGE = 'L', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -229,7 +230,14 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, const char *message); + bool transactional, const char *prefix, + Size sz, const char *message); +extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, const char *role, + const char *search_path, Size sz, const char *message); +extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, + const char **role, const char **search_path, + bool *transactional, Size *sz); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index cf61fc1e0f..698c5114e6 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -33,7 +33,7 @@ $node_subscriber->safe_psql('postgres', # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + "CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl index c924ff35f7..6c9b055eb0 100644 --- a/src/test/subscription/t/006_rewrite.pl +++ b/src/test/subscription/t/006_rewrite.pl @@ -23,7 +23,7 @@ $node_subscriber->safe_psql('postgres', $ddl); my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION mypub FOR ALL TABLES;"); + "CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl index 67b4026afa..2902e6fc34 100644 --- a/src/test/subscription/t/008_diff_schema.pl +++ b/src/test/subscription/t/008_diff_schema.pl @@ -32,7 +32,7 @@ $node_subscriber->safe_psql('postgres', # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + "CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl index 1ce696d4a4..6c586877a1 100644 --- a/src/test/subscription/t/009_matviews.pl +++ b/src/test/subscription/t/009_matviews.pl @@ -19,7 +19,7 @@ $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION mypub FOR ALL TABLES;"); + "CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl index 2182f7948e..e1b5050663 100644 --- a/src/test/subscription/t/012_collation.pl +++ b/src/test/subscription/t/012_collation.pl @@ -76,7 +76,7 @@ $node_subscriber->safe_psql('postgres', # set up publication, subscription $node_publisher->safe_psql('postgres', - q{CREATE PUBLICATION pub1 FOR ALL TABLES}); + q{CREATE PUBLICATION pub1 FOR ALL TABLES WITH (ddl = '')}); $node_subscriber->safe_psql('postgres', qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)} diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 66e63e755e..de967eaaf2 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -25,9 +25,9 @@ $node_subscriber2->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; # publisher -$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 WITH (ddl = '')"); $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub_all FOR ALL TABLES"); + "CREATE PUBLICATION pub_all FOR ALL TABLES WITH (ddl = '')"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_publisher->safe_psql('postgres', @@ -424,12 +424,12 @@ $node_publisher->safe_psql('postgres', # and child tables are present but changes will be replicated via the parent's # identity and only once. $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)" + "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true, ddl = '')" ); # for tab4, we publish changes through the "middle" partitioned table $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)" + "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true, ddl = '')" ); # prepare data for the initial sync diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl new file mode 100644 index 0000000000..c88c4ea1c0 --- /dev/null +++ b/src/test/subscription/t/030_rep_ddls.pl @@ -0,0 +1,237 @@ + +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Regression tests for logical replication of DDLs +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'autovacuum = off'); +$node_publisher->start; + +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', 'autovacuum = off'); +$node_subscriber->start; + +my $node_subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->append_conf('postgresql.conf', 'autovacuum = off'); +$node_subscriber2->start; + +my $ddl = "CREATE TABLE test_rep(id int primary key, name varchar);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (1, 'data1');"); +$node_subscriber->safe_psql('postgres', $ddl); +$node_subscriber2->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# mypub has pubddl_database on +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); +# mypub2 has pubddl_database off +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub2 FOR ALL TABLES with (ddl = '');"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' PUBLICATION mypub2;" +); + +$node_publisher->wait_for_catchup('mysub'); + +# Test simple CREATE TABLE command is replicated to subscriber +# Test smae simple CREATE TABLE command is not replicated to subscriber2 (ddl off) +# Test ALTER TABLE command is replicated on table test_rep +# Test CREATE INDEX is replicated to subscriber +# Test CREATE FUNCTION command is replicated to subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);"); +$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD c3 int;"); +$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (2, 'data2', 2);"); +$node_publisher->safe_psql('postgres', "CREATE INDEX nameindex on test_rep (name)"); +$node_publisher->safe_psql('postgres', qq{CREATE OR REPLACE FUNCTION totalRecords() +RETURNS integer AS \$total\$ +declare + total integer; +BEGIN + SELECT count(*) into total FROM test_rep; + RETURN total; +END; +\$total\$ LANGUAGE plpgsql;}); + +$node_publisher->wait_for_catchup('mysub'); + +my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t1"); +is($result, qq(0), 'CREATE of t1 replicated to subscriber'); +$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't1';"); +is($result, qq(0), 'CREATE of t1 is not replicated to subscriber2'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep WHERE c3 =2;"); +is($result, qq(1), 'ALTER test_rep ADD replicated'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'"); +is($result, qq(1), 'CREATE INDEX nameindex replicated'); +$result = $node_subscriber->safe_psql('postgres', "SELECT totalRecords();"); +is($result, qq(2), 'CREATE of function totalRecords replicated to subscriber'); +$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';"); +is($result, qq(0), 'CREATE FUNCTION totalrecords is not replicated to subscriber2'); + +# Test ALTER TABLE DROP +# Test DROP INDEX +# Test DROP FUNCTION +$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;"); +$node_publisher->safe_psql('postgres', "DELETE FROM test_rep where id = 2;"); +$node_publisher->safe_psql('postgres', "DROP INDEX nameindex;"); +$node_publisher->safe_psql('postgres', "DROP FUNCTION totalRecords;"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;"); +is($result, qq(1), 'ALTER test_rep DROP replicated'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'"); +is($result, qq(0), 'DROP INDEX nameindex replicated'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';"); +is($result, qq(0), 'DROP FUNCTION totalrecords replicated'); + + +# TODO figure out how to set ON_ERROR_STOP = 0 in this test +# Test failed CREATE/ALTER TABLE on publisher doesn't break replication +# Table t1 already exits so expect the command to fail +#$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);"); +#$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;"); +#$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (103, 'data103', 1013);"); + +#$node_publisher->wait_for_catchup('mysub'); +# Verify replication still works +#$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;"); +#is($result, qq(1), 'DELETE from test_rep replicated'); + +# Test DDLs inside txn block +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +CREATE TABLE t2 (a int, b varchar); +ALTER TABLE test_rep ADD c3 int; +INSERT INTO test_rep VALUES (3, 'data3', 3); +COMMIT;}); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t2;"); +is($result, qq(0), 'CREATE t2 replicated'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep;"); +is($result, qq(2), 'ALTER test_rep ADD replicated'); + +# Test toggling pubddl_database option off +$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = '');"); +$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';"); +is($result, qq(f|f), 'pubddl_database turned off on mypub'); +$node_publisher->safe_psql('postgres', "CREATE TABLE t3 (a int, b varchar);"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't3';"); +is($result, qq(0), 'CREATE t3 is not replicated'); + +# Test toggling pubddl_database option on +$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = 'database');"); +$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';"); +is($result, qq(t|t), 'pubddl_database turned on on mypub'); + +$node_publisher->safe_psql('postgres', "CREATE TABLE t4 (a int, b varchar);"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't4';"); +is($result, qq(1), 'CREATE t4 is replicated'); + +# Test DML changes on the new table t4 are replicated +$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (1, 'a')"); +$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (2, 'b')"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t4;"); +is($result, qq(2), 'DML Changes to t4 are replicated'); + +# A somewhat complicated test in plpgsql block with trigger +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +CREATE TABLE foo (a int); +CREATE INDEX foo_idx ON foo (a); +ALTER TABLE foo ADD COLUMN b timestamptz; +CREATE FUNCTION foo_ts() +RETURNS trigger AS $$ +BEGIN +NEW.b := current_timestamp; +RETURN NEW; +END; +$$ +LANGUAGE plpgsql; +CREATE TRIGGER foo_ts BEFORE INSERT OR UPDATE ON foo +FOR EACH ROW EXECUTE FUNCTION foo_ts(); +INSERT INTO foo VALUES (1); +COMMIT;}); +$result = $node_publisher->safe_psql('postgres', "SELECT b from foo where a = 1;"); + +$node_publisher->wait_for_catchup('mysub'); + +my $result_sub = $node_subscriber->safe_psql('postgres', "SELECT b from foo where a = 1;"); +is($result, qq($result_sub), 'timestamp of insert matches'); + +# Test CREATE SCHEMA stmt is replicated +$node_publisher->safe_psql('postgres', "CREATE SCHEMA s1"); +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 's1';"); +is($result, qq(1), 'CREATE SCHEMA s1 is replicated'); + +# Test CREATE TABLE in new schema s1 followed by insert +$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t1 (a int, b varchar);"); +$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (1, 'a');"); +$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (2, 'b');"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t1;"); +is($result, qq(2), 'CREATE TABLE s1.t1 is replicated'); + +# Test replication works as expected with mismatched search_path on publisher and subscriber +$node_publisher->append_conf('postgresql.conf', 'search_path = \'s1, public\''); +$node_publisher->restart; +# CREATE unqualified table t2, it is s1.t2 under the modified search_path +$node_publisher->safe_psql('postgres', "CREATE TABLE t2 (a int, b varchar);"); +$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (1, 'a');"); +$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (2, 'b');"); +$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (3, 'c');"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t2;"); +is($result, qq(3), 'CREATE TABLE s1.t2 is replicated'); + +# Test owner of new table on subscriber matches the owner on publisher +$node_publisher->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;"); + +$node_subscriber->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;"); + +$node_publisher->safe_psql('postgres', "SET SESSION AUTHORIZATION 'ddl_replication_user'; CREATE TABLE t5 (a int, b varchar);"); +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 't5';"); +is($result, qq(ddl_replication_user), 'Owner of t5 is correct'); + +#TODO TEST certain DDLs are not replicated + +pass "DDL replication tests passed!"; + +$node_subscriber->stop; +$node_subscriber2->stop; +$node_publisher->stop; + +done_testing(); -- 2.25.1