RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | kuroda.hayato@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | TYAPR01MB5866C7ED7046BBFC1B8FDD26F55C9@TYAPR01MB5866.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
Dear Hou, I put comments for v35-0001. 01. catalog.sgml ``` + Controls how to handle the streaming of in-progress transactions: + <literal>f</literal> = disallow streaming of in-progress transactions, + <literal>t</literal> = spill the changes of in-progress transactions to + disk and apply at once after the transaction is committed on the + publisher, + <literal>p</literal> = apply changes directly using a parallel apply + worker if available (same as 't' if no worker is available) ``` I'm not sure why 't' means "spill the changes to file". Is it compatibility issue? ~~~ 02. applyworker.c - parallel_apply_stream_abort The argument abort_data is not modified in the function. Maybe "const" modifier should be added. (Other functions should be also checked...) ~~~ 03. applyparallelworker.c - parallel_apply_find_worker ``` + ParallelApplyWorkerEntry *entry = NULL; ``` This may not have to be initialized here. ~~~ 04. applyparallelworker.c - HandleParallelApplyMessages ``` + static MemoryContext hpm_context = NULL; ``` I think "hpm" means "handle parallel message", so it should be "hpam". ~~~ 05. launcher.c - logicalrep_worker_launch() ``` if (is_subworker) snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); else snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); ``` I'm not sure why there are only bgw_type even if there are three types of apply workers. Is it for compatibility? ~~~ 06. launcher.c - logicalrep_worker_stop_by_slot An assertion like Assert(slot_no >=0 && slot_no < max_logical_replication_workers) should be added at the top of this function. ~~~ 07. launcher.c - logicalrep_worker_stop_internal ``` +/* + * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and + * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die. + */ +static void +logicalrep_worker_stop_internal(LogicalRepWorker *worker) ``` I think logicalrep_worker_stop_internal() may be not "Workhorse" for logicalrep_worker_detach(). In the function internalfunction is called for parallel apply worker, and it does not main part of the detach function. ~~~ 08. worker.c - handle_streamed_transaction() ``` + TransactionId current_xid = InvalidTransactionId; ``` This initialization is not needed. This is not used in non-streaming mode, otherwise it is substituted before used. ~~~ 09. worker.c - handle_streamed_transaction() ``` + case TRANS_PARALLEL_APPLY: + /* Define a savepoint for a subxact if needed. */ + parallel_apply_start_subtrans(current_xid, stream_xid); + return false; ``` Based on other case-block, Assert(am_parallel_apply_worker()) may be added at the top of this part. This suggestion can be said for other swith-case statements. ~~~ 10. worker.c - apply_handle_stream_start ``` + * + * XXX We can avoid sending pair of the START/STOP messages to the parallel + * worker because unlike apply worker it will process only one + * transaction-at-a-time. However, it is not clear whether that is worth the + * effort because it is sent after logical_decoding_work_mem changes. ``` I can understand that START message is not needed, but is STOP really removable? If leader does not send STOP to its child,does it lose a chance to change the worker-state to IDLE_IN_TRANSACTION? ~~~ 11. worker.c - apply_handle_stream_start Currently the number of received chunks have not counted, but it can do if a variable "nchunks" is defined and incrementedin apply_handle_stream_start(). This this info may be useful to determine appropriate logical_decoding_work_memfor workloads. How do you think? ~~~ 12. worker.c - get_transaction_apply_action {} are not needed. Best Regards, Hayato Kuroda FUJITSU LIMITED
pgsql-hackers by date: