From c1fceb64203468cfb4532e2117e4da7247a3b6ba Mon Sep 17 00:00:00 2001 From: Vignesh Date: Tue, 25 Mar 2025 09:23:48 +0530 Subject: [PATCH v20250503 3/5] Reorganize tablesync Code and Introduce syncutils Reorganized the tablesync code by creating a new syncutils file. This refactoring will facilitate the development of sequence synchronization worker code. This commit separates code reorganization from functional changes, making it clearer to reviewers that only existing code has been moved. The changes in this patch can be merged with subsequent patches during the commit process. --- src/backend/catalog/pg_subscription.c | 4 +- src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/syncutils.c | 190 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 186 ++--------------- src/backend/replication/logical/worker.c | 18 +- src/include/catalog/pg_subscription_rel.h | 2 +- src/include/replication/worker_internal.h | 13 +- src/tools/pgindent/typedefs.list | 2 +- 10 files changed, 232 insertions(+), 187 deletions(-) create mode 100644 src/backend/replication/logical/syncutils.c diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..1c71161e723 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -488,13 +488,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid) } /* - * Does the subscription have any relations? + * Does the subscription have any tables? * * Use this function only to know true/false, and when you have no need for the * List returned by GetSubscriptionRelations. */ bool -HasSubscriptionRelations(Oid subid) +HasSubscriptionTables(Oid subid) { Relation rel; ScanKeyData skey[1]; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb..c62c8c67521 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -28,6 +28,7 @@ OBJS = \ reorderbuffer.o \ slotsync.o \ snapbuild.o \ + syncutils.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..d2b663267ad 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + SyncInvalidateRelationStates, (Datum) 0); set_apply_error_context_origin(originname); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 6f19614c79d..9283e996ef4 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -14,6 +14,7 @@ backend_sources += files( 'reorderbuffer.c', 'slotsync.c', 'snapbuild.c', + 'syncutils.c', 'tablesync.c', 'worker.c', ) diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c new file mode 100644 index 00000000000..3d405ff2dc6 --- /dev/null +++ b/src/backend/replication/logical/syncutils.c @@ -0,0 +1,190 @@ +/*------------------------------------------------------------------------- + * syncutils.c + * PostgreSQL logical replication: common synchronization code + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/syncutils.c + * + * NOTES + * This file contains code common to table synchronization workers, and + * the sequence synchronization worker. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/pg_subscription_rel.h" +#include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/origin.h" +#include "replication/slot.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +/* + * Enum for phases of the subscription relations state. + * + * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations + * state is no longer valid, and the subscription relations should be rebuilt. + * + * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription + * relations state is being rebuilt. + * + * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is + * up-to-date and valid. + */ +typedef enum +{ + SYNC_RELATIONS_STATE_NEEDS_REBUILD, + SYNC_RELATIONS_STATE_REBUILD_STARTED, + SYNC_RELATIONS_STATE_VALID, +} SyncingRelationsState; + +static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; + +/* + * Exit routine for synchronization worker. + */ +pg_noreturn void +SyncFinishWorker(void) +{ + /* + * Commit any outstanding transaction. This is the usual case, unless + * there was nothing to do for the table. + */ + if (IsTransactionState()) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + CommitTransactionCommand(); + + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + /* Stop gracefully */ + proc_exit(0); +} + +/* + * Callback from syscache invalidation. + */ +void +SyncInvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue) +{ + relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; +} + +/* + * Process possible state change(s) of relations that are being synchronized. + */ +void +SyncProcessRelations(XLogRecPtr current_lsn) +{ + switch (MyLogicalRepWorker->type) + { + case WORKERTYPE_PARALLEL_APPLY: + /* + * Skip for parallel apply workers because they only operate on + * tables that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + break; + + case WORKERTYPE_TABLESYNC: + ProcessSyncingTablesForSync(current_lsn); + break; + + case WORKERTYPE_APPLY: + ProcessSyncingTablesForApply(current_lsn); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); + } +} + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +bool +SyncFetchRelationStates(bool *started_tx) +{ + static bool has_subtables = false; + + *started_tx = false; + + if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch tables that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY tables found then we know it does. But if + * table_states_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subtables = (table_states_not_ready != NIL) || + HasSubscriptionTables(MySubscription->oid); + + /* + * If the subscription relation cache has been invalidated since we + * entered this routine, we still use and return the relations we just + * finished constructing, to avoid infinite loops, but we leave the + * table states marked as stale so that we'll rebuild it again on next + * access. Otherwise, we mark the table states as valid. + */ + if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED) + relation_states_validity = SYNC_RELATIONS_STATE_VALID; + } + + return has_subtables; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8e1e8762f62..9bd51ceef48 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -117,58 +117,15 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" -#include "utils/memutils.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/usercontext.h" -typedef enum -{ - SYNC_TABLE_STATE_NEEDS_REBUILD, - SYNC_TABLE_STATE_REBUILD_STARTED, - SYNC_TABLE_STATE_VALID, -} SyncingTablesState; - -static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -static List *table_states_not_ready = NIL; -static bool FetchTableStates(bool *started_tx); +List *table_states_not_ready = NIL; static StringInfo copybuf = NULL; -/* - * Exit routine for synchronization worker. - */ -pg_noreturn static void -finish_sync_worker(void) -{ - /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. - */ - if (IsTransactionState()) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } - - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - - /* Stop gracefully */ - proc_exit(0); -} - /* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. @@ -180,7 +137,7 @@ finish_sync_worker(void) * CATCHUP state to SYNCDONE. */ static bool -wait_for_relation_state_change(Oid relid, char expected_state) +wait_for_table_state_change(Oid relid, char expected_state) { char state; @@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state) return false; } -/* - * Callback from syscache invalidation. - */ -void -invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) -{ - table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -} - /* * Handle table synchronization cooperation from the synchronization * worker. @@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * predetermined synchronization point in the WAL stream, mark the table as * SYNCDONE and finish. */ -static void -process_syncing_tables_for_sync(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForSync(XLogRecPtr current_lsn) { SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -348,9 +296,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. + * This transaction will be ended within the SyncFinishWorker(). Now, + * even, if we fail to remove this here, the apply worker will ensure + * to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database @@ -386,7 +334,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + SyncFinishWorker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -413,8 +361,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. */ -static void -process_syncing_tables_for_apply(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { struct tablesync_start_time_mapping { @@ -429,7 +377,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchTableStates(&started_tx); + SyncFetchRelationStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -567,8 +515,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; - wait_for_relation_state_change(rstate->relid, - SUBREL_STATE_SYNCDONE); + wait_for_table_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } else LWLockRelease(LogicalRepWorkerLock); @@ -659,37 +607,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } -/* - * Process possible state change(s) of tables that are being synchronized. - */ -void -process_syncing_tables(XLogRecPtr current_lsn) -{ - switch (MyLogicalRepWorker->type) - { - case WORKERTYPE_PARALLEL_APPLY: - - /* - * Skip for parallel apply workers because they only operate on - * tables that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - break; - - case WORKERTYPE_TABLESYNC: - process_syncing_tables_for_sync(current_lsn); - break; - - case WORKERTYPE_APPLY: - process_syncing_tables_for_apply(current_lsn); - break; - - case WORKERTYPE_UNKNOWN: - /* Should never happen. */ - elog(ERROR, "Unknown worker type"); - } -} - /* * Create list of columns for COPY based on logical relation mapping. */ @@ -1326,7 +1243,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + SyncFinishWorker(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1567,77 +1484,6 @@ copy_table_done: return slotname; } -/* - * Common code to fetch the up-to-date sync state info into the static lists. - * - * Returns true if subscription has 1 or more tables, else false. - * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. - */ -static bool -FetchTableStates(bool *started_tx) -{ - static bool has_subrels = false; - - *started_tx = false; - - if (table_states_validity != SYNC_TABLE_STATE_VALID) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED; - - /* Clean the old lists. */ - list_free_deep(table_states_not_ready); - table_states_not_ready = NIL; - - if (!IsTransactionState()) - { - StartTransactionCommand(); - *started_tx = true; - } - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); - } - MemoryContextSwitchTo(oldctx); - - /* - * Does the subscription have tables? - * - * If there were not-READY relations found then we know it does. But - * if table_states_not_ready was empty we still need to check again to - * see if there are 0 tables. - */ - has_subrels = (table_states_not_ready != NIL) || - HasSubscriptionRelations(MySubscription->oid); - - /* - * If the subscription relation cache has been invalidated since we - * entered this routine, we still use and return the relations we just - * finished constructing, to avoid infinite loops, but we leave the - * table states marked as stale so that we'll rebuild it again on next - * access. Otherwise, we mark the table states as valid. - */ - if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED) - table_states_validity = SYNC_TABLE_STATE_VALID; - } - - return has_subrels; -} - /* * Execute the initial sync with error handling. Disable the subscription, * if it's required. @@ -1723,7 +1569,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + SyncFinishWorker(); } /* @@ -1741,7 +1587,7 @@ AllTablesyncsReady(void) bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = SyncFetchRelationStates(&started_tx); if (started_tx) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4151a4b2a96..765754bfc3c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -91,7 +91,7 @@ * behave as if two_phase = off. When the apply worker detects that all * tablesyncs have become READY (while the tri-state was PENDING) it will * restart the apply worker process. This happens in - * process_syncing_tables_for_apply. + * ProcessSyncingTablesForApply. * * When the (re-started) apply worker finds that all tablesyncs are READY for a * two_phase tri-state of PENDING it start streaming messages with the @@ -1030,7 +1030,7 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + SyncProcessRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1152,7 +1152,7 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + SyncProcessRelations(prepare_data.end_lsn); /* * Since we have already prepared the transaction, in a case where the @@ -1208,7 +1208,7 @@ apply_handle_commit_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + SyncProcessRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1274,7 +1274,7 @@ apply_handle_rollback_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(rollback_data.rollback_end_lsn); + SyncProcessRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1409,7 +1409,7 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + SyncProcessRelations(prepare_data.end_lsn); /* * Similar to prepare case, the subskiplsn could be left in a case of @@ -2251,7 +2251,7 @@ apply_handle_stream_commit(StringInfo s) } /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + SyncProcessRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -3728,7 +3728,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(last_received); + SyncProcessRelations(last_received); } /* Cleanup the memory. */ @@ -4797,7 +4797,7 @@ SetupApplyOrSyncWorker(int worker_slot) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + SyncInvalidateRelationStates, (Datum) 0); } diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index c91797c869c..ea869588d84 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -extern bool HasSubscriptionRelations(Oid subid); +extern bool HasSubscriptionTables(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..082e2b3d86c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,8 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +extern PGDLLIMPORT List *table_states_not_ready; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -259,9 +261,14 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); -extern void process_syncing_tables(XLogRecPtr current_lsn); -extern void invalidate_syncing_table_states(Datum arg, int cacheid, - uint32 hashvalue); +extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); +extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); + +pg_noreturn extern void SyncFinishWorker(void); +extern void SyncInvalidateRelationStates(Datum arg, int cacheid, + uint32 hashvalue); +extern void SyncProcessRelations(XLogRecPtr current_lsn); +extern bool SyncFetchRelationStates(bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 74dad46568a..82af9d8a741 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2902,7 +2902,7 @@ SyncRepStandbyData SyncRequestHandler SyncRequestType SyncStandbySlotsConfigData -SyncingTablesState +SyncingRelationsState SysFKRelationship SysScanDesc SyscacheCallbackFunction -- 2.43.0