From b9962108dbd00e9ca385cbae7a418772c433d267 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 21 Dec 2022 22:20:14 +0530 Subject: [PATCH v55 3/5] DDL messaging infrastructure for DDL replication. DDL messaging infrastructure for DDL replication. --- src/backend/access/rmgrdesc/Makefile | 1 + .../access/rmgrdesc/logicalddlmsgdesc.c | 52 ++++++++++++ src/backend/access/rmgrdesc/meson.build | 1 + src/backend/access/transam/rmgr.c | 1 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/ddlmessage.c | 84 +++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/rmgrlist.h | 1 + src/include/replication/ddlmessage.h | 60 +++++++++++++ 10 files changed, 203 insertions(+) create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c create mode 100644 src/backend/replication/logical/ddlmessage.c create mode 100644 src/include/replication/ddlmessage.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..2ff01e69bf 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -18,6 +18,7 @@ OBJS = \ gistdesc.o \ hashdesc.o \ heapdesc.o \ + logicalddlmsgdesc.o \ logicalmsgdesc.o \ mxactdesc.o \ nbtdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c new file mode 100644 index 0000000000..05e930c90c --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -0,0 +1,52 @@ +/*------------------------------------------------------------------------- + * + * logicalddlmsgdesc.c + * rmgr descriptor routines for replication/logical/ddlmessage.c + * + * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalddlmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/ddlmessage.h" + +void +logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_DDL_MESSAGE) + { + xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec; + char *prefix = xlrec->message; + char *message = xlrec->message + xlrec->prefix_size; + char *sep = ""; + + Assert(prefix[xlrec->prefix_size] != '\0'); + + appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ", + prefix, xlrec->message_size); + appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype); + /* Write message payload as a series of hex bytes */ + for (int cnt = 0; cnt < xlrec->message_size; cnt++) + { + appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]); + sep = " "; + } + } +} + +const char * +logicalddlmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE) + return "DDL"; + + return NULL; +} diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build index a7a7baecd1..30520b05e0 100644 --- a/src/backend/access/rmgrdesc/meson.build +++ b/src/backend/access/rmgrdesc/meson.build @@ -11,6 +11,7 @@ rmgr_desc_sources = files( 'gistdesc.c', 'hashdesc.c', 'heapdesc.c', + 'logicalddlmsgdesc.c', 'logicalmsgdesc.c', 'mxactdesc.c', 'nbtdesc.c', diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 7d67eda5f7..678e81ae01 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -27,6 +27,7 @@ #include "fmgr.h" #include "funcapi.h" #include "miscadmin.h" +#include "replication/ddlmessage.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb71..d4f29f8ffc 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ + ddlmessage.o\ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c new file mode 100644 index 0000000000..e11e56e5a3 --- /dev/null +++ b/src/backend/replication/logical/ddlmessage.c @@ -0,0 +1,84 @@ +/*------------------------------------------------------------------------- + * + * ddlmessage.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddlmessage.c + * + * NOTES + * + * Logical DDL messages allow XLOG logging of DDL command strings that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * Unlike generic logical messages, these DDL messages have only transactional + * mode. Note by default DDLs in PostgreSQL are transactional. + * + * These messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/ddlmessage.h" +#include "utils/memutils.h" + +/* + * Write logical decoding DDL message into XLog. + */ +XLogRecPtr +LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype, + const char *message, size_t size) +{ + xl_logical_ddl_message xlrec; + + /* Ensure we have a valid transaction id. */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + xlrec.dbId = MyDatabaseId; + /* Trailing zero is critical; see logicalddlmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + xlrec.relid = relid; + xlrec.cmdtype = cmdtype; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, message), size); + + /* Allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding ddl messages. + */ +void +logicalddlmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1b9f072edc..07756e90ae 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -1,6 +1,7 @@ # Copyright (c) 2022, PostgreSQL Global Development Group backend_sources += files( + 'ddlmessage.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..daf1730252 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -26,6 +26,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/ddlmessage.h" #include "replication/message.h" #include "replication/origin.h" #include "rmgrdesc.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 000bcbfdaf..eefc13a1e1 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h new file mode 100644 index 0000000000..77df6ea11a --- /dev/null +++ b/src/include/replication/ddlmessage.h @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * ddlmessage.h + * Exports from replication/logical/ddlmessage.c + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * src/include/replication/ddlmessage.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_DDL_MESSAGE_H +#define PG_LOGICAL_DDL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "nodes/nodes.h" + + +/* + * Support for keeping track of deparsed commands. + */ +typedef enum DeparsedCommandType +{ + DCT_SimpleCmd, + DCT_TableDropStart, + DCT_TableDropEnd, + DCT_TableAlter, + DCT_ObjectCreate, + DCT_ObjectDrop +} DeparsedCommandType; + +/* + * Generic logical decoding DDL message wal record. + */ +typedef struct xl_logical_ddl_message +{ + Oid dbId; /* database Oid emitted from */ + Size prefix_size; /* length of prefix including null terminator */ + Oid relid; /* id of the table */ + DeparsedCommandType cmdtype; /* type of sql command */ + Size message_size; /* size of the message */ + + /* + * payload, including null-terminated prefix of length prefix_size + */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_ddl_message; + +#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) + +extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype, + const char *ddl_message, size_t size); + +/* RMGR API*/ +#define XLOG_LOGICAL_DDL_MESSAGE 0x00 +void logicalddlmsg_redo(XLogReaderState *record); +void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalddlmsg_identify(uint8 info); + +#endif -- 2.34.1