diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index f9dda6daee..7c2b0e0b5e 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -64,12 +64,13 @@ * etc.) of the publisher and subscriber could be different, applying * transactions in parallel mode on the subscriber side can cause some * deadlocks that do not occur on the publisher side which is expected and can - * happen even without parallel mode. In order to detect the deadlocks among - * leader and parallel apply workers, we need to ensure that we wait using lmgr - * locks, otherwise, such deadlocks won't be detected. The other approach was - * to not allow parallelism when the schema of tables is different between the - * publisher and subscriber but that would be too restrictive and would require - * the publisher to send much more information than it is currently sending. + * happen even without parallel mode when there are concurrent operations on + * the subscriber. In order to detect the deadlocks among leader and parallel + * apply workers, we need to ensure that we wait using lmgr locks, otherwise, + * such deadlocks won't be detected. The other approach was to not allow + * parallelism when the schema of tables is different between the publisher + * and subscriber but that would be too restrictive and would require the + * publisher to send much more information than it is currently sending. * * Consider a case where the subscribed table does not have a unique key on the * publisher and has a unique key on the subscriber. @@ -1308,6 +1309,10 @@ pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) void pa_clean_subtrans(void) { + /* + * We don't need to free this explicitly as the allocated memory will be + * freed at the transaction end. + */ subxactlist = NIL; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 997a1ef2d3..9510dc5683 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -484,6 +484,7 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) return MyLogicalRepWorker->relid == rel->localreloid; else if (am_parallel_apply_worker()) { + /* We don't synchronize rel's that are in unknown state. */ if (rel->state != SUBREL_STATE_READY && rel->state != SUBREL_STATE_UNKNOWN) ereport(ERROR, @@ -1945,6 +1946,7 @@ apply_handle_stream_commit(StringInfo s) pa_wait_for_xact_finish(winfo); store_flush_position(commit_data.end_lsn, winfo->shared->last_commit_end); + (void) pa_free_worker(winfo, xid); pgstat_report_stat(false); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9b2ca9461b..c10b503e76 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -124,7 +124,10 @@ typedef struct ParallelApplyWorkerShared */ pg_atomic_uint32 pending_stream_count; - /* XactLastCommitEnd from the parallel apply worker. */ + /* + * XactLastCommitEnd from the parallel apply worker. This is required to + * update the lsn_mappings by leader worker. + */ XLogRecPtr last_commit_end; } ParallelApplyWorkerShared;