Re: Support logical replication of DDLs - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Support logical replication of DDLs |
Date | |
Msg-id | CAHut+PtOODRybaptKRKUWZnGw-PZuLF2BxaitnMSNeAiU8-yPg@mail.gmail.com Whole thread Raw |
In response to | RE: Support logical replication of DDLs ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
RE: Support logical replication of DDLs
|
List | pgsql-hackers |
Here are some more review comments for the patch 0002-2023_04_07-2 This was a WIP review in parts because the patch was quite large: WIP part 1 [1] was posted 17/4. WIP part 2 [2] was posted 17/4. WIP part 3 [3] was posted 19/4. WIP part 4 is this post. (This is my final WIP part for this 0002 patch) ====== contrib/test_decoding/sql/ddl.sql 1. +SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394, 1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}'); I wasn't entirely sure what are these tests showing. It seems to do nothing but hardwire a bunch of random stuff and then print it out again. Am I missing something? And are those just bogus content payloads? Maybe they are valid JSON but AFAICT nobody is using them. What is the advantage of using this bogus payload data instead of just a simple string like "DDL message content goes here"? ====== contrib/test_decoding/test_decoding.c 2. _PG_output_plugin_init cb->message_cb = pg_decode_message; + cb->ddl_cb = pg_decode_ddl_message; cb->filter_prepare_cb = pg_decode_filter_prepare; Where is the 'stream_ddl_cb' to match this? ~~~ 3. pg_decode_ddl_message +static void +pg_decode_ddl_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, const char *prefix, Oid relid, + DeparsedCommandType cmdtype, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: prefix: %s, relid: %u, ", + prefix, relid); Should the appendStringInfo say "DDL message:" instead of "message"? I can't tell if this was deliberate or a cut/paste error from the existing code. ~~~ 4. pg_decode_ddl_message + appendStringInfo(ctx->out, "sz: %zu content:", sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); 4a. Should there be a whitespace after that last 'content:' before the binary content? ~ 4b. Is it necessary to say this is 'Binary'? I thought this payload was only JSON text data. ====== src/backend/replication/logical/ddltrigger.c 5. +/*------------------------------------------------------------------------- + * + * ddltrigger.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddltrigger.c + * + * NOTES + * + * Deparse the ddl command and log it. + * + * --------------------------------------------------------------------------- + */ ~ 5a. Just saying "Logical DDL messages" is the same header comment as in the other new file ddlmessges.c, so it looks like a cut/paste issue. ~ 5b. Should say 2023. ~~~ 6. publication_deparse_ddl_command_start +/* + * Deparse the ddl command and log it prior to + * execution. Currently only used for DROP TABLE command + * so that catalog can be accessed before being deleted. + * This is to check if the table is part of the publication + * or not. + */ +Datum +publication_deparse_ddl_command_start(PG_FUNCTION_ARGS) +{ + EventTriggerData *trigdata; + char *command = psprintf("Drop table command start"); Since information about this only being for DROP is hardcoded and in the function comment, shouldn't this whole function be renamed to something DROP-specific? e.g publication_deparse_ddl_drop_command_start() ~~~ 7. publication_deparse_ddl_command_start + if (relation) + table_close(relation, NoLock); I thought this check was not needed because the relation was already checked earlier in this function so it cannot be NULL here. ~~~ 8. publication_deparse_table_rewrite + char relpersist; + CollectedCommand *cmd; + char *json_string; The json_string can be declared later within the scope that uses it, instead of here at the top. ~~~ 9. publication_deparse_ddl_command_end + ListCell *lc; + slist_iter iter; + DeparsedCommandType type; + Oid relid; + char relkind; 9a. Some of these variable declarations seem misplaced. I think the 'json_string' and the 'type' can be at a lower scope, can't they? ~ 9b. Also IMO it is better to call 'type' as 'cmdtype', so it has the same name as the variable in the other slist_foreach loop. ~~~ 10. publication_deparse_ddl_command_end + foreach(lc, currentEventTriggerState->commandList) + { + char relpersist = RELPERSISTENCE_PERMANENT; + CollectedCommand *cmd = lfirst(lc); + char *json_string; This json_string can be declared later only in the scope that uses it. ~~~ 11. publication_deparse_ddl_command_end + if (cmd->type == SCT_Simple && + !OidIsValid(cmd->d.simple.address.objectId)) + continue; + + if (cmd->type == SCT_AlterTable) + { + relid = cmd->d.alterTable.objectId; + type = DCT_TableAlter; + } + else + { + /* Only SCT_Simple for now */ + relid = cmd->d.simple.address.objectId; + type = DCT_SimpleCmd; + } This code seemed structured a bit strangely to me; The comment /* Only SCT_Simple for now */ appears to be expecting something that may not be guaranteed. Perhaps the below-suggested code is closer to what was intended? SUGGESTION (should it be like this?) if (cmd->type == SCT_AlterTable) { relid = cmd->d.alterTable.objectId; type = DCT_TableAlter; } else { /* Only SCT_Simple for now */ if (cmd->type != SCT_Simple) continue; if (!OidIsValid(cmd->d.simple.address.objectId)) continue; relid = cmd->d.simple.address.objectId; type = DCT_SimpleCmd; } ~~~ 12. publication_deparse_ddl_command_end + slist_foreach(iter, &(currentEventTriggerState->SQLDropList)) + { I thought there should be some comment describing the purpose of this 2nd loop. ~~~ 13. publication_deparse_ddl_command_end + return PointerGetDatum(NULL); +} + + Double blank lines. ~~~ 14. publication_deparse_table_init_write + /* + * Do not generate wal log for commands whose target table is a temporary + * table. + * + * We will generate wal logs for unlogged tables so that unlogged tables + * can also be created and altered on the subscriber side. This makes it + * possible to directly replay the SET LOGGED command and the incoming + * rewrite message without creating a new table. + */ + if (relpersist != RELPERSISTENCE_PERMANENT) + return PointerGetDatum(NULL); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd, false); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); Some other code with the same logic to skip temporary tables is written differently to this. e.g. see publication_deparse_ddl_command_end, which looks like below: + if (relpersist == RELPERSISTENCE_PERMANENT) + { + /* + * Deparse the DDL command and WAL log it to allow decoding of the + * same. + */ + json_string = deparse_utility_command(cmd, false); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", relid, type, json_string, + strlen(json_string) + 1); + } 14a. I thought this publication_deparse_table_init_write should be coded in a similar way, instead of having 2x return PointerGetDatum(NULL); ~ 14b. Also, move the 'json_string' into this new scope (similar to the previous review comments above) ====== src/backend/replication/logical/worker.c 15. General IMO it might end up being tidier to rename all the DDL-related functions with 'ddl' in the name: e.g. preprocess_create_table --> preprocess_ddl_create_table e.g. handle_create_table --> handle_ddl_create_table ~~~ 16. preprocess_create_table +/* + * Special handling for CREATE TABLE AS and SELECT INTO + * to not populate data from the source table on the subscriber. + * Allow the data to be replicated through INSERTs on the publisher. + */ +static void +preprocess_create_table(RawStmt *command) +{ + CommandTag commandTag; + + commandTag = CreateCommandTag((Node *) command); + + switch (commandTag) + { + case CMDTAG_CREATE_TABLE_AS: + case CMDTAG_SELECT_INTO: + { + CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt; + + if (castmt->objtype == OBJECT_TABLE) + { + /* + * Force skipping data population to avoid data + * inconsistency. Data should be replicated from the + * publisher instead. + */ + castmt->into->skipData = true; + } + } + break; + case CMDTAG_SELECT: + { + SelectStmt *sstmt = (SelectStmt *) command->stmt; + + if (sstmt->intoClause != NULL) + { + /* + * Force skipping data population to avoid data + * inconsistency. Data should be replicated from the + * publisher instead. + */ + sstmt->intoClause->skipData = true; + } + } + break; + default: + break; + } +} 16a. Maybe just slightly reword the function-header comment. SUGGESTION CREATE TABLE AS and SELECT INTO require special handling to force them to skip populating data from the source table on the subscriber. Data should be replicated from the publisher instead. ~ 16b I felt it was not really necessary to have those "Force skipping data..." comments for each of the cases because those comments are pretty much saying the same thing as the function-header comment. Then several '{}' can also be removed, so the whole function becomes much shorter. ~~~ 17. handle_create_table +/* + * Handle CREATE TABLE command + * + * Call AddSubscriptionRelState for CREATE LABEL command to set the relstate to + * SUBREL_STATE_READY so DML changes on this new table can be replicated without + * having to manually run "alter subscription ... refresh publication" + */ +static void +handle_create_table(RawStmt *command) 17a. /CREATE LABEL/CREATE TABLE/ ~ 17b "alter subscription ... refresh publication" --> "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" ~ 17c. I think the function name is misleading because this is not really handling the task of table creation. IIUC the actual DDL for the CREATE TABLE performed already in the apply_handle_ddl() function. If that is correct then IMO a better name for this function is more like postprocess_create_table(). Also, that kind of naming would pair nicely with the exiting preprocess_create_table(). See also review comment #15 above, so in the end these functions could be called like: - preprocess_ddl_create_table - postprocess_ddl_create_table ~~~ 18. handle_create_table + commandTag = CreateCommandTag((Node *) command); + cstmt = (CreateStmt *) command->stmt; + rv = cstmt->relation; + + if (commandTag == CMDTAG_CREATE_TABLE) + { + cstmt = (CreateStmt *) command->stmt; + rv = cstmt->relation; + } + else + { + return; + } + + if (!rv) + return; This seemed quite strangely coded. Also, the assignment to 'cstmt' and 'rv' are duplicated (??) SUGGESTION commandTag = CreateCommandTag((Node *) command); if (commandTag != CMDTAG_CREATE_TABLE) return; cstmt = (CreateStmt *) command->stmt; rv = cstmt->relation; if (!rv) return; ~~~ 19. handle_create_table + if (relnamespace != InvalidOid) + relid = get_relname_relid(relname, relnamespace); + else + relid = RelnameGetRelid(relname); + + if (OidIsValid(relid)) 19a IMO 'relnamespace' could have an improved name like 'relnamespace_oid' ~ 19b. + if (relnamespace != InvalidOid) should match the other check SUGGESTION if (OidIsValid(relnamespace)) ~~~ 20. apply_handle_ddl +/* + * Handle DDL replication messages. + */ +static void +apply_handle_ddl(StringInfo s) This is an important function for the DDL replication logic; I felt it should have some descriptive comment to say what it is doing. ~~~ 21. apply_handle_ddl + commandTag = CreateCommandTag((Node *) command); + + /* If we got a cancel signal in parsing or prior command, quit */ + CHECK_FOR_INTERRUPTS(); + + /* Remove data population from the command */ + preprocess_create_table(command); There seems to be an assumption here that the only kind of command processed here would be TABLE related. Maybe that is currently true, but shouldn't there be some error checking just to make sure it cannot execute unexpected commands? ====== src/backend/replication/pgoutput/pgoutput.c 22. PGOutputTxnData typedef struct PGOutputTxnData { bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */ + List *deleted_relids; } PGOutputTxnData; I thought the struct comment should also have something to say about the new field 'deleted_relids', and why it is necessary. ~~~ 23. _PG_output_plugin_init @@ -254,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->ddl_cb = pgoutput_ddl; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -270,6 +278,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; + cb->stream_ddl_cb = pgoutput_ddl; This is not a new issue -- but here are some more examples of what was already mentioned in one of my previous WIP reviews. This patch needs to decide if it is going to these as 'ddl_cb' or 'ddl_message_cb' (similarly for function names and comments and string and variables etc) and then be consistent everywhere with whatever that decision is. ~~~ 24. init_txn_data +/* Initialize the per-transaction level variable for the given transaction. */ +static void +init_txn_data(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) Maybe instead of 'level variable' call this something like: SUGGESTION Initialize the per-transaction private data for the given transaction. ~~~ 25. clean_txn_data (Same as previous comment #24). SUGGESTION Clean up the per-transaction private data for the given transaction. ~~~ 26. init_txn_data/clean_txn_data Hmm, this refactoring to isolate the alloc/free of this private data and to delegate to these new functions from a number of places looked to me more like a bug-fix which is not really related to the DDL replication. I guess what has happened is that when more information (field 'deleted_relids') was added to the PGOutputTxnData it exposed this problem more visibly (??) To summarize, I thought all this stuff about init_txn_data/clean_txn_data refactoring should probably be removed from this patch and instead pushed as a separate bug fix to HEAD. What do you think? ~~~ 27. pgoutput_change + /* + * We don't publish table rewrite change unless we publish the rewrite ddl + * message. + */ + if (table_rewrite && !relentry->pubactions.pubddl_table) + return; + /change/changes/ ~~~ 28. pgoutput_change + if (table_rewrite) + RelationClose(relation); + Something doesn't seem right. AFAICT this cleanup code has been added to match the new code at the top of the function, where the "actual relation" was fetched. Meanwhile, there are also some other return points where 'table_rewrite' is true: e.g. if (table_rewrite && !relentry->pubactions.pubddl_table) return; So why is there no RelationClose(relation) for those other returns? ~~~ 29. is_object_published +/* Check if the given object is published. */ +static bool +is_object_published(LogicalDecodingContext *ctx, Oid objid) +{ + Relation relation = NULL; + RelationSyncEntry *relentry; + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + /* First check the DDL command filter. */ + switch (get_rel_relkind(objid)) + { + case RELKIND_RELATION: + relation = RelationIdGetRelation(objid); + relentry = get_rel_sync_entry(data, relation); + RelationClose(relation); + + /* + * Skip sending this ddl if we don't publish ddl message or the ddl + * need to be published via its root relation. + */ + if (!relentry->pubactions.pubddl_table || + relentry->publish_as_relid != objid) + return false; + + break; + default: + /* unsupported objects */ + return false; + } + + return true; +} The function seems back-to-front. IMO it would be better/safer if the default (the last return) was false. So, the "Skip sending this ddl if..." should be reversed to say "Only send this ddl if..." and return true only in that case. ~~~ 30. pgoutput_ddl +/* + * Send the decoded DDL over wire. + */ +static void +pgoutput_ddl(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Should that comment be written more like "Send the decoded DDL message"? ~~~ 31. pgoutput_ddl + switch (cmdtype) + { + case DCT_TableDropStart: + { + MemoryContext old; Probably 'oldctx' would be a more meaningful/typical name for this instead of just 'old' ~~~ 32. pgoutput_ddl + case DCT_TableAlter: + + /* + * For table rewrite ddl, we first send the original ddl message + * to subscriber, then convert the upcoming rewrite INSERT to + * UPDATE and send them to subscriber so that the data between + * publisher and subscriber can always be consistent. + * + * We do this way because of two reason: + * + * (1) The data before the rewrite ddl could already be different + * among publisher and subscriber. To make sure the extra data in + * subscriber which doesn't exist in publisher also get rewritten, + * we need to let the subscriber execute the original rewrite ddl + * to rewrite all the data at first. + * + * (2) the data after executing rewrite ddl could be different + * among publisher and subscriber(due to different + * functions/operators used during rewrite), so we need to + * replicate the rewrite UPDATEs to keep the data consistent. + * + * TO IMPROVE: We could improve this by letting the subscriber + * only rewrite the extra data instead of doing fully rewrite and + * use the upcoming rewrite UPDATEs to rewrite the rest data. + * Besides, we may not need to send rewrite changes for all type + * of rewrite ddl, for example, it seems fine to skip sending + * rewrite changes for ALTER TABLE SET LOGGED as the data in the + * table doesn't actually be changed. + */ + break; 32a. I think this giant comment is the same as the Commit Message. A previous WIP review ([2]?) already gave some suggestions for this text. Please make sure the text in both places matches. ~ 32b. IIUC this comment is referring to the pgoutput_change code for REORDER_BUFFER_CHANGE_INSERT which converts to UPDATE for table_rewrite. If that is correct, probably this comment should cross-reference to that other function to give the reader more information. ~ 32c. Instead of "TO IMPROVE", I think it is more conventional to write "XXX:" in a code comment. ~~~ 33. reload_publications +/* Reload publications if needed. */ +static void +reload_publications(PGOutputData *data) +{ + MemoryContext oldctx; + + if (!publications_valid) + { + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + if (data->publications) + { + list_free_deep(data->publications); + data->publications = NIL; + } + data->publications = LoadPublications(data->publication_names); + MemoryContextSwitchTo(oldctx); + publications_valid = true; + } +} + + 33a. AFAICT this appears to be a general cleanup refactoring that is not really related to the DDL replication patch. So I felt this can be removed from this patch and applied as a separate patch to HEAD. ~ 33b. Double blank lines after this function ~~~ 34. get_rel_sync_entry entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = + entry->pubactions.pubddl_table = false; entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); Continually adding to these assignment has got a bit out of control... IMO the code now would be better written as: memset(entry->pubactions, 0, sizeof(entry->pubactions)); And doing this would also be consistent with the similar code for entry->exprstate (just a couple of lines below here). ~~~ 35. get_rel_sync_entry entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->pubactions.pubddl_table = false; (same as above review comment #35) IMO all this should be written more simply as: memset(entry->pubactions, 0, sizeof(entry->pubactions)); ------ [1] https://www.postgresql.org/message-id/CAHut%2BPtzpuuRFrLnjkQePq296ip_0WfmQ4CtydM9JDR6gEf%3DQw%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAHut%2BPtMkVoweJrd%3DSLw7BfpW883skasdnemoj4N19NnyjrT3Q%40mail.gmail.com [3] https://www.postgresql.org/message-id/CAHut+PuG8J8uA5V-F-o4TczhvFSWGG1B8qL+EZO0HjWWEEYG+g@mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: