From 9f2d1ff2a181136efe2d5db0e6ac43bec909a1f1 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 3 Dec 2020 14:18:19 +0530 Subject: [PATCH v1] Allow more than one transaction in tablesync worker. --- src/backend/replication/logical/tablesync.c | 9 ++++++++- src/backend/replication/logical/worker.c | 19 +++++-------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1904f34..886298e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -270,7 +270,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - Assert(IsTransactionState()); + if (!IsTransactionState()) + StartTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -294,6 +295,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) } else SpinLockRelease(&MyLogicalRepWorker->relmutex); + + if (IsTransactionState()) + CommitTransactionCommand(); } /* @@ -943,6 +947,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Make the copy visible. */ CommandCounterIncrement(); + CommitTransactionCommand(); + StartTransactionCommand(); + /* * We are done with the initial data synchronization, update the state. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8c7fad8..af6a98a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* The synchronization worker runs in single transaction. */ - if (!am_tablesync_worker()) - { - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - } + /* Commit the per-stream transaction */ + CommitTransactionCommand(); in_streamed_transaction = false; @@ -888,10 +884,7 @@ apply_handle_stream_abort(StringInfo s) { /* Cleanup the subxact info */ cleanup_subxact_info(); - - /* The synchronization worker runs in single transaction */ - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); return; } @@ -918,8 +911,7 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); } } @@ -1062,8 +1054,7 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data) { - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct -- 1.8.3.1