From b3dfe88f330f0cefbb4e0612d93e7182d008f474 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Tue, 24 May 2022 15:37:34 +0000 Subject: [PATCH 11/12] Remove non-transactional ddl message decoding because the use case is unclear and it has unresolved issues under concurrency. --- .../test_decoding/expected/ddlmessages.out | 42 ++++++------ contrib/test_decoding/test_decoding.c | 25 +++----- .../access/rmgrdesc/logicalddlmsgdesc.c | 3 +- src/backend/commands/tablecmds.c | 8 +-- src/backend/replication/logical/ddlmessage.c | 12 ++-- src/backend/replication/logical/decode.c | 13 +--- src/backend/replication/logical/logical.c | 26 ++++---- src/backend/replication/logical/proto.c | 12 +--- .../replication/logical/reorderbuffer.c | 64 ++++++------------- src/backend/replication/logical/worker.c | 3 +- src/backend/replication/pgoutput/pgoutput.c | 17 ++--- src/backend/tcop/utility.c | 24 ++----- src/include/replication/ddlmessage.h | 3 +- src/include/replication/logicalproto.h | 4 +- src/include/replication/output_plugin.h | 2 - src/include/replication/reorderbuffer.h | 6 +- 16 files changed, 87 insertions(+), 177 deletions(-) diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out index 823029d03d..56cbafa4cf 100644 --- a/contrib/test_decoding/expected/ddlmessages.out +++ b/contrib/test_decoding/expected/ddlmessages.out @@ -24,14 +24,14 @@ CREATE TABLE test_ddlmessage (id serial unique, data int); ALTER TABLE test_ddlmessage add c3 varchar; COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; + data +----------------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; (6 rows) -- Test logging DDL in function @@ -52,20 +52,20 @@ SELECT func_ddl ('tab_from_func'); (1 row) SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) + - RETURNS VOID AS $$ + - BEGIN + - execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + - execute format('ALTER TABLE %I ADD c3 int', tname); + - execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + - execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); + - END; + - $$ + + data +-------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) + + RETURNS VOID AS $$ + + BEGIN + + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + + execute format('ALTER TABLE %I ADD c3 int', tname); + + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); + + END; + + $$ + LANGUAGE plpgsql; - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar); - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar); + DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int BEGIN table public.tab_from_func: INSERT: id[integer]:1 name[character varying]:'a' c3[integer]:null table public.tab_from_func: INSERT: id[integer]:2 name[character varying]:'b' c3[integer]:22 diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a44e1f79e3..eb3dd76782 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -78,7 +78,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, @@ -123,7 +123,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, @@ -771,13 +771,13 @@ pg_decode_message(LogicalDecodingContext *ctx, static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + ReorderBufferTXN *txn, XLogRecPtr lsn, const char *prefix, const char *role, const char *search_path, Size sz, const char *message) { OutputPluginPrepareWrite(ctx, true); - appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", - transactional, prefix, role, search_path, sz); + appendStringInfo(ctx->out, "DDL message: prefix: %s role: %s, search_path: %s, sz: %zu content:", + prefix, role, search_path, sz); appendBinaryStringInfo(ctx->out, message, sz); OutputPluginWrite(ctx, true); } @@ -989,23 +989,14 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, */ static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + ReorderBufferTXN *txn, XLogRecPtr lsn, const char *prefix, const char * role, const char * search_path, Size sz, const char *message) { OutputPluginPrepareWrite(ctx, true); - if (transactional) - { - appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu", - transactional, prefix, role, search_path, sz); - } - else - { - appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", - transactional, prefix, role, search_path, sz); - appendBinaryStringInfo(ctx->out, message, sz); - } + appendStringInfo(ctx->out, "streaming DDL message: prefix: %s role: %s, search_path: %s, sz: %zu", + prefix, role, search_path, sz); OutputPluginWrite(ctx, true); } diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c index 7a352d540a..0aaebf8a08 100644 --- a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -32,8 +32,7 @@ logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) Assert(prefix[xlrec->prefix_size] != '\0'); - appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", - xlrec->transactional ? "transactional" : "non-transactional", + appendStringInfo(buf, "prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", prefix, role, search_path, xlrec->message_size); /* Write message payload as a series of hex bytes */ for (int cnt = 0; cnt < xlrec->message_size; cnt++) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 65386f2641..94e350b80d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -1516,13 +1516,11 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery) /* Log the Drop command for logical replication */ if (ddlxlog) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), pstate->p_sourcetext, - strlen(pstate->p_sourcetext), - transactional); + strlen(pstate->p_sourcetext)); } performMultipleDeletions(objects, drop->behavior, flags); @@ -3876,13 +3874,11 @@ RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery) if (ddlxlog && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), pstate->p_sourcetext, - strlen(pstate->p_sourcetext), - transactional); + strlen(pstate->p_sourcetext)); } /* Do the work */ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c index f93573079a..5f89afec49 100644 --- a/src/backend/replication/logical/ddlmessage.c +++ b/src/backend/replication/logical/ddlmessage.c @@ -47,7 +47,7 @@ */ XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, - size_t size, bool transactional) + size_t size) { xl_logical_ddl_message xlrec; const char *role; @@ -55,16 +55,12 @@ LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, role = GetUserNameFromId(roleoid, false); /* - * Force xid to be allocated if we're emitting a transactional message. + * Force xid to be allocated since we're emitting a transactional message. */ - if (transactional) - { - Assert(IsTransactionState()); - GetCurrentTransactionId(); - } + Assert(IsTransactionState()); + GetCurrentTransactionId(); xlrec.dbId = MyDatabaseId; - xlrec.transactional = transactional; /* trailing zero is critical; see logicalddlmsg_desc */ xlrec.prefix_size = strlen(prefix) + 1; xlrec.role_size = strlen(role) + 1; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 034c7f2413..ce5b595326 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -615,7 +615,6 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; xl_logical_ddl_message *message; if (info != XLOG_LOGICAL_DDL_MESSAGE) @@ -637,17 +636,7 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) FilterByOrigin(ctx, origin_id)) return; - if (message->transactional && - !SnapBuildProcessChange(builder, xid, buf->origptr)) - return; - else if (!message->transactional && - (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || - SnapBuildXactNeedsSkip(builder, buf->origptr))) - return; - - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); - ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr, - message->transactional, + ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr, message->message, /* first part of message is prefix */ message->message + message->prefix_size, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3004f02433..c02ea6fb99 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -74,8 +74,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, const char *search_path, + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, Size message_size, const char *message); /* streaming callbacks */ @@ -95,8 +95,8 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, const char *search_path, + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, Size message_size, const char *message); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -1233,10 +1233,9 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, - const char *search_path, Size message_size, - const char *message) + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char *search_path, + Size message_size, const char *message) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -1262,7 +1261,7 @@ ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->write_location = message_lsn; /* do the actual work: call callback */ - ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix, role, search_path, message_size, message); /* Pop the error context stack */ @@ -1586,10 +1585,9 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, - const char *prefix, const char *role, - const char* search_path, Size message_size, - const char *message) + XLogRecPtr message_lsn, const char *prefix, + const char *role, const char* search_path, + Size message_size, const char *message) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -1619,7 +1617,7 @@ stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->write_location = message_lsn; /* do the actual work: call callback */ - ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix, role, search_path, message_size, message); /* Pop the error context stack */ diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3cfa94dd8c..2072207647 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -670,16 +670,12 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, const char **role, const char **search_path, - bool *transactional, Size *sz) { - uint8 flags; const char *msg; //TODO double check when do we need to get TransactionId. - flags = pq_getmsgint(in, 1); - *transactional = (flags & MESSAGE_TRANSACTIONAL) > 0; *lsn = pq_getmsgint64(in); *prefix = pq_getmsgstring(in); *role = pq_getmsgstring(in); @@ -695,22 +691,16 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, */ void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message) { - uint8 flags = 0; pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE); - /* encode and send message flags */ - if (transactional) - flags |= MESSAGE_TRANSACTIONAL; - /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) pq_sendint32(out, xid); - pq_sendint8(out, flags); pq_sendint64(out, lsn); pq_sendstring(out, prefix); pq_sendstring(out, role); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ca01336604..f5a3247348 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -881,61 +881,33 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } /* - * A transactional DDL message is queued to be processed upon commit and a - * non-transactional DDL message gets processed immediately. + * A transactional DDL message is queued to be processed upon commit */ void ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, - Snapshot snapshot, XLogRecPtr lsn, - bool transactional, const char *prefix, + XLogRecPtr lsn, const char *prefix, const char *role, const char *search_path, Size message_size, const char *message) { - if (transactional) - { - MemoryContext oldcontext; - ReorderBufferChange *change; - - Assert(xid != InvalidTransactionId); - - oldcontext = MemoryContextSwitchTo(rb->context); - - change = ReorderBufferGetChange(rb); - change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; - change->data.ddlmsg.prefix = pstrdup(prefix); - change->data.ddlmsg.role = pstrdup(role); - change->data.ddlmsg.search_path = pstrdup(search_path); - change->data.ddlmsg.message_size = message_size; - change->data.ddlmsg.message = palloc(message_size); - memcpy(change->data.ddlmsg.message, message, message_size); + MemoryContext oldcontext; + ReorderBufferChange *change; - ReorderBufferQueueChange(rb, xid, lsn, change, false); + Assert(xid != InvalidTransactionId); - MemoryContextSwitchTo(oldcontext); - } - else - { - ReorderBufferTXN *txn = NULL; - volatile Snapshot snapshot_now = snapshot; + oldcontext = MemoryContextSwitchTo(rb->context); - if (xid != InvalidTransactionId) - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; + change->data.ddlmsg.prefix = pstrdup(prefix); + change->data.ddlmsg.role = pstrdup(role); + change->data.ddlmsg.search_path = pstrdup(search_path); + change->data.ddlmsg.message_size = message_size; + change->data.ddlmsg.message = palloc(message_size); + memcpy(change->data.ddlmsg.message, message, message_size); - /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); - PG_TRY(); - { - rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message); + ReorderBufferQueueChange(rb, xid, lsn, change, false); - TeardownHistoricSnapshot(false); - } - PG_CATCH(); - { - TeardownHistoricSnapshot(true); - PG_RE_THROW(); - } - PG_END_TRY(); - } + MemoryContextSwitchTo(oldcontext); } /* @@ -2037,14 +2009,14 @@ ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming) { if (streaming) - rb->stream_ddlmessage(rb, txn, change->lsn, true, + rb->stream_ddlmessage(rb, txn, change->lsn, change->data.ddlmsg.prefix, change->data.ddlmsg.role, change->data.ddlmsg.search_path, change->data.ddlmsg.message_size, change->data.ddlmsg.message); else - rb->ddlmessage(rb, txn, change->lsn, true, + rb->ddlmessage(rb, txn, change->lsn, change->data.ddlmsg.prefix, change->data.ddlmsg.role, change->data.ddlmsg.search_path, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 610147ca77..c61b3564a7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2461,14 +2461,13 @@ static void apply_handle_ddlmessage(StringInfo s) { XLogRecPtr lsn; - bool transactional; Size sz; const char *prefix; const char *role; const char *search_path; const char *msg; - msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz); + msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &sz); apply_execute_sql_command(msg, role, search_path, true); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index db221c7e8d..a66367fe7a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,7 +56,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -1705,13 +1705,14 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, + XLogRecPtr message_lsn, const char *prefix, const char * role, const char *search_path, Size sz, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; ListCell *lc; + PGOutputTxnData *txndata; /* Reload publications if needed before use. */ if (!publications_valid) @@ -1745,20 +1746,16 @@ pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Output BEGIN if we haven't yet. Avoid for non-transactional * messages. */ - if (transactional) - { - PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + txndata = (PGOutputTxnData *) txn->output_plugin_private; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - } + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); OutputPluginPrepareWrite(ctx, true); logicalrep_write_ddlmessage(ctx->out, xid, message_lsn, - transactional, prefix, role, search_path, diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 1abd77a60e..08278c945c 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1131,13 +1131,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } break; @@ -1173,13 +1171,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } default: break; @@ -1199,13 +1195,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) { RenameStmt *stmt = (RenameStmt *) parsetree; if(!stmt->relation && ddl_need_xlog(InvalidOid, true)){ - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } } case T_AlterOwnerStmt: /* TODO, it is data control case, save for later update */ @@ -1236,13 +1230,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) */ if (ddl_need_xlog(InvalidOid, true)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } break; } @@ -1546,13 +1538,11 @@ ProcessUtilitySlow(ParseState *pstate, isCompleteQuery && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } /* ... and do it */ @@ -1783,13 +1773,11 @@ ProcessUtilitySlow(ParseState *pstate, isCompleteQuery && ddl_need_xlog(relid, false)) { - bool transactional = true; const char* prefix = ""; LogLogicalDDLMessage(prefix, GetUserId(), queryString, - strlen(queryString), - transactional); + strlen(queryString)); } address = diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h index 1e8ef22296..0418d16dca 100644 --- a/src/include/replication/ddlmessage.h +++ b/src/include/replication/ddlmessage.h @@ -20,7 +20,6 @@ typedef struct xl_logical_ddl_message { Oid dbId; /* database Oid emitted from */ - bool transactional; /* is message transactional? */ Size prefix_size; /* length of prefix */ Size role_size; /* length of the role that executes the DDL command */ Size search_path_size; /* length of the search path */ @@ -36,7 +35,7 @@ typedef struct xl_logical_ddl_message #define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message, - size_t size, bool transactional); + size_t size); /* RMGR API*/ #define XLOG_LOGICAL_DDL_MESSAGE 0x00 diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 28ff562d62..cc0ba60905 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -233,11 +233,11 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, + const char *prefix, const char *role, const char *search_path, Size sz, const char *message); extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, const char **role, const char **search_path, - bool *transactional, Size *sz); + Size *sz); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 5b1c245b72..444b75bef0 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -94,7 +94,6 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -219,7 +218,6 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dd89e08efc..cd8c69ceb8 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -445,7 +445,6 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -523,7 +522,6 @@ typedef void (*ReorderBufferStreamDDLMessageCB) ( ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, - bool transactional, const char *prefix, const char *role, const char *search_path, @@ -671,8 +669,8 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, - bool transactional, const char *prefix, const char *role, +extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + const char *prefix, const char *role, const char *search_path, Size message_size, const char *message); extern void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, -- 2.32.0