From ce7e6a0087e626d2279fe0ca3472d7eea45fa851 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Sun, 12 Jun 2022 05:28:02 +0000 Subject: [PATCH 12/12] Change the logging format of logical ddlmessage to that of the serialization of the parsetree of the DDL command using nodeToString(), the apply worker deserializes the ddlmessage by calling stringToNode(). Detailed changes: - Adding serialization/deserialization functions in outfuncs.c/readfuncs.c for CreateTableStmt, AlterTableStmt, DropStmt, CreateFunctionStmt and AlterFunctionStmt. - Modified the serialization process to always schema qualify object names, this is done by outQualifiedName() and a change in _outRangeVar(). - Change the input of LogLogicalDDLMessage() to use nodeToString(parsetree). - Change the apply worker to call stringToNode(ddlmessage) to deserialize the parsetree and then directly execute the parsetree. Purpose of the commit is to explore the new logging format, lot of the existing tests in 030_rep_ddls.pl fails due to missing serialization/ deserialization support of their DDL parsetree types. Only CreateTableStmt, AlterTableStmt, DropStmt, CreateFunctionStmt are tested to be working at the moment. --- src/backend/commands/tablecmds.c | 10 +- src/backend/nodes/outfuncs.c | 181 ++++++++- src/backend/nodes/readfuncs.c | 452 +++++++++++++++++++++++ src/backend/replication/logical/worker.c | 345 ++++++++--------- src/backend/tcop/utility.c | 49 ++- 5 files changed, 846 insertions(+), 191 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 94e350b80d..87978b7523 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -1517,10 +1517,11 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery) if (ddlxlog) { const char* prefix = ""; + char* parsetree_str = nodeToString((Node *) drop); LogLogicalDDLMessage(prefix, GetUserId(), - pstate->p_sourcetext, - strlen(pstate->p_sourcetext)); + parsetree_str, + strlen(parsetree_str)); } performMultipleDeletions(objects, drop->behavior, flags); @@ -3875,10 +3876,11 @@ RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery) ddl_need_xlog(relid, false)) { const char* prefix = ""; + char* parsetree_str = nodeToString((Node *) stmt); LogLogicalDDLMessage(prefix, GetUserId(), - pstate->p_sourcetext, - strlen(pstate->p_sourcetext)); + parsetree_str, + strlen(parsetree_str)); } /* Do the work */ diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ce12915592..d302c63923 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -29,12 +29,14 @@ #include +#include "catalog/namespace.h" #include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/extensible.h" #include "nodes/pathnodes.h" #include "nodes/plannodes.h" #include "utils/datum.h" +#include "utils/lsyscache.h" #include "utils/rel.h" static void outChar(StringInfo str, char c); @@ -210,6 +212,34 @@ outChar(StringInfo str, char c) outToken(str, in); } +/* + * Convert a possibly qualified name (list of String nodes) + * We'll qualify the name if the input is not qualified. + */ +static void +outQualifiedName(StringInfo str, List *names) +{ + appendStringInfoChar(str, '('); + if (list_length(names) > 0) + { + Oid namespaceId; + char *schemaname; + char *objname; + + namespaceId = QualifiedNameGetCreationNamespace(names, &objname); + schemaname = get_namespace_name(namespaceId); + + appendStringInfoChar(str, '"'); + appendStringInfoString(str, schemaname); + appendStringInfoChar(str, '"'); + appendStringInfoChar(str, ' '); + appendStringInfoChar(str, '"'); + appendStringInfoString(str, objname); + appendStringInfoChar(str, '"'); + } + appendStringInfoChar(str, ')'); +} + static void _outList(StringInfo str, const List *node) { @@ -1090,7 +1120,16 @@ _outRangeVar(StringInfo str, const RangeVar *node) * we deliberately ignore catalogname here, since it is presently not * semantically meaningful */ - WRITE_STRING_FIELD(schemaname); + if (node->schemaname == NULL) + { + Oid schema_oid = RangeVarGetCreationNamespace(node); + char *schema_name = get_namespace_name(schema_oid); + + appendStringInfoString(str, " :schemaname "); + appendStringInfoString(str, schema_name); + } + else + WRITE_STRING_FIELD(schemaname); WRITE_STRING_FIELD(relname); WRITE_BOOL_FIELD(inh); WRITE_CHAR_FIELD(relpersistence); @@ -2888,6 +2927,18 @@ _outCreateStmt(StringInfo str, const CreateStmt *node) _outCreateStmtInfo(str, (const CreateStmt *) node); } +static void +_outCreateTableAsStmt(StringInfo str, const CreateTableAsStmt *node) +{ + WRITE_NODE_TYPE("CREATETABLEASSTMT"); + + WRITE_NODE_FIELD(query); + WRITE_NODE_FIELD(into); + WRITE_ENUM_FIELD(objtype, ObjectType); + WRITE_BOOL_FIELD(is_select_into); + WRITE_BOOL_FIELD(if_not_exists); +} + static void _outCreateForeignTableStmt(StringInfo str, const CreateForeignTableStmt *node) { @@ -2899,6 +2950,106 @@ _outCreateForeignTableStmt(StringInfo str, const CreateForeignTableStmt *node) WRITE_NODE_FIELD(options); } +static void +_outAlterTableStmt(StringInfo str, const AlterTableStmt *node) +{ + WRITE_NODE_TYPE("ALTERTABLESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(cmds); + WRITE_ENUM_FIELD(objtype, ObjectType); + WRITE_BOOL_FIELD(missing_ok); +} + +static void +_outAlterTableCmd(StringInfo str, const AlterTableCmd *node) +{ + WRITE_NODE_TYPE("ALTERTABLECMD"); + + WRITE_ENUM_FIELD(subtype, AlterTableType); + WRITE_STRING_FIELD(name); + WRITE_INT_FIELD(num); + WRITE_NODE_FIELD(newowner); + WRITE_NODE_FIELD(def); + WRITE_ENUM_FIELD(behavior, DropBehavior); + WRITE_BOOL_FIELD(missing_ok); +} + +static void +_outDropStmt(StringInfo str, const DropStmt *node) +{ + WRITE_NODE_TYPE("DROPSTMT"); + + //WRITE_NODE_FIELD(objects); + appendStringInfoString(str, " :objects "); + /* Qualify and OUT object names */ + if (node->objects) + { + ListCell *lc; + + appendStringInfoChar(str, '('); + foreach(lc, node->objects) + { + List *names = (List *) lfirst(lc); + outQualifiedName(str, names); + } + appendStringInfoChar(str, ')'); + } + else + appendStringInfoString(str, "<>"); + + WRITE_ENUM_FIELD(removeType, ObjectType); + WRITE_ENUM_FIELD(behavior, DropBehavior); + WRITE_BOOL_FIELD(missing_ok); + WRITE_BOOL_FIELD(concurrent); +} + +static void +_outCreateFunctionStmt(StringInfo str, const CreateFunctionStmt *node) +{ + WRITE_NODE_TYPE("CREATEFUNCTIONSTMT"); + + WRITE_BOOL_FIELD(is_procedure); + WRITE_BOOL_FIELD(replace); + appendStringInfoString(str, " :funcname "); + outQualifiedName(str, node->funcname); + WRITE_NODE_FIELD(parameters); + WRITE_NODE_FIELD(returnType); + WRITE_NODE_FIELD(options); + WRITE_NODE_FIELD(sql_body); +} + +static void +_outFunctionParameter(StringInfo str, const FunctionParameter *node) +{ + WRITE_NODE_TYPE("FUNCTIONPARAMETER"); + + WRITE_STRING_FIELD(name); + WRITE_NODE_FIELD(argType); + WRITE_ENUM_FIELD(mode, FunctionParameterMode); + WRITE_NODE_FIELD(defexpr); +} + +static void +_outAlterFunctionStmt(StringInfo str, const AlterFunctionStmt *node) +{ + WRITE_NODE_TYPE("ALTERFUNCTIONSTMT"); + + WRITE_ENUM_FIELD(objtype, ObjectType); + WRITE_NODE_FIELD(func); + WRITE_NODE_FIELD(actions); +} + +static void +_outRoleSpec(StringInfo str, const RoleSpec *node) +{ + WRITE_NODE_TYPE("ROLESPEC"); + + WRITE_ENUM_FIELD(type, RoleSpecType); + WRITE_STRING_FIELD(rolename); + WRITE_INT_FIELD(location); +} + static void _outImportForeignSchemaStmt(StringInfo str, const ImportForeignSchemaStmt *node) { @@ -3134,7 +3285,9 @@ _outTypeName(StringInfo str, const TypeName *node) { WRITE_NODE_TYPE("TYPENAME"); - WRITE_NODE_FIELD(names); + //WRITE_NODE_FIELD(names); + appendStringInfoString(str, " :names "); + outQualifiedName(str, node->names); WRITE_OID_FIELD(typeOid); WRITE_BOOL_FIELD(setof); WRITE_BOOL_FIELD(pct_type); @@ -4543,9 +4696,33 @@ outNode(StringInfo str, const void *obj) case T_CreateStmt: _outCreateStmt(str, obj); break; + case T_CreateTableAsStmt: + _outCreateTableAsStmt(str, obj); + break; case T_CreateForeignTableStmt: _outCreateForeignTableStmt(str, obj); break; + case T_AlterTableStmt: + _outAlterTableStmt(str, obj); + break; + case T_AlterTableCmd: + _outAlterTableCmd(str, obj); + break; + case T_DropStmt: + _outDropStmt(str, obj); + break; + case T_CreateFunctionStmt: + _outCreateFunctionStmt(str, obj); + break; + case T_FunctionParameter: + _outFunctionParameter(str, obj); + break; + case T_AlterFunctionStmt: + _outAlterFunctionStmt(str, obj); + break; + case T_RoleSpec: + _outRoleSpec(str, obj); + break; case T_ImportForeignSchemaStmt: _outImportForeignSchemaStmt(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 6a05b69415..71556d33a8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2938,6 +2938,426 @@ _readPartitionRangeDatum(void) READ_DONE(); } +/* + * _readCreateStmt + */ +static CreateStmt* +_readCreateStmt(void) +{ + READ_LOCALS(CreateStmt); + + READ_NODE_FIELD(relation); + READ_NODE_FIELD(tableElts); + READ_NODE_FIELD(inhRelations); + READ_NODE_FIELD(partspec); + READ_NODE_FIELD(partbound); + READ_NODE_FIELD(ofTypename); + READ_NODE_FIELD(constraints); + READ_NODE_FIELD(options); + READ_ENUM_FIELD(oncommit, OnCommitAction); + READ_STRING_FIELD(tablespacename); + READ_STRING_FIELD(accessMethod); + READ_BOOL_FIELD(if_not_exists); + + READ_DONE(); +} + +/* + * _readCreateTableAsStmt + */ +static CreateTableAsStmt* +_readCreateTableAsStmt(void) +{ + READ_LOCALS(CreateTableAsStmt); + + READ_NODE_FIELD(query); + READ_NODE_FIELD(into); + READ_ENUM_FIELD(objtype, ObjectType); + READ_BOOL_FIELD(is_select_into); + READ_BOOL_FIELD(if_not_exists); + + READ_DONE(); +} + +/* + * _readAlterTableStmt + */ +static AlterTableStmt* +_readAlterTableStmt(void) +{ + READ_LOCALS(AlterTableStmt); + + READ_NODE_FIELD(relation); + READ_NODE_FIELD(cmds); + READ_ENUM_FIELD(objtype, ObjectType); + READ_BOOL_FIELD(missing_ok); + + READ_DONE(); +} + +/* + * _readAlterTableCmd + */ +static AlterTableCmd* +_readAlterTableCmd(void) +{ + READ_LOCALS(AlterTableCmd); + + READ_ENUM_FIELD(subtype, AlterTableType); + READ_STRING_FIELD(name); + READ_INT_FIELD(num); + READ_NODE_FIELD(newowner); + READ_NODE_FIELD(def); + READ_ENUM_FIELD(behavior, DropBehavior); + READ_BOOL_FIELD(missing_ok); + + READ_DONE(); +} + +/* + * _readDropStmt + */ +static DropStmt* +_readDropStmt(void) +{ + READ_LOCALS(DropStmt); + + READ_NODE_FIELD(objects); + READ_ENUM_FIELD(removeType, ObjectType); + READ_ENUM_FIELD(behavior, DropBehavior); + READ_BOOL_FIELD(missing_ok); + READ_BOOL_FIELD(concurrent); + + READ_DONE(); +} + +/* + * _readCreateFunctionStmt + */ +static CreateFunctionStmt* +_readCreateFunctionStmt(void) +{ + READ_LOCALS(CreateFunctionStmt); + + READ_BOOL_FIELD(is_procedure); + READ_BOOL_FIELD(replace); + READ_NODE_FIELD(funcname); + READ_NODE_FIELD(parameters); + READ_NODE_FIELD(returnType); + READ_NODE_FIELD(options); + READ_NODE_FIELD(sql_body); + + READ_DONE(); +} + +/* + * _readFunctionParameter + */ +static FunctionParameter* +_readFunctionParameter(void) +{ + READ_LOCALS(FunctionParameter); + + READ_STRING_FIELD(name); + READ_NODE_FIELD(argType); + READ_ENUM_FIELD(mode, FunctionParameterMode); + READ_NODE_FIELD(defexpr); + + READ_DONE(); +} + +/* + * _readAlterFunctionStmt + */ +static AlterFunctionStmt* +_readAlterFunctionStmt(void) +{ + READ_LOCALS(AlterFunctionStmt); + + READ_ENUM_FIELD(objtype, ObjectType); + READ_NODE_FIELD(func); + READ_NODE_FIELD(actions); + + READ_DONE(); +} + +/* + * _readRoleSpec + */ +static RoleSpec* +_readRoleSpec(void) +{ + READ_LOCALS(RoleSpec); + + READ_ENUM_FIELD(type, RoleSpecType); + READ_STRING_FIELD(rolename); + READ_INT_FIELD(location); + + READ_DONE(); +} + +/* + * _readColumnDef + */ +static ColumnDef* +_readColumnDef(void) +{ + READ_LOCALS(ColumnDef); + + READ_STRING_FIELD(colname); + READ_NODE_FIELD(typeName); + READ_STRING_FIELD(compression); + READ_INT_FIELD(inhcount); + READ_BOOL_FIELD(is_local); + READ_BOOL_FIELD(is_not_null); + READ_BOOL_FIELD(is_from_type); + READ_CHAR_FIELD(storage); + READ_NODE_FIELD(raw_default); + READ_NODE_FIELD(cooked_default); + READ_CHAR_FIELD(identity); + READ_NODE_FIELD(identitySequence); + READ_CHAR_FIELD(generated); + READ_NODE_FIELD(collClause); + READ_OID_FIELD(collOid); + READ_NODE_FIELD(constraints); + READ_NODE_FIELD(fdwoptions); + READ_LOCATION_FIELD(location); + + READ_DONE(); +} + +/* + * _readPartitionSpec + */ +static PartitionSpec* +_readPartitionSpec(void) +{ + READ_LOCALS(PartitionSpec); + + READ_STRING_FIELD(strategy); + READ_NODE_FIELD(partParams); + READ_LOCATION_FIELD(location); + + READ_DONE(); +} + +/* + * _readPartitionElem + */ +static PartitionElem* +_readPartitionElem(void) +{ + READ_LOCALS(PartitionElem); + + READ_STRING_FIELD(name); + READ_NODE_FIELD(expr); + READ_NODE_FIELD(collation); + READ_NODE_FIELD(opclass); + READ_LOCATION_FIELD(location); + + READ_DONE(); +} + +/* + * _readTypeName + */ +static TypeName* +_readTypeName(void) +{ + READ_LOCALS(TypeName); + + READ_NODE_FIELD(names); + READ_OID_FIELD(typeOid); + READ_BOOL_FIELD(setof); + READ_BOOL_FIELD(pct_type); + READ_NODE_FIELD(typmods); + READ_INT_FIELD(typemod); + READ_NODE_FIELD(arrayBounds); + READ_LOCATION_FIELD(location); + + READ_DONE(); +} + +/* + * _readConstraint + */ +static Constraint* +_readConstraint(void) +{ + READ_LOCALS(Constraint); + READ_STRING_FIELD(conname); + READ_BOOL_FIELD(deferrable); + READ_BOOL_FIELD(initdeferred); + READ_LOCATION_FIELD(location); + + /* read :contype CONTYPE */ + token = pg_strtok(&length); + token = pg_strtok(&length); + +#define MATCH(tokname, namelen) \ + (length == namelen && memcmp(token, tokname, namelen) == 0) + + if (MATCH("NULL", 4)) + { + local_node->contype = CONSTR_NULL; + } + else if (MATCH("NOT_NULL", 8)) + { + local_node->contype = CONSTR_NOTNULL; + } + else if (MATCH("DEFAULT", 7)) + { + local_node->contype = CONSTR_DEFAULT; + READ_NODE_FIELD(raw_expr); + READ_STRING_FIELD(cooked_expr); + } + else if (MATCH("IDENTITY", 8)) + { + local_node->contype = CONSTR_IDENTITY; + READ_NODE_FIELD(raw_expr); + READ_STRING_FIELD(cooked_expr); + READ_CHAR_FIELD(generated_when); + } + else if (MATCH("GENERATED", 9)) + { + local_node->contype = CONSTR_GENERATED; + READ_NODE_FIELD(raw_expr); + READ_STRING_FIELD(cooked_expr); + READ_CHAR_FIELD(generated_when); + } + else if (MATCH("CHECK", 5)) + { + local_node->contype = CONSTR_CHECK; + READ_BOOL_FIELD(is_no_inherit); + READ_NODE_FIELD(raw_expr); + READ_STRING_FIELD(cooked_expr); + } + else if (MATCH("PRIMARY_KEY", 11)) + { + local_node->contype = CONSTR_PRIMARY; + READ_NODE_FIELD(keys); + READ_NODE_FIELD(including); + READ_NODE_FIELD(options); + READ_STRING_FIELD(indexname); + READ_STRING_FIELD(indexspace); + READ_BOOL_FIELD(reset_default_tblspc); + /* access_method and where_clause not currently used */ + } + else if (MATCH("UNIQUE", 6)) + { + local_node->contype = CONSTR_UNIQUE; + READ_BOOL_FIELD(nulls_not_distinct); + READ_NODE_FIELD(keys); + READ_NODE_FIELD(including); + READ_NODE_FIELD(options); + READ_STRING_FIELD(indexname); + READ_STRING_FIELD(indexspace); + READ_BOOL_FIELD(reset_default_tblspc); + /* access_method and where_clause not currently used */ + } + else if (MATCH("EXCLUSION", 9)) + { + local_node->contype = CONSTR_EXCLUSION; + READ_NODE_FIELD(exclusions); + READ_NODE_FIELD(including); + READ_NODE_FIELD(options); + READ_STRING_FIELD(indexname); + READ_STRING_FIELD(indexspace); + READ_BOOL_FIELD(reset_default_tblspc); + READ_STRING_FIELD(access_method); + READ_NODE_FIELD(where_clause); + } + else if (MATCH("FOREIGN_KEY", 11)) + { + local_node->contype = CONSTR_FOREIGN; + READ_NODE_FIELD(pktable); + READ_NODE_FIELD(fk_attrs); + READ_NODE_FIELD(pk_attrs); + READ_CHAR_FIELD(fk_matchtype); + READ_CHAR_FIELD(fk_upd_action); + READ_CHAR_FIELD(fk_del_action); + READ_NODE_FIELD(fk_del_set_cols); + READ_NODE_FIELD(old_conpfeqop); + READ_OID_FIELD(old_pktable_oid); + READ_BOOL_FIELD(skip_validation); + READ_BOOL_FIELD(initially_valid); + } + else if (MATCH("ATTR_DEFERRABLE", 15)) + { + local_node->contype = CONSTR_ATTR_DEFERRABLE; + } + else if (MATCH("ATTR_NOT_DEFERRABLE", 19)) + { + local_node->contype = CONSTR_ATTR_NOT_DEFERRABLE; + } + else if (MATCH("ATTR_DEFERRED", 13)) + { + local_node->contype = CONSTR_ATTR_DEFERRED; + } + else if (MATCH("ATTR_IMMEDIATE", 14)) + { + local_node->contype = CONSTR_ATTR_IMMEDIATE; + } + + READ_DONE(); +} + +/* + * _readIndexStmt + */ +static IndexStmt* +_readIndexStmt(void) +{ + READ_LOCALS(IndexStmt); + + READ_STRING_FIELD(idxname); + READ_NODE_FIELD(relation); + READ_STRING_FIELD(accessMethod); + READ_STRING_FIELD(tableSpace); + READ_NODE_FIELD(indexParams); + READ_NODE_FIELD(indexIncludingParams); + READ_NODE_FIELD(options); + READ_NODE_FIELD(whereClause); + READ_NODE_FIELD(excludeOpNames); + READ_STRING_FIELD(idxcomment); + READ_OID_FIELD(indexOid); + READ_OID_FIELD(oldNode); + READ_UINT_FIELD(oldCreateSubid); + READ_UINT_FIELD(oldFirstRelfilenodeSubid); + READ_BOOL_FIELD(unique); + READ_BOOL_FIELD(nulls_not_distinct); + READ_BOOL_FIELD(primary); + READ_BOOL_FIELD(isconstraint); + READ_BOOL_FIELD(deferrable); + READ_BOOL_FIELD(initdeferred); + READ_BOOL_FIELD(transformed); + READ_BOOL_FIELD(concurrent); + READ_BOOL_FIELD(if_not_exists); + READ_BOOL_FIELD(reset_default_tblspc); + + READ_DONE(); +} + +/* + * _readIndexElem + */ +static IndexElem* +_readIndexElem(void) +{ + READ_LOCALS(IndexElem); + + READ_STRING_FIELD(name); + READ_NODE_FIELD(expr); + READ_STRING_FIELD(indexcolname); + READ_NODE_FIELD(collation); + READ_NODE_FIELD(opclass); + READ_NODE_FIELD(opclassopts); + READ_ENUM_FIELD(ordering, SortByDir); + READ_ENUM_FIELD(nulls_ordering, SortByNulls); + + READ_DONE(); +} + /* * parseNodeString * @@ -3235,6 +3655,38 @@ parseNodeString(void) return_value = _readJsonTableParent(); else if (MATCH("JSONTABLESIBLING", 16)) return_value = _readJsonTableSibling(); + else if (MATCH("CREATESTMT", 10)) + return_value = _readCreateStmt(); + else if (MATCH("CREATETABLEASSTMT", 17)) + return_value = _readCreateTableAsStmt(); + else if (MATCH("ALTERTABLESTMT", 14)) + return_value = _readAlterTableStmt(); + else if (MATCH("ALTERTABLECMD", 13)) + return_value = _readAlterTableCmd(); + else if (MATCH("DROPSTMT", 8)) + return_value = _readDropStmt(); + else if (MATCH("CREATEFUNCTIONSTMT", 18)) + return_value = _readCreateFunctionStmt(); + else if (MATCH("FUNCTIONPARAMETER", 17)) + return_value = _readFunctionParameter(); + else if (MATCH("ALTERFUNCTIONSTMT", 17)) + return_value = _readAlterFunctionStmt(); + else if (MATCH("ROLESPEC", 8)) + return_value = _readRoleSpec(); + else if (MATCH("COLUMNDEF", 9)) + return_value = _readColumnDef(); + else if (MATCH("TYPENAME", 8)) + return_value = _readTypeName(); + else if (MATCH("PARTITIONELEM", 13)) + return_value = _readPartitionElem(); + else if (MATCH("PARTITIONSPEC", 13)) + return_value = _readPartitionSpec(); + else if (MATCH("CONSTRAINT", 10)) + return_value = _readConstraint(); + else if (MATCH("INDEXSTMT", 9)) + return_value = _readIndexStmt(); + else if (MATCH("INDEXELEM", 9)) + return_value = _readIndexElem(); else { elog(ERROR, "badly formatted node string \"%.32s\"...", token); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c61b3564a7..da72f41921 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -351,7 +351,7 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation); -static void apply_execute_sql_command(const char *cmdstr, +static void apply_execute_sql_command(const char *message, const char* role, const char* search_path, bool isTopLevel); @@ -2482,130 +2482,70 @@ execute_sql_command_error_cb(void *arg) } /* - * Execute an SQL command. This can be multiple queries. - * This is modified based on pglogical_execute_sql_command(). + * Preprocess certain DDL commands before apply + * -Remove data population for table creation + * -Enable missing_ok for drop stmt + * -Disallow table rewrites using volatile functions */ static void -apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path, - bool isTopLevel) +preprocess_ddl(RawStmt *command, char **schemaname, char **relname, bool *is_partitioned_table) { - const char *save_debug_query_string = debug_query_string; - List *parsetree_list; - ListCell *parsetree_item; - MemoryContext oldcontext; - ErrorContextCallback errcallback; - int save_nestlevel; - - /* - * Switch to appropriate context for constructing parsetrees. - */ - oldcontext = MemoryContextSwitchTo(ApplyMessageContext); - begin_replication_step(); - - /* - * Set the current role to the user that executed the command on the - * publication server. - * Set the current search_path to the search_path on the publication - * server when the command was executed. - */ - save_nestlevel = NewGUCNestLevel(); - SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE); - SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE); - - errcallback.callback = execute_sql_command_error_cb; - errcallback.arg = (char *) cmdstr; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - debug_query_string = cmdstr; - - parsetree_list = pg_parse_query(cmdstr); - - /* - * Do a limited amount of safety checking against CONCURRENTLY commands - * executed in situations where they aren't allowed. The sender side should - * provide protection, but better be safe than sorry. - */ - isTopLevel = isTopLevel && (list_length(parsetree_list) == 1); - - /* - * Switch back to transaction context to enter the loop. - */ - MemoryContextSwitchTo(oldcontext); - - foreach(parsetree_item, parsetree_list) + switch(nodeTag(command->stmt)) { - List *plantree_list; - List *querytree_list; - RawStmt *command = (RawStmt *) lfirst(parsetree_item); - CommandTag commandTag; - MemoryContext per_parsetree_context = NULL; - Portal portal; - DestReceiver *receiver; - bool snapshot_set = false; - char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ - char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ - bool is_partitioned_table = false; - - commandTag = CreateCommandTag((Node *)command); - - /* The following DDL commands need special handling */ - - /* - * Remember the schemaname and relname if the cmd is going to create a table - * because we will need them for some post-processing after we - * execute the stmt. At that point, command->stmt may have been freeed up. - */ - if (commandTag == CMDTAG_CREATE_TABLE) + case T_CreateStmt: { + /* + * Remember the schemaname and relname if the cmd is going to create a table + * because we will need them for some post-processing after we + * execute the stmt. At that point, command->stmt may have been freeed up. + */ CreateStmt *cstmt = (CreateStmt *) command->stmt; RangeVar *rv = cstmt->relation; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; if (cstmt->inhRelations != NIL || cstmt->partspec != NULL) - is_partitioned_table = true; + *is_partitioned_table = true; + + break; } - else if (commandTag == CMDTAG_CREATE_TABLE_AS) + case T_CreateTableAsStmt: { CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt; if (castmt->objtype == OBJECT_TABLE) { RangeVar *rv = castmt->into->rel; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; /* - * Force skipping data population to avoid data inconsistency. - * Data should be replicated from the publisher instead. - */ + * Force skipping data population to avoid data inconsistency. + * Data should be replicated from the publisher instead. + */ castmt->into->skipData = true; } + break; } /* SELECT INTO */ - else if (commandTag == CMDTAG_SELECT) + case T_SelectStmt: { SelectStmt *sstmt = (SelectStmt *) command->stmt; if (sstmt->intoClause != NULL) { RangeVar *rv = sstmt->intoClause->rel; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; /* - * Force skipping data population to avoid data inconsistency. - * Data should be replicated from the publisher instead. - */ + * Force skipping data population to avoid data inconsistency. + * Data should be replicated from the publisher instead. + */ sstmt->intoClause->skipData = true; } + break; } - /* - * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported. - * Until we support logical replication of table rewrite, see ATRewriteTables() - * for details on table rewrite. - */ - else if (commandTag == CMDTAG_ALTER_TABLE) + case T_AlterTableStmt: { AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt; ListCell *lc; @@ -2633,14 +2573,151 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear if (contain_volatile_functions(expr)) { elog(ERROR, - "Do not support replication of DDL statement that rewrites table using volatile functions: %s", - cmdstr); + "Do not support replication of DDL statement that rewrites table using volatile functions"); } } } } } + break; + } + case T_DropStmt: + { + DropStmt *dstmt = (DropStmt *) command->stmt; + dstmt->missing_ok = true; + break; + } + default: + break; + } +} + +/* +* Table created by DDL replication (database level) is automatically +* added to the subscription here. +* +* Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS +* command to set the relstate to SUBREL_STATE_INIT so DML changes on this +* new table can be replicated without having to manually run +* "alter subscription ... refresh publication" +*/ +static void +handle_create_table(char* relname, char* schemaname, bool is_partitioned_table) +{ + Oid relid; + Oid relnamespace = InvalidOid; + + if (schemaname != NULL) + relnamespace = get_namespace_oid(schemaname, false); + if (relnamespace != InvalidOid) + relid = get_relname_relid(relname, relnamespace); + else + { + /* + * Try to resolve unqualified relname. + * Notice we have set the search_path to the original search_path on the publisher + * at the beginning of this function. + */ + relid = RelnameGetRelid(relname); + } + + if (relid != InvalidOid) + { + bool subscribe_table = true; + + if (is_partitioned_table) + { + Relation rel = RelationIdGetRelation(relid); + char *table_name = RelationGetRelationName(rel); + char *schema_name = get_namespace_name(RelationGetNamespace(rel)); + /* + * Connect to the source DB and check whehter the partitioned table should be subscribed. + * Because it depends on the setting of publish_via_partition_root, which the subscription + * doesn't know. + */ + subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name); + RelationClose(rel); + } + + if (subscribe_table) + { + AddSubscriptionRelState(MySubscription->oid, relid, + SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s\" added to subscription \"%s\"", + relname, MySubscription->name))); } + } +} + +/* + * Deserialize a ddlmessage into a raw parsetree by calling stringToNode() and + * then execute it. + * + * This is modified based on pglogical_execute_sql_command(). + */ +static void +apply_execute_sql_command(const char *message, const char *role, const char *search_path, + bool isTopLevel) +{ + const char *save_debug_query_string = debug_query_string; + const char *cmdstr = ""; + MemoryContext oldcontext; + ErrorContextCallback errcallback; + int save_nestlevel; + RawStmt *command = makeNode(RawStmt); + + /* + * Switch to appropriate context for constructing parsetrees. + */ + oldcontext = MemoryContextSwitchTo(ApplyMessageContext); + begin_replication_step(); + + /* + * Set the current role to the user that executed the command on the + * publication server. + * Set the current search_path to the search_path on the publication + * server when the command was executed. + */ + save_nestlevel = NewGUCNestLevel(); + SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE); + //SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE); + + errcallback.callback = execute_sql_command_error_cb; + errcallback.arg = (char *) message; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + debug_query_string = message; + + command->stmt = stringToNode(message); + + /* + * Do a limited amount of safety checking against CONCURRENTLY commands + * executed in situations where they aren't allowed. The sender side should + * provide protection, but better be safe than sorry. + */ + //isTopLevel = isTopLevel && (list_length(parsetree_list) == 1); + + /* + * Switch back to transaction context to enter the loop. + */ + MemoryContextSwitchTo(oldcontext); + + if (command->stmt) + { + List *plantree_list; + List *querytree_list; + CommandTag commandTag = CreateCommandTag((Node *)command); + Portal portal; + DestReceiver *receiver; + bool snapshot_set = false; + char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + bool is_partitioned_table = false; + + preprocess_ddl(command, &schemaname, &relname, &is_partitioned_table); /* * Set up a snapshot if parse analysis/planning will need one. @@ -2656,23 +2733,10 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear * * Switch to appropriate context for constructing query and plan trees * (these can't be in the transaction context, as that will get reset - * when the command is COMMIT/ROLLBACK). If we have multiple - * parsetrees, we use a separate context for each one, so that we can - * free that memory before moving on to the next one. But for the - * last (or only) parsetree, just use MessageContext, which will be - * reset shortly after completion anyway. In event of an error, the - * per_parsetree_context will be deleted when MessageContext is reset. + * when the command is COMMIT/ROLLBACK). */ - if (lnext(parsetree_list, parsetree_item) != NULL) - { - per_parsetree_context = - AllocSetContextCreate(MessageContext, - "per-parsetree message context", - ALLOCSET_DEFAULT_SIZES); - oldcontext = MemoryContextSwitchTo(per_parsetree_context); - } - else - oldcontext = MemoryContextSwitchTo(ApplyMessageContext); + + oldcontext = MemoryContextSwitchTo(ApplyMessageContext); querytree_list = pg_analyze_and_rewrite_fixedparams( command, @@ -2735,63 +2799,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear CommandCounterIncrement(); - /* - * Table created by DDL replication (database level) is automatically - * added to the subscription here. - * - * Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS - * command to set the relstate to SUBREL_STATE_INIT so DML changes on this - * new table can be replicated without having to manually run - * "alter subscription ... refresh publication" - */ - if (relname != NULL) - { - Oid relid; - Oid relnamespace = InvalidOid; - - if (schemaname != NULL) - relnamespace = get_namespace_oid(schemaname, false); - if (relnamespace != InvalidOid) - relid = get_relname_relid(relname, relnamespace); - else - { - /* - * Try to resolve unqualified relname. - * Notice we have set the search_path to the original search_path on the publisher - * at the beginning of this function. - */ - relid = RelnameGetRelid(relname); - } - - if (relid != InvalidOid) - { - bool subscribe_table = true; - - if (is_partitioned_table) - { - Relation rel = RelationIdGetRelation(relid); - char *table_name = RelationGetRelationName(rel); - char *schema_name = get_namespace_name(RelationGetNamespace(rel)); - /* - * Connect to the source DB and check whehter the partitioned table should be subscribed. - * Because it depends on the setting of publish_via_partition_root, which the subscription - * doesn't know. - */ - subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name); - RelationClose(rel); - } - - if (subscribe_table) - { - AddSubscriptionRelState(MySubscription->oid, relid, - SUBREL_STATE_INIT, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s\" added to subscription \"%s\"", - relname, MySubscription->name))); - } - } - } + if (relname) + handle_create_table(relname, schemaname, is_partitioned_table); } /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 08278c945c..5f41d66604 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1132,10 +1132,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) if (ddl_need_xlog(InvalidOid, true)) { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, GetUserId(), - queryString, - strlen(queryString)); + parsetree_str, + strlen(parsetree_str)); } break; @@ -1172,10 +1173,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) if (ddl_need_xlog(InvalidOid, true)) { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, GetUserId(), - queryString, - strlen(queryString)); + parsetree_str, + strlen(parsetree_str)); } default: break; @@ -1191,16 +1193,26 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ case T_AlterTableStmt: case T_IndexStmt: + break; + /* + * Rename of objects other than table is only allowed in database level + * replication. + * Rename of table is allowed in both table level and database level + * replication. + */ case T_RenameStmt: { RenameStmt *stmt = (RenameStmt *) parsetree; - if(!stmt->relation && ddl_need_xlog(InvalidOid, true)){ + if(!stmt->relation && ddl_need_xlog(InvalidOid, true)) + { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, - GetUserId(), - queryString, - strlen(queryString)); + GetUserId(), + parsetree_str, + strlen(parsetree_str)); } + break; } case T_AlterOwnerStmt: /* TODO, it is data control case, save for later update */ break; @@ -1218,11 +1230,9 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) /* Drop of sequence is by logical replication of sequences separately */ case OBJECT_SEQUENCE: break; - /* Drop of other objects are allowed in Database level DDL replication only */ + /* Drop of VIEW and MATVIEW are allowed in Database level DDL replication only */ case OBJECT_VIEW: case OBJECT_MATVIEW: - case OBJECT_FOREIGN_TABLE: - default: /* * Log these DropStmt for logical replication if * there is any FOR ALL TABLES publication with pubddl_database on. @@ -1231,12 +1241,15 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) if (ddl_need_xlog(InvalidOid, true)) { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, GetUserId(), - queryString, - strlen(queryString)); + parsetree_str, + strlen(parsetree_str)); } break; + default: + break; } } /* @@ -1539,10 +1552,11 @@ ProcessUtilitySlow(ParseState *pstate, ddl_need_xlog(relid, false)) { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, GetUserId(), - queryString, - strlen(queryString)); + parsetree_str, + strlen(parsetree_str)); } /* ... and do it */ @@ -1774,10 +1788,11 @@ ProcessUtilitySlow(ParseState *pstate, ddl_need_xlog(relid, false)) { const char* prefix = ""; + char* parsetree_str = nodeToString(parsetree); LogLogicalDDLMessage(prefix, GetUserId(), - queryString, - strlen(queryString)); + parsetree_str, + strlen(parsetree_str)); } address = -- 2.32.0