From 1073207ed7838061a648b9b157f9422a08e010aa Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 11 Jun 2025 11:42:26 +0800 Subject: [PATCH vPOC] Apply non-streaming transactions in parallel apply worker --- src/backend/executor/execReplication.c | 39 +- .../replication/logical/applyparallelworker.c | 427 +++++++++- src/backend/replication/logical/proto.c | 35 + src/backend/replication/logical/relation.c | 36 +- src/backend/replication/logical/worker.c | 747 ++++++++++++++++-- src/include/executor/executor.h | 9 +- src/include/replication/logicalproto.h | 3 + src/include/replication/logicalrelation.h | 3 + src/include/replication/worker_internal.h | 28 +- src/test/subscription/t/010_truncate.pl | 2 +- src/test/subscription/t/026_stats.pl | 1 + src/test/subscription/t/027_nosuperuser.pl | 1 + 12 files changed, 1203 insertions(+), 128 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42d..fdf73ba264f 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -139,6 +139,12 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) { case TM_Ok: break; + case TM_SelfModified: + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be updated was already modified in the current transaction"))); + refetch = true; + break; case TM_Updated: /* XXX: Improve handling here */ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) @@ -179,7 +185,8 @@ bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, - TupleTableSlot *outslot) + TupleTableSlot *outslot, + bool locktuple) { ScanKeyData skey[INDEX_MAX_KEYS]; int skey_attoff; @@ -246,7 +253,7 @@ retry: } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && locktuple) { TM_FailureData tmfd; TM_Result res; @@ -353,7 +360,8 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, */ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, - TupleTableSlot *searchslot, TupleTableSlot *outslot) + TupleTableSlot *searchslot, TupleTableSlot *outslot, + bool locktuple) { TupleTableSlot *scanslot; TableScanDesc scan; @@ -404,7 +412,7 @@ retry: } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && locktuple) { TM_FailureData tmfd; TM_Result res; @@ -431,6 +439,29 @@ retry: return found; } +bool +RelationLockTuple(Relation rel, LockTupleMode lockmode, + TupleTableSlot *searchslot, TupleTableSlot *outslot, + CommandId cid) +{ + TM_FailureData tmfd; + TM_Result res; + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), + outslot, + cid, + lockmode, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + return !should_refetch_tuple(res, &tmfd); +} + /* * Build additional index information necessary for conflict detection. */ diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..0dd001e3623 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -209,6 +209,8 @@ #define PARALLEL_APPLY_LOCK_STREAM 0 #define PARALLEL_APPLY_LOCK_XACT 1 +#define PARALLEL_APPLY_INIT_RELATION 'r' + /* * Hash table entry to map xid to the parallel apply worker state. */ @@ -216,8 +218,14 @@ typedef struct ParallelApplyWorkerEntry { TransactionId xid; /* Hash key -- must be first */ ParallelApplyWorkerInfo *winfo; + XLogRecPtr local_end; } ParallelApplyWorkerEntry; +typedef struct ParallelApplyCommitSeq +{ + pg_atomic_uint64 committable_seq_num; +} ParallelApplyCommitSeq; + /* * A hash table used to cache the state of streaming transactions being applied * by the parallel apply workers. @@ -254,9 +262,18 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +static dsm_segment *commit_seq_seg = NULL; +static ParallelApplyCommitSeq *pa_commit_seq = NULL; + +static TransactionId last_parallelized_xid = InvalidTransactionId; +static uint64 last_parallelized_commit_seq = 0; + +static dsm_handle pa_get_dsm_for_commit_seq(void); static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); +static bool pa_transaction_committed(uint64 commit_seq_num); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); +static void pa_init_relmap_cache(StringInfo s); /* * Returns true if it is OK to start a parallel apply worker, false otherwise. @@ -334,6 +351,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) shm_mq *mq; Size queue_size = DSM_QUEUE_SIZE; Size error_queue_size = DSM_ERROR_QUEUE_SIZE; + dsm_handle commit_seq_handle; + + commit_seq_handle = pa_get_dsm_for_commit_seq(); + + if (commit_seq_handle == DSM_HANDLE_INVALID) + return false; /* * Estimate how much shared memory we need. @@ -364,11 +387,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) /* Set up the header region. */ shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared)); SpinLockInit(&shared->mutex); - + shared->xid = InvalidTransactionId; shared->xact_state = PARALLEL_TRANS_UNKNOWN; pg_atomic_init_u32(&(shared->pending_stream_count), 0); shared->last_commit_end = InvalidXLogRecPtr; shared->fileset_state = FS_EMPTY; + shared->commit_seq_handle = commit_seq_handle; shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared); @@ -396,6 +420,24 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) return true; } +static dsm_handle +pa_get_dsm_for_commit_seq(void) +{ + if (commit_seq_seg) + return dsm_segment_handle(commit_seq_seg); + + /* Create the shared memory segment and establish a table of contents. */ + commit_seq_seg = dsm_create(sizeof(ParallelApplyCommitSeq), 0); + if (!commit_seq_seg) + return DSM_HANDLE_INVALID; + + pa_commit_seq = (ParallelApplyCommitSeq *) dsm_segment_address(commit_seq_seg); + + pg_atomic_init_u64(&(pa_commit_seq->committable_seq_num), 0); + + return dsm_segment_handle(commit_seq_seg); +} + /* * Try to get a parallel apply worker from the pool. If none is available then * start a new one. @@ -413,10 +455,29 @@ pa_launch_parallel_worker(void) { winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - if (!winfo->in_use) + if (!winfo->stream_txn && + pa_transaction_committed(winfo->shared->commit_seq_num)) + { + /* + * Save the local commit lsn of the last transaction that was + * applied by this worker. We need to collect this info to + * determine the flush position to reply to the publisher (See + * get_flush_position()). + */ + (void) pa_get_last_commit_end(winfo->shared->xid, false, NULL); + return winfo; + } + + if (winfo->stream_txn && !winfo->in_use) return winfo; } + pa_get_dsm_for_commit_seq(); + + if (list_length(ParallelApplyWorkerPool) == + max_parallel_apply_workers_per_subscription) + return NULL; + /* * Start a new parallel apply worker. * @@ -445,10 +506,23 @@ pa_launch_parallel_worker(void) if (launched) { + StringInfoData out; + + elog(DEBUG1, "started new pa worker"); ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); + + initStringInfo(&out); + appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION); + logicalrep_write_all_rels(&out); + + if (out.len > 1) + pa_send_data(winfo, out.len, out.data); + + pfree(out.data); } else { + elog(DEBUG1, "failed to start pa worker"); pa_free_worker_info(winfo); winfo = NULL; } @@ -467,7 +541,7 @@ pa_launch_parallel_worker(void) * streaming changes. */ void -pa_allocate_worker(TransactionId xid) +pa_allocate_worker(TransactionId xid, bool stream_txn) { bool found; ParallelApplyWorkerInfo *winfo = NULL; @@ -504,11 +578,21 @@ pa_allocate_worker(TransactionId xid) SpinLockAcquire(&winfo->shared->mutex); winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; winfo->shared->xid = xid; + winfo->shared->commit_seq_num = last_parallelized_commit_seq; + winfo->shared->wait_for_xid = last_parallelized_xid; SpinLockRelease(&winfo->shared->mutex); winfo->in_use = true; winfo->serialize_changes = false; + winfo->stream_txn = stream_txn; entry->winfo = winfo; + entry->local_end = InvalidXLogRecPtr; + + if (!stream_txn) + { + last_parallelized_xid = xid; + last_parallelized_commit_seq++; + } } /* @@ -542,6 +626,57 @@ pa_find_worker(TransactionId xid) return NULL; } +XLogRecPtr +pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write) +{ + bool found; + ParallelApplyWorkerEntry *entry; + ParallelApplyWorkerInfo *winfo; + + Assert(TransactionIdIsValid(xid)); + + if (skipped_write) + *skipped_write = false; + + /* Find an entry for the requested transaction. */ + entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found); + + if (!found) + return InvalidXLogRecPtr; + + winfo = entry->winfo; + + if (winfo == NULL) + { + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + + if (skipped_write) + *skipped_write = XLogRecPtrIsInvalid(entry->local_end); + + return entry->local_end; + } + + if (!pa_transaction_committed(winfo->shared->commit_seq_num)) + return InvalidXLogRecPtr; + + entry->local_end = winfo->shared->last_commit_end; + entry->winfo = NULL; + + if (skipped_write) + *skipped_write = XLogRecPtrIsInvalid(entry->local_end); + + elog(DEBUG1, "store local commit %X/%X end to txn entry: %u", + LSN_FORMAT_ARGS(entry->local_end), xid); + + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + + return entry->local_end; +} + /* * Makes the worker available for reuse. * @@ -557,7 +692,8 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) { Assert(!am_parallel_apply_worker()); Assert(winfo->in_use); - Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); + Assert(!winfo->stream_txn || + pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL)) elog(ERROR, "hash table corrupted"); @@ -573,9 +709,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) * been serialized and then letting the parallel apply worker deal with * the spurious message, we stop the worker. */ - if (winfo->serialize_changes || - list_length(ParallelApplyWorkerPool) > - (max_parallel_apply_workers_per_subscription / 2)) + if (winfo->serialize_changes) { logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); @@ -705,6 +839,106 @@ pa_process_spooled_messages_if_required(void) return true; } +void +pa_advance_committable_seq_num(void) +{ + Assert(am_parallel_apply_worker()); + pg_atomic_add_fetch_u64(&pa_commit_seq->committable_seq_num, 1); +} + +static bool +pa_transaction_committable(uint64 commit_seq_num) +{ + uint64 shared_seq_num; + + shared_seq_num = pg_atomic_read_u64(&pa_commit_seq->committable_seq_num); + + Assert(shared_seq_num <= commit_seq_num); + + return commit_seq_num == shared_seq_num; +} + +static bool +pa_transaction_committed(uint64 commit_seq_num) +{ + return commit_seq_num < + pg_atomic_read_u64(&pa_commit_seq->committable_seq_num); +} + +static void +pa_wait_for_transaction(TransactionId wait_for_xid, uint64 commit_seq_num) +{ + int count = 0; + + if (!TransactionIdIsValid(wait_for_xid) || commit_seq_num == 0) + return; + + elog(DEBUG1, "plan to wait for commit seq: %lld, remote_xid %u to finish", + (long long int) commit_seq_num, wait_for_xid); + + for (;;) + { + if (pa_transaction_committable(commit_seq_num)) + break; + + if (count > 100000) + { + pa_lock_transaction(wait_for_xid, AccessShareLock); + pa_unlock_transaction(wait_for_xid, AccessShareLock); + } + else + count++; + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + + elog(DEBUG1, "finished wait for commit seq: %lld, remote_xid %u to finish", + (long long int) commit_seq_num, wait_for_xid); +} + +void +pa_wait_until_committable(void) +{ + TransactionId wait_for_xid = InvalidTransactionId; + uint64 commit_seq_num = 0; + + if (am_parallel_apply_worker()) + { + SpinLockAcquire(&MyParallelShared->mutex); + wait_for_xid = MyParallelShared->wait_for_xid; + commit_seq_num = MyParallelShared->commit_seq_num; + SpinLockRelease(&MyParallelShared->mutex); + } + else + { + wait_for_xid = last_parallelized_xid; + commit_seq_num = last_parallelized_commit_seq; + } + + pa_wait_for_transaction(wait_for_xid, commit_seq_num); +} + +bool +pa_cur_transaction_committable(void) +{ + uint64 commit_seq_num = 0; + + if (am_parallel_apply_worker()) + { + SpinLockAcquire(&MyParallelShared->mutex); + commit_seq_num = MyParallelShared->commit_seq_num; + SpinLockRelease(&MyParallelShared->mutex); + } + else + { + commit_seq_num = last_parallelized_commit_seq; + } + + return pa_transaction_committable(commit_seq_num); +} + + /* * Interrupt handler for main loop of parallel apply worker. */ @@ -745,6 +979,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) "ApplyMessageContext", ALLOCSET_DEFAULT_SIZES); + ApplyBufferContext = AllocSetContextCreate(ApplyContext, + "ApplyBufferContext", + ALLOCSET_DEFAULT_SIZES); + /* * Push apply error context callback. Fields will be filled while applying * a change. @@ -775,26 +1013,35 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) initReadOnlyStringInfo(&s, data, len); - /* - * The first byte of messages sent from leader apply worker to - * parallel apply workers can only be 'w'. - */ c = pq_getmsgbyte(&s); - if (c != 'w') - elog(ERROR, "unexpected message \"%c\"", c); - - /* - * Ignore statistics fields that have been updated by the leader - * apply worker. - * - * XXX We can avoid sending the statistics fields from the leader - * apply worker but for that, it needs to rebuild the entire - * message by removing these fields which could be more work than - * simply ignoring these fields in the parallel apply worker. - */ - s.cursor += SIZE_STATS_MESSAGE; + if (c == 'w') + { + /* + * Ignore statistics fields that have been updated by the + * leader apply worker. + * + * XXX We can avoid sending the statistics fields from the + * leader apply worker but for that, it needs to rebuild the + * entire message by removing these fields which could be more + * work than simply ignoring these fields in the parallel apply + * worker. + */ + s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + apply_dispatch(&s); + } + else if (c == PARALLEL_APPLY_INIT_RELATION) + { + pa_init_relmap_cache(&s); + } + else + { + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w' or 'r'. + */ + elog(ERROR, "unexpected message \"%c\"", c); + } } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -811,6 +1058,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) if (rc & WL_LATCH_SET) ResetLatch(MyLatch); + + if (!IsTransactionState()) + pgstat_report_stat(true); } } else @@ -848,6 +1098,7 @@ pa_shutdown(int code, Datum arg) INVALID_PROC_NUMBER); dsm_detach((dsm_segment *) DatumGetPointer(arg)); + dsm_detach(commit_seq_seg); } /* @@ -913,6 +1164,18 @@ ParallelApplyWorkerMain(Datum main_arg) */ logicalrep_worker_attach(worker_slot); + /* + * Attach to the dynamic shared memory segment for the parallel apply + * commit sequence. + */ + commit_seq_seg = dsm_attach(MyParallelShared->commit_seq_handle); + if (!commit_seq_seg) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + pa_commit_seq = dsm_segment_address(commit_seq_seg); + /* * Register the shutdown callback after we are attached to the worker * slot. This is to ensure that MyLogicalRepWorker remains valid when this @@ -1149,7 +1412,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); /* @@ -1201,6 +1463,51 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) } } +void +pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel) +{ + List *workers_stopped = NIL; + StringInfoData out; + + if (!ParallelApplyWorkerPool) + return; + + initStringInfo(&out); + appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION); + logicalrep_write_remote_rel(&out, rel); + + foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool) + { + if (winfo == stream_apply_worker) + continue; + + if (winfo->serialize_changes) + continue; + + elog(DEBUG1, "distributing schema changes to pa workers"); + + if (pa_send_data(winfo, out.len, out.data)) + continue; + + elog(DEBUG1, "failed to distribute, will stop that worker instead"); + + /* cannot distribute to this worker, stop this worker */ + pa_wait_for_transaction(winfo->shared->xid, + winfo->shared->commit_seq_num + 1); + + pa_get_last_commit_end(winfo->shared->xid, false, NULL); + + logicalrep_pa_worker_stop(winfo); + + workers_stopped = lappend(workers_stopped, winfo); + } + + pfree(out.data); + + foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped) + pa_free_worker_info(winfo); +} + /* * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means * that the current data and any subsequent data for this transaction will be @@ -1282,9 +1589,9 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED); /* - * Wait for the transaction lock to be released. This is required to - * detect deadlock among leader and parallel apply workers. Refer to the - * comments atop this file. + * Wait for the transaction lock to be released. This is required to detect + * deadlock among leader and parallel apply. Refer to the comments atop + * this file. */ pa_lock_transaction(winfo->shared->xid, AccessShareLock); pa_unlock_transaction(winfo->shared->xid, AccessShareLock); @@ -1298,6 +1605,7 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("lost connection to the logical replication parallel apply worker"))); + } /* @@ -1361,6 +1669,9 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp) void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) { + if (!TransactionIdIsValid(top_xid)) + return; + if (current_xid != top_xid && !list_member_xid(subxactlist, current_xid)) { @@ -1617,23 +1928,63 @@ pa_decr_and_wait_stream_block(void) void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) { - Assert(am_leader_apply_worker()); + XLogRecPtr local_lsn = InvalidXLogRecPtr; + TransactionId pa_remote_xid = winfo->shared->xid; - /* - * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. - */ - pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + Assert(am_leader_apply_worker()); /* + * Unlock the shared object lock taken for transactions so that parallel + * apply worker can continue to receive and apply changes. + * * Wait for that worker to finish. This is necessary to maintain commit * order which avoids failures due to transaction dependencies and * deadlocks. */ - pa_wait_for_xact_finish(winfo); + if (winfo->serialize_changes || winfo->stream_txn) + { + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + pa_wait_for_xact_finish(winfo); + + local_lsn = winfo->shared->last_commit_end; + pa_remote_xid = InvalidTransactionId; + + pa_free_worker(winfo); + } if (!XLogRecPtrIsInvalid(remote_lsn)) - store_flush_position(remote_lsn, winfo->shared->last_commit_end); + store_flush_position(remote_lsn, local_lsn, pa_remote_xid); + + pa_set_stream_apply_worker(NULL); +} + +static void +pa_init_relmap_cache(StringInfo s) +{ + for (;;) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + elog(DEBUG1, "pa worker init relmap for %s", rel->relname); + + if (s->cursor == s->len) + break; + } +} + +void +pa_update_commit_seq(ParallelApplyWorkerInfo *winfo) +{ + Assert(am_leader_apply_worker()); + Assert(winfo->stream_txn); + + SpinLockAcquire(&winfo->shared->mutex); + winfo->shared->commit_seq_num = last_parallelized_commit_seq; + winfo->shared->wait_for_xid = last_parallelized_xid; + SpinLockRelease(&winfo->shared->mutex); - pa_free_worker(winfo); + elog(DEBUG1, "updated wait event for pa worker, seq: %lld, wait xid: %u", + (long long int) last_parallelized_commit_seq, last_parallelized_xid); } diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1a352b542dc..c072c44795f 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -691,6 +691,41 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_attrs(out, rel, columns, include_gencols_type); } +void +logicalrep_write_remote_rel(StringInfo out, LogicalRepRelation *rel) +{ + pq_sendint32(out, rel->remoteid); + + /* Write relation name */ + pq_sendstring(out, rel->nspname); + pq_sendstring(out, rel->relname); + + /* Write the replica identity. */ + pq_sendbyte(out, rel->replident); + + /* Write attribute description */ + pq_sendint16(out, rel->natts); + + for (int i = 0; i < rel->natts; i++) + { + uint8 flags = 0; + + if (bms_is_member(i, rel->attkeys)) + flags |= LOGICALREP_IS_REPLICA_IDENTITY; + + pq_sendbyte(out, flags); + + /* attribute name */ + pq_sendstring(out, rel->attnames[i]); + + /* attribute type id */ + pq_sendint32(out, rel->atttyps[i]); + + /* ignore attribute mode for now */ + pq_sendint32(out, 0); + } +} + /* * Read the relation info from stream and return as LogicalRepRelation. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index f59046ad620..35392cf9aac 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -133,6 +133,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) { LogicalRepRelation *remoterel; + if (entry->refcount) + elog(ERROR, "cannot free the entry when it is sill being used"); + remoterel = &entry->remoterel; pfree(remoterel->nspname); @@ -366,6 +369,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) remoterel = &entry->remoterel; /* Ensure we don't leak a relcache refcount. */ + if (entry->refcount > 0) + { + Assert(entry->localrel); + entry->refcount++; + return entry; + } + if (entry->localrel) elog(ERROR, "remote relation ID %u is already open", remoteid); @@ -494,6 +504,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) entry->localreloid, &entry->statelsn); + entry->refcount++; + return entry; } @@ -503,8 +515,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) { - table_close(rel->localrel, lockmode); - rel->localrel = NULL; + rel->refcount--; + + if (!rel->refcount) + { + table_close(rel->localrel, lockmode); + rel->localrel = NULL; + } } /* @@ -946,3 +963,18 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, return InvalidOid; } + +void +logicalrep_write_all_rels(StringInfo out) +{ + LogicalRepRelMapEntry *entry; + HASH_SEQ_STATUS status; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + logicalrep_write_remote_rel(out, &entry->remoterel); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c5fb627aa56..9130f893787 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -193,6 +193,7 @@ typedef struct FlushPosition dlist_node node; XLogRecPtr local_end; XLogRecPtr remote_end; + TransactionId pa_remote_xid; } FlushPosition; static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); @@ -209,6 +210,19 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +typedef struct ApplyBufferChange +{ + LogicalRepMsgType action; + ApplyExecutionData *edata; + TupleTableSlot *old_slot; + TupleTableSlot *new_slot; + TupleTableSlot *local_slot; + LogicalRepTupleData *tuple; +} ApplyBufferChange; + +static bool buffering_changes = false; +static List *buffered_changes = NIL; + /* Struct for saving and restoring apply errcontext information */ typedef struct ApplyErrorCallbackArg { @@ -283,6 +297,7 @@ ErrorContextCallback *apply_error_context_stack = NULL; MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; +MemoryContext ApplyBufferContext = NULL; /* per stream context for streaming transactions */ static MemoryContext LogicalStreamingContext = NULL; @@ -296,6 +311,7 @@ static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +static TransactionId remote_xid = InvalidTransactionId; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -364,11 +380,7 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_open_file(Oid subid, TransactionId xid, - bool first_segment); static void stream_write_change(char action, StringInfo s); -static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); -static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -380,11 +392,13 @@ static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - Oid localindexoid); + Oid localindexoid, + TupleTableSlot *bufferedlocalslot); static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - Oid localindexoid); + Oid localindexoid, + TupleTableSlot *bufferedlocalslot); static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, @@ -394,6 +408,14 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation); +static void apply_buffered_changes(void); +static LogicalRepTupleData *apply_buffer_copy_tuple(LogicalRepTupleData *tupleData); +static void apply_buffer_add_change(LogicalRepMsgType action, + ApplyExecutionData *edata, + TupleTableSlot *old_slot, + TupleTableSlot *new_slot, + LogicalRepTupleData *tuple); +static void apply_buffer_store_local_slot(TupleTableSlot *local_slot); /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); @@ -508,7 +530,8 @@ begin_replication_step(void) maybe_reread_subscription(); } - PushActiveSnapshot(GetTransactionSnapshot()); + if (!buffering_changes || !ActiveSnapshotSet()) + PushActiveSnapshot(GetTransactionSnapshot()); MemoryContextSwitchTo(ApplyMessageContext); } @@ -523,6 +546,10 @@ begin_replication_step(void) static void end_replication_step(void) { + if (buffering_changes) + return; + + Assert(ActiveSnapshotSet()); PopActiveSnapshot(); CommandCounterIncrement(); @@ -556,14 +583,16 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; - apply_action = get_transaction_apply_action(stream_xid, &winfo); + Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid)); + + apply_action = get_transaction_apply_action(in_streamed_transaction + ? stream_xid : remote_xid, + &winfo); /* not in streaming mode */ if (apply_action == TRANS_LEADER_APPLY) return false; - Assert(TransactionIdIsValid(stream_xid)); - /* * The parallel apply worker needs the xid in this message to decide * whether to define a savepoint, so save the original message that has @@ -574,9 +603,12 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* * We should have received XID of the subxact as the first part of the - * message, so extract it. + * message in streaming transactions, so extract it. */ - current_xid = pq_getmsgint(s, 4); + if (in_streamed_transaction) + current_xid = pq_getmsgint(s, 4); + else + current_xid = remote_xid; if (!TransactionIdIsValid(current_xid)) ereport(ERROR, @@ -685,10 +717,17 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel) estate->es_opened_result_relations = lappend(estate->es_opened_result_relations, resultRelInfo); - estate->es_output_cid = GetCurrentCommandId(true); + if (!buffering_changes) + { + estate->es_output_cid = GetCurrentCommandId(true); - /* Prepare to catch AFTER triggers. */ - AfterTriggerBeginQuery(); + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + } + else + { + estate->es_output_cid = GetCurrentCommandId(false); + } /* other fields of edata remain NULL for now */ @@ -985,17 +1024,50 @@ static void apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); + + remote_xid = begin_data.xid; + + set_apply_error_context_xact(remote_xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; maybe_start_skipping_changes(begin_data.final_lsn); + pa_allocate_worker(remote_xid, false); + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + elog(DEBUG1, "new remote_xid %u", remote_xid); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + buffering_changes = !pa_cur_transaction_committable(); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + pa_send_data(winfo, s->len, s->data); + pa_set_stream_apply_worker(winfo); + break; + + case TRANS_PARALLEL_APPLY: + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + buffering_changes = !pa_cur_transaction_committable(); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -1010,6 +1082,11 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; logicalrep_read_commit(s, &commit_data); @@ -1020,7 +1097,79 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + switch (apply_action) + { + case TRANS_LEADER_APPLY: + if (buffering_changes) + { + pa_wait_until_committable(); + apply_buffered_changes(); + } + + apply_handle_commit_internal(&commit_data); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + pa_wait_until_committable(); + apply_buffered_changes(); + apply_handle_commit_internal(&commit_data); + + MyParallelShared->last_commit_end = XactLastCommitEnd; + + pa_advance_committable_seq_num(); + pa_unlock_transaction(remote_xid, AccessExclusiveLock); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + + remote_xid = InvalidTransactionId; + in_remote_transaction = false; + + buffering_changes = false; + + elog(DEBUG1, "reset remote_xid %u", remote_xid); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -1140,7 +1289,8 @@ apply_handle_prepare(StringInfo s) * XactLastCommitEnd, and adding it for this purpose doesn't seems worth * it. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -1185,6 +1335,8 @@ apply_handle_commit_prepared(StringInfo s) /* There is no transaction when COMMIT PREPARED is called */ begin_replication_step(); + /* TODO wait for xid to finish */ + /* * Update origin state so we can restart streaming from correct position * in case of crash. @@ -1197,7 +1349,8 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd, + InvalidTransactionId); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1263,7 +1416,8 @@ apply_handle_rollback_prepared(StringInfo s) * transaction because we always flush the WAL record for it. See * apply_handle_prepare. */ - store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); + store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1322,7 +1476,8 @@ apply_handle_stream_prepare(StringInfo s) * It is okay not to set the local_end LSN for the prepare because * we always flush the prepare record. See apply_handle_prepare. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -1501,6 +1656,8 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; + buffering_changes = false; + /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); @@ -1513,7 +1670,7 @@ apply_handle_stream_start(StringInfo s) /* Try to allocate a worker for the streaming transaction. */ if (first_segment) - pa_allocate_worker(stream_xid); + pa_allocate_worker(stream_xid, true); apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -1535,6 +1692,8 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + pa_update_commit_seq(winfo); + /* * Once we start serializing the changes, the parallel apply * worker will wait for the leader to release the stream lock @@ -1571,6 +1730,12 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + /* + * TODO, the pa worker could start to wait too soon when processing + * some old stream start + */ + pa_update_commit_seq(winfo); + /* * Open the spool file unless it was already opened when switching * to serialize mode. The transaction started in @@ -1599,6 +1764,8 @@ apply_handle_stream_start(StringInfo s) logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); } + pa_wait_until_committable(); + parallel_stream_nchanges = 0; break; @@ -2069,6 +2236,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, end_replication_step(); + Assert(!buffering_changes); + /* * Read the entries one by one and pass them through the same logic as in * apply_dispatch. @@ -2294,7 +2463,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) pgstat_report_stat(false); - store_flush_position(commit_data->end_lsn, XactLastCommitEnd); + store_flush_position(commit_data->end_lsn, XactLastCommitEnd, + InvalidTransactionId); } else { @@ -2318,15 +2488,31 @@ static void apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; rel = logicalrep_read_rel(s); + + apply_action = get_transaction_apply_action(in_streamed_transaction + ? stream_xid : remote_xid, + &winfo); + + if (apply_action == TRANS_LEADER_APPLY) + { + pa_wait_until_committable(); + apply_buffered_changes(); + } + logicalrep_relmap_update(rel); /* Also reset all entries in the partition map that refer to remoterel. */ logicalrep_partmap_reset_relmap(rel); + + if (am_leader_apply_worker()) + pa_distribute_schema_changes_to_workers(rel); } /* @@ -2394,6 +2580,7 @@ apply_handle_insert(StringInfo s) ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; + MemoryContext applyctx; MemoryContext oldctx; bool run_as_owner; @@ -2405,6 +2592,9 @@ apply_handle_insert(StringInfo s) handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; + if (buffering_changes && pa_cur_transaction_committable()) + apply_buffered_changes(); + begin_replication_step(); relid = logicalrep_read_insert(s, &newtup); @@ -2431,6 +2621,9 @@ apply_handle_insert(StringInfo s) /* Set relation for error callback */ apply_error_callback_arg.rel = rel; + if (buffering_changes) + applyctx = MemoryContextSwitchTo(ApplyBufferContext); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2444,20 +2637,84 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); + if (buffering_changes) + { + apply_buffer_add_change(LOGICAL_REP_MSG_INSERT, edata, NULL, + remoteslot, NULL); + MemoryContextSwitchTo(applyctx); + } + else + { + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_INSERT); + else + { + ResultRelInfo *relinfo = edata->targetRelInfo; + + ExecOpenIndices(relinfo, false); + apply_handle_insert_internal(edata, relinfo, remoteslot); + ExecCloseIndices(relinfo); + } + + finish_edata(edata); + + logicalrep_rel_close(rel, NoLock); + } + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + end_replication_step(); +} + +static void +apply_handle_buffered_insert(ApplyBufferChange *change) +{ + LogicalRepRelMapEntry *rel = change->edata->targetRel; + UserContext ucxt; + bool run_as_owner; + + Assert(!buffering_changes); + + begin_replication_step(); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + + change->edata->estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(edata, - remoteslot, NULL, CMD_INSERT); + apply_handle_tuple_routing(change->edata, + change->new_slot, NULL, CMD_INSERT); else { - ResultRelInfo *relinfo = edata->targetRelInfo; + ResultRelInfo *relinfo = change->edata->targetRelInfo; ExecOpenIndices(relinfo, false); - apply_handle_insert_internal(edata, relinfo, remoteslot); + apply_handle_insert_internal(change->edata, relinfo, change->new_slot); ExecCloseIndices(relinfo); } - finish_edata(edata); + finish_edata(change->edata); + + logicalrep_rel_close(rel, NoLock); /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; @@ -2465,8 +2722,6 @@ apply_handle_insert(StringInfo s) if (!run_as_owner) RestoreUserContext(&ucxt); - logicalrep_rel_close(rel, NoLock); - end_replication_step(); } @@ -2554,6 +2809,7 @@ apply_handle_update(StringInfo s) bool has_oldtup; TupleTableSlot *remoteslot; RTEPermissionInfo *target_perminfo; + MemoryContext applyctx; MemoryContext oldctx; bool run_as_owner; @@ -2565,6 +2821,9 @@ apply_handle_update(StringInfo s) handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; + if (buffering_changes && pa_cur_transaction_committable()) + apply_buffered_changes(); + begin_replication_step(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, @@ -2595,6 +2854,9 @@ apply_handle_update(StringInfo s) if (!run_as_owner) SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + if (buffering_changes) + applyctx = MemoryContextSwitchTo(ApplyBufferContext); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2632,15 +2894,34 @@ apply_handle_update(StringInfo s) has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); - /* For a partitioned table, apply update to correct partition. */ - if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(edata, - remoteslot, &newtup, CMD_UPDATE); + if (buffering_changes) + { + apply_buffer_add_change(LOGICAL_REP_MSG_UPDATE, edata, + has_oldtup ? remoteslot : NULL, + has_oldtup ? NULL : remoteslot, + apply_buffer_copy_tuple(&newtup)); + MemoryContextSwitchTo(applyctx); + + if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup, + rel->localindexoid, NULL); + } else - apply_handle_update_internal(edata, edata->targetRelInfo, - remoteslot, &newtup, rel->localindexoid); + { + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, &newtup, CMD_UPDATE); + else + apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup, rel->localindexoid, + NULL); + + finish_edata(edata); - finish_edata(edata); + logicalrep_rel_close(rel, NoLock); + } /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; @@ -2648,8 +2929,68 @@ apply_handle_update(StringInfo s) if (!run_as_owner) RestoreUserContext(&ucxt); + end_replication_step(); +} + +static void +apply_handle_buffered_update(ApplyBufferChange *change) +{ + LogicalRepRelMapEntry *rel = change->edata->targetRel; + UserContext ucxt; + bool run_as_owner; + CommandId original_cid; + + Assert(!buffering_changes); + + begin_replication_step(); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + + original_cid = change->edata->estate->es_output_cid; + change->edata->estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(change->edata, + change->new_slot ? change->new_slot : change->old_slot, + change->tuple, CMD_UPDATE); + else + { + bool locked = false; + + if (change->local_slot) + locked = RelationLockTuple(rel->localrel, LockTupleExclusive, + change->new_slot ? change->new_slot : change->old_slot, + change->local_slot, original_cid); + + apply_handle_update_internal(change->edata, change->edata->targetRelInfo, + change->new_slot ? change->new_slot : change->old_slot, + change->tuple, rel->localindexoid, + locked ? change->local_slot : NULL); + } + + finish_edata(change->edata); + logicalrep_rel_close(rel, NoLock); + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + if (!run_as_owner) + RestoreUserContext(&ucxt); + end_replication_step(); } @@ -2663,25 +3004,35 @@ apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - Oid localindexoid) + Oid localindexoid, + TupleTableSlot *bufferedlocalslot) { EState *estate = edata->estate; LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; - TupleTableSlot *localslot = NULL; + TupleTableSlot *localslot = bufferedlocalslot; ConflictTupleInfo conflicttuple = {0}; - bool found; + bool found = (bufferedlocalslot != NULL); MemoryContext oldctx; + if (!found) + { + found = FindReplTupleInLocalRel(edata, localrel, + &relmapentry->remoterel, + localindexoid, + remoteslot, &localslot); + } + + if (buffering_changes) + { + apply_buffer_store_local_slot(found ? localslot : NULL); + return; + } + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, false); - found = FindReplTupleInLocalRel(edata, localrel, - &relmapentry->remoterel, - localindexoid, - remoteslot, &localslot); - /* * Tuple found. * @@ -2759,6 +3110,7 @@ apply_handle_delete(StringInfo s) ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; + MemoryContext applyctx; MemoryContext oldctx; bool run_as_owner; @@ -2770,6 +3122,9 @@ apply_handle_delete(StringInfo s) handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; + if (buffering_changes && pa_cur_transaction_committable()) + apply_buffered_changes(); + begin_replication_step(); relid = logicalrep_read_delete(s, &oldtup); @@ -2799,6 +3154,9 @@ apply_handle_delete(StringInfo s) if (!run_as_owner) SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + if (buffering_changes) + applyctx = MemoryContextSwitchTo(ApplyBufferContext); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2811,21 +3169,99 @@ apply_handle_delete(StringInfo s) slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); + if (buffering_changes) + { + apply_buffer_add_change(LOGICAL_REP_MSG_DELETE, edata, remoteslot, + NULL, NULL); + MemoryContextSwitchTo(applyctx); + + if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + apply_handle_delete_internal(edata, edata->targetRelInfo, + remoteslot, rel->localindexoid, NULL); + } + else + { + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_DELETE); + else + { + ResultRelInfo *relinfo = edata->targetRelInfo; + + ExecOpenIndices(relinfo, false); + apply_handle_delete_internal(edata, relinfo, + remoteslot, rel->localindexoid, NULL); + ExecCloseIndices(relinfo); + } + + finish_edata(edata); + + logicalrep_rel_close(rel, NoLock); + } + + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + end_replication_step(); +} + +static void +apply_handle_buffered_delete(ApplyBufferChange *change) +{ + LogicalRepRelMapEntry *rel = change->edata->targetRel; + UserContext ucxt; + bool run_as_owner; + CommandId original_cid; + + Assert(!buffering_changes); + + begin_replication_step(); + + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + + original_cid = change->edata->estate->es_output_cid; + change->edata->estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + /* For a partitioned table, apply delete to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(edata, - remoteslot, NULL, CMD_DELETE); + apply_handle_tuple_routing(change->edata, + change->old_slot, NULL, CMD_DELETE); else { - ResultRelInfo *relinfo = edata->targetRelInfo; + ResultRelInfo *relinfo = change->edata->targetRelInfo; + bool locked = false; + + if (change->local_slot) + locked = RelationLockTuple(rel->localrel, LockTupleExclusive, + change->old_slot, change->local_slot, + original_cid); ExecOpenIndices(relinfo, false); - apply_handle_delete_internal(edata, relinfo, - remoteslot, rel->localindexoid); + apply_handle_delete_internal(change->edata, relinfo, + change->old_slot, rel->localindexoid, + locked ? change->local_slot : NULL); ExecCloseIndices(relinfo); } - finish_edata(edata); + finish_edata(change->edata); + + logicalrep_rel_close(rel, NoLock); /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; @@ -2833,8 +3269,6 @@ apply_handle_delete(StringInfo s) if (!run_as_owner) RestoreUserContext(&ucxt); - logicalrep_rel_close(rel, NoLock); - end_replication_step(); } @@ -2847,25 +3281,33 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - Oid localindexoid) + Oid localindexoid, + TupleTableSlot *bufferedlocalslot) { EState *estate = edata->estate; Relation localrel = relinfo->ri_RelationDesc; LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; - TupleTableSlot *localslot; + TupleTableSlot *localslot = bufferedlocalslot; ConflictTupleInfo conflicttuple = {0}; - bool found; + bool found = (bufferedlocalslot != NULL); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + if (!found) + found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, + remoteslot, &localslot); + + if (buffering_changes) + { + apply_buffer_store_local_slot(found ? localslot : NULL); + return; + } /* Caller should have opened indexes already. */ Assert(relinfo->ri_IndexRelationDescs != NULL || !localrel->rd_rel->relhasindex || RelationGetIndexList(localrel) == NIL); - found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, - remoteslot, &localslot); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); /* If found delete it. */ if (found) @@ -2919,6 +3361,7 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, TupleTableSlot **localslot) { EState *estate = edata->estate; + MemoryContext oldctx; bool found; /* @@ -2927,8 +3370,14 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, */ TargetPrivilegesCheck(localrel, ACL_SELECT); + if (buffering_changes) + oldctx = MemoryContextSwitchTo(ApplyBufferContext); + *localslot = table_slot_create(localrel, &estate->es_tupleTable); + if (buffering_changes) + MemoryContextSwitchTo(oldctx); + Assert(OidIsValid(localidxoid) || (remoterel->replident == REPLICA_IDENTITY_FULL)); @@ -2947,11 +3396,13 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, found = RelationFindReplTupleByIndex(localrel, localidxoid, LockTupleExclusive, - remoteslot, *localslot); + remoteslot, *localslot, + !buffering_changes); } else found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, - remoteslot, *localslot); + remoteslot, *localslot, + !buffering_changes); return found; } @@ -3048,7 +3499,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, case CMD_DELETE: apply_handle_delete_internal(edata, partrelinfo, remoteslot_part, - part_entry->localindexoid); + part_entry->localindexoid, + NULL); break; case CMD_UPDATE: @@ -3250,6 +3702,8 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; + elog(LOG, "truncate"); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. @@ -3258,6 +3712,9 @@ apply_handle_truncate(StringInfo s) handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; + pa_wait_until_committable(); + apply_buffered_changes(); + begin_replication_step(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -3471,6 +3928,115 @@ apply_dispatch(StringInfo s) apply_error_callback_arg.command = saved_command; } +static void +apply_buffered_changes(void) +{ + buffering_changes = false; + elog(DEBUG1, "started apply buffered changes %d", list_length(buffered_changes)); + + if (!buffered_changes) + return; + + foreach_ptr(ApplyBufferChange, change, buffered_changes) + { + LogicalRepMsgType saved_command; + + /* + * Set the current command being applied. Since this function can be + * called recursively when applying spooled changes, save the current + * command. + */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = change->action; + + switch (change->action) + { + case LOGICAL_REP_MSG_INSERT: + apply_handle_buffered_insert(change); + break; + + case LOGICAL_REP_MSG_UPDATE: + apply_handle_buffered_update(change); + break; + + case LOGICAL_REP_MSG_DELETE: + apply_handle_buffered_delete(change); + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"??? (%d)\"", change->action))); + } + + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; + } + + Assert(ActiveSnapshotSet()); + PopActiveSnapshot(); + Assert(!ActiveSnapshotSet()); + MemoryContextReset(ApplyBufferContext); + buffered_changes = NIL; +} + +static LogicalRepTupleData * +apply_buffer_copy_tuple(LogicalRepTupleData *tupleData) +{ + LogicalRepTupleData *res; + int ncols = tupleData->ncols; + + Assert(CurrentMemoryContext == ApplyBufferContext); + + res = palloc0_object(LogicalRepTupleData); + + /* Allocate space for per-column values; zero out unused StringInfoDatas */ + res->colvalues = (StringInfoData *) palloc0(ncols * sizeof(StringInfoData)); + res->colstatus = (char *) palloc(ncols * sizeof(char)); + res->ncols = ncols; + + for (int i = 0; i < ncols; i++) + { + initStringInfo(&res->colvalues[i]); + appendBinaryStringInfo(&res->colvalues[i], + tupleData->colvalues[i].data, + tupleData->colvalues[i].len); + res->colstatus[i] = tupleData->colstatus[i]; + } + + return res; +} + +static void +apply_buffer_add_change(LogicalRepMsgType action, ApplyExecutionData *edata, + TupleTableSlot *old_slot, TupleTableSlot *new_slot, + LogicalRepTupleData *tuple) +{ + ApplyBufferChange *change; + + Assert(CurrentMemoryContext == ApplyBufferContext); + + change = palloc0_object(ApplyBufferChange); + change->action = action; + change->edata = edata; + change->new_slot = new_slot; + change->old_slot = old_slot; + change->tuple = tuple; + + buffered_changes = lappend(buffered_changes, change); +} + +static void +apply_buffer_store_local_slot(TupleTableSlot *local_slot) +{ + ApplyBufferChange *change; + + Assert(buffered_changes != NIL); + + change = (ApplyBufferChange *) llast(buffered_changes); + change->local_slot = local_slot; +} + /* * Figure out which write/flush positions to report to the walsender process. * @@ -3499,6 +4065,36 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, FlushPosition *pos = dlist_container(FlushPosition, node, iter.cur); + /* + * Get the parallel apply worker that holds the local commit lsn of the + * transaction. + */ + if (TransactionIdIsValid(pos->pa_remote_xid) && + XLogRecPtrIsInvalid(pos->local_end)) + { + bool skipped_write; + + pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true, + &skipped_write); + + elog(DEBUG1, + "got commit end from parallel apply worker, " + "txn: %u, remote_end %X/%X, local_end %X/%X", + pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end), + LSN_FORMAT_ARGS(pos->local_end)); + + /* Return if the worker has not finished applying */ + if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end)) + { + *have_pending_txes = true; + return; + } + } + + /* + * Worker has finished applying or the transaction was applied in the + * leader apply worker + */ *write = pos->remote_end; if (pos->local_end <= local_flush) @@ -3507,19 +4103,6 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, dlist_delete(iter.cur); pfree(pos); } - else - { - /* - * Don't want to uselessly iterate over the rest of the list which - * could potentially be long. Instead get the last element and - * grab the write position from there. - */ - pos = dlist_tail_element(FlushPosition, node, - &lsn_mapping); - *write = pos->remote_end; - *have_pending_txes = true; - return; - } } *have_pending_txes = !dlist_is_empty(&lsn_mapping); @@ -3529,7 +4112,8 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, * Store current remote/local lsn pair in the tracking list. */ void -store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) +store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId remote_xid) { FlushPosition *flushpos; @@ -3547,6 +4131,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); flushpos->local_end = local_lsn; flushpos->remote_end = remote_lsn; + flushpos->pa_remote_xid = remote_xid; dlist_push_tail(&lsn_mapping, &flushpos->node); MemoryContextSwitchTo(ApplyMessageContext); @@ -3594,6 +4179,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); + ApplyBufferContext = AllocSetContextCreate(ApplyContext, + "ApplyBufferContext", + ALLOCSET_DEFAULT_SIZES); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -4324,7 +4913,7 @@ stream_cleanup_files(Oid subid, TransactionId xid) * changes for this transaction, create the buffile, otherwise open the * previously created file. */ -static void +void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; @@ -4369,7 +4958,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * stream_close_file * Close the currently open file with streamed changes. */ -static void +void stream_close_file(void) { Assert(stream_fd != NULL); @@ -4417,7 +5006,7 @@ stream_write_change(char action, StringInfo s) * target file if not already before writing the message and close the file at * the end. */ -static void +void stream_open_and_write_change(TransactionId xid, char action, StringInfo s) { Assert(!in_streamed_transaction); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 104b059544d..ed9ba8fdb9c 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -756,9 +756,14 @@ extern void check_exclusion_constraint(Relation heap, Relation index, extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, - TupleTableSlot *outslot); + TupleTableSlot *outslot, + bool locktuple); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, - TupleTableSlot *searchslot, TupleTableSlot *outslot); + TupleTableSlot *searchslot, TupleTableSlot *outslot, + bool locktuple); +extern bool RelationLockTuple(Relation rel, LockTupleMode lockmode, + TupleTableSlot *searchslot, + TupleTableSlot *outslot, CommandId cid); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index b261c60d3fa..ce880f179dc 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -64,6 +64,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_INTERNAL_RELATION = 'r', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', @@ -251,6 +252,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type); +extern void logicalrep_write_remote_rel(StringInfo out, + LogicalRepRelation *rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 7a561a8e8d8..d35bd00e93c 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -37,6 +37,8 @@ typedef struct LogicalRepRelMapEntry /* Sync state. */ char state; XLogRecPtr statelsn; + + int refcount; } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); @@ -50,5 +52,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap); extern Oid GetRelationIdentityOrPK(Relation rel); +extern void logicalrep_write_all_rels(StringInfo out); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..e0c20da7299 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared */ PartialFileSetState fileset_state; FileSet fileset; + + dsm_handle commit_seq_handle; + + uint64 commit_seq_num; + TransactionId wait_for_xid; } ParallelApplyWorkerShared; /* @@ -214,7 +219,11 @@ typedef struct ParallelApplyWorkerInfo */ bool in_use; + bool stream_txn; + ParallelApplyWorkerShared *shared; + + bool collected_local_end; } ParallelApplyWorkerInfo; /* Main memory context for apply worker. Permanent during worker lifetime. */ @@ -222,6 +231,8 @@ extern PGDLLIMPORT MemoryContext ApplyContext; extern PGDLLIMPORT MemoryContext ApplyMessageContext; +extern PGDLLIMPORT MemoryContext ApplyBufferContext; + extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack; extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared; @@ -275,6 +286,10 @@ extern void apply_dispatch(StringInfo s); extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); +extern void stream_open_file(Oid subid, TransactionId xid, bool first_segment); +extern void stream_close_file(void); +extern void stream_open_and_write_change(TransactionId xid, char action, + StringInfo s); extern void set_stream_options(WalRcvStreamOptions *options, char *slotname, @@ -288,19 +303,27 @@ extern void SetupApplyOrSyncWorker(int worker_slot); extern void DisableSubscriptionAndExit(void); -extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); +extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId remote_xid); /* Function for apply error callback */ extern void apply_error_callback(void *arg); extern void set_apply_error_context_origin(char *originname); /* Parallel apply worker setup and interactions */ -extern void pa_allocate_worker(TransactionId xid); +extern void pa_allocate_worker(TransactionId xid, bool stream_txn); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); +extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry, + bool *skipped_write); extern void pa_detach_all_error_mq(void); +extern void pa_advance_committable_seq_num(void); +extern void pa_wait_until_committable(void); +extern bool pa_cur_transaction_committable(void); + extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); +extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked); @@ -325,6 +348,7 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_update_commit_seq(ParallelApplyWorkerInfo *winfo); #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 3d16c2a800d..c2fba0b9a9c 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -17,7 +17,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', - qq(max_logical_replication_workers = 6)); + qq(max_logical_replication_workers = 7)); $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 00a1c2fcd48..6842476c8b0 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node. my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_subscriber->start; diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index 36af1c16e7f..aec039d565b 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -87,6 +87,7 @@ $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_publisher->init(allows_streaming => 'logical'); $node_subscriber->init; $node_publisher->start; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_subscriber->start; $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my %remainder_a = ( -- 2.49.0.windows.1