diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 7422521626..7a30828d44 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -138,10 +138,11 @@ * wait to send messages, and this wait doesn't appear in lmgr. * * To avoid this wait, we use a non-blocking write and wait with a timeout. If - * the timeout is exceeded, the LA will serialize the message to a file and - * indicate PA-2 that it needs to read that file for the remaining messages. - * Then LA will start waiting for commit which will detect deadlock if any. - * See pa_send_data() and enum TransApplyAction. + * the timeout is exceeded, the LA will serialize all the pending messages to + * a file and indicate PA-2 that it needs to read that file for the remaining + * messages. Then LA will start waiting for commit as in the previous case + * which will detect deadlock if any. See pa_send_data() and + * enum TransApplyAction. * * 4) Lock types * @@ -575,11 +576,13 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) /* * Stop the worker if there are enough workers in the pool. * - * XXX The worker is also stopped if the leader apply worker needed to + * XXX Additionally, we also stop the worker if the leader apply worker * serialize part of the transaction data due to a send timeout. This is - * because the message could be partially written to the queue but there is + * because the message could be partially written to the queue and there is * no way to clean the queue other than resending the message until it - * succeeds. Directly stopping the worker avoids needing this complexity. + * succeeds. Instead of trying to send the data which anyway would have + * been serialized and then letting the parallel apply worker deal with the + * spurious message, we stop the worker. */ if (winfo->serialize_changes || list_length(ParallelApplyWorkerPool) > @@ -652,7 +655,7 @@ pa_detach_all_error_mq(void) } /* - * Check if the parallel apply worker is pending due to spooled messages. + * Check if there are any pending spooled messages. */ static bool pa_has_spooled_message_pending() @@ -665,8 +668,8 @@ pa_has_spooled_message_pending() } /* - * Replay the spooled messages in the parallel apply worker if the leader apply - * worker has finished serializing changes to the file. + * Replay the spooled messages once the leader apply worker has finished + * serializing changes to the file. */ static void pa_spooled_messages(void) @@ -699,8 +702,8 @@ pa_spooled_messages(void) * We cannot read the file immediately after the leader has serialized all * changes to the file because there may still be messages in the memory * queue. We will apply all spooled messages the next time we call this - * function, which should ensure that there are no messages left in the - * memory queue. + * function and that will ensure there are no messages left in the memory + * queue. */ else if (fileset_state == FS_SERIALIZE_DONE) { @@ -811,7 +814,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { - /* Check if changes have been serialized to a file. */ + /* Replay the changes from the file, if any. */ if (pa_has_spooled_message_pending()) { pa_spooled_messages(); @@ -1207,8 +1210,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means * that the current data and any subsequent data for this transaction will be * serialized to a file. This is done to prevent possible deadlocks with - * another parallel apply worker (refer to the comments atop - * applyparallelworker.c for details). + * another parallel apply worker (refer to the comments atop this file). */ void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, @@ -1217,7 +1219,7 @@ pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, /* * The parallel apply worker could be stuck for some reason (say waiting on * some lock by other backend), so stop trying to send data directly to it - * and start to serialize data to file instead. + * and start serializing data to the file instead. */ winfo->serialize_changes = true; @@ -1488,7 +1490,7 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) } /* - * Set the fileset state for the given parallel apply worker. The fileset + * Set the fileset state for a particular parallel apply worker. The fileset * will be set once the leader worker serialized all changes to the file * so that it can be used by parallel apply worker. */