From 176ee0b2408a62faac18fea06e3e21f9665fb49f Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 20 Nov 2020 11:17:18 +0530 Subject: [PATCH v1] Bug fix in handling streaming transaction in tablesync worker Tablesync worker applies all the changes under a single transaction but in streaming mode we were committing the transaction on stream stop and stream commit so avoid that if it is tablesync worker --- src/backend/replication/logical/worker.c | 45 +++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0468491..aadc46a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -825,8 +825,12 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* Commit the per-stream transaction */ - CommitTransactionCommand(); + /* The synchronization worker runs in single transaction. */ + if (!am_tablesync_worker()) + { + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + } in_streamed_transaction = false; @@ -902,7 +906,9 @@ apply_handle_stream_abort(StringInfo s) { /* Cleanup the subxact info */ cleanup_subxact_info(); - CommitTransactionCommand(); + + if (!am_tablesync_worker()) + CommitTransactionCommand(); return; } @@ -928,7 +934,8 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - CommitTransactionCommand(); + if (!am_tablesync_worker()) + CommitTransactionCommand(); } } @@ -1048,20 +1055,28 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); pfree(s2.data); - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); + /* The synchronization worker runs in single transaction. */ + if (!am_tablesync_worker()) + { + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = commit_data.end_lsn; + replorigin_session_origin_timestamp = commit_data.committime; + CommitTransactionCommand(); + pgstat_report_stat(false); + store_flush_position(commit_data.end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); -- 1.8.3.1