From 192ed8b4a75ae856365041bba319176ad129742e Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 8 Dec 2022 12:28:23 +0800 Subject: [PATCH v58 2/2] Serialize partial changes to a file when the attempt to send data times out. In patch 0001 if the leader apply worker times out while attempting to send a message to the parallel apply worker it results in an ERROR. This patch (0002) modifies that behaviour, so instead of erroring it will switch to "partial serialize" mode - in this mode the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. --- .../replication/logical/applyparallelworker.c | 176 +++++++- src/backend/replication/logical/worker.c | 499 ++++++++++++++++----- src/include/replication/worker_internal.h | 53 ++- src/tools/pgindent/typedefs.list | 1 + 4 files changed, 601 insertions(+), 128 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1cc0576..79667fb 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -137,9 +137,11 @@ * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to * wait to send messages, and this wait doesn't appear in lmgr. * - * To resolve this issue, we use non-blocking write and wait with a timeout. If - * the timeout is exceeded, the LA reports an error and restarts logical - * replication. + * To avoid this wait, we use a non-blocking write and wait with a timeout. If + * the timeout is exceeded, the LA will serialize the message to a file and + * indicate PA-2 that it needs to read that file for the remaining messages. + * Then LA will start waiting for commit which will detect deadlock if any. + * See pa_send_data() and enum TransApplyAction. * * 4) Lock types * @@ -494,9 +496,11 @@ pa_allocate_worker(TransactionId xid) SpinLockAcquire(&winfo->shared->mutex); winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; winfo->shared->xid = xid; + winfo->shared->fileset_state = FS_UNKNOWN; SpinLockRelease(&winfo->shared->mutex); winfo->in_use = true; + winfo->serialize_changes = false; entry->winfo = winfo; entry->xid = xid; } @@ -570,8 +574,17 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) napplyworkers = logicalrep_pa_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); - /* Stop the worker if there are enough workers in the pool. */ - if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX The worker is also stopped if the leader apply worker needed to + * serialize part of the transaction data due to a send timeout. This is + * because the message could be partially written to the queue but there is + * no way to clean the queue other than resending the message until it + * succeeds. Directly stopping the worker avoids needing this complexity. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { int slot_no; uint16 generation; @@ -589,11 +602,15 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) } winfo->in_use = false; + winfo->serialize_changes = false; return false; } -/* Free the parallel apply worker information. */ +/* + * Free the parallel apply worker information and unlink the files with + * serialized changes if any. + */ static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo) { @@ -605,6 +622,10 @@ pa_free_worker_info(ParallelApplyWorkerInfo *winfo) if (winfo->error_mq_handle) shm_mq_detach(winfo->error_mq_handle); + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + if (winfo->dsm_seg) dsm_detach(winfo->dsm_seg); @@ -633,6 +654,63 @@ pa_detach_all_error_mq(void) } /* + * Replay the spooled messages in the parallel apply worker if the leader apply + * worker has finished serializing changes to the file. + */ +static void +pa_spooled_messages(void) +{ + PartialFileSetState fileset_state; + + /* + * Check if changes have been serialized to a file. If so, read and apply + * them. + */ + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); + + /* + * If the leader apply worker is busy serializing the partial changes then + * acquire the stream lock now and wait for the leader worker to finish + * serializing the changes. Otherwise, the parallel apply worker won't get + * a chance to receive a STREAM_STOP (and acquire the stream lock) until + * the leader had serialized all changes which can lead to undetected + * deadlock. + * + * XXX It is possible that immediately after we have waited for a lock in + * the FS_SERIALIZE_IN_PROGRESS state, the fileset state becomes + * FS_SERIALIZE_DONE but re-checking it again doesn't seem worth it. + * Anyway, next time when this function is invoked, we will set the state + * to FS_READY. + */ + if (fileset_state == FS_SERIALIZE_IN_PROGRESS) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } + + /* + * We cannot read the file immediately after the leader has serialized all + * changes to the file because there may still be messages in the memory + * queue. We will apply all spooled messages the next time we call this + * function, which should ensure that there are no messages left in the + * memory queue. + */ + else if (fileset_state == FS_SERIALIZE_DONE) + { + pa_set_fileset_state(MyParallelShared, FS_READY); + } + else if (fileset_state == FS_READY) + { + apply_spooled_messages(&MyParallelShared->fileset, + MyParallelShared->xid, + InvalidXLogRecPtr); + pa_set_fileset_state(MyParallelShared, FS_UNKNOWN); + } +} + +/* * Interrupt handler for main loop of parallel apply worker. */ static void @@ -716,6 +794,11 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) /* * Ignore statistics fields that have been updated by the leader * apply worker. + * + * XXX We can avoid sending the statistics fields from the leader + * apply worker but for that, it needs to rebuild the entire + * message by removing these fields which could be more work than + * simply ignoring these fields in the parallel apply worker. */ s.cursor += SIZE_STATS_MESSAGE; @@ -727,6 +810,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) { int rc; + /* Check if changes have been serialized to a file. */ + pa_spooled_messages(); + MemoryContextReset(ApplyMessageContext); MemoryContextSwitchTo(oldcxt); @@ -1063,15 +1149,24 @@ HandleParallelApplyMessages(void) /* * Send the data to the specified parallel apply worker via shared-memory * queue. + * + * If the attempt to send data via shared memory times out, then we will switch + * to "PARTIAL_SERIALIZE mode" for the current transaction -- this means that + * the current data and any subsequent data for this transaction will be + * serialized to a file. This is done to prevent possible deadlocks with + * another parallel apply worker (refer to the comments atop + * applyparallelworker.c for details). */ void -pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) +pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data, + bool stream_locked) { int rc; shm_mq_result result; TimestampTz startTime = 0; Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) @@ -1101,19 +1196,46 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) CHECK_FOR_INTERRUPTS(); } - /* - * If the attempt to send data via shared memory times out, we restart - * the logical replication to prevent possible deadlocks with another - * parallel apply worker. Refer to the comments atop this file for - * details. - */ if (startTime == 0) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication parallel apply worker due to timeout"))); + { + StringInfoData msg; + LogicalRepMsgType action; + + /* + * The parallel apply worker could be stuck for some reason (say + * waiting on some lock by other backend), so stop trying to send + * data directly to it and instead start to serialize data to + * file instead. + */ + winfo->serialize_changes = true; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + + /* Skip first byte and statistics fields. */ + msg.cursor += 1 + SIZE_STATS_MESSAGE; + + /* Initialize the stream fileset. */ + stream_start_internal(winfo->shared->xid, true); + + /* Write this message to a file. */ + action = pq_getmsgbyte(&msg); + stream_write_change(action, &msg); + + /* + * Acquires the stream lock if not already to make sure that the + * parallel apply worker will wait for the leader to release the + * stream lock until the end of the transaction. + */ + if (!stream_locked) + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS); + break; + } } } @@ -1370,6 +1492,28 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) } /* + * Set the fileset state for the given parallel apply worker. The fileset + * will be set once the leader worker serialized all changes to the file + * so that it can be used by parallel apply worker. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->fileset_state = fileset_state; + + if (fileset_state == FS_SERIALIZE_DONE) + { + Assert(am_leader_apply_worker()); + Assert(MyLogicalRepWorker->stream_fileset); + wshared->fileset = *MyLogicalRepWorker->stream_fileset; + } + + SpinLockRelease(&wshared->mutex); +} + +/* * Helper functions to acquire and release a lock for each stream block. * * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4b0c81f..4b760a6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -256,6 +256,18 @@ typedef struct ApplyErrorCallbackArg * worker. Changes are written to temporary files and then applied when the * final commit arrives. * + * TRANS_LEADER_PARTIAL_SERIALIZE: + * This action means that we are in the leader apply worker and have sent some + * changes directly to the parallel apply worker and the remaining changes are + * serialized to a file, due to timeout while sending data. The parallel apply + * worker will apply these serialized changes when the final commit arrives. + * + * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to + * serializing changes, the leader worker also needs to serialize the + * STREAM_XXX message to a file, and wait for the parallel apply worker to + * finish the transaction when processing the transaction finish command. So + * this new action was introduced to keep the code and logic clear. + * * TRANS_LEADER_SEND_TO_PARALLEL: * This action means that we are in the leader apply worker and need to send * the changes to the parallel apply worker. @@ -271,6 +283,7 @@ typedef enum /* Actions for streaming transactions. */ TRANS_LEADER_SERIALIZE, + TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY } TransApplyAction; @@ -353,7 +366,6 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; static inline void subxact_filename(char *path, Oid subid, TransactionId xid); -static inline void changes_filename(char *path, Oid subid, TransactionId xid); /* * Information about subtransactions of a given toplevel transaction. @@ -366,10 +378,9 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); -static void stream_write_change(char action, StringInfo s); +static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -401,9 +412,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -/* Common streaming function to apply all the spooled messages */ -static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -566,6 +574,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -576,6 +585,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); /* + * The parallel apply worker needs the xid in this message to decide + * whether to define a savepoint, so save the original message that has not + * moved the cursor after the xid. We will serialize this message to a file + * in PARTIAL_SERIALIZE mode. + */ + original_msg = *s; + + /* * We should have received XID of the subxact as the first part of the * message, so extract it. */ @@ -598,10 +615,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) stream_write_change(action, s); return true; + case TRANS_LEADER_PARTIAL_SERIALIZE: case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data, false); + else + stream_write_change(action, &original_msg); /* * XXX The publisher side doesn't always send relation/type update @@ -1249,6 +1270,9 @@ apply_handle_stream_prepare(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1273,7 +1297,8 @@ apply_handle_stream_prepare(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, + prepare_data.xid, prepare_data.prepare_lsn); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1288,20 +1313,23 @@ apply_handle_stream_prepare(StringInfo s) stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); break; - case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + stream_open_and_write_change(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + /* * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * apply spooled changes. */ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); - /* Send STREAM PREPARE message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); - /* - * After sending the data to the parallel apply worker, wait for + * After serializing the STREAM PREPARE message to a file, wait for * that worker to finish. This is necessary to maintain commit * order which avoids failures due to transaction dependencies and * deadlocks. @@ -1315,7 +1343,51 @@ apply_handle_stream_prepare(StringInfo s) in_remote_transaction = false; break; + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + pa_send_data(winfo, s->len, s->data, true); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + { + stream_stop_internal(winfo->shared->xid); + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + } + + /* + * Unlock the shared object lock so that parallel apply worker can + * continue to receive and apply changes. + */ + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + + /* + * After sending the STREAM PREPARE message to the parallel apply + * worker, wait for that worker to finish. This is necessary to + * maintain commit order which avoids failures due to transaction + * dependencies and deadlocks. + */ + pa_wait_for_xact_finish(winfo); + + store_flush_position(prepare_data.end_lsn, winfo->shared->last_commit_end); + + (void) pa_free_worker(winfo, prepare_data.xid); + + in_remote_transaction = false; + break; + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before preparing. + */ + if (stream_fd) + stream_close_file(); + begin_replication_step(); /* Mark the transaction as prepared. */ @@ -1378,6 +1450,47 @@ apply_handle_origin(StringInfo s) } /* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +stream_start_internal(TransactionId xid, bool first_segment) +{ + begin_replication_step(); + + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); + } + + /* Open the spool file for this transaction. */ + stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); + + /* If this is not the first segment, open existing subxact file. */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, xid); + + end_replication_step(); +} + +/* * Handle STREAM START message. */ static void @@ -1387,6 +1500,9 @@ apply_handle_stream_start(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1416,43 +1532,25 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SERIALIZE: /* - * Start a transaction on stream start, this transaction will be - * committed on the stream stop unless it is a tablesync worker in - * which case it will be committed after processing all the - * messages. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * Function stream_start_internal starts a transaction. This + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need this transaction for + * handling the BufFile, used for serializing the streaming data + * and subxact info. */ - begin_replication_step(); + stream_start_internal(stream_xid, first_segment); + break; + + case TRANS_LEADER_PARTIAL_SERIALIZE: /* - * Initialize the worker's stream_fileset if we haven't yet. This - * will be used for the entire duration of the worker so create it - * in a permanent context. We create this on the very first - * streaming message from any transaction and then use it for this - * and other streaming transactions. Now, we could create a - * fileset at the start of the worker as well but then we won't be - * sure that it will ever be used. + * The message spool file was already created when entering + * PARTIAL_SERIALIZE mode. The transaction started in + * stream_start_internal will be committed on the stream stop. */ - if (!MyLogicalRepWorker->stream_fileset) - { - MemoryContext oldctx; - - oldctx = MemoryContextSwitchTo(ApplyContext); - - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); - - MemoryContextSwitchTo(oldctx); - } - - /* Open the spool file for this transaction. */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); - - /* If this is not the first segment, open existing subxact file. */ - if (!first_segment) - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); - - end_replication_step(); + stream_start_internal(stream_xid, false); + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1471,7 +1569,7 @@ apply_handle_stream_start(StringInfo s) */ pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); - pa_send_data(winfo, s->len, s->data); + pa_send_data(winfo, s->len, s->data, false); /* Cache the parallel apply worker for this transaction. */ pa_set_stream_apply_worker(winfo); @@ -1497,6 +1595,33 @@ apply_handle_stream_start(StringInfo s) } /* + * Update the information about subxacts and close the file. + * + * This function should be called when the stream_start_internal function has + * been called. + */ +void +stream_stop_internal(TransactionId xid) +{ + /* + * Serialize information about subxacts for the toplevel transaction, then + * close the stream messages spool file. + */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + stream_close_file(); + + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle STREAM STOP message. */ static void @@ -1515,24 +1640,13 @@ apply_handle_stream_stop(StringInfo s) switch (apply_action) { case TRANS_LEADER_SERIALIZE: + stream_stop_internal(stream_xid); + break; - /* - * Close the file with serialized changes, and serialize - * information about subxacts for the toplevel transaction. - */ - subxact_info_write(MyLogicalRepWorker->subid, stream_xid); - stream_close_file(); - - /* We must be in a valid transaction state */ - Assert(IsTransactionState()); - - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - - /* Reset per-stream context */ - MemoryContextReset(LogicalStreamingContext); - - pgstat_report_activity(STATE_IDLE, NULL); + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); + stream_stop_internal(stream_xid); + pa_set_stream_apply_worker(NULL); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1546,7 +1660,14 @@ apply_handle_stream_stop(StringInfo s) */ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); - pa_send_data(winfo, s->len, s->data); + pa_send_data(winfo, s->len, s->data, true); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + stream_stop_internal(winfo->shared->xid); pa_set_stream_apply_worker(NULL); @@ -1679,6 +1800,9 @@ apply_handle_stream_abort(StringInfo s) LogicalRepStreamAbortData abort_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; bool toplevel_xact; if (in_streamed_transaction) @@ -1709,14 +1833,30 @@ apply_handle_stream_abort(StringInfo s) stream_abort_internal(xid, subxid); break; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that it can rollback the + * subtransaction if needed. + */ + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + + if (toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + (void) pa_free_worker(winfo, xid); + } + + break; + case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); /* - * For the case of aborting the toplevel transaction, unlock the - * shared object lock so that parallel apply worker can continue - * to receive and apply changes. - * * XXX For the case of aborting the subtransaction, we only * increment the number of streaming blocks without releasing the * lock. This may slightly delay the processing of STREAM_ABORT @@ -1726,20 +1866,43 @@ apply_handle_stream_abort(StringInfo s) * set of changes after processing the STREAM_ABORT message if it * is not already waiting for STREAM_STOP message. */ - if (toplevel_xact) - pa_unlock_stream(xid, AccessExclusiveLock); - else + if (!toplevel_xact) pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); /* Send STREAM ABORT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + pa_send_data(winfo, s->len, s->data, true); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + stream_stop_internal(winfo->shared->xid); if (toplevel_xact) + { + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* + * Unlock the shared object lock so that parallel apply worker + * can continue to receive and apply changes. + */ + pa_unlock_stream(xid, AccessExclusiveLock); (void) pa_free_worker(winfo, xid); + } break; case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before aborting. + */ + if (toplevel_xact && stream_fd) + stream_close_file(); + pa_stream_abort(&abort_data); /* @@ -1773,19 +1936,55 @@ apply_handle_stream_abort(StringInfo s) } /* - * Common spoolfile processing. + * Ensure that the passed location is fileset's end. */ static void -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) +ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, + off_t offset) +{ + char path[MAXPGPATH]; + BufFile *fd; + int last_fileno; + off_t last_offset; + + Assert(!IsTransactionState()); + + begin_replication_step(); + + changes_filename(path, MyLogicalRepWorker->subid, xid); + + fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + BufFileSeek(fd, 0, 0, SEEK_END); + BufFileTell(fd, &last_fileno, &last_offset); + + BufFileClose(fd); + + end_replication_step(); + + if (last_fileno != fileno || last_offset != offset) + elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"", + path); +} + +/* + * Common spoolfile processing. + */ +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; + ResourceOwner oldowner; + int fileno; + off_t offset; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1801,8 +2000,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + /* + * Make sure the file is owned by the toplevel transaction so that the file + * will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + CurrentResourceOwner = oldowner; buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -1833,7 +2040,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) CHECK_FOR_INTERRUPTS(); /* read length of the on-disk record */ - nbytes = BufFileRead(fd, &len, sizeof(len)); + nbytes = BufFileRead(stream_fd, &len, sizeof(len)); /* have we reached end of the file? */ if (nbytes == 0) @@ -1854,12 +2061,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) buffer = repalloc(buffer, len); /* and finally read the data into the buffer */ - if (BufFileRead(fd, buffer, len) != len) + if (BufFileRead(stream_fd, buffer, len) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from streaming transaction's changes file \"%s\": %m", path))); + BufFileTell(stream_fd, &fileno, &offset); + /* copy the buffer to the stringinfo and call apply_dispatch */ resetStringInfo(&s2); appendBinaryStringInfo(&s2, buffer, len); @@ -1875,15 +2084,24 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) nchanges++; + /* + * It is possible the file has been closed because we have processed + * the transaction end message like stream_commit in which case that + * must be the last message. + */ + if (!stream_fd) + { + ensure_last_message(stream_fileset, xid, fileno, offset); + break; + } + if (nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } - BufFileClose(fd); - - pfree(buffer); - pfree(s2.data); + if (stream_fd) + stream_close_file(); elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); @@ -1902,6 +2120,9 @@ apply_handle_stream_commit(StringInfo s) ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1920,7 +2141,8 @@ apply_handle_stream_commit(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(xid, commit_data.commit_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, + commit_data.commit_lsn); apply_handle_commit_internal(&commit_data); @@ -1928,20 +2150,22 @@ apply_handle_stream_commit(StringInfo s) stream_cleanup_files(MyLogicalRepWorker->subid, xid); break; - case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + /* * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * apply spooled changes. */ pa_unlock_stream(xid, AccessExclusiveLock); - /* Send STREAM COMMIT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); - /* - * After sending the data to the parallel apply worker, wait for + * After serializing the STREAM COMMIT message to a file, wait for * that worker to finish. This is necessary to maintain commit * order which avoids failures due to transaction dependencies and * deadlocks. @@ -1949,12 +2173,54 @@ apply_handle_stream_commit(StringInfo s) pa_wait_for_xact_finish(winfo); store_flush_position(commit_data.end_lsn, winfo->shared->last_commit_end); + (void) pa_free_worker(winfo, xid); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + pa_send_data(winfo, s->len, s->data, true); + + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + { + stream_stop_internal(winfo->shared->xid); + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + } + + /* + * Unlock the shared object lock so that parallel apply worker can + * continue to receive and apply changes. + */ + pa_unlock_stream(xid, AccessExclusiveLock); + + /* + * After sending the STREAM COMMIT message to the parallel apply + * worker, wait for that worker to finish. This is necessary to + * maintain commit order which avoids failures due to transaction + * dependencies and deadlocks. + */ + pa_wait_for_xact_finish(winfo); + + store_flush_position(commit_data.end_lsn, winfo->shared->last_commit_end); (void) pa_free_worker(winfo, xid); break; case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + apply_handle_commit_internal(&commit_data); MyParallelShared->last_commit_end = XactLastCommitEnd; @@ -3853,7 +4119,7 @@ subxact_filename(char *path, Oid subid, TransactionId xid) } /* format filename for file containing serialized changes */ -static inline void +void changes_filename(char *path, Oid subid, TransactionId xid) { snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); @@ -3867,7 +4133,7 @@ changes_filename(char *path, Oid subid, TransactionId xid) * toplevel transaction. Each subscription has a separate set of files * for any toplevel transaction. */ -static void +void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; @@ -3890,9 +4156,6 @@ stream_cleanup_files(Oid subid, TransactionId xid) * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, create the buffile, otherwise open the * previously created file. - * - * This can only be called at the beginning of a "streaming" block, i.e. - * between stream_start/stream_stop messages from the upstream. */ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) @@ -3900,7 +4163,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) char path[MAXPGPATH]; MemoryContext oldcxt; - Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); @@ -3939,15 +4201,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* * stream_close_file * Close the currently open file with streamed changes. - * - * This can only be called at the end of a streaming block, i.e. at stream_stop - * message from the upstream. */ static void stream_close_file(void) { - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); BufFileClose(stream_fd); @@ -3964,13 +4221,11 @@ stream_close_file(void) * the length), action code (identifying the message type) and message * contents (without the subxact TransactionId value). */ -static void +void stream_write_change(char action, StringInfo s) { int len; - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); /* total on-disk size, including the action type character */ @@ -3989,6 +4244,23 @@ stream_write_change(char action, StringInfo s) } /* + * stream_open_and_write_change + * Serialize a message to a file for the given transaction. + * + * This function is similar to stream_write_change except that it will open the + * target file before writing the message and close file at the end. + */ +static void +stream_open_and_write_change(TransactionId xid, char action, StringInfo s) +{ + Assert(!in_streamed_transaction); + + stream_start_internal(xid, false); + stream_write_change(action, s); + stream_stop_internal(xid); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -4664,7 +4936,8 @@ set_apply_error_context_origin(char *originname) /* * Return the action to take for the given transaction. *winfo is assigned to * the destination parallel worker info (if the action is - * TRANS_LEADER_SEND_TO_PARALLEL), otherwise *winfo is assigned NULL. + * TRANS_LEADER_SEND_TO_PARALLEL or TRANS_LEADER_PARTIAL_SERIALIZE), otherwise + * *winfo is assigned NULL. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4686,13 +4959,17 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) */ *winfo = pa_find_worker(xid); - if (*winfo) + if (!*winfo) { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_SERIALIZE; + } + else if ((*winfo)->serialize_changes) + { + return TRANS_LEADER_PARTIAL_SERIALIZE; } else { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 639fca5..2756590 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" #include "storage/shm_mq.h" @@ -95,6 +96,22 @@ typedef enum ParallelTransState } ParallelTransState; /* + * State of fileset in leader apply worker. + * + * FS_SERIALIZE_IN_PROGRESS means that the leader is serializing changes to the + * file. FS_SERIALIZE_DONE means that the leader has serialized all changes to + * the file. FS_READY means that it is now ok for a parallel apply worker read + * the file. + */ +typedef enum PartialFileSetState +{ + FS_UNKNOWN, + FS_SERIALIZE_IN_PROGRESS, + FS_SERIALIZE_DONE, + FS_READY +} PartialFileSetState; + +/* * Struct for sharing information between leader apply worker and parallel * apply workers. */ @@ -129,6 +146,20 @@ typedef struct ParallelApplyWorkerShared * update the lsn_mappings by leader worker. */ XLogRecPtr last_commit_end; + + + /* + * After entering PARTIAL_SERIALIZE mode, the leader apply worker will + * serialize changes to the file, and share the fileset with the parallel + * apply worker when processing the transaction finish command. Then the + * parallel apply worker will apply all the spooled messages. + * + * FileSet is used here instead of SharedFileSet because we need it to + * survive after releasing the shared memory so that the leader apply + * worker can re-use the same fileset for the next streaming transaction. + */ + PartialFileSetState fileset_state; + FileSet fileset; } ParallelApplyWorkerShared; /* @@ -147,6 +178,13 @@ typedef struct ParallelApplyWorkerInfo dsm_segment *dsm_seg; /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to a file due to timeout when attempting to send data + * to the parallel apply worker via shared memory. + */ + bool serialize_changes; + + /* * True if the worker is being used to process a parallel apply * transaction. False indicates this worker is available for re-use. */ @@ -199,10 +237,21 @@ extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); +extern void stream_start_internal(TransactionId xid, bool first_segment); +extern void stream_stop_internal(TransactionId xid); + +/* Common streaming function to apply all the spooled messages */ +extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn); + extern void apply_dispatch(StringInfo s); extern void maybe_reread_subscription(void); +extern void changes_filename(char *path, Oid subid, TransactionId xid); +extern void stream_write_change(char action, StringInfo s); +extern void stream_cleanup_files(Oid subid, TransactionId xid); + extern void InitializeApplyWorker(void); /* Function for apply error callback */ @@ -218,7 +267,7 @@ extern bool pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid); extern void pa_detach_all_error_mq(void); extern void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, - const void *data); + const void *data, bool stream_locked); extern void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *wshared); extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, @@ -228,6 +277,8 @@ extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_reset_subtrans(void); extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); +extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state); extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a611520..7f7a702 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1388,6 +1388,7 @@ LagTracker LargeObjectDesc LastAttnumInfo Latch +PartialFileSetState LerpFunc LexDescr LexemeEntry -- 2.7.2.windows.1