From 0cd28537b9153c9386a8ca7ccfe66b7515a47a1b Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 20 Nov 2020 11:17:18 +0530 Subject: [PATCH v2] Bug fix in handling streaming transaction in tablesync worker Tablesync worker applies all the changes under a single transaction but in streaming mode we were committing the transaction on stream stop and stream commit so avoid that if it is tablesync worker --- src/backend/replication/logical/worker.c | 71 +++++++++++++++++--------------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0468491..cf646a7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -696,19 +696,13 @@ apply_handle_begin(StringInfo s) } /* - * Handle COMMIT message. + * Internal helper function to Handle COMMIT message. * - * TODO, support tracking of multiple origins + * Helper function for apply_handle_commit and apply_handle_stream_commit */ static void -apply_handle_commit(StringInfo s) +apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) { - LogicalRepCommitData commit_data; - - logicalrep_read_commit(s, &commit_data); - - Assert(commit_data.commit_lsn == remote_final_lsn); - /* The synchronization worker runs in single transaction. */ if (IsTransactionState() && !am_tablesync_worker()) { @@ -716,13 +710,13 @@ apply_handle_commit(StringInfo s) * Update origin state so we can restart streaming from correct * position in case of crash. */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(commit_data.end_lsn); + store_flush_position(commit_data->end_lsn); } else { @@ -734,7 +728,24 @@ apply_handle_commit(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + process_syncing_tables(commit_data->end_lsn); +} + +/* + * Handle COMMIT message. + * + * TODO, support tracking of multiple origins + */ +static void +apply_handle_commit(StringInfo s) +{ + LogicalRepCommitData commit_data; + + logicalrep_read_commit(s, &commit_data); + + Assert(commit_data.commit_lsn == remote_final_lsn); + + apply_handle_commit_internal(s, &commit_data); pgstat_report_activity(STATE_IDLE, NULL); } @@ -825,8 +836,12 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* Commit the per-stream transaction */ - CommitTransactionCommand(); + /* The synchronization worker runs in single transaction. */ + if (!am_tablesync_worker()) + { + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + } in_streamed_transaction = false; @@ -902,7 +917,9 @@ apply_handle_stream_abort(StringInfo s) { /* Cleanup the subxact info */ cleanup_subxact_info(); - CommitTransactionCommand(); + + if (!am_tablesync_worker()) + CommitTransactionCommand(); return; } @@ -928,7 +945,10 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - CommitTransactionCommand(); + + /* The synchronization worker runs in single transaction */ + if (!am_tablesync_worker()) + CommitTransactionCommand(); } } @@ -1048,28 +1068,13 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); pfree(s2.data); - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); - elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - in_remote_transaction = false; - - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); -- 1.8.3.1