From e3a1bc7182992de50a922dc4e81655aa9fff1aef Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 21 Nov 2022 20:10:11 +0800 Subject: [PATCH v51 2/4] Serialize partial changes to disk if the shm_mq buffer is full When the leader apply worker times out while sending a message to the parallel apply worker. Instead of erroring out, switch to partial serialize mode and let the leader serialize all remaining changes to the file and notify the parallel apply workers to read and apply them at the end of the transaction. --- .../replication/logical/applyparallelworker.c | 182 ++++++++++- src/backend/replication/logical/worker.c | 342 +++++++++++++++------ src/include/replication/worker_internal.h | 44 +++ 3 files changed, 462 insertions(+), 106 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ec48a94..7a406b8 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -74,8 +74,10 @@ * messages, and this wait doesn't appear in lmgr. * * To resolve this issue, we use non-blocking write and wait with a timeout. If - * timeout is exceeded, the LA reports an error and restarts logical - * replication. + * timeout is exceeded, the LA will write to file and indicate PA-2 that it + * needs to read file for remaining messages. Then LA will start waiting for + * commit which will detect deadlock if any. (See pa_send_data() and typedef + * enum TransApplyAction) *------------------------------------------------------------------------- */ @@ -311,9 +313,11 @@ pa_allocate_worker(TransactionId xid) SpinLockAcquire(&winfo->shared->mutex); winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; winfo->shared->xid = xid; + winfo->shared->fileset_state = LEADER_FILESET_UNKNOWN; SpinLockRelease(&winfo->shared->mutex); winfo->in_use = true; + winfo->serialize_changes = false; entry->winfo = winfo; entry->xid = xid; } @@ -387,8 +391,18 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) napplyworkers = logicalrep_pa_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); - /* Stop the worker if there are enough workers in the pool. */ - if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX we also need to stop the worker if the leader apply worker + * serialized part of the transaction data to a file due to send timeout. + * This is because the message could be partially written to the queue due + * to send timeout and there is no way to clean the queue other than + * resending the message until it succeeds. To avoid complexity, we + * directly stop the worker in this case. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { int slot_no; uint16 generation; @@ -406,11 +420,15 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) } winfo->in_use = false; + winfo->serialize_changes = false; return false; } -/* Free the parallel apply worker information. */ +/* + * Free the parallel apply worker information and unlink the files with + * serialized changes if any. + */ static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo) { @@ -422,6 +440,10 @@ pa_free_worker_info(ParallelApplyWorkerInfo *winfo) if (winfo->error_mq_handle) shm_mq_detach(winfo->error_mq_handle); + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + if (winfo->dsm_seg) dsm_detach(winfo->dsm_seg); @@ -432,6 +454,43 @@ pa_free_worker_info(ParallelApplyWorkerInfo *winfo) } /* + * Replay the spooled messages in the parallel apply worker if leader apply + * worker has finished serializing changes to the file. + */ +static void +pa_spooled_messages(void) +{ + LeaderFileSetState fileset_state; + + /* + * Check if changes have been serialized to disk. if so, read and apply + * them. + */ + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); + + /* + * Acquire the stream lock if the leader apply worker is serializing + * changes to the file, because the parallel apply worker will no longer + * have a chance to receive a STREAM_STOP and acquire the lock until the + * leader serialize all changes to the file. + */ + if (fileset_state == LEADER_FILESET_BUSY) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } + else if (fileset_state == LEADER_FILESET_ACCESSIBLE) + { + apply_spooled_messages(&MyParallelShared->fileset, + MyParallelShared->xid, + InvalidXLogRecPtr); + pa_set_fileset_state(MyParallelShared, LEADER_FILESET_UNKNOWN); + } +} + +/* * Interrupt handler for main loops of parallel apply worker. */ static void @@ -526,6 +585,8 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) { int rc; + pa_spooled_messages(); + if (!in_streamed_transaction) { /* @@ -984,6 +1045,10 @@ pa_init_and_launch_worker(void) /* * Send the data to the specified parallel apply worker via shared-memory * queue. + * + * When sending data times out, data will be serialized to disk. And the + * current streaming transaction will enter PARTIAL_SERIALIZE mode, which means + * that subsequent data will also be serialized to disk. */ void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) @@ -993,6 +1058,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) TimestampTz startTime = 0; Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS 10000 @@ -1027,9 +1093,56 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication parallel apply worker due to timeout"))); + { + StringInfoData msg; + LogicalRepMsgType action; + + /* + * The parallel apply worker might be stuck for some reason, so + * stop sending data to parallel worker and start to serialize + * data to files. + */ + winfo->serialize_changes = true; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + + /* Skip first byte and statistics fields. */ + msg.cursor += SIZE_STATS_MESSAGE + 1; + + /* Initialize the stream fileset. */ + serialize_stream_start(winfo->shared->xid, true); + + /* Write this message to a file. */ + action = pq_getmsgbyte(&msg); + stream_write_change(action, &msg); + + /* + * The stream lock is released when processing changes in a + * streaming block, so the leader needs to acquire the lock here + * before entering PARTIAL_SERIALIZE mode to ensure that the + * parallel apply worker will wait for the leader to release the + * stream lock. + */ + if (in_streamed_transaction && + action != LOGICAL_REP_MSG_STREAM_STOP) + { + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + } + + /* + * Close the stream file if not in a streaming block or the + * message being processed is the last message of a streaming + * block. + */ + else + { + serialize_stream_stop(winfo->shared->xid); + } + + pa_set_fileset_state(winfo->shared, LEADER_FILESET_BUSY); + break; + } } } @@ -1260,9 +1373,40 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) if (xid_tmp == subxid) { + bool reopen_stream_fd = false; + int fileno; + off_t offset; + + /* + * If the parallel apply worker is applying the spooled + * messages, we save the current file position and close the + * file to prevent the file from being accidentally closed on + * rollback. + */ + if (stream_fd) + { + BufFileTell(stream_fd, &fileno, &offset); + BufFileClose(stream_fd); + reopen_stream_fd = true; + } + RollbackToSavepoint(spname); CommitTransactionCommand(); subxactlist = list_truncate(subxactlist, i + 1); + + /* + * Reopen the file and set the file position to the saved + * position. + */ + if (reopen_stream_fd) + { + char path[MAXPGPATH]; + + changes_filename(path, MyLogicalRepWorker->subid, xid); + stream_fd = BufFileOpenFileSet(&MyParallelShared->fileset, + path, O_RDONLY, false); + BufFileSeek(stream_fd, fileno, offset, SEEK_SET); + } break; } } @@ -1272,6 +1416,28 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) } /* + * Set the fileset_state flag for the given parallel apply worker. The + * stream_fileset of the leader apply worker will be written into the shared + * memory if the fileset_state is LEADER_FILESET_ACCESSIBLE. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + LeaderFileSetState fileset_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->fileset_state = fileset_state; + + if (fileset_state == LEADER_FILESET_ACCESSIBLE) + { + Assert(am_leader_apply_worker()); + Assert(MyLogicalRepWorker->stream_fileset); + wshared->fileset = *MyLogicalRepWorker->stream_fileset; + } + + SpinLockRelease(&wshared->mutex); +} + +/* * Helper functions to acquire and release a lock for each stream block. * * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a21315f..27c76c8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -219,7 +219,6 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" -#include "storage/buffile.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -278,6 +277,19 @@ typedef struct ApplyExecutionData * Changes are written to temporary files and then applied when the final * commit arrives. * + * TRANS_LEADER_PARTIAL_SERIALIZE: + * The action means that we are in the leader apply worker and have sent some + * changes to the parallel apply worker, but the remaining changes need to be + * serialized to disk due to timeout while sending data, and the parallel apply + * worker will apply these changes when the final commit arrives. + * + * One might think we can use LEADER_SERIALIZE directly. But in partial + * serialize mode, in addition to serializing changes to file, the leader + * worker needs to write the STREAM_XXX message to disk, and needs to wait for + * parallel apply worker to finish the transaction when processing the + * transaction finish command. So a new action was introduced to make the logic + * clearer. + * * TRANS_LEADER_SEND_TO_PARALLEL: * The action means that we are in the leader apply worker and need to send the * changes to the parallel apply worker. @@ -293,6 +305,7 @@ typedef enum /* Actions for streaming transactions. */ TRANS_LEADER_SERIALIZE, + TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY } TransApplyAction; @@ -354,7 +367,7 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) /* BufFile handle of the current streaming file */ -static BufFile *stream_fd = NULL; +BufFile *stream_fd = NULL; typedef struct SubXactInfo { @@ -375,7 +388,7 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; static inline void subxact_filename(char *path, Oid subid, TransactionId xid); -static inline void changes_filename(char *path, Oid subid, TransactionId xid); +inline void changes_filename(char *path, Oid subid, TransactionId xid); /* * Information about subtransactions of a given toplevel transaction. @@ -388,10 +401,9 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); -static void stream_write_change(char action, StringInfo s); +static void stream_write_message(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -423,9 +435,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -/* Common streaming function to apply all the spooled messages */ -static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -586,6 +595,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -595,6 +605,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); + original_msg = *s; + /* * We should have received XID of the subxact as the first part of the * message, so extract it. @@ -618,10 +630,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) stream_write_change(action, s); return true; + case TRANS_LEADER_PARTIAL_SERIALIZE: case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_change(action, &original_msg); /* * XXX The publisher side doesn't always send relation/type update @@ -1266,6 +1282,7 @@ apply_handle_stream_prepare(StringInfo s) LogicalRepPreparedTxnData prepare_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, @@ -1291,7 +1308,8 @@ apply_handle_stream_prepare(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, + prepare_data.xid, prepare_data.prepare_lsn); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -1307,6 +1325,7 @@ apply_handle_stream_prepare(StringInfo s) break; case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); /* @@ -1316,13 +1335,21 @@ apply_handle_stream_prepare(StringInfo s) pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); /* Send STREAM PREPARE message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_message(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); + + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, LEADER_FILESET_ACCESSIBLE); /* - * After sending the data to the parallel apply worker, wait for - * that worker to finish. This is necessary to maintain commit - * order which avoids failures due to transaction dependencies and - * deadlocks. + * After sending the data to the parallel apply worker or + * serializing data to file, wait for that worker to finish. This + * is necessary to maintain commit order which avoids failures due + * to transaction dependencies and deadlocks. */ pa_wait_for_xact_finish(winfo); (void) pa_free_worker(winfo, prepare_data.xid); @@ -1333,6 +1360,14 @@ apply_handle_stream_prepare(StringInfo s) break; case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply is + * applying spooled changes. + */ + if (stream_fd) + BufFileClose(stream_fd); + begin_replication_step(); list_free(subxactlist); @@ -1393,7 +1428,46 @@ apply_handle_origin(StringInfo s) errmsg_internal("ORIGIN message sent out of order"))); } +/* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +serialize_stream_start(TransactionId xid, bool first_segment) +{ + begin_replication_step(); + + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); + } + + /* Open the spool file for this transaction. */ + stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); + + /* If this is not the first segment, open existing subxact file. */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, xid); + + end_replication_step(); +} /* * Handle STREAM START message. @@ -1404,6 +1478,7 @@ apply_handle_stream_start(StringInfo s) bool first_segment; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, @@ -1434,43 +1509,26 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SERIALIZE: /* - * Start a transaction on stream start, this transaction will be - * committed on the stream stop unless it is a tablesync worker in - * which case it will be committed after processing all the - * messages. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * serialize_stream_start will start a transaction, this + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need the transaction for + * handling the buffile, used for serializing the streaming data + * and subxact info. */ - begin_replication_step(); + serialize_stream_start(stream_xid, first_segment); + break; + case TRANS_LEADER_PARTIAL_SERIALIZE: /* - * Initialize the worker's stream_fileset if we haven't yet. This - * will be used for the entire duration of the worker so create it - * in a permanent context. We create this on the very first - * streaming message from any transaction and then use it for this - * and other streaming transactions. Now, we could create a - * fileset at the start of the worker as well but then we won't be - * sure that it will ever be used. + * The file should have been created when entering + * PARTIAL_SERIALIZE mode so no need to create it again. The + * transaction started in serialize_stream_start will be committed + * on the stream stop. */ - if (!MyLogicalRepWorker->stream_fileset) - { - MemoryContext oldctx; - - oldctx = MemoryContextSwitchTo(ApplyContext); - - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); - - MemoryContextSwitchTo(oldctx); - } + serialize_stream_start(stream_xid, false); + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); - /* Open the spool file for this transaction. */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); - - /* If this is not the first segment, open existing subxact file. */ - if (!first_segment) - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); - - end_replication_step(); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1515,6 +1573,34 @@ apply_handle_stream_start(StringInfo s) } /* + * Update the information about subxacts and close the file. + * + * This function should be called when the serialize_stream_start function has + * been called. + */ +void +serialize_stream_stop(TransactionId xid) +{ + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle STREAM STOP message. */ static void @@ -1533,24 +1619,13 @@ apply_handle_stream_stop(StringInfo s) switch (apply_action) { case TRANS_LEADER_SERIALIZE: + serialize_stream_stop(stream_xid); + break; - /* - * Close the file with serialized changes, and serialize - * information about subxacts for the toplevel transaction. - */ - subxact_info_write(MyLogicalRepWorker->subid, stream_xid); - stream_close_file(); - - /* We must be in a valid transaction state */ - Assert(IsTransactionState()); - - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - - /* Reset per-stream context */ - MemoryContextReset(LogicalStreamingContext); - - pgstat_report_activity(STATE_IDLE, NULL); + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); + serialize_stream_stop(stream_xid); + stream_apply_worker = NULL; break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1693,6 +1768,7 @@ apply_handle_stream_abort(StringInfo s) LogicalRepStreamAbortData abort_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; bool toplevel_xact; if (in_streamed_transaction) @@ -1723,6 +1799,26 @@ apply_handle_stream_abort(StringInfo s) serialize_stream_abort(xid, subxid); break; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that the parallel apply worker can + * rollback the subtransaction if needed. + */ + stream_write_message(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + + if (toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); + pa_set_fileset_state(winfo->shared, LEADER_FILESET_ACCESSIBLE); + (void) pa_free_worker(winfo, xid); + } + + break; + case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); @@ -1747,7 +1843,13 @@ apply_handle_stream_abort(StringInfo s) pa_send_data(winfo, s->len, s->data); if (toplevel_xact) + { + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, + LEADER_FILESET_ACCESSIBLE); + (void) pa_free_worker(winfo, xid); + } break; @@ -1784,17 +1886,18 @@ apply_handle_stream_abort(StringInfo s) /* * Common spoolfile processing. */ -static void -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1810,8 +1913,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + stream_xid = xid; buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -1842,7 +1945,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) CHECK_FOR_INTERRUPTS(); /* read length of the on-disk record */ - nbytes = BufFileRead(fd, &len, sizeof(len)); + nbytes = BufFileRead(stream_fd, &len, sizeof(len)); /* have we reached end of the file? */ if (nbytes == 0) @@ -1863,7 +1966,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) buffer = repalloc(buffer, len); /* and finally read the data into the buffer */ - if (BufFileRead(fd, buffer, len) != len) + if (BufFileRead(stream_fd, buffer, len) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from streaming transaction's changes file \"%s\": %m", @@ -1882,6 +1985,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) MemoryContextSwitchTo(oldcxt); + /* + * Break the loop if the parallel apply worker has finished applying + * the transaction. The parallel apply worker should have closed the + * file before committing. + */ + if (am_parallel_apply_worker() && + (MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED || + MyParallelShared->xact_state == PARALLEL_TRANS_UNKNOWN)) + goto done; + nchanges++; if (nchanges % 1000 == 0) @@ -1889,11 +2002,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) nchanges, path); } - BufFileClose(fd); - + BufFileClose(stream_fd); pfree(buffer); pfree(s2.data); +done: + stream_fd = NULL; + stream_xid = InvalidTransactionId; + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); @@ -1910,6 +2026,7 @@ apply_handle_stream_commit(StringInfo s) LogicalRepCommitData commit_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, @@ -1929,7 +2046,8 @@ apply_handle_stream_commit(StringInfo s) * The transaction has been serialized to file, so replay all the * spooled operations. */ - apply_spooled_messages(xid, commit_data.commit_lsn); + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, + commit_data.commit_lsn); apply_handle_commit_internal(&commit_data); @@ -1938,6 +2056,7 @@ apply_handle_stream_commit(StringInfo s) break; case TRANS_LEADER_SEND_TO_PARALLEL: + case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); /* @@ -1947,13 +2066,21 @@ apply_handle_stream_commit(StringInfo s) pa_unlock_stream(xid, AccessExclusiveLock); /* Send STREAM COMMIT message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_message(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); + + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, + LEADER_FILESET_ACCESSIBLE); /* - * After sending the data to the parallel apply worker, wait for - * that worker to finish. This is necessary to maintain commit - * order which avoids failures due to transaction dependencies and - * deadlocks. + * After sending the data to the parallel apply worker or + * serializing data to file, wait for that worker to finish. This + * is necessary to maintain commit order which avoids failures due + * to transaction dependencies and deadlocks. */ pa_wait_for_xact_finish(winfo); @@ -1972,6 +2099,14 @@ apply_handle_stream_commit(StringInfo s) break; case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply is + * applying spooled changes. + */ + if (stream_fd) + BufFileClose(stream_fd); + list_free(subxactlist); subxactlist = NIL; @@ -3860,7 +3995,7 @@ subxact_filename(char *path, Oid subid, TransactionId xid) } /* format filename for file containing serialized changes */ -static inline void +void changes_filename(char *path, Oid subid, TransactionId xid) { snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); @@ -3874,7 +4009,7 @@ changes_filename(char *path, Oid subid, TransactionId xid) * toplevel transaction. Each subscription has a separate set of files * for any toplevel transaction. */ -static void +void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; @@ -3897,9 +4032,6 @@ stream_cleanup_files(Oid subid, TransactionId xid) * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, create the buffile, otherwise open the * previously created file. - * - * This can only be called at the beginning of a "streaming" block, i.e. - * between stream_start/stream_stop messages from the upstream. */ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) @@ -3907,7 +4039,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) char path[MAXPGPATH]; MemoryContext oldcxt; - Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); @@ -3946,15 +4077,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* * stream_close_file * Close the currently open file with streamed changes. - * - * This can only be called at the end of a streaming block, i.e. at stream_stop - * message from the upstream. */ static void stream_close_file(void) { - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); BufFileClose(stream_fd); @@ -3971,13 +4097,11 @@ stream_close_file(void) * the length), action code (identifying the message type) and message * contents (without the subxact TransactionId value). */ -static void +void stream_write_change(char action, StringInfo s) { int len; - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); /* total on-disk size, including the action type character */ @@ -3996,6 +4120,23 @@ stream_write_change(char action, StringInfo s) } /* + * stream_write_message + * Serialize a message to a file for the given transaction. + * + * This function is similar to stream_write_change except that it will open the + * target file before writing the message and close file at the end. + */ +static void +stream_write_message(TransactionId xid, char action, StringInfo s) +{ + Assert(!in_streamed_transaction); + + serialize_stream_start(xid, false); + stream_write_change(action, s); + serialize_stream_stop(xid); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -4668,7 +4809,8 @@ reset_apply_error_context_info(void) /* * Return the action to take for the given transaction. *winfo is assigned to * the destination parallel worker info (if the action is - * TRANS_LEADER_SEND_TO_PARALLEL, otherwise *winfo is assigned NULL. + * TRANS_LEADER_SEND_TO_PARALLEL or TRANS_LEADER_PARTIAL_SERIALIZE), otherwise + * *winfo is assigned NULL. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4690,13 +4832,17 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) */ *winfo = pa_find_worker(xid); - if (*winfo) + if (!*winfo) { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_SERIALIZE; + } + else if ((*winfo)->serialize_changes) + { + return TRANS_LEADER_PARTIAL_SERIALIZE; } else { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 84d123a..7250f7a 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" #include "storage/shm_mq.h" @@ -107,6 +108,14 @@ typedef enum ParallelTransState PARALLEL_TRANS_FINISHED } ParallelTransState; +/* State of fileset in leader apply worker. */ +typedef enum LeaderFileSetState +{ + LEADER_FILESET_UNKNOWN, + LEADER_FILESET_BUSY, + LEADER_FILESET_ACCESSIBLE +} LeaderFileSetState; + /* * Struct for sharing information between leader apply worker and parallel * apply workers. @@ -136,6 +145,19 @@ typedef struct ParallelApplyWorkerShared * parallel apply worker will check it before starting to wait. */ pg_atomic_uint32 pending_stream_count; + + /* + * The leader apply worker will serialize changes to the file after + * entering PARTIAL_SERIALIZE mode and share the fileset with the parallel + * apply worker when processing the transaction finish command. And then + * the parallel apply worker will apply all the spooled messages. + * + * Don't use SharedFileSet here as we need the fileset to survive after + * releasing the shared memory so that the leader apply worker can re-use + * the fileset for next streaming transaction. + */ + LeaderFileSetState fileset_state; + FileSet fileset; } ParallelApplyWorkerShared; /* @@ -155,6 +177,13 @@ typedef struct ParallelApplyWorkerInfo dsm_segment *dsm_seg; /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to disk due to timeout when sending data to the + * parallel apply worker. + */ + bool serialize_changes; + + /* * True if the worker is being used to process a parallel apply * transaction. False indicates this worker is available for re-use. */ @@ -177,6 +206,8 @@ extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared; extern PGDLLIMPORT List *subxactlist; +extern PGDLLIMPORT BufFile *stream_fd; + /* libpqreceiver connection */ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; @@ -214,10 +245,21 @@ extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); +extern void serialize_stream_start(TransactionId xid, bool first_segment); +extern void serialize_stream_stop(TransactionId xid); + +/* Common streaming function to apply all the spooled messages */ +extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn); + extern void apply_dispatch(StringInfo s); extern void maybe_reread_subscription(void); +extern void changes_filename(char *path, Oid subid, TransactionId xid); +extern void stream_write_change(char action, StringInfo s); +extern void stream_cleanup_files(Oid subid, TransactionId xid); + extern void InitializeApplyWorker(void); extern void apply_worker_clean_exit(void); @@ -238,6 +280,8 @@ extern void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); +extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + LeaderFileSetState fileset_state); extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode); -- 2.7.2.windows.1