From b6ee12ca3d2c606bc4c6294a35bbe6ba53f3a74f Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 19 Jan 2021 19:23:35 +1100 Subject: [PATCH v17] Tablesync extra logging. This patch only adds some extra logging which may be helpful for testing, but is not for committing. --- src/backend/replication/logical/tablesync.c | 62 ++++++++++++++++++++++++----- src/backend/tcop/postgres.c | 2 + 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ec85c08..388f0da 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -283,8 +283,8 @@ tablesync_cleanup_at_interrupt(void) Oid subid = MySubscription->oid; Oid relid = MyLogicalRepWorker->relid; - elog(DEBUG1, - "tablesync_cleanup_at_interrupt for relid = %d", + elog(LOG, + "!!>> tablesync_cleanup_at_interrupt for relid = %d", MyLogicalRepWorker->relid); /* @@ -322,7 +322,13 @@ tablesync_cleanup_at_interrupt(void) ReplicationSlotNameForTablesync(subid, relid, syncslotname); + elog(LOG, + "!!>> tablesync_cleanup_at_interrupt: dropping the tablesync slot \"%s\".", + syncslotname); ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok); + elog(LOG, + "!!>> tablesync_cleanup_at_interrupt: dropped the tablesync slot \"%s\".", + syncslotname); } /* @@ -339,8 +345,13 @@ tablesync_cleanup_at_interrupt(void) originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) { + elog(LOG, + "!!>> tablesync_cleanup_at_interrupt: dropping origin tracking for \"%s\"", + originname); replorigin_drop(originid, false); - + elog(LOG, + "!!>> tablesync_cleanup_at_interrupt: dropped origin tracking for \"%s\"", + originname); /* * CommitTransactionCommand would normally attempt to advance the * origin, but now that the origin has been dropped that would fail, @@ -387,10 +398,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ ReplicationSlotNameForTablesync(subid, relid, syncslotname); - elog(DEBUG1, - "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".", + elog(LOG, + "!!>> process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".", syncslotname); ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); + elog(LOG, + "!!>> process_syncing_tables_for_sync: dropped the tablesync slot \"%s\".", + syncslotname); /* * Change state to SYNCDONE. @@ -552,10 +566,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) originid = replorigin_by_name(originname, true); if (OidIsValid(originid)) { - elog(DEBUG1, - "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", + elog(LOG, + "!!>> process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", originname); replorigin_drop(originid, false); + elog(LOG, + "!!>> process_syncing_tables_for_apply: dropped tablesync origin tracking for \"%s\".", + originname); } } @@ -1054,14 +1071,22 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * The COPY phase was previously done, but tablesync then crashed/etc * before it was able to finish normally. */ + elog(LOG, + "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_FINISHEDCOPY."); StartTransactionCommand(); /* * The origin tracking name must already exist (missing_ok=false). */ originid = replorigin_by_name(originname, false); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", + originname); replorigin_session_setup(originid); replorigin_session_origin = originid; + elog(LOG, + "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", + originname); *origin_startpos = replorigin_session_get_progress(false); goto copy_table_done; @@ -1110,6 +1135,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ + elog(LOG, + "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", + slotname); walrcv_create_slot(wrconn, slotname, false, CRS_USE_SNAPSHOT, origin_startpos); @@ -1141,10 +1169,13 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * If something failed during copy table then cleanup the created * slot. */ - elog(DEBUG1, - "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", + elog(LOG, + "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname); ReplicationSlotDropAtPubNode(wrconn, slotname, false); + elog(LOG, + "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", + slotname); pfree(slotname); slotname = NULL; @@ -1165,11 +1196,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * replication origin from vanishing while advancing. */ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", + originname); originid = replorigin_create(originname); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", + originname); replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + elog(LOG, + "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", + originname); replorigin_session_setup(originid); replorigin_session_origin = originid; } @@ -1192,8 +1232,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) copy_table_done: - elog(DEBUG1, - "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", + elog(LOG, + "!!>> LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", originname, (uint32) (*origin_startpos >> 32), (uint32) *origin_startpos); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 2a0565e..0f3e32a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3087,6 +3087,8 @@ ProcessInterrupts(void) errmsg("terminating autovacuum process due to administrator command"))); else if (IsLogicalWorker()) { + elog(LOG, "!!>> ProcessInterrupts: Hello, I am a LogicalWorker"); + /* Tablesync workers do their own cleanups. */ if (IsLogicalWorkerTablesync()) tablesync_cleanup_at_interrupt(); /* does not return. */ -- 1.8.3.1