From f204faddba7d7f634f21dbc7e0caf7a1dd2ddde8 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Fri, 18 Mar 2022 16:57:23 +0000 Subject: [PATCH v3 2/3] Support logical logging and decoding of DDL command string. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A new WAL record type xl_logical_ddl_message is introduced to support logical logging of DDL command. xl_logical_ddl_message is similar to the existing xl_logical_message for generic message logging. The reason for not using xl_logical_message directly as proposed initially is I found out we need to log more information (such as user role, search path and potentially more in the future) than just one string, and we don’t want to make too much changes to the existing xl_logical_message which may break its current consumers. The logging of DDL command string is processed in function LogLogicalDDLCommand. We categorize DDL command types into three categories in this function: 1. replicated in database level replication only (such as CREATE TABLE, CREATE FUNCTION). 2. replicated in database or table level replication depending on the configuration (such as ALTER TABLE). 3. not supported for replication or pending investigation. Support logical decoding of the new WAL record xl_logical_ddl_message. This is similar to the logical decoding of xl_logical_message. Tests for this change are added in the test_decoding plugin. --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/ddlmessages.out | 50 +++++ contrib/test_decoding/sql/ddlmessages.sql | 31 +++ contrib/test_decoding/test_decoding.c | 61 +++++- src/backend/access/rmgrdesc/Makefile | 1 + .../access/rmgrdesc/logicalddlmsgdesc.c | 54 +++++ src/backend/access/transam/rmgr.c | 1 + src/backend/catalog/pg_publication.c | 52 +++++ src/backend/commands/tablecmds.c | 43 +++- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/ddlmessage.c | 99 +++++++++ src/backend/replication/logical/decode.c | 56 +++++ src/backend/replication/logical/logical.c | 91 ++++++++ .../replication/logical/reorderbuffer.c | 204 +++++++++++++++++- src/backend/tcop/utility.c | 201 ++++++++++++++++- src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/rmgrlist.h | 1 + src/include/commands/tablecmds.h | 3 +- src/include/replication/ddlmessage.h | 47 ++++ src/include/replication/decode.h | 1 + src/include/replication/output_plugin.h | 29 +++ src/include/replication/reorderbuffer.h | 39 ++++ 22 files changed, 1055 insertions(+), 13 deletions(-) create mode 100644 contrib/test_decoding/expected/ddlmessages.out create mode 100644 contrib/test_decoding/sql/ddlmessages.sql create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c create mode 100644 src/backend/replication/logical/ddlmessage.c create mode 100644 src/include/replication/ddlmessage.h diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 36929dd97d..d5091c568d 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -4,7 +4,7 @@ MODULES = test_decoding PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ - decoding_into_rel binary prepared replorigin time messages \ + decoding_into_rel binary prepared replorigin time messages ddlmessages\ spill slot truncate stream stats twophase twophase_stream \ sequence ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out new file mode 100644 index 0000000000..40fd07f079 --- /dev/null +++ b/contrib/test_decoding/expected/ddlmessages.out @@ -0,0 +1,50 @@ +-- predictability +SET synchronous_commit = on; +-- turn on logical ddl message logging +CREATE publication mypub FOR ALL TABLES with (ddl = 'database'); +-- SET USER +CREATE ROLE ddl_replication_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'ddl_replication_user'; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +ALTER TABLE tab1 drop c3; +DROP TABLE tab1; +BEGIN; +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +ROLLBACK; +BEGIN; +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + sequence public.tab1_id_seq: transactional:1 last_value: 1 log_cnt: 0 is_called:0 + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 47 content:CREATE TABLE tab1 (id serial unique, data int); + COMMIT + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 32 content:ALTER TABLE tab1 add c3 varchar; + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 25 content:ALTER TABLE tab1 drop c3; + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 16 content:DROP TABLE tab1; + BEGIN + sequence public.tab1_id_seq: transactional:1 last_value: 1 log_cnt: 0 is_called:0 + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 47 content:CREATE TABLE tab1 (id serial unique, data int); + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 32 content:ALTER TABLE tab1 add c3 varchar; + COMMIT +(12 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +DROP TABLE tab1; +DROP publication mypub; diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql new file mode 100644 index 0000000000..132361fbd5 --- /dev/null +++ b/contrib/test_decoding/sql/ddlmessages.sql @@ -0,0 +1,31 @@ +-- predictability +SET synchronous_commit = on; +-- turn on logical ddl message logging +CREATE publication mypub FOR ALL TABLES with (ddl = 'database'); + +-- SET USER +CREATE ROLE ddl_replication_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'ddl_replication_user'; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +ALTER TABLE tab1 drop c3; +DROP TABLE tab1; + +BEGIN; +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +ROLLBACK; + +BEGIN; +CREATE TABLE tab1 (id serial unique, data int); +ALTER TABLE tab1 add c3 varchar; +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); +DROP TABLE tab1; +DROP publication mypub; + diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ea22649e41..5e02b274c5 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -77,6 +77,11 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size sz, const char *message); static void pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, Relation rel, bool transactional, @@ -121,6 +126,11 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size sz, const char *message); static void pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, Relation rel, bool transactional, @@ -150,6 +160,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->ddlmessage_cb = pg_decode_ddlmessage; cb->sequence_cb = pg_decode_sequence; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; @@ -163,6 +174,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; + cb->stream_ddlmessage_cb = pg_decode_stream_ddlmessage; cb->stream_sequence_cb = pg_decode_stream_sequence; cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -758,7 +770,8 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, - const char *prefix, Size sz, const char *message) + const char *prefix, Size sz, + const char *message) { OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", @@ -799,6 +812,19 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pg_decode_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", + transactional, prefix, role, search_path, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) @@ -979,7 +1005,8 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, - const char *prefix, Size sz, const char *message) + const char *prefix, Size sz, + const char *message) { OutputPluginPrepareWrite(ctx, true); @@ -991,7 +1018,35 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, else { appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", - transactional, prefix, sz); + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + } + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the contents for transactional messages + * as the transaction can abort at a later point in time. We don't want users to + * see the message contents until the transaction is committed. + */ +static void +pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, const char * role, const char * search_path, + Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + + if (transactional) + { + appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu", + transactional, prefix, role, search_path, sz); + } + else + { + appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", + transactional, prefix, role, search_path, sz); appendBinaryStringInfo(ctx->out, message, sz); } diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..b8e29e8df3 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -19,6 +19,7 @@ OBJS = \ hashdesc.o \ heapdesc.o \ logicalmsgdesc.o \ + logicalddlmsgdesc.o \ mxactdesc.o \ nbtdesc.o \ relmapdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c new file mode 100644 index 0000000000..7a352d540a --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * logicalddlmsgdesc.c + * rmgr descriptor routines for replication/logical/ddlmessage.c + * + * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalddlmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/ddlmessage.h" + +void +logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_DDL_MESSAGE) + { + xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec; + char *prefix = xlrec->message; + char *role = xlrec->message + xlrec->prefix_size; + char *search_path = xlrec->message + xlrec->prefix_size + xlrec->role_size; + char *message = xlrec->message + xlrec->prefix_size + xlrec->role_size + xlrec->search_path_size; + char *sep = ""; + + Assert(prefix[xlrec->prefix_size] != '\0'); + + appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", + xlrec->transactional ? "transactional" : "non-transactional", + prefix, role, search_path, xlrec->message_size); + /* Write message payload as a series of hex bytes */ + for (int cnt = 0; cnt < xlrec->message_size; cnt++) + { + appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]); + sep = " "; + } + } +} + +const char * +logicalddlmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE) + return "DDL MESSAGE"; + + return NULL; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aeb..b761662d28 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -26,6 +26,7 @@ #include "commands/tablespace.h" #include "replication/decode.h" #include "replication/message.h" +#include "replication/ddlmessage.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 00b5673b8f..f2d11c3847 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1007,3 +1007,55 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Checks if DDL on relation (relid) need xlog for logical replication + */ +bool +ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel) +{ + List *allTablePubs = NIL; + List *tablePubs = NIL; + ListCell *lc; + + /* Only replicate toplevel DDL command */ + if (!isTopLevel) + return false; + if (relid == InvalidOid && !forAllTabPubOnly) + return false; + + /* + * Log the DDL command if + * there is any FOR ALL TABLES publication with pubddl_database on + * or + * this TABLE belongs to any non FOR ALL publications with pubddl_table on + */ + allTablePubs = GetAllTablesPublications(); + foreach(lc, allTablePubs) + { + Oid pubid = lfirst_oid(lc); + Publication *pub = GetPublication(pubid); + + if (pub->pubactions.pubddl_database) + return true; + } + + /* + * If forAllTabPubOnly is true (i.e. database level replication is required for the DDL + * to be logged), we can bail now since no publication has been found with pubddl_database on + */ + if (forAllTabPubOnly) + return false; + + tablePubs = GetRelationPublications(relid); + foreach(lc, tablePubs) + { + Oid pubid = lfirst_oid(lc); + Publication *pub = GetPublication(pubid); + + if (pub->pubactions.pubddl_table) + return true; + } + + return false; +} diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 80faae985e..f8be71cbeb 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -80,6 +80,7 @@ #include "partitioning/partbounds.h" #include "partitioning/partdesc.h" #include "pgstat.h" +#include "replication/ddlmessage.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" @@ -1332,13 +1333,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind) * DROP MATERIALIZED VIEW, DROP FOREIGN TABLE */ void -RemoveRelations(DropStmt *drop) +RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel) { ObjectAddresses *objects; char relkind; ListCell *cell; int flags = 0; LOCKMODE lockmode = AccessExclusiveLock; + bool ddlxlog = XLogLogicalInfoActive(); /* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */ if (drop->concurrent) @@ -1437,10 +1439,37 @@ RemoveRelations(DropStmt *drop) /* Not there? */ if (!OidIsValid(relOid)) { + ddlxlog = false; DropErrorMsgNonExistent(rel, relkind, drop->missing_ok); continue; } + /* + * Only log DROP RELATION cmd for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on or + * every relation to be dropped belongs to any non FOR ALL publications with pubddl_table on + */ + if (ddlxlog) + { + Oid tableOid = InvalidOid; + + if (relkind == RELKIND_RELATION) + tableOid = relOid; + else if (relkind == RELKIND_INDEX) + tableOid = IndexGetRelation(relOid, true); + /* + * Other relation types require database level ddl replication and are + * already logged in LogLogicalDDLCommand() if needed. + */ + else + ddlxlog = false; + + /* DROP RELATION or INDEX are allowed in table level DDL replication */ + if (tableOid != InvalidOid && + !ddl_need_xlog(tableOid, false, isTopLevel)) + ddlxlog = false; + } + /* * Decide if concurrent mode needs to be used here or not. The * callback retrieved the rel's persistence for us. @@ -1484,6 +1513,18 @@ RemoveRelations(DropStmt *drop) add_exact_object_address(&obj, objects); } + /* Log the Drop command for logical replication */ + if (ddlxlog) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + pstate->p_sourcetext, + strlen(pstate->p_sourcetext), + transactional); + } + performMultipleDeletions(objects, drop->behavior, flags); free_object_addresses(objects); diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb71..f3eeb67312 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ decode.o \ + ddlmessage.o\ launcher.o \ logical.o \ logicalfuncs.o \ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c new file mode 100644 index 0000000000..f93573079a --- /dev/null +++ b/src/backend/replication/logical/ddlmessage.c @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * ddlmessage.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddlmessage.c + * + * NOTES + * + * Logical DDL messages allow XLOG logging of DDL command strings that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * Simiarl to the generic logical messages, These DDL messages can be either + * transactional or non-transactional. Note by default DDLs in PostgreSQL are + * transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. This also means that transactional + * messages won't be delivered if the transaction was rolled back but the + * non-transactional one will always be delivered. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/ddlmessage.h" +#include "utils/memutils.h" + +/* + * Write logical decoding DDL message into XLog. + */ +XLogRecPtr +LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, + size_t size, bool transactional) +{ + xl_logical_ddl_message xlrec; + const char *role; + + role = GetUserNameFromId(roleoid, false); + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.dbId = MyDatabaseId; + xlrec.transactional = transactional; + /* trailing zero is critical; see logicalddlmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.role_size = strlen(role) + 1; + xlrec.search_path_size = strlen(namespace_search_path) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, role), xlrec.role_size); + XLogRegisterData(namespace_search_path, xlrec.search_path_size); + XLogRegisterData(unconstify(char *, message), size); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding ddl messages. + */ +void +logicalddlmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 77bc7aea7a..8c1188f128 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -36,6 +36,7 @@ #include "access/xlogutils.h" #include "catalog/pg_control.h" #include "replication/decode.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/origin.h" @@ -605,6 +606,61 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } +/* + * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +void +logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); + Snapshot snapshot; + xl_logical_ddl_message *message; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding ddl messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + message = (xl_logical_ddl_message *) XLogRecGetData(r); + + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!message->transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr, + message->transactional, + message->message, + /* first part of message is prefix */ + message->message + message->prefix_size, + /* Second part of message is role*/ + message->message + message->prefix_size + message->role_size, + /* Third part of message is search_path */ + message->message_size, + message->message + message->prefix_size + + message->role_size + message->search_path_size); +} + /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..4e84705007 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size message_size, const char *message); static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, Relation rel, bool transactional, @@ -94,6 +98,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size message_size, const char *message); static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, Relation rel, bool transactional, @@ -226,6 +234,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->ddlmessage = ddlmessage_cb_wrapper; ctx->reorder->sequence = sequence_cb_wrapper; /* @@ -243,6 +252,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_ddlmessage_cb != NULL) || (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); @@ -261,6 +271,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper; ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1250,6 +1261,44 @@ sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, + const char *search_path, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + role, search_path, message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) @@ -1596,6 +1645,48 @@ stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, + const char* search_path, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + role, search_path, message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c2d9be81fa..599dcef6bf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -555,6 +555,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + if (change->data.ddlmsg.prefix != NULL) + pfree(change->data.ddlmsg.prefix); + change->data.ddlmsg.prefix = NULL; + if (change->data.ddlmsg.role != NULL) + pfree(change->data.ddlmsg.role); + change->data.ddlmsg.role = NULL; + if (change->data.ddlmsg.search_path != NULL) + pfree(change->data.ddlmsg.search_path); + change->data.ddlmsg.search_path = NULL; + if (change->data.ddlmsg.message != NULL) + pfree(change->data.ddlmsg.message); + change->data.ddlmsg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INVALIDATION: if (change->data.inval.invalidations) pfree(change->data.inval.invalidations); @@ -1140,6 +1154,64 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, } } +/* + * A transactional DDL message is queued to be processed upon commit and a + * non-transactional DDL message gets processed immediately. + */ +void +ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size message_size, const char *message) +{ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; + change->data.ddlmsg.prefix = pstrdup(prefix); + change->data.ddlmsg.role = pstrdup(role); + change->data.ddlmsg.search_path = pstrdup(search_path); + change->data.ddlmsg.message_size = message_size; + change->data.ddlmsg.message = palloc(message_size); + memcpy(change->data.ddlmsg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + PG_TRY(); + { + rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message); + + TeardownHistoricSnapshot(false); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -2234,6 +2306,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying the DDL message. + */ +static inline void +ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_ddlmessage(rb, txn, change->lsn, true, + change->data.ddlmsg.prefix, + change->data.ddlmsg.role, + change->data.ddlmsg.search_path, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); + else + rb->ddlmessage(rb, txn, change->lsn, true, + change->data.ddlmsg.prefix, + change->data.ddlmsg.role, + change->data.ddlmsg.search_path, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); +} + /* * Helper function for ReorderBufferProcessTXN for applying sequences. */ @@ -2635,6 +2730,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferApplyMessage(rb, txn, change, streaming); break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + ReorderBufferApplyDDLMessage(rb, txn, change, streaming); + break; + case REORDER_BUFFER_CHANGE_INVALIDATION: /* Execute the invalidation messages locally */ ReorderBufferExecuteInvalidations( @@ -4034,6 +4133,53 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + Size role_size = strlen(change->data.ddlmsg.role) + 1; + Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1; + + sz += prefix_size + role_size + search_path_size + + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.prefix, + prefix_size); + data += prefix_size; + + /* write the role including the size */ + memcpy(data, &role_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.role, + role_size); + data += role_size; + + /* write the search_path including the size */ + memcpy(data, &search_path_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.search_path, + search_path_size); + data += search_path_size; + + /* write the message including the size */ + memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.message, + change->data.ddlmsg.message_size); + data += change->data.ddlmsg.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4381,6 +4527,18 @@ ReorderBufferChangeSize(ReorderBufferChange *change) sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + Size role_size = strlen(change->data.ddlmsg.role) + 1; + Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1; + + sz += prefix_size + role_size + search_path_size + + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size); + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4657,8 +4815,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* read prefix */ memcpy(&prefix_size, data, sizeof(Size)); data += sizeof(Size); - change->data.msg.prefix = MemoryContextAlloc(rb->context, - prefix_size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size); memcpy(change->data.msg.prefix, data, prefix_size); Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); data += prefix_size; @@ -4672,6 +4829,49 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size; + Size role_size; + Size search_path_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size); + memcpy(change->data.ddlmsg.prefix, data, prefix_size); + Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0'); + data += prefix_size; + + /* read role */ + memcpy(&role_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.role = MemoryContextAlloc(rb->context, + role_size); + memcpy(change->data.ddlmsg.role, data, role_size); + Assert(change->data.ddlmsg.role[role_size - 1] == '\0'); + data += role_size; + + /* read search_path */ + memcpy(&search_path_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.search_path = MemoryContextAlloc(rb->context, + search_path_size); + memcpy(change->data.ddlmsg.search_path, data, search_path_size); + Assert(change->data.ddlmsg.search_path[search_path_size - 1] == '\0'); + data += search_path_size; + + /* read the message */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 3780c6e812..5881371aac 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -62,6 +62,7 @@ #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" +#include "replication/ddlmessage.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" #include "storage/fd.h" @@ -86,7 +87,7 @@ static void ProcessUtilitySlow(ParseState *pstate, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc); -static void ExecDropStmt(DropStmt *stmt, bool isTopLevel); +static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel); /* * CommandIsReadOnly: is an executable query read-only? @@ -986,7 +987,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, context, params, queryEnv, dest, qc); else - ExecDropStmt(stmt, isTopLevel); + ExecDropStmt(pstate, stmt, isTopLevel); } break; @@ -1086,6 +1087,154 @@ standard_ProcessUtility(PlannedStmt *pstmt, CommandCounterIncrement(); } +/* + * Log a DDL command for logical replication + * Some DDLs are only replicated in Database Level DDL replication + * Some can be replicated in Table Level DDL replication. + * + * Currently we focus on supporting Database Level DDL replication + */ +static void +LogLogicalDDLCommand(Node *parsetree, const char *queryString) +{ + switch (nodeTag(parsetree)) + { + /* Fisrtly, commands that are only supported in Database level DDL replication */ + case T_CreateSchemaStmt: + case T_CreateStmt: + case T_CreateForeignTableStmt: + case T_AlterDomainStmt: + case T_DefineStmt: + case T_CompositeTypeStmt: + case T_CreateEnumStmt: + case T_CreateRangeStmt: + case T_AlterEnumStmt: + case T_ViewStmt: + case T_CreateFunctionStmt: + case T_AlterFunctionStmt: + case T_CreateTrigStmt: + case T_CreateDomainStmt: + case T_CreateCastStmt: + case T_CreateOpClassStmt: + case T_CreateOpFamilyStmt: + case T_AlterOpFamilyStmt: + case T_AlterOperatorStmt: + case T_AlterTypeStmt: + case T_GrantStmt: + case T_AlterCollationStmt: + /* + * Log these stmt for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on. + * i.e. Database level DDL replication is on for some publication. + */ + if (ddl_need_xlog(InvalidOid, true, true)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + break; + + /* + * Secondly, commands that may be allowed in Table level DDL replication. + * These are currently handled in the later execution path of the command. + * Because we need to get the relation id which readily available in later + * code path. + */ + case T_AlterTableStmt: + case T_IndexStmt: + case T_RenameStmt: /* TODO */ + case T_AlterOwnerStmt: /* TODO */ + break; + + /* DropStmt depends on the removeType */ + case T_DropStmt: + { + DropStmt* stmt = (DropStmt*) parsetree; + switch (stmt->removeType) + { + /* Maybe allowed in Table level DDL replication, handled in later code path */ + case OBJECT_INDEX: + case OBJECT_TABLE: + break; + /* Drop of sequence is by logical replication of sequences separately */ + case OBJECT_SEQUENCE: + break; + /* Drop of other objects are allowed in Database level DDL replication only */ + case OBJECT_VIEW: + case OBJECT_MATVIEW: + case OBJECT_FOREIGN_TABLE: + default: + /* + * Log these DropStmt for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on. + * i.e. Database level DDL replication is on for some publication. + */ + if (ddl_need_xlog(InvalidOid, true, true)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + break; + } + } + /* + * Lastly, rule out DDLs we don't replicate yet in DDL replication + * Some of these can be supported, we just need to investigate and run tests. + */ + case T_CreateExtensionStmt: + case T_AlterExtensionStmt: + case T_AlterExtensionContentsStmt: + case T_CreateFdwStmt: + case T_AlterFdwStmt: + case T_CreateForeignServerStmt: + case T_AlterForeignServerStmt: + case T_CreateUserMappingStmt: + case T_AlterUserMappingStmt: + case T_DropUserMappingStmt: + case T_ImportForeignSchemaStmt: + case T_RuleStmt: + case T_CreateSeqStmt: + case T_AlterSeqStmt: + case T_CreateTableAsStmt: + case T_RefreshMatViewStmt: + case T_CreatePLangStmt: + case T_CreateConversionStmt: + case T_CreateTransformStmt: + case T_AlterTSDictionaryStmt: + case T_AlterTSConfigurationStmt: + case T_AlterTableMoveAllStmt: + case T_AlterObjectDependsStmt: + case T_AlterObjectSchemaStmt: + case T_CommentStmt: + case T_DropOwnedStmt: + case T_AlterDefaultPrivilegesStmt: + case T_CreatePolicyStmt: + case T_AlterPolicyStmt: + case T_SecLabelStmt: + case T_CreateAmStmt: + case T_CreatePublicationStmt: + case T_AlterPublicationStmt: + case T_CreateSubscriptionStmt: + case T_AlterSubscriptionStmt: + case T_DropSubscriptionStmt: + case T_CreateStatsStmt: + case T_AlterStatsStmt: + break; + default: + break; + } +} + /* * The "Slow" variant of ProcessUtility should only receive statements * supported by the event triggers facility. Therefore, we always @@ -1320,6 +1469,23 @@ ProcessUtilitySlow(ParseState *pstate, EventTriggerAlterTableStart(parsetree); EventTriggerAlterTableRelid(relid); + /* + * Log the ALTER TABLE command if + * There is any publication with database level ddl on or + * this TABLE belongs to any publication with table level ddl on + */ + if (XLogLogicalInfoActive() && + ddl_need_xlog(relid, false, isTopLevel)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + /* ... and do it */ AlterTable(atstmt, lockmode, &atcontext); @@ -1538,6 +1704,24 @@ ProcessUtilitySlow(ParseState *pstate, /* ... and do it */ EventTriggerAlterTableStart(parsetree); + + /* + * Log CREATE INDEX cmd for logical replication if + * there is any publication with database level ddl on or + * this TABLE belongs to any publication with table level ddl on. + */ + if (XLogLogicalInfoActive() && + ddl_need_xlog(relid, false, isTopLevel)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + address = DefineIndex(relid, /* OID of heap relation */ stmt, @@ -1754,7 +1938,7 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_DropStmt: - ExecDropStmt((DropStmt *) parsetree, isTopLevel); + ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel); /* no commands stashed for DROP */ commandCollected = true; break; @@ -1912,6 +2096,13 @@ ProcessUtilitySlow(ParseState *pstate, if (isCompleteQuery) { + /* + * Consider logging the DDL command if logical logging is enabled and this is + * a complete top level query. + */ + if (XLogLogicalInfoActive() && isTopLevel) + LogLogicalDDLCommand(parsetree, queryString); + EventTriggerSQLDrop(parsetree); EventTriggerDDLCommandEnd(parsetree); } @@ -1975,7 +2166,7 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context) * Dispatch function for DropStmt */ static void -ExecDropStmt(DropStmt *stmt, bool isTopLevel) +ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel) { switch (stmt->removeType) { @@ -1990,7 +2181,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel) case OBJECT_VIEW: case OBJECT_MATVIEW: case OBJECT_FOREIGN_TABLE: - RemoveRelations(stmt); + RemoveRelations(pstate, stmt, isTopLevel); break; default: RemoveObjects(stmt); diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6a4ebd1310..c38c163f28 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -27,6 +27,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/message.h" +#include "replication/ddlmessage.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index cf8b6d4819..5ffdc16cca 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode) diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 5d4037f26e..68781365de 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -18,6 +18,7 @@ #include "catalog/dependency.h" #include "catalog/objectaddress.h" #include "nodes/parsenodes.h" +#include "parser/parse_node.h" #include "storage/lock.h" #include "utils/relcache.h" @@ -27,7 +28,7 @@ struct AlterTableUtilityContext; /* avoid including tcop/utility.h here */ extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, ObjectAddress *typaddress, const char *queryString); -extern void RemoveRelations(DropStmt *drop); +extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel); extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode); diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h new file mode 100644 index 0000000000..1e8ef22296 --- /dev/null +++ b/src/include/replication/ddlmessage.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * ddlmessage.h + * Exports from replication/logical/ddlmessage.c + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * src/include/replication/ddlmessage.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_DDL_MESSAGE_H +#define PG_LOGICAL_DDL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding DDL message wal record. + */ +typedef struct xl_logical_ddl_message +{ + Oid dbId; /* database Oid emitted from */ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size role_size; /* length of the role that executes the DDL command */ + Size search_path_size; /* length of the search path */ + Size message_size; /* size of the message */ + /* + * payload, including null-terminated prefix of length prefix_size + * and null-terminated role of length role_size + * and null-terminated search_path of length search_path_size + */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_ddl_message; + +#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) + +extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message, + size_t size, bool transactional); + +/* RMGR API*/ +#define XLOG_LOGICAL_DDL_MESSAGE 0x00 +void logicalddlmsg_redo(XLogReaderState *record); +void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalddlmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_DDL_MESSAGE_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 8e07bb7409..ea0f7e5cb9 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -28,6 +28,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..f0fd1ea895 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the logical decoding DDL messages. + */ +typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size message_size, + const char *message); + /* * Called for the generic logical decoding sequences. */ @@ -211,6 +224,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx Size message_size, const char *message); +/* + * Callback for streaming logical decoding DDL messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size message_size, + const char *message); + /* * Called for the streaming generic logical decoding sequences from in-progress * transactions. @@ -244,6 +271,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeDDLMessageCB ddlmessage_cb; LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -263,6 +291,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb; LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0bcc150b33..0819361d25 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -56,6 +56,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_DDLMESSAGE, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, @@ -131,6 +132,16 @@ typedef struct ReorderBufferChange char *message; } msg; + /* DDL Message. */ + struct + { + char *prefix; + char *role; + char *search_path; + Size message_size; + char *message; + } ddlmsg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -438,6 +449,17 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* DDL message callback signature */ +typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size sz, + const char *message); + /* sequence callback signature */ typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -513,6 +535,18 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream DDL message callback signature */ +typedef void (*ReorderBufferStreamDDLMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size sz, + const char *message); + /* stream sequence callback signature */ typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -573,6 +607,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferDDLMessageCB ddlmessage; ReorderBufferSequenceCB sequence; /* @@ -593,6 +628,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamDDLMessageCB stream_ddlmessage; ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; @@ -669,6 +705,9 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, const char *role, + const char *search_path, Size message_size, const char *message); void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, RelFileNode rnode, bool transactional, bool created, -- 2.25.1