Re: Rework LogicalOutputPluginWriterUpdateProgress - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Rework LogicalOutputPluginWriterUpdateProgress |
Date | |
Msg-id | CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com Whole thread Raw |
In response to | RE: Rework LogicalOutputPluginWriterUpdateProgress ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>) |
Responses |
RE: Rework LogicalOutputPluginWriterUpdateProgress
|
List | pgsql-hackers |
Here are some comments for the v2-0001 patch. (I haven't looked at the v3 that was posted overnight; maybe some of my comments have already been addressed.) ====== General 1. (Info from the commit message) Since we can know whether the change is an end of transaction change in the common code, we removed the LogicalDecodingContext->end_xact introduced in commit f95d53e. ~ TBH, it was not clear to me that this change was an improvement. IIUC, it removes the "unnecessary" member, but only does that by replacing it everywhere with a boolean parameter passed to update_progress_and_keepalive(). So the end result seems no less code, but it is less readable code now because you need to know what the true/false parameter means. I wonder if it would have been better just to leave this how it was. ====== src/backend/replication/logical/logical.c 2. General - blank lines There are multiple places in this file where the patch removed some statements but left blank lines. The result is 2 blank lines remaining instead of one. see change_cb_wrapper. see truncate_cb_wrapper. see stream_start_cb_wrapper. see stream_stop_cb_wrapper. see stream_change_cb_wrapper. e.g. BEFORE ctx->write_location = last_lsn; ctx->end_xact = false; /* in streaming mode, stream_stop_cb is required */ AFTER (now there are 2 blank lines) ctx->write_location = last_lsn; /* in streaming mode, stream_stop_cb is required */ ~~~ 3. General - calls to is_skip_threshold_change() + if (is_skip_threshold_change(ctx)) + update_progress_and_keepalive(ctx, false); There are multiple calls like this, which are guarding the update_progress_and_keepalive() with the is_skip_threshold_change() - See truncate_cb_wrapper - See message_cb_wrapper - See stream_change_cb_wrapper - See stream_message_cb_wrapper - See stream_truncate_cb_wrapper - See UpdateDecodingProgressAndKeepalive IIUC, then I was thinking all those conditions maybe can be pushed down *into* the wrapper, thereby making every calling code simpler. e.g. make the wrapper function code look similar to the current UpdateDecodingProgressAndKeepalive: BEFORE (update_progress_and_keepalive) { if (!ctx->update_progress_and_keepalive) return; ctx->update_progress_and_keepalive(ctx, ctx->write_location, ctx->write_xid, ctx->did_write, finished_xact); } AFTER { if (!ctx->update_progress_and_keepalive) return; if (finished_xact || is_skip_threshold_change(ctx)) { ctx->update_progress_and_keepalive(ctx, ctx->write_location, ctx->write_xid, ctx->did_write, finished_xact); } } ~~~ 4. StartupDecodingContext @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive) TBH, I find it confusing that the new parameter name ('update_progress_and_keepalive') is identical to the static function name in the same C source file. It introduces a kind of unnecessary shadowing and makes it harder to search/read the code. I suggest just calling this param something unique and local to the function like 'do_update_keepalive'. ~~~ 5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive) (Ditto previous comment #4) TBH, I find it confusing that the new parameter name ('update_progress_and_keepalive') is identical to the static function name in the same C source file. It introduces a kind of unnecessary shadowing and makes it harder to search/read the code. I suggest just calling this param something unique and local to the function like 'do_update_keepalive'. ~~~ 6. CreateDecodingContext @@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgressAndKeepalive update_progress_and_keepalive) (Ditto previous comment #4) TBH, I find it confusing that the new parameter name ('update_progress_and_keepalive') is identical to the static function name in the same C source file. It introduces a kind of unnecessary shadowing and makes it harder to search/read the code. I suggest just calling this param something unique and local to the function like 'do_update_keepalive'. ~~~ 7. OutputPluginPrepareWrite @@ -662,7 +657,7 @@ void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write) { if (!ctx->accept_writes) - elog(ERROR, "writes are only accepted in commit, begin and change callbacks"); + elog(ERROR, "writes are only accepted in callbacks in the OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and filter_prepare callbacks)"); It seems a confusing error message. Can it be worded better? Also, I noticed this flag is never used except in this one place where it throws an error, so would an "Assert" would be more appropriate here? ~~~ 8. rollback_prepared_cb_wrapper /* * If the plugin support two-phase commits then rollback prepared callback * is mandatory + * + * FIXME: This should have been caught much earlier. */ if (ctx->callbacks.rollback_prepared_cb == NULL) ~ Is this FIXME related to the current patch, or should this be an entirely different topic? ~~~ 9. is_skip_threshold_change The current usage for this function is like: if (is_skip_threshold_change(ctx)) + update_progress_and_keepalive(ctx, false); ~ IMO a better name for this function might be like 'is_change_threshold_exceeded()' (or 'is_keepalive_threshold_exceeded()' etc) because seems more readable to say if (is_change_threshold_exceeded()) do_something(); ~~~ 10. is_skip_threshold_change static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx) { static int changes_count = 0; /* used to accumulate the number of * changes */ /* If the change was published, reset the counter and return false */ if (ctx->did_write) { changes_count = 0; return false; } /* * It is possible that the data is not sent to downstream for a long time * either because the output plugin filtered it or there is a DDL that * generates a lot of data that is not processed by the plugin. So, in * such cases, the downstream can timeout. To avoid that we try to send a * keepalive message if required. Trying to send a keepalive message * after every change has some overhead, but testing showed there is no * noticeable overhead if we do it after every ~100 changes. */ #define CHANGES_THRESHOLD 100 if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD) { changes_count = 0; return true; } return false; } ~ That 2nd condition checking if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD) does not seem right. There is no need to check the ctx->did_write; it must be false because it was checked earlier in the function: BEFORE if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD) SUGGESTION1 Assert(!ctx->did_write); if (++changes_count >= CHANGES_THRESHOLD) SUGGESTION2 if (++changes_count >= CHANGES_THRESHOLD) ~~~ 11. update_progress_and_keepalive /* * Update progress tracking and send keep alive (if required). */ static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx, bool finished_xact) { if (!ctx->update_progress_and_keepalive) return; ctx->update_progress_and_keepalive(ctx, ctx->write_location, ctx->write_xid, ctx->did_write, finished_xact); } ~ Maybe it's simpler to code this without the return. e.g. if (ctx->update_progress_and_keepalive) { ctx->update_progress_and_keepalive(ctx, ctx->write_location, ctx->write_xid, ctx->did_write, finished_xact); } (it is just generic suggested code for example -- I made some other review comments overlapping this) ====== .../replication/logical/reorderbuffer.c 12. ReorderBufferAbort + UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data, + xid, lsn, !TransactionIdIsValid(txn->toplevel_xid)); + I didn't really recognise how the "!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean 'finished_xact' param. Can this call have an explanatory comment about how it works? ====== src/backend/replication/walsender.c ~~~ 13. WalSndUpdateProgressAndKeepalive - if (pending_writes || (!end_xact && + if (pending_writes || (!finished_xact && wal_sender_timeout > 0 && now >= TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2))) - ProcessPendingWrites(); + WalSndSendPending(); Is this new function name OK to be WalSndSendPending? From this code, we can see it can also be called in other scenarios even when there is nothing "pending" at all. ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: