From 4e9706bf9421ec24d7fe2e4b985d44d456fa5e3e Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Tue, 24 May 2022 15:37:34 +0000 Subject: [PATCH 11/12] Remove non-transactional ddl message decoding because the use case is unclear and it has unresolved issues under concurrency. Some code cleanup. --- .../test_decoding/expected/ddlmessages.out | 42 +-- contrib/test_decoding/test_decoding.c | 25 +- .../access/rmgrdesc/logicalddlmsgdesc.c | 3 +- src/backend/commands/tablecmds.c | 8 +- src/backend/replication/logical/ddlmessage.c | 12 +- src/backend/replication/logical/decode.c | 13 +- src/backend/replication/logical/logical.c | 26 +- src/backend/replication/logical/proto.c | 12 +- .../replication/logical/reorderbuffer.c | 64 +--- src/backend/replication/logical/worker.c | 309 ++++++++++-------- src/backend/replication/pgoutput/pgoutput.c | 17 +- src/backend/tcop/utility.c | 35 +- src/include/replication/ddlmessage.h | 3 +- src/include/replication/logicalproto.h | 4 +- src/include/replication/output_plugin.h | 2 - src/include/replication/reorderbuffer.h | 6 +- 16 files changed, 264 insertions(+), 317 deletions(-) diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out index 823029d03d..56cbafa4cf 100644 --- a/contrib/test_decoding/expected/ddlmessages.out +++ b/contrib/test_decoding/expected/ddlmessages.out @@ -24,14 +24,14 @@ CREATE TABLE test_ddlmessage (id serial unique, data int); ALTER TABLE test_ddlmessage add c3 varchar; COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; + data +----------------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; (6 rows) -- Test logging DDL in function @@ -52,20 +52,20 @@ SELECT func_ddl ('tab_from_func'); (1 row) SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) + - RETURNS VOID AS $$ + - BEGIN + - execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + - execute format('ALTER TABLE %I ADD c3 int', tname); + - execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + - execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); + - END; + - $$ + + data +-------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) + + RETURNS VOID AS $$ + + BEGIN + + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + + execute format('ALTER TABLE %I ADD c3 int', tname); + + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); + + END; + + $$ + LANGUAGE plpgsql; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int BEGIN table public.tab_from_func: INSERT: id[integer]:1 name[character varying]:'a' c3[integer]:null table public.tab_from_func: INSERT: id[integer]:2 name[character varying]:'b' c3[integer]:22 diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a44e1f79e3..eb3dd76782 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -78,7 +78,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, @@ -123,7 +123,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, @@ -771,13 +771,13 @@ pg_decode_message(LogicalDecodingContext *ctx, static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + ReorderBufferTXN *txn, XLogRecPtr lsn, const char *prefix, const char *role, const char *search_path, Size sz, const char *message) { OutputPluginPrepareWrite(ctx, true); - appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", - transactional, prefix, role, search_path, sz); + appendStringInfo(ctx->out, "DDL message: prefix: %s role: %s, search_path: %s, sz: %zu content:", + prefix, role, search_path, sz); appendBinaryStringInfo(ctx->out, message, sz); OutputPluginWrite(ctx, true); } @@ -989,23 +989,14 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, */ static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + ReorderBufferTXN *txn, XLogRecPtr lsn, const char *prefix, const char * role, const char * search_path, Size sz, const char *message) { OutputPluginPrepareWrite(ctx, true); - if (transactional) - { - appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu", - transactional, prefix, role, search_path, sz); - } - else - { - appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", - transactional, prefix, role, search_path, sz); - appendBinaryStringInfo(ctx->out, message, sz); - } + appendStringInfo(ctx->out, "streaming DDL message: prefix: %s role: %s, search_path: %s, sz: %zu", + prefix, role, search_path, sz); OutputPluginWrite(ctx, true); } diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c index 7a352d540a..0aaebf8a08 100644 --- a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -32,8 +32,7 @@ logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) Assert(prefix[xlrec->prefix_size] != '\0'); - appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", - xlrec->transactional ? "transactional" : "non-transactional", + appendStringInfo(buf, "prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", prefix, role, search_path, xlrec->message_size); /* Write message payload as a series of hex bytes */ for (int cnt = 0; cnt < xlrec->message_size; cnt++) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 65386f2641..94e350b80d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -1516,13 +1516,11 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery) /* Log the Drop command for logical replication */ if (ddlxlog) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), pstate->p_sourcetext, - strlen(pstate->p_sourcetext), - transactional); + strlen(pstate->p_sourcetext)); } performMultipleDeletions(objects, drop->behavior, flags); @@ -3876,13 +3874,11 @@ RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery) if (ddlxlog && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), pstate->p_sourcetext, - strlen(pstate->p_sourcetext), - transactional); + strlen(pstate->p_sourcetext)); } /* Do the work */ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c index f93573079a..5f89afec49 100644 --- a/src/backend/replication/logical/ddlmessage.c +++ b/src/backend/replication/logical/ddlmessage.c @@ -47,7 +47,7 @@ */ XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, - size_t size, bool transactional) + size_t size) { xl_logical_ddl_message xlrec; const char *role; @@ -55,16 +55,12 @@ LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, role = GetUserNameFromId(roleoid, false); /* - * Force xid to be allocated if we're emitting a transactional message. + * Force xid to be allocated since we're emitting a transactional message. */ - if (transactional) - { - Assert(IsTransactionState()); - GetCurrentTransactionId(); - } + Assert(IsTransactionState()); + GetCurrentTransactionId(); xlrec.dbId = MyDatabaseId; - xlrec.transactional = transactional; /* trailing zero is critical; see logicalddlmsg_desc */ xlrec.prefix_size = strlen(prefix) + 1; xlrec.role_size = strlen(role) + 1; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 034c7f2413..ce5b595326 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -615,7 +615,6 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; xl_logical_ddl_message *message; if (info != XLOG_LOGICAL_DDL_MESSAGE) @@ -637,17 +636,7 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) FilterByOrigin(ctx, origin_id)) return; - if (message->transactional && - !SnapBuildProcessChange(builder, xid, buf->origptr)) - return; - else if (!message->transactional && - (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || - SnapBuildXactNeedsSkip(builder, buf->origptr))) - return; - - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); - ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr, - message->transactional, + ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr, message->message, /* first part of message is prefix */ message->message + message->prefix_size, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3004f02433..c02ea6fb99 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -74,8 +74,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, const char *search_path, + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, Size message_size, const char *message); /* streaming callbacks */ @@ -95,8 +95,8 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, const char *search_path, + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, Size message_size, const char *message); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -1233,10 +1233,9 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, - const char *search_path, Size message_size, - const char *message) + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, + Size message_size, const char *message) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -1262,7 +1261,7 @@ ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->write_location = message_lsn; /* do the actual work: call callback */ - ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix, role, search_path, message_size, message); /* Pop the error context stack */ @@ -1586,10 +1585,9 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, - const char* search_path, Size message_size, - const char *message) + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char* search_path, + Size message_size, const char *message) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -1619,7 +1617,7 @@ stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->write_location = message_lsn; /* do the actual work: call callback */ - ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix, role, search_path, message_size, message); /* Pop the error context stack */ diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3cfa94dd8c..2072207647 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -670,16 +670,12 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, const char **role, const char **search_path, - bool *transactional, Size *sz) { - uint8 flags; const char *msg; //TODO double check when do we need to get TransactionId. - flags = pq_getmsgint(in, 1); - *transactional = (flags & MESSAGE_TRANSACTIONAL) > 0; *lsn = pq_getmsgint64(in); *prefix = pq_getmsgstring(in); *role = pq_getmsgstring(in); @@ -695,22 +691,16 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, */ void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message) { - uint8 flags = 0; pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE); - /* encode and send message flags */ - if (transactional) - flags |= MESSAGE_TRANSACTIONAL; - /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) pq_sendint32(out, xid); - pq_sendint8(out, flags); pq_sendint64(out, lsn); pq_sendstring(out, prefix); pq_sendstring(out, role); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ca01336604..f5a3247348 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -881,61 +881,33 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } /* - * A transactional DDL message is queued to be processed upon commit and a - * non-transactional DDL message gets processed immediately. + * A transactional DDL message is queued to be processed upon commit */ void ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, - Snapshot snapshot, XLogRecPtr lsn, - bool transactional, const char *prefix, + XLogRecPtr lsn, const char *prefix, const char *role, const char *search_path, Size message_size, const char *message) { - if (transactional) - { - MemoryContext oldcontext; - ReorderBufferChange *change; - - Assert(xid != InvalidTransactionId); - - oldcontext = MemoryContextSwitchTo(rb->context); - - change = ReorderBufferGetChange(rb); - change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; - change->data.ddlmsg.prefix = pstrdup(prefix); - change->data.ddlmsg.role = pstrdup(role); - change->data.ddlmsg.search_path = pstrdup(search_path); - change->data.ddlmsg.message_size = message_size; - change->data.ddlmsg.message = palloc(message_size); - memcpy(change->data.ddlmsg.message, message, message_size); + MemoryContext oldcontext; + ReorderBufferChange *change; - ReorderBufferQueueChange(rb, xid, lsn, change, false); + Assert(xid != InvalidTransactionId); - MemoryContextSwitchTo(oldcontext); - } - else - { - ReorderBufferTXN *txn = NULL; - volatile Snapshot snapshot_now = snapshot; + oldcontext = MemoryContextSwitchTo(rb->context); - if (xid != InvalidTransactionId) - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; + change->data.ddlmsg.prefix = pstrdup(prefix); + change->data.ddlmsg.role = pstrdup(role); + change->data.ddlmsg.search_path = pstrdup(search_path); + change->data.ddlmsg.message_size = message_size; + change->data.ddlmsg.message = palloc(message_size); + memcpy(change->data.ddlmsg.message, message, message_size); - /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); - PG_TRY(); - { - rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message); + ReorderBufferQueueChange(rb, xid, lsn, change, false); - TeardownHistoricSnapshot(false); - } - PG_CATCH(); - { - TeardownHistoricSnapshot(true); - PG_RE_THROW(); - } - PG_END_TRY(); - } + MemoryContextSwitchTo(oldcontext); } /* @@ -2037,14 +2009,14 @@ ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming) { if (streaming) - rb->stream_ddlmessage(rb, txn, change->lsn, true, + rb->stream_ddlmessage(rb, txn, change->lsn, change->data.ddlmsg.prefix, change->data.ddlmsg.role, change->data.ddlmsg.search_path, change->data.ddlmsg.message_size, change->data.ddlmsg.message); else - rb->ddlmessage(rb, txn, change->lsn, true, + rb->ddlmessage(rb, txn, change->lsn, change->data.ddlmsg.prefix, change->data.ddlmsg.role, change->data.ddlmsg.search_path, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1108030179..ef79d10115 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2464,14 +2464,13 @@ static void apply_handle_ddlmessage(StringInfo s) { XLogRecPtr lsn; - bool transactional; Size sz; const char *prefix; const char *role; const char *search_path; const char *msg; - msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz); + msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &sz); apply_execute_sql_command(msg, role, search_path, true); } @@ -2486,98 +2485,41 @@ execute_sql_command_error_cb(void *arg) } /* - * Execute an SQL command. This can be multiple queries. - * This is modified based on pglogical_execute_sql_command(). + * Preprocess certain DDL commands before apply + * -Remove data population for table creation + * -Enable missing_ok for drop stmt + * -Disallow table rewrites using volatile functions */ static void -apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path, - bool isTopLevel) +preprocess_ddl(RawStmt *command, char **schemaname, char **relname, bool *is_partitioned_table) { - const char *save_debug_query_string = debug_query_string; - List *parsetree_list; - ListCell *parsetree_item; - MemoryContext oldcontext; - ErrorContextCallback errcallback; - int save_nestlevel; - - /* - * Switch to appropriate context for constructing parsetrees. - */ - oldcontext = MemoryContextSwitchTo(ApplyMessageContext); - begin_replication_step(); - - /* - * Set the current role to the user that executed the command on the - * publication server. - * Set the current search_path to the search_path on the publication - * server when the command was executed. - */ - save_nestlevel = NewGUCNestLevel(); - SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE); - SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE); - - errcallback.callback = execute_sql_command_error_cb; - errcallback.arg = (char *) cmdstr; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - debug_query_string = cmdstr; - - parsetree_list = pg_parse_query(cmdstr); - - /* - * Do a limited amount of safety checking against CONCURRENTLY commands - * executed in situations where they aren't allowed. The sender side should - * provide protection, but better be safe than sorry. - */ - isTopLevel = isTopLevel && (list_length(parsetree_list) == 1); - - /* - * Switch back to transaction context to enter the loop. - */ - MemoryContextSwitchTo(oldcontext); - - foreach(parsetree_item, parsetree_list) + switch(nodeTag(command->stmt)) { - List *plantree_list; - List *querytree_list; - RawStmt *command = (RawStmt *) lfirst(parsetree_item); - CommandTag commandTag; - MemoryContext per_parsetree_context = NULL; - Portal portal; - DestReceiver *receiver; - bool snapshot_set = false; - char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ - char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ - bool is_partitioned_table = false; - - commandTag = CreateCommandTag((Node *)command); - - /* The following DDL commands need special handling */ - - /* - * Remember the schemaname and relname if the cmd is going to create a table - * because we will need them for some post-processing after we - * execute the stmt. At that point, command->stmt may have been freeed up. - */ - if (commandTag == CMDTAG_CREATE_TABLE) + case T_CreateStmt: { + /* + * Remember the schemaname and relname if the cmd is going to create a table + * because we will need them for some post-processing after we + * execute the stmt. At that point, command->stmt may have been freeed up. + */ CreateStmt *cstmt = (CreateStmt *) command->stmt; RangeVar *rv = cstmt->relation; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; if (cstmt->inhRelations != NIL || cstmt->partspec != NULL) - is_partitioned_table = true; + *is_partitioned_table = true; + + break; } - else if (commandTag == CMDTAG_CREATE_TABLE_AS) + case T_CreateTableAsStmt: { CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt; if (castmt->objtype == OBJECT_TABLE) { RangeVar *rv = castmt->into->rel; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; /* * Force skipping data population to avoid data inconsistency. @@ -2585,17 +2527,18 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear */ castmt->into->skipData = true; } + break; } /* SELECT INTO */ - else if (commandTag == CMDTAG_SELECT) + case T_SelectStmt: { SelectStmt *sstmt = (SelectStmt *) command->stmt; if (sstmt->intoClause != NULL) { RangeVar *rv = sstmt->intoClause->rel; - schemaname = rv->schemaname; - relname = rv->relname; + *schemaname = rv->schemaname; + *relname = rv->relname; /* * Force skipping data population to avoid data inconsistency. @@ -2603,13 +2546,14 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear */ sstmt->intoClause->skipData = true; } + break; } /* * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported. * Until we support logical replication of table rewrite, see ATRewriteTables() * for details on table rewrite. */ - else if (commandTag == CMDTAG_ALTER_TABLE) + case T_AlterTableStmt: { AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt; ListCell *lc; @@ -2637,15 +2581,153 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear if (contain_volatile_functions(expr)) { elog(ERROR, - "Do not support replication of DDL statement that rewrites table using volatile functions: %s", - cmdstr); + "Do not support replication of DDL statement that rewrites table using volatile functions"); } } } } } + break; + } + case T_DropStmt: + { + DropStmt *dstmt = (DropStmt *) command->stmt; + dstmt->missing_ok = true; + break; + } + default: + break; + } +} + +/* +* Table created by DDL replication (database level) is automatically +* added to the subscription here. +* +* Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS +* command to set the relstate to SUBREL_STATE_INIT so DML changes on this +* new table can be replicated without having to manually run +* "alter subscription ... refresh publication" +*/ +static void +handle_create_table(char* relname, char* schemaname, bool is_partitioned_table) +{ + Oid relid; + Oid relnamespace = InvalidOid; + + if (schemaname != NULL) + relnamespace = get_namespace_oid(schemaname, false); + if (relnamespace != InvalidOid) + relid = get_relname_relid(relname, relnamespace); + else + { + /* + * Try to resolve unqualified relname. + * Notice we have set the search_path to the original search_path on the publisher + * at the beginning of this function. + */ + relid = RelnameGetRelid(relname); + } + + if (relid != InvalidOid) + { + bool subscribe_table = true; + + if (is_partitioned_table) + { + Relation rel = RelationIdGetRelation(relid); + char *table_name = RelationGetRelationName(rel); + char *schema_name = get_namespace_name(RelationGetNamespace(rel)); + /* + * Connect to the source DB and check whehter the partitioned table should be subscribed. + * Because it depends on the setting of publish_via_partition_root, which the subscription + * doesn't know. + */ + subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name); + RelationClose(rel); } + if (subscribe_table) + { + AddSubscriptionRelState(MySubscription->oid, relid, + SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s\" added to subscription \"%s\"", + relname, MySubscription->name))); + } + } +} + +/* + * Execute an SQL command. This can be multiple queries. + * This is modified based on pglogical_execute_sql_command(). + */ +static void +apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path, + bool isTopLevel) +{ + const char *save_debug_query_string = debug_query_string; + List *parsetree_list; + ListCell *parsetree_item; + MemoryContext oldcontext; + ErrorContextCallback errcallback; + int save_nestlevel; + + /* + * Switch to appropriate context for constructing parsetrees. + */ + oldcontext = MemoryContextSwitchTo(ApplyMessageContext); + begin_replication_step(); + + /* + * Set the current role to the user that executed the command on the + * publication server. + * Set the current search_path to the search_path on the publication + * server when the command was executed. + */ + save_nestlevel = NewGUCNestLevel(); + SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE); + SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE); + + errcallback.callback = execute_sql_command_error_cb; + errcallback.arg = (char *) cmdstr; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + debug_query_string = cmdstr; + + parsetree_list = pg_parse_query(cmdstr); + + /* + * Do a limited amount of safety checking against CONCURRENTLY commands + * executed in situations where they aren't allowed. The sender side should + * provide protection, but better be safe than sorry. + */ + isTopLevel = isTopLevel && (list_length(parsetree_list) == 1); + + /* + * Switch back to transaction context to enter the loop. + */ + MemoryContextSwitchTo(oldcontext); + + foreach(parsetree_item, parsetree_list) + { + List *plantree_list; + List *querytree_list; + RawStmt *command = (RawStmt *) lfirst(parsetree_item); + CommandTag commandTag; + MemoryContext per_parsetree_context = NULL; + Portal portal; + DestReceiver *receiver; + bool snapshot_set = false; + char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + bool is_partitioned_table = false; + + commandTag = CreateCommandTag((Node *)command); + preprocess_ddl(command, &schemaname, &relname, &is_partitioned_table); + /* * Set up a snapshot if parse analysis/planning will need one. */ @@ -2739,63 +2821,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear CommandCounterIncrement(); - /* - * Table created by DDL replication (database level) is automatically - * added to the subscription here. - * - * Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS - * command to set the relstate to SUBREL_STATE_INIT so DML changes on this - * new table can be replicated without having to manually run - * "alter subscription ... refresh publication" - */ if (relname != NULL) - { - Oid relid; - Oid relnamespace = InvalidOid; - - if (schemaname != NULL) - relnamespace = get_namespace_oid(schemaname, false); - if (relnamespace != InvalidOid) - relid = get_relname_relid(relname, relnamespace); - else - { - /* - * Try to resolve unqualified relname. - * Notice we have set the search_path to the original search_path on the publisher - * at the beginning of this function. - */ - relid = RelnameGetRelid(relname); - } - - if (relid != InvalidOid) - { - bool subscribe_table = true; - - if (is_partitioned_table) - { - Relation rel = RelationIdGetRelation(relid); - char *table_name = RelationGetRelationName(rel); - char *schema_name = get_namespace_name(RelationGetNamespace(rel)); - /* - * Connect to the source DB and check whehter the partitioned table should be subscribed. - * Because it depends on the setting of publish_via_partition_root, which the subscription - * doesn't know. - */ - subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name); - RelationClose(rel); - } - - if (subscribe_table) - { - AddSubscriptionRelState(MySubscription->oid, relid, - SUBREL_STATE_INIT, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s\" added to subscription \"%s\"", - relname, MySubscription->name))); - } - } - } + handle_create_table(relname, schemaname, is_partitioned_table); } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index db221c7e8d..a66367fe7a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,7 +56,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -1705,13 +1705,14 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, + XLogRecPtr message_lsn, const char *prefix, const char * role, const char *search_path, Size sz, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; ListCell *lc; + PGOutputTxnData *txndata; /* Reload publications if needed before use. */ if (!publications_valid) @@ -1745,20 +1746,16 @@ pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Output BEGIN if we haven't yet. Avoid for non-transactional * messages. */ - if (transactional) - { - PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + txndata = (PGOutputTxnData *) txn->output_plugin_private; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - } + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); OutputPluginPrepareWrite(ctx, true); logicalrep_write_ddlmessage(ctx->out, xid, message_lsn, - transactional, prefix, role, search_path, diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 1abd77a60e..7cf8fe0ba3 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1131,13 +1131,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } break; @@ -1173,13 +1171,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } default: break; @@ -1195,18 +1191,25 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ case T_AlterTableStmt: case T_IndexStmt: + break; + /* + * Rename of objects other than table is only allowed in database level + * replication. + * Rename of table is allowed in both table level and database level + * replication. + */ case T_RenameStmt: { RenameStmt *stmt = (RenameStmt *) parsetree; - if(!stmt->relation && ddl_need_xlog(InvalidOid, true)){ - bool transactional = true; + if(!stmt->relation && ddl_need_xlog(InvalidOid, true)) + { const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } + break; } case T_AlterOwnerStmt: /* TODO, it is data control case, save for later update */ break; @@ -1236,13 +1239,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } break; } @@ -1546,13 +1547,11 @@ ProcessUtilitySlow(ParseState *pstate, isCompleteQuery && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } /* ... and do it */ @@ -1783,13 +1782,11 @@ ProcessUtilitySlow(ParseState *pstate, isCompleteQuery && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } address = diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h index 1e8ef22296..0418d16dca 100644 --- a/src/include/replication/ddlmessage.h +++ b/src/include/replication/ddlmessage.h @@ -20,7 +20,6 @@ typedef struct xl_logical_ddl_message { Oid dbId; /* database Oid emitted from */ - bool transactional; /* is message transactional? */ Size prefix_size; /* length of prefix */ Size role_size; /* length of the role that executes the DDL command */ Size search_path_size; /* length of the search path */ @@ -36,7 +35,7 @@ typedef struct xl_logical_ddl_message #define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message, - size_t size, bool transactional); + size_t size); /* RMGR API*/ #define XLOG_LOGICAL_DDL_MESSAGE 0x00 diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 28ff562d62..cc0ba60905 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -233,11 +233,11 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, const char **role, const char **search_path, - bool *transactional, Size *sz); + Size *sz); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 5b1c245b72..444b75bef0 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -94,7 +94,6 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -219,7 +218,6 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dd89e08efc..cd8c69ceb8 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -445,7 +445,6 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -523,7 +522,6 @@ typedef void (*ReorderBufferStreamDDLMessageCB) ( ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -671,8 +669,8 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, +extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + const char *prefix, const char *role, const char *search_path, Size message_size, const char *message); extern void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, -- 2.32.0