From cdd09f1ac681b3d0d5bf88c0bfa920896dd4b3be Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Thu, 2 Jun 2022 13:32:39 +0800 Subject: [PATCH] support CREATE TABLE AS/SELECT INTO The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. In this apporach, we don't execute the subquery on subscriber so that don't need to make sure all the objects referenced in the subquery also exists in subscriber. And This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]) Introducing a new type of event trigger "table_create". which would be fired for CREATE TABLE/CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_create event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 3 + src/backend/commands/event_trigger.c | 161 ++++++++++++++++++++++++++++++++- src/backend/commands/publicationcmds.c | 33 ++++++- src/backend/tcop/utility.c | 7 ++ src/backend/utils/cache/evtcache.c | 2 + src/include/catalog/pg_proc.dat | 3 + src/include/commands/event_trigger.h | 4 + src/include/utils/evtcache.h | 3 +- 8 files changed, 208 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 9abbb6b..ae25d2a 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,8 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + EventTriggerTableCreate((Node *) create, intoRelationAddr); return intoRelationAddr; } diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 9bc2145..e77f01b 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_create") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_create") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -586,7 +588,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableCreate) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -869,6 +872,155 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } + +/* + * EventTriggerCreateTableStart + * Prepare to receive data on an CREATE TABLE [AS] command about to be + * executed. + */ +void +EventTriggerCreateTableStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = SCT_Simple; + command->in_extension = creating_extension; + command->d.simple.address = InvalidObjectAddress; + command->d.simple.secondaryObject = InvalidObjectAddress; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerCreateTableEnd + * Finish up saving an CREATE TABLE [AS] command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerCreateTableEnd(void) +{ + CollectedCommand *parent; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + parent = currentEventTriggerState->currentCommand->parent; + + pfree(currentEventTriggerState->currentCommand); + + currentEventTriggerState->currentCommand = parent; +} + +/* + * publication_ddl_deparse_table_create + * + * Deparse the ddl table create command and log it. + */ +Datum +publication_ddl_deparse_table_create(PG_FUNCTION_ARGS) +{ + CollectedCommand *cmd; + char *json_string; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + cmd = currentEventTriggerState->currentCommand; + Assert(cmd); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_Create, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); +} + +/* + * Fire table_rewrite triggers. + */ +void +EventTriggerTableCreate(Node *parsetree, ObjectAddress address) +{ + List *runlist; + Node *old_parsetree; + EventTriggerData trigdata; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + runlist = EventTriggerCommonSetup(currentEventTriggerState->currentCommand->parsetree, + EVT_TableCreate, + "table_create", + &trigdata); + if (runlist == NIL) + return; + + /* + * Use PG_TRY to ensure parsetree is reset even when one trigger + * fails. (This is perhaps not necessary, as the currentState variable will + * be removed shortly by our caller, but it seems better to play safe.) + */ + old_parsetree = currentEventTriggerState->currentCommand->parsetree; + currentEventTriggerState->currentCommand->d.simple.address = address; + currentEventTriggerState->currentCommand->parsetree = parsetree; + + /* Run the triggers. */ + PG_TRY(); + { + EventTriggerInvoke(runlist, &trigdata); + } + PG_FINALLY(); + { + currentEventTriggerState->currentCommand->parsetree = old_parsetree; + } + PG_END_TRY(); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1149,7 +1301,8 @@ trackDroppedObjectsNeeded(void) */ return list_length(EventCacheLookup(EVT_SQLDrop)) > 0 || list_length(EventCacheLookup(EVT_TableRewrite)) > 0 || - list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0; + list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0 || + list_length(EventCacheLookup(EVT_TableCreate)) > 0; } /* diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 63ffc62..187e326 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -762,7 +762,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) Relation rel; ObjectAddress myself; ObjectAddress referenced_start, referenced_end, - referenced_table_rewrite; + referenced_table_rewrite, + referenced_table_create; Oid puboid; bool nulls[Natts_pg_publication]; Datum values[Natts_pg_publication]; @@ -904,20 +905,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) if (pubactions.pubddl) { CreateEventTrigStmt *ddl_trigg_start, *ddl_trigg_end, - *ddl_trigg_table_rewrite; + *ddl_trigg_table_rewrite, + *ddl_trigg_table_create; Node *end_arg1 = NULL; Node *start_arg1 = NULL; Node *table_rewrite_arg1 = NULL; + Node *table_create_arg1 = NULL; Node *arg2 = NULL; Node *arg3 = NULL; List *end_tags = NIL; List *start_tags = NIL; List *table_rewrite_tags = NIL; + List *table_create_tags = NIL; Oid event_trig_start_id, event_trig_end_id, - event_trig_table_rewrite_id; + event_trig_table_rewrite_id, + event_trig_table_create_id; char trigger_name_start[NAMEDATALEN]; char trigger_name_end[NAMEDATALEN]; char trigger_name_table_rewrite[NAMEDATALEN]; + char trigger_name_table_create[NAMEDATALEN]; ddl_trigg_end = makeNode(CreateEventTrigStmt); @@ -969,6 +975,24 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) ddl_trigg_table_rewrite->whenclause = list_make1(makeDefElem("tag", (Node *) table_rewrite_tags, -1)); event_trig_table_rewrite_id = CreateEventTrigger(ddl_trigg_table_rewrite); + /* Trigger for table_create */ + ddl_trigg_table_create = makeNode(CreateEventTrigStmt); + + snprintf(trigger_name_table_create, sizeof(trigger_name_table_create), + "pg_deparse_trig_table_create_%u", puboid); + ddl_trigg_table_create->trigname = pstrdup(trigger_name_table_create); + ddl_trigg_table_create->eventname = "table_create"; + ddl_trigg_table_create->funcname = SystemFuncName("publication_ddl_deparse_table_create"); + + table_create_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_CREATE_TABLE_AS))); + table_create_tags = list_make1(table_create_arg1); + table_create_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_SELECT_INTO))); + table_create_tags = lappend(table_create_tags, table_create_arg1); + + ddl_trigg_table_create->whenclause = list_make1(makeDefElem("tag", (Node *) table_create_tags, -1)); + event_trig_table_create_id = CreateEventTrigger(ddl_trigg_table_create); + + /* * Register the event triggers as internally dependent on the * publication. @@ -981,6 +1005,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) ObjectAddressSet(referenced_table_rewrite, EventTriggerRelationId, event_trig_table_rewrite_id); recordDependencyOn(&referenced_table_rewrite, &myself, DEPENDENCY_INTERNAL); + + ObjectAddressSet(referenced_table_create, EventTriggerRelationId, event_trig_table_create_id); + recordDependencyOn(&referenced_table_create, &myself, DEPENDENCY_INTERNAL); } table_close(rel, RowExclusiveLock); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6a5bcde..c16f804 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1164,6 +1164,8 @@ ProcessUtilitySlow(ParseState *pstate, Datum toast_options; static char *validnsps[] = HEAP_RELOPT_NAMESPACES; + EventTriggerCreateTableStart(parsetree); + /* Remember transformed RangeVar for LIKE */ table_rv = cstmt->relation; @@ -1198,6 +1200,9 @@ ProcessUtilitySlow(ParseState *pstate, NewRelationCreateToastTable(address.objectId, toast_options); + + EventTriggerTableCreate((Node *) cstmt, address); + EventTriggerCreateTableEnd(); } else if (IsA(stmt, CreateForeignTableStmt)) { @@ -1665,8 +1670,10 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerCreateTableStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerCreateTableEnd(); break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index 3a9c9f0..f4c8b73 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_create") == 0) + event = EVT_TableCreate; else continue; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6396566..f55f455 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11899,4 +11899,7 @@ { oid => '4646', descr => 'trigger for ddl command deparse table rewrite', proname => 'publication_ddl_deparse_table_rewrite', prorettype => 'event_trigger', proargtypes => '', prosrc => 'publication_ddl_deparse_table_rewrite' }, +{ oid => '4647', descr => 'trigger for ddl command deparse table rewrite', + proname => 'publication_ddl_deparse_table_create', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_ddl_deparse_table_create' }, ] diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index fd2ee7f..d617e94 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerCreateTableStart(Node *parsetree); +extern void EventTriggerTableCreate(Node *parsetree, ObjectAddress address); +extern void EventTriggerCreateTableEnd(void); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index ddb67a6..070c6b9 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableCreate } EventTriggerEvent; typedef struct -- 2.7.2.windows.1