From dd86acbf56e329cb0be653ac2dede08012b44631 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 29 Jun 2022 17:04:32 +0800 Subject: [PATCH] Support CREATE TABLE AS SELECT INTO The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. In this apporach, we don't execute the subquery on subscriber so that don't need to make sure all the objects referenced in the subquery also exists in subscriber. And This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]) Introducing a new type of event trigger "table_init_write". which would be fired for CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_init_write event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 10 ++ src/backend/commands/ddl_deparse.c | 24 +++++ src/backend/commands/event_trigger.c | 172 ++++++++++++++++++++++++++++++++- src/backend/commands/publicationcmds.c | 9 ++ src/backend/tcop/utility.c | 2 + src/backend/utils/cache/evtcache.c | 2 + src/include/catalog/pg_proc.dat | 3 + src/include/commands/event_trigger.h | 4 + src/include/tcop/deparse_utility.h | 9 +- src/include/utils/evtcache.h | 3 +- 10 files changed, 232 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 9abbb6b..989e894 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,15 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + { + /* + * Fire the trigger for table_init_write after creating the table so + * that we can access the catalog info about the newly created table in + * the trigger function. + */ + EventTriggerTableInitWrite((Node *) create, intoRelationAddr); + } return intoRelationAddr; } diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index cbaa146..aceb815 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -3409,6 +3409,27 @@ deparse_AlterTableStmt(CollectedCommand *cmd) } /* + * Deparse CREATE TABLE AS command. + * + * deparse_CreateStmt do the actual work as we deparse the final CreateStmt for + * CREATE TABLE AS command. + */ +static ObjTree * +deparse_CreateTableAsStmt(CollectedCommand *cmd) +{ + Oid objectId; + Node *parsetree; + + Assert(cmd->type == SCT_CreateTableAs); + + parsetree = cmd->d.ctas.real_create; + objectId = cmd->d.ctas.address.objectId; + + return deparse_CreateStmt(objectId, parsetree); +} + + +/* * Handle deparsing of simple commands. * * This function should cover all cases handled in ProcessUtilitySlow. @@ -3545,6 +3566,9 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) case SCT_AlterTable: tree = deparse_AlterTableStmt(cmd); break; + case SCT_CreateTableAs: + tree = deparse_CreateTableAsStmt(cmd); + break; default: elog(ERROR, "unexpected deparse node type %d", cmd->type); } diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index bd6a359..ed6cc63 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_init_write") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_init_write") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -585,7 +587,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableInitWrite) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -868,6 +871,163 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } + +/* + * EventTriggerTableInitWriteStart + * Prepare to receive data on an CREATE TABLE AS/SELET INTO command about + * to be executed. + */ +void +EventTriggerTableInitWriteStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = SCT_CreateTableAs; + command->in_extension = creating_extension; + command->d.ctas.address = InvalidObjectAddress; + command->d.ctas.real_create = NULL; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerTableInitWriteEnd + * Finish up saving an CREATE TABLE AS/SELECT INTO command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerTableInitWriteEnd(void) +{ + CollectedCommand *parent; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + parent = currentEventTriggerState->currentCommand->parent; + + pfree(currentEventTriggerState->currentCommand); + + currentEventTriggerState->currentCommand = parent; +} + +/* + * publication_deparse_table_init_write + * + * Deparse the ddl table create command and log it. + */ +Datum +publication_deparse_table_init_write(PG_FUNCTION_ARGS) +{ + char relpersist; + CollectedCommand *cmd; + char *json_string; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + cmd = currentEventTriggerState->currentCommand; + Assert(cmd); + + relpersist = get_rel_persistence(cmd->d.simple.address.objectId); + + /* + * Do not generate wal log for commands whose target table is a temporary + * table. + * + * We will generate wal logs for unlogged tables so that unlogged tables + * can also be created and altered on the subscriber side. This makes it + * possible to directly replay the SET LOGGED command and the incoming + * rewrite message without creating a new table. + */ + if (relpersist == RELPERSISTENCE_TEMP) + return PointerGetDatum(NULL); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd, false); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); +} + +/* + * Fire table_init_rewrite triggers. + */ +void +EventTriggerTableInitWrite(Node *real_create, ObjectAddress address) +{ + List *runlist; + EventTriggerData trigdata; + CollectedCommand *command; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + /* Do nothing if no command was collected. */ + if (!currentEventTriggerState->currentCommand) + return; + + command = currentEventTriggerState->currentCommand; + + runlist = EventTriggerCommonSetup(command->parsetree, + EVT_TableInitWrite, + "table_init_write", + &trigdata); + if (runlist == NIL) + return; + + /* Set the real CreateTable statment and object address */ + command->d.ctas.real_create = real_create; + command->d.ctas.address = address; + + /* Run the triggers. */ + EventTriggerInvoke(runlist, &trigdata); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1148,7 +1308,8 @@ trackDroppedObjectsNeeded(void) */ return list_length(EventCacheLookup(EVT_SQLDrop)) > 0 || list_length(EventCacheLookup(EVT_TableRewrite)) > 0 || - list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0; + list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0 || + list_length(EventCacheLookup(EVT_TableInitWrite)) > 0; } /* @@ -1872,6 +2033,7 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) case SCT_AlterOpFamily: case SCT_CreateOpClass: case SCT_AlterTSConfig: + case SCT_CreateTableAs: { char *identity; char *type; @@ -1889,6 +2051,8 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) addr = cmd->d.createopc.address; else if (cmd->type == SCT_AlterTSConfig) addr = cmd->d.atscfg.address; + else if (cmd->type == SCT_AlterTSConfig) + addr = cmd->d.ctas.address; /* * If an object was dropped in the same command we may end diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 12d4b2e..ff75999 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -966,6 +966,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CMDTAG_ALTER_INDEX }; + CommandTag init_commands[] = { + CMDTAG_CREATE_TABLE_AS, + CMDTAG_SELECT_INTO + }; + /* Create the ddl_command_end event trigger */ CreateDDLReplicaEventTrigger("ddl_command_end", end_commands, lengthof(end_commands), myself, puboid); @@ -977,6 +982,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Create the table_rewrite event trigger */ CreateDDLReplicaEventTrigger("table_rewrite", rewrite_commands, lengthof(rewrite_commands), myself, puboid); + + /* Create the table_init_write event trigger */ + CreateDDLReplicaEventTrigger("table_init_write", init_commands, + lengthof(init_commands), myself, puboid); } table_close(rel, RowExclusiveLock); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6b0a865..490b73b 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1665,8 +1665,10 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerTableInitWriteStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerTableInitWriteEnd(); break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index f7f7165..7fb8cb2 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_init_write") == 0) + event = EVT_TableInitWrite; else continue; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 571b9b6..a4adc9a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11901,4 +11901,7 @@ { oid => '4646', descr => 'trigger for ddl command deparse table rewrite', proname => 'publication_deparse_table_rewrite', prorettype => 'event_trigger', proargtypes => '', prosrc => 'publication_deparse_table_rewrite' }, +{ oid => '4647', descr => 'trigger for ddl command deparse table init', + proname => 'publication_deparse_table_init_write', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_deparse_table_init_write' }, ] diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index fd2ee7f..a9e0f71 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerTableInitWriteStart(Node *parsetree); +extern void EventTriggerTableInitWrite(Node *parsetree, ObjectAddress address); +extern void EventTriggerTableInitWriteEnd(void); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b53294b..3d294a0 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -29,7 +29,8 @@ typedef enum CollectedCommandType SCT_AlterOpFamily, SCT_AlterDefaultPrivileges, SCT_CreateOpClass, - SCT_AlterTSConfig + SCT_AlterTSConfig, + SCT_CreateTableAs } CollectedCommandType; /* @@ -101,6 +102,12 @@ typedef struct CollectedCommand { ObjectType objtype; } defprivs; + + struct + { + ObjectAddress address; + Node *real_create; + } ctas; } d; struct CollectedCommand *parent; /* when nested */ diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index ddb67a6..1e64831 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableInitWrite } EventTriggerEvent; typedef struct -- 2.7.2.windows.1