From 1b14cc43a16a3bd0296f7b1564707b9ac44b9de4 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Thu, 24 Mar 2022 17:46:55 +0000 Subject: [PATCH 05/10] Support replication of CREATE ... AS .. and SELECT ... INTO ... statements. The idea is to force skipping the direct data population (which could potentially cause data mismatch compared to the publisher) with these command on the subscriber by force setting the skipData flag in the intoClause of the parsetree after the logical replication worker parses the command. The data sync will be taken care of by DML replication after the replication of the DDL command. --- src/backend/replication/logical/worker.c | 51 ++++++++++++++++++++---- src/backend/tcop/utility.c | 41 +++++++++++-------- src/test/subscription/t/030_rep_ddls.pl | 24 +++++++++++ 3 files changed, 92 insertions(+), 24 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c25e3a4cfe..905f47faf5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2542,23 +2542,58 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear Portal portal; DestReceiver *receiver; bool snapshot_set = false; - char *schemaname = NULL; /* For CREATE TABLE stmt only */ - char *relname = NULL; /* For CREATE TABLE stmt only */ + char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ commandTag = CreateCommandTag((Node *)command); /* - * Remember the schemaname and relname if it's a CREATE TABLE stmt + * Remember the schemaname and relname if the cmd is going to create a table * because we will need them for some post-processing after we - * execute the stmt. At that point, CreateStmt may have been freeed up. + * execute the stmt. At that point, command->stmt may have been freeed up. */ if (commandTag == CMDTAG_CREATE_TABLE) { - CreateStmt *cstmt = (CreateStmt *)command->stmt; + CreateStmt *cstmt = (CreateStmt *) command->stmt; RangeVar *rv = cstmt->relation; schemaname = rv->schemaname; relname = rv->relname; } + else if (commandTag == CMDTAG_CREATE_TABLE_AS) + { + CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt; + + if (castmt->objtype == OBJECT_TABLE) + { + RangeVar *rv = castmt->into->rel; + schemaname = rv->schemaname; + relname = rv->relname; + + /* + * Force skipping data population to avoid data inconsistency. + * Data should be replicated from the publisher instead. + */ + castmt->into->skipData = true; + } + } + /* SELECT INTO */ + else if (commandTag == CMDTAG_SELECT) + { + SelectStmt *sstmt = (SelectStmt *) command->stmt; + + if (sstmt->intoClause != NULL) + { + RangeVar *rv = sstmt->intoClause->rel; + schemaname = rv->schemaname; + relname = rv->relname; + + /* + * Force skipping data population to avoid data inconsistency. + * Data should be replicated from the publisher instead. + */ + sstmt->intoClause->skipData = true; + } + } /* * Set up a snapshot if parse analysis/planning will need one. @@ -2657,12 +2692,12 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear * Table created by DDL replication (database level) is automatically * added to the subscription here. * - * Call AddSubscriptionRelState for CREATE TABEL command to set - * the relstate to SUBREL_STATE_INIT so DML changes on this + * Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS + * command to set the relstate to SUBREL_STATE_INIT so DML changes on this * new table can be replicated without having to manually run * "alter subscription ... refresh publication" */ - if (commandTag == CMDTAG_CREATE_TABLE) + if (relname != NULL) { Oid relid; Oid relnamespace = InvalidOid; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index fdda1385c3..34c7db8cc9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1142,24 +1142,31 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) /* * CreateTableAsStmt can create either a table a materialized view - * and they are handled differently. */ case T_CreateTableAsStmt: { CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree; + switch(stmt->objtype) { + /* + * Either CREATE TABLE AS or SELECT ... INTO stmt + * The statement is logged as is, but when we apply the + * CREATE TABLE AS or SELECT ... INTO statemtns on the logical + * replication worker, we will force the skipData flag in the + * intoClause. This way we avoid direct data population on the + * subsriber with the execution of these commands which can + * potentially cause data mismatch bewteen the publisher. + * + * The data sync will be handled by DML replication after the + * target table has been created. + */ case OBJECT_TABLE: - /* - * FIXME CREATE TABLE AS stmt needs to be broken down into two parts - * 1. A normal CREATE TABLE string that get's logged and replicated via - * DDL replication. - * 2. Insertions that get replicated by DML replication. - */ - break; + + /* CREATE MATERIALIZED VIEW */ case OBJECT_MATVIEW: /* - * Log CREATE MATERIALIZED VIEW AS stmt for logical replication if + * Log these stmt for logical replication if * there is any FOR ALL TABLES publication with pubddl_database on. * i.e. Database level DDL replication is on for some publication. */ @@ -1304,8 +1311,17 @@ ProcessUtilitySlow(ParseState *pstate, PG_TRY(); { if (isCompleteQuery) + { EventTriggerDDLCommandStart(parsetree); + /* + * Consider logging the DDL command if logical logging is enabled and this is + * a complete top level query. + */ + if (XLogLogicalInfoActive() && isTopLevel) + LogLogicalDDLCommand(parsetree, queryString); + } + switch (nodeTag(parsetree)) { /* @@ -2135,13 +2151,6 @@ ProcessUtilitySlow(ParseState *pstate, if (isCompleteQuery) { - /* - * Consider logging the DDL command if logical logging is enabled and this is - * a complete top level query. - */ - if (XLogLogicalInfoActive() && isTopLevel) - LogLogicalDDLCommand(parsetree, queryString); - EventTriggerSQLDrop(parsetree); EventTriggerDDLCommandEnd(parsetree); } diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl index 562efe2cf7..3b15c6d9f0 100644 --- a/src/test/subscription/t/030_rep_ddls.pl +++ b/src/test/subscription/t/030_rep_ddls.pl @@ -244,6 +244,30 @@ $node_publisher->wait_for_catchup('mysub'); $result_sub = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.view1;"); is($result, qq($result_sub), 'CREATE of s1.view1 is replicated'); +# TEST CREATE TABLE AS stmt +$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t3 AS SELECT a, b from s1.t1;"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t3;"); +is($result, qq(2), 'CREATE TABLE s1.t3 AS is replicated with data'); + +# TEST CREATE TABLE AS stmt ... WITH NO DATA +$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t4 AS SELECT a, b from s1.t1 WITH NO DATA;"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t4;"); +is($result, qq(0), 'CREATE TABLE s1.t4 AS is replicated with no data'); + +# TEST SELECT INTO stmt +$node_publisher->safe_psql('postgres', "SELECT b into s1.t6 from s1.t1 where a > 1"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t6;"); +is($result, qq(1), 'SELECT INTO s1.t6 is replicated with data'); + #TODO TEST certain DDLs are not replicated pass "DDL replication tests passed!"; -- 2.32.0