From 45b6899aa45b07fd0d42bf113c679895eebb4acd Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 17 Nov 2020 12:31:13 +1100 Subject: [PATCH v1] Degugging tablesync worker. This patch adds some LOGs and sleeps to help debugging the tablesync/apply worker initialization behaviour. --- src/backend/replication/logical/tablesync.c | 5 +++++ src/backend/replication/logical/worker.c | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a91b00e..8a1fa86 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -529,6 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { + elog(LOG, "!!>> %s worker: called process_syncing_tables", am_tablesync_worker() ? "tablesync" : "apply"); + if (am_tablesync_worker()) process_syncing_tables_for_sync(current_lsn); else @@ -955,6 +957,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Finally, wait until the main apply worker tells us to catch up and then * return to let LogicalRepApplyLoop do it. */ + elog(LOG, "!!>> tablesync worker: wait for CATCHUP state notification"); wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + elog(LOG, "!!>> tablesync worker: received CATCHUP state notification"); + return slotname; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0468491..498bb2c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2086,6 +2086,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool endofstream = false; long wait_time; + elog(LOG, "!!>> %s worker: LogicalRepApplyLoop", am_tablesync_worker() ? "tablesync" : "apply"); + CHECK_FOR_INTERRUPTS(); MemoryContextSwitchTo(ApplyMessageContext); @@ -3005,8 +3007,21 @@ ApplyWorkerMain(Datum main_arg) (errmsg("logical replication apply worker for subscription \"%s\" has started", MySubscription->name))); + CommitTransactionCommand(); + elog(LOG, "!!>> The %s worker process has PID = %d", + am_tablesync_worker() ? "tablesync" : "apply", + getpid()); + + if (am_tablesync_worker()) + { + elog(LOG, "!!>>\n\n\nSleeping 30 secs. For debugging, attach to process %d now!\n\n\n", getpid()); + sleep(30); + } + + + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -3016,7 +3031,9 @@ ApplyWorkerMain(Datum main_arg) char *syncslotname; /* This is table synchronization worker, call initial sync. */ + elog(LOG, "!!>> tablesync worker: About to call LogicalRepSyncTableStart to do initial syncing"); syncslotname = LogicalRepSyncTableStart(&origin_startpos); + elog(LOG, "!!>> tablesync worker: Returned from LogicalRepSyncTableStart"); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); -- 1.8.3.1