From 8a1f169281c691ca0ba9ef09fc25c240c4029778 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 4 Dec 2022 11:25:29 +0800 Subject: [PATCH v56 2/3] Serialize partial changes to a file when the attempt to send data times out. In patch 0001 if the leader apply worker times out while attempting to send a message to the parallel apply worker it results in an ERROR. This patch (0002) modifies that behaviour, so instead of erroring it will switch to "partial serialize" mode - in this mode the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. --- .../replication/logical/applyparallelworker.c | 155 ++++++- src/backend/replication/logical/worker.c | 446 ++++++++++++++++----- src/include/replication/worker_internal.h | 49 +++ src/tools/pgindent/typedefs.list | 1 + 4 files changed, 526 insertions(+), 125 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d420aea..d91563d 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -137,9 +137,11 @@ * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to * wait to send messages, and this wait doesn't appear in lmgr. * - * To resolve this issue, we use non-blocking write and wait with a timeout. If - * the timeout is exceeded, the LA reports an error and restarts logical - * replication. + * To avoid this wait, we use a non-blocking write and wait with a timeout. If + * the timeout is exceeded, the LA will serialize the message to a file and + * indicate PA-2 that it needs to read that file for the remaining messages. + * Then LA will start waiting for commit which will detect deadlock if any. + * See pa_send_data() and enum TransApplyAction. * * 4) Lock types * @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid) /* * Don't start a new parallel worker if user has set skiplsn as it's * possible that user want to skip the streaming transaction. For - * streaming transaction, we need to spill the transaction to disk so that - * we can get the last LSN of the transaction to judge whether to skip - * before starting to apply the change. + * streaming transaction, we need to serialize the transaction to a file + * so that we can get the last LSN of the transaction to judge whether to + * skip before starting to apply the change. */ if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) return false; @@ -495,9 +497,11 @@ pa_allocate_worker(TransactionId xid) SpinLockAcquire(&winfo->shared->mutex); winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; winfo->shared->xid = xid; + winfo->shared->fileset_state = FS_UNKNOWN; SpinLockRelease(&winfo->shared->mutex); winfo->in_use = true; + winfo->serialize_changes = false; entry->winfo = winfo; entry->xid = xid; } @@ -571,8 +575,18 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) napplyworkers = logicalrep_pa_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); - /* Stop the worker if there are enough workers in the pool. */ - if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX The worker is also stopped if the leader apply worker needed to + * serialize part of the transaction data due to a send timeout. This is + * because the message could be partially written to the queue due to send + * timeout and there is no way to clean the queue other than resending the + * message until it succeeds. To avoid complexity, we directly stop the + * worker in this case. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { int slot_no; uint16 generation; @@ -590,11 +604,15 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) } winfo->in_use = false; + winfo->serialize_changes = false; return false; } -/* Free the parallel apply worker information. */ +/* + * Free the parallel apply worker information and unlink the files with + * serialized changes if any. + */ static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo) { @@ -606,6 +624,10 @@ pa_free_worker_info(ParallelApplyWorkerInfo *winfo) if (winfo->error_mq_handle) shm_mq_detach(winfo->error_mq_handle); + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + if (winfo->dsm_seg) dsm_detach(winfo->dsm_seg); @@ -634,6 +656,44 @@ pa_detach_all_error_mq(void) } /* + * Replay the spooled messages in the parallel apply worker if the leader apply + * worker has finished serializing changes to the file. + */ +static void +pa_spooled_messages(void) +{ + PartialFileSetState fileset_state; + + /* + * Check if changes have been serialized to a file. if so, read and apply + * them. + */ + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); + + /* + * If the leader apply worker is busy serializing the partial changes then + * acquire the stream lock now. Otherwise, the parallel apply worker won't + * get a chance to receive a STREAM_STOP (and acquire the stream lock) + * until the leader had serialized all changes which can lead to undetected + * deadlock. + */ + if (fileset_state == FS_BUSY) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } + else if (fileset_state == FS_READY) + { + apply_spooled_messages(&MyParallelShared->fileset, + MyParallelShared->xid, + InvalidXLogRecPtr); + pa_set_fileset_state(MyParallelShared, FS_UNKNOWN); + } +} + +/* * Interrupt handler for main loop of parallel apply worker. */ static void @@ -728,6 +788,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) { int rc; + /* Check if changes have been serialized to a file. */ + pa_spooled_messages(); + if (!is_in_streamed_transaction()) { /* @@ -1074,6 +1137,12 @@ HandleParallelApplyMessages(void) /* * Send the data to the specified parallel apply worker via shared-memory * queue. + * + * If the attempt to send data via shared memory times out, then we will switch + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent possible + * deadlocks with another parallel apply worker (refer to the comments atop + * applyparallelworker.c for details). This means that the current data and any + * subsequent data for this transaction will be serialized to a file. */ void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) @@ -1083,6 +1152,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) TimestampTz startTime = 0; Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) @@ -1112,19 +1182,45 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) CHECK_FOR_INTERRUPTS(); } - /* - * If the attempt to send data via shared memory times out, we restart - * the logical replication to prevent possible deadlocks with another - * parallel apply worker. Refer to the comments atop this file for - * details. - */ if (startTime == 0) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication parallel apply worker due to timeout"))); + { + StringInfoData msg; + LogicalRepMsgType action; + + /* + * The parallel apply worker could be stuck for some reason (say + * waiting on some lock by other backend), so stop trying to send + * data directly to it and instead start to serialize data to + * file instead. + */ + winfo->serialize_changes = true; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + + /* Skip first byte and statistics fields. */ + msg.cursor += 1 + SIZE_STATS_MESSAGE; + + /* Initialize the stream fileset. */ + serialize_stream_start(winfo->shared->xid, true); + + /* Write this message to a file. */ + action = pq_getmsgbyte(&msg); + stream_write_change(action, &msg); + + /* + * Take the stream lock to make sure that the parallel apply worker + * will wait for the leader to release the stream lock until the + * end of the transaction. + */ + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + + pa_set_fileset_state(winfo->shared, FS_BUSY); + break; + } } } @@ -1372,6 +1468,7 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) RollbackToSavepoint(spname); CommitTransactionCommand(); subxactlist = list_truncate(subxactlist, i + 1); + break; } } @@ -1381,6 +1478,28 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) } /* + * Set the fileset state for the given parallel apply worker. The fileset + * will be set once the leader worker reached the ready state so that it can + * be used by parallel apply worker. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->fileset_state = fileset_state; + + if (fileset_state == FS_READY) + { + Assert(am_leader_apply_worker()); + Assert(MyLogicalRepWorker->stream_fileset); + wshared->fileset = *MyLogicalRepWorker->stream_fileset; + } + + SpinLockRelease(&wshared->mutex); +} + +/* * Helper functions to acquire and release a lock for each stream block. * * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index be1af1e..51877a4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -255,6 +255,18 @@ typedef struct ApplyErrorCallbackArg * worker. Changes are written to temporary files and then applied when the * final commit arrives. * + * TRANS_LEADER_PARTIAL_SERIALIZE: + * This action means that we are in the leader apply worker and have sent some + * changes directly to the parallel apply worker and the remaining changes are + * serialized to a file, due to timeout while sending data. The parallel apply + * worker will apply these serialized changes when the final commit arrives. + * + * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to + * serializing changes, the leader worker also needs to serialize the + * STREAM_XXX message to a file, and wait for the parallel apply worker to + * finish the transaction when processing the transaction finish command. So + * this new action was introduced to keep the code and logic clear. + * * TRANS_LEADER_SEND_TO_PARALLEL: * This action means that we are in the leader apply worker and need to send * the changes to the parallel apply worker. @@ -270,6 +282,7 @@ typedef enum /* Actions for streaming transactions. */ TRANS_LEADER_SERIALIZE, + TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY } TransApplyAction; @@ -352,7 +365,6 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; static inline void subxact_filename(char *path, Oid subid, TransactionId xid); -static inline void changes_filename(char *path, Oid subid, TransactionId xid); /* * Information about subtransactions of a given toplevel transaction. @@ -365,10 +377,9 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); -static void stream_write_change(char action, StringInfo s); +static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -400,9 +411,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -/* Common streaming function to apply all the spooled messages */ -static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -565,6 +573,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -575,6 +584,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); /* + * The parallel apply worker needs the xid in this message to decide + * whether to define a savepoint, so save the original message that has not + * moved the cursor after the xid. We will serailize this message to a file + * in PARTIAL_SERIALIZE mode. + */ + original_msg = *s; + + /* * We should have received XID of the subxact as the first part of the * message, so extract it. */ @@ -597,10 +614,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) stream_write_change(action, s); return true; + case TRANS_LEADER_PARTIAL_SERIALIZE: case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_change(action, &original_msg); /* * XXX The publisher side doesn't always send relation/type update @@ -1245,6 +1266,7 @@ apply_handle_stream_prepare(StringInfo s) LogicalRepPreparedTxnData prepare_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (is_in_streamed_transaction()) ereport(ERROR, @@ -1270,7 +1292,8 @@ apply_handle_stream_prepare(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, + prepare_data.xid, prepare_data.prepare_lsn); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1285,20 +1308,23 @@ apply_handle_stream_prepare(StringInfo s) stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); break; - case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + stream_open_and_write_change(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_READY); + /* * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * apply spooled changes. */ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); - /* Send STREAM PREPARE message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); - /* - * After sending the data to the parallel apply worker, wait for + * After serializing the STREAM PREPARE message to a file, wait for * that worker to finish. This is necessary to maintain commit * order which avoids failures due to transaction dependencies and * deadlocks. @@ -1312,7 +1338,51 @@ apply_handle_stream_prepare(StringInfo s) in_remote_transaction = false; break; + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + pa_send_data(winfo, s->len, s->data); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + { + serialize_stream_stop(winfo->shared->xid); + pa_set_fileset_state(winfo->shared, FS_READY); + } + + /* + * Unlock the shared object lock so that parallel apply worker can + * continue to receive and apply changes. + */ + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + + /* + * After sending the STREAM PREPARE message to the parallel apply + * worker, wait for that worker to finish. This is necessary to + * maintain commit order which avoids failures due to transaction + * dependencies and deadlocks. + */ + pa_wait_for_xact_finish(winfo); + + store_flush_position(prepare_data.end_lsn, winfo->shared->last_commit_end); + + (void) pa_free_worker(winfo, prepare_data.xid); + + in_remote_transaction = false; + break; + case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply worker + * is applying spooled messages. + */ + if (stream_fd) + stream_close_file(); + begin_replication_step(); /* Mark the transaction as prepared. */ @@ -1375,6 +1445,47 @@ apply_handle_origin(StringInfo s) } /* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +serialize_stream_start(TransactionId xid, bool first_segment) +{ + begin_replication_step(); + + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); + } + + /* Open the spool file for this transaction. */ + stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); + + /* If this is not the first segment, open existing subxact file. */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, xid); + + end_replication_step(); +} + +/* * Handle STREAM START message. */ static void @@ -1383,6 +1494,7 @@ apply_handle_stream_start(StringInfo s) bool first_segment; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (is_in_streamed_transaction()) ereport(ERROR, @@ -1413,43 +1525,26 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SERIALIZE: /* - * Start a transaction on stream start, this transaction will be - * committed on the stream stop unless it is a tablesync worker in - * which case it will be committed after processing all the - * messages. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * Function serialize_stream_start starts a transaction. This + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need this transaction for + * handling the BufFile, used for serializing the streaming data + * and subxact info. */ - begin_replication_step(); + serialize_stream_start(stream_xid, first_segment); + break; + + case TRANS_LEADER_PARTIAL_SERIALIZE: /* - * Initialize the worker's stream_fileset if we haven't yet. This - * will be used for the entire duration of the worker so create it - * in a permanent context. We create this on the very first - * streaming message from any transaction and then use it for this - * and other streaming transactions. Now, we could create a - * fileset at the start of the worker as well but then we won't be - * sure that it will ever be used. + * The message spool file was already created when entering + * PARTIAL_SERIALIZE mode. The transaction started in + * serialize_stream_start will be committed on the stream stop. */ - if (!MyLogicalRepWorker->stream_fileset) - { - MemoryContext oldctx; - - oldctx = MemoryContextSwitchTo(ApplyContext); - - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); - - MemoryContextSwitchTo(oldctx); - } - - /* Open the spool file for this transaction. */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); - - /* If this is not the first segment, open existing subxact file. */ - if (!first_segment) - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + serialize_stream_start(stream_xid, false); + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); - end_replication_step(); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1494,6 +1589,34 @@ apply_handle_stream_start(StringInfo s) } /* + * Update the information about subxacts and close the file. + * + * This function should be called when the serialize_stream_start function has + * been called. + */ +void +serialize_stream_stop(TransactionId xid) +{ + /* + * Serialize information about subxacts for the toplevel transaction, then + * close the stream messages spool file. + */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle STREAM STOP message. */ static void @@ -1512,24 +1635,13 @@ apply_handle_stream_stop(StringInfo s) switch (apply_action) { case TRANS_LEADER_SERIALIZE: + serialize_stream_stop(stream_xid); + break; - /* - * Close the file with serialized changes, and serialize - * information about subxacts for the toplevel transaction. - */ - subxact_info_write(MyLogicalRepWorker->subid, stream_xid); - stream_close_file(); - - /* We must be in a valid transaction state */ - Assert(IsTransactionState()); - - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - - /* Reset per-stream context */ - MemoryContextReset(LogicalStreamingContext); - - pgstat_report_activity(STATE_IDLE, NULL); + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); + serialize_stream_stop(stream_xid); + pa_set_stream_apply_worker(NULL); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1545,6 +1657,13 @@ apply_handle_stream_stop(StringInfo s) pa_send_data(winfo, s->len, s->data); + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + serialize_stream_stop(winfo->shared->xid); + pa_set_stream_apply_worker(NULL); pgstat_report_activity(STATE_IDLE, NULL); @@ -1583,7 +1702,7 @@ apply_handle_stream_stop(StringInfo s) } /* - * Handle STREAM ABORT message when the transaction was spilled to disk. + * Handle STREAM ABORT message when the transaction was serialized to file. */ static void serialize_stream_abort(TransactionId xid, TransactionId subxid) @@ -1676,6 +1795,7 @@ apply_handle_stream_abort(StringInfo s) LogicalRepStreamAbortData abort_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; bool toplevel_xact; if (is_in_streamed_transaction()) @@ -1706,14 +1826,30 @@ apply_handle_stream_abort(StringInfo s) serialize_stream_abort(xid, subxid); break; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that it can rollback the + * subtransaction if needed. + */ + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + + if (toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); + pa_set_fileset_state(winfo->shared, FS_READY); + (void) pa_free_worker(winfo, xid); + } + + break; + case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); /* - * For the case of aborting the toplevel transaction, unlock the - * shared object lock so that parallel apply worker can continue - * to receive and apply changes. - * * XXX For the case of aborting the subtransaction, we only * increment the number of streaming blocks without releasing the * lock. This may slightly delay the processing of STREAM_ABORT @@ -1723,16 +1859,31 @@ apply_handle_stream_abort(StringInfo s) * set of changes after processing the STREAM_ABORT message if it * is not already waiting for STREAM_STOP message. */ - if (toplevel_xact) - pa_unlock_stream(xid, AccessExclusiveLock); - else + if (!toplevel_xact) pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); /* Send STREAM ABORT message to the parallel apply worker. */ pa_send_data(winfo, s->len, s->data); + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + serialize_stream_stop(winfo->shared->xid); + if (toplevel_xact) + { + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, FS_READY); + + /* + * Unlock the shared object lock so that parallel apply worker + * can continue to receive and apply changes. + */ + pa_unlock_stream(xid, AccessExclusiveLock); (void) pa_free_worker(winfo, xid); + } break; @@ -1772,17 +1923,19 @@ apply_handle_stream_abort(StringInfo s) /* * Common spoolfile processing. */ -static void -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; + ResourceOwner oldowner; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1798,8 +1951,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + /* + * Make sure the file is owned by the toplevel transaction so that the file + * will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + CurrentResourceOwner = oldowner; buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -1830,7 +1991,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) CHECK_FOR_INTERRUPTS(); /* read length of the on-disk record */ - nbytes = BufFileRead(fd, &len, sizeof(len)); + nbytes = BufFileRead(stream_fd, &len, sizeof(len)); /* have we reached end of the file? */ if (nbytes == 0) @@ -1851,7 +2012,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) buffer = repalloc(buffer, len); /* and finally read the data into the buffer */ - if (BufFileRead(fd, buffer, len) != len) + if (BufFileRead(stream_fd, buffer, len) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from streaming transaction's changes file \"%s\": %m", @@ -1872,16 +2033,30 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) nchanges++; - if (nchanges % 1000 == 0) + /* + * Break the loop if the parallel apply worker has finished applying + * the transaction. The parallel apply worker should have closed the + * file before committing. + */ + if (am_parallel_apply_worker() && + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED) + goto done; + + /* + * No need to output the DEBUG message here in the parallel apply + * worker as similar messages will be output when handling STREAM_STOP + * message. + */ + if (!am_parallel_apply_worker() && nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } - BufFileClose(fd); - + stream_close_file(); pfree(buffer); pfree(s2.data); +done: elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); @@ -1898,6 +2073,7 @@ apply_handle_stream_commit(StringInfo s) LogicalRepCommitData commit_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (is_in_streamed_transaction()) ereport(ERROR, @@ -1917,7 +2093,8 @@ apply_handle_stream_commit(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(xid, commit_data.commit_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, + commit_data.commit_lsn); apply_handle_commit_internal(&commit_data); @@ -1925,20 +2102,22 @@ apply_handle_stream_commit(StringInfo s) stream_cleanup_files(MyLogicalRepWorker->subid, xid); break; - case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_READY); + /* * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * apply spooled changes. */ pa_unlock_stream(xid, AccessExclusiveLock); - /* Send STREAM COMMIT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); - /* - * After sending the data to the parallel apply worker, wait for + * After serializing the STREAM COMMIT message to a file, wait for * that worker to finish. This is necessary to maintain commit * order which avoids failures due to transaction dependencies and * deadlocks. @@ -1946,12 +2125,54 @@ apply_handle_stream_commit(StringInfo s) pa_wait_for_xact_finish(winfo); store_flush_position(commit_data.end_lsn, winfo->shared->last_commit_end); + (void) pa_free_worker(winfo, xid); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + pa_send_data(winfo, s->len, s->data); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + { + serialize_stream_stop(winfo->shared->xid); + pa_set_fileset_state(winfo->shared, FS_READY); + } + + /* + * Unlock the shared object lock so that parallel apply worker can + * continue to receive and apply changes. + */ + pa_unlock_stream(xid, AccessExclusiveLock); + + /* + * After sending the STREAM COMMIT message to the parallel apply + * worker, wait for that worker to finish. This is necessary to + * maintain commit order which avoids failures due to transaction + * dependencies and deadlocks. + */ + pa_wait_for_xact_finish(winfo); + + store_flush_position(commit_data.end_lsn, winfo->shared->last_commit_end); (void) pa_free_worker(winfo, xid); break; case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply worker + * is applying spooled messages. + */ + if (stream_fd) + stream_close_file(); + apply_handle_commit_internal(&commit_data); MyParallelShared->last_commit_end = XactLastCommitEnd; @@ -3843,7 +4064,7 @@ subxact_filename(char *path, Oid subid, TransactionId xid) } /* format filename for file containing serialized changes */ -static inline void +void changes_filename(char *path, Oid subid, TransactionId xid) { snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); @@ -3857,7 +4078,7 @@ changes_filename(char *path, Oid subid, TransactionId xid) * toplevel transaction. Each subscription has a separate set of files * for any toplevel transaction. */ -static void +void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; @@ -3880,9 +4101,6 @@ stream_cleanup_files(Oid subid, TransactionId xid) * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, create the buffile, otherwise open the * previously created file. - * - * This can only be called at the beginning of a "streaming" block, i.e. - * between stream_start/stream_stop messages from the upstream. */ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) @@ -3890,7 +4108,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) char path[MAXPGPATH]; MemoryContext oldcxt; - Assert(is_in_streamed_transaction()); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); @@ -3929,15 +4146,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* * stream_close_file * Close the currently open file with streamed changes. - * - * This can only be called at the end of a streaming block, i.e. at stream_stop - * message from the upstream. */ static void stream_close_file(void) { - Assert(is_in_streamed_transaction()); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); BufFileClose(stream_fd); @@ -3954,13 +4166,11 @@ stream_close_file(void) * the length), action code (identifying the message type) and message * contents (without the subxact TransactionId value). */ -static void +void stream_write_change(char action, StringInfo s) { int len; - Assert(is_in_streamed_transaction()); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); /* total on-disk size, including the action type character */ @@ -3979,6 +4189,23 @@ stream_write_change(char action, StringInfo s) } /* + * stream_open_and_write_change + * Serialize a message to a file for the given transaction. + * + * This function is similar to stream_write_change except that it will open the + * target file before writing the message and close file at the end. + */ +static void +stream_open_and_write_change(TransactionId xid, char action, StringInfo s) +{ + Assert(!is_in_streamed_transaction()); + + serialize_stream_start(xid, false); + stream_write_change(action, s); + serialize_stream_stop(xid); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -4649,7 +4876,8 @@ set_apply_error_context_origin(char *originname) /* * Return the action to take for the given transaction. *winfo is assigned to * the destination parallel worker info (if the action is - * TRANS_LEADER_SEND_TO_PARALLEL), otherwise *winfo is assigned NULL. + * TRANS_LEADER_SEND_TO_PARALLEL or TRANS_LEADER_PARTIAL_SERIALIZE), otherwise + * *winfo is assigned NULL. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4671,13 +4899,17 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) */ *winfo = pa_find_worker(xid); - if (*winfo) + if (!*winfo) { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_SERIALIZE; + } + else if ((*winfo)->serialize_changes) + { + return TRANS_LEADER_PARTIAL_SERIALIZE; } else { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4779c75..626b997 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" #include "storage/shm_mq.h" @@ -95,6 +96,20 @@ typedef enum ParallelTransState } ParallelTransState; /* + * State of fileset in leader apply worker. + * + * FS_BUSY means that the leader is serializing changes to the file. FS_READY + * means that the leader has serialized all changes to the file and the file is + * ready to be read by a parallel apply worker. + */ +typedef enum PartialFileSetState +{ + FS_UNKNOWN, + FS_BUSY, + FS_READY +} PartialFileSetState; + +/* * Struct for sharing information between leader apply worker and parallel * apply workers. */ @@ -129,6 +144,20 @@ typedef struct ParallelApplyWorkerShared * update the lsn_mappings by leader worker. */ XLogRecPtr last_commit_end; + + + /* + * After entering PARTIAL_SERIALIZE mode, the leader apply worker will + * serialize changes to the file, and share the fileset with the parallel + * apply worker when processing the transaction finish command. Then the + * parallel apply worker will apply all the spooled messages. + * + * FileSet is used here instead of SharedFileSet because we need it to + * survive after releasing the shared memory so that the leader apply + * worker can re-use the same fileset for the next streaming transaction. + */ + PartialFileSetState fileset_state; + FileSet fileset; } ParallelApplyWorkerShared; /* @@ -147,6 +176,13 @@ typedef struct ParallelApplyWorkerInfo dsm_segment *dsm_seg; /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to a file due to timeout when attempting to send data + * to the parallel apply worker via shared memory. + */ + bool serialize_changes; + + /* * True if the worker is being used to process a parallel apply * transaction. False indicates this worker is available for re-use. */ @@ -199,10 +235,21 @@ extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); +extern void serialize_stream_start(TransactionId xid, bool first_segment); +extern void serialize_stream_stop(TransactionId xid); + +/* Common streaming function to apply all the spooled messages */ +extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn); + extern void apply_dispatch(StringInfo s); extern void maybe_reread_subscription(void); +extern void changes_filename(char *path, Oid subid, TransactionId xid); +extern void stream_write_change(char action, StringInfo s); +extern void stream_cleanup_files(Oid subid, TransactionId xid); + extern void InitializeApplyWorker(void); /* Function for apply error callback */ @@ -229,6 +276,8 @@ extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_clean_subtrans(void); extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); +extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state); extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9334bc5..d0f287b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1388,6 +1388,7 @@ LagTracker LargeObjectDesc LastAttnumInfo Latch +PartialFileSetState LerpFunc LexDescr LexemeEntry -- 2.7.2.windows.1