From 90c9e06283fd9e677631690ce3a884f0165aaaad Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 2 Sep 2025 16:59:39 +0530 Subject: [PATCH v20250908 3/7] Reorganize tablesync Code and Introduce syncutils Reorganized the tablesync code by creating a new syncutils file. This refactoring will facilitate the development of sequence synchronization worker code. This commit separates code reorganization from functional changes, making it clearer to reviewers that only existing code has been moved. The changes in this patch can be merged with subsequent patches during the commit process. Author: Vignesh C Reviewer: Amit Kapila, Shveta Malik, Dilip Kumar, Peter Smith, Nisha Moond Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/catalog/pg_subscription.c | 4 +- src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/syncutils.c | 190 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 186 ++--------------- src/backend/replication/logical/worker.c | 18 +- src/bin/pg_dump/common.c | 2 +- src/bin/pg_dump/pg_dump.c | 4 +- src/include/catalog/pg_subscription_rel.h | 2 +- src/include/replication/worker_internal.h | 12 +- src/tools/pgindent/typedefs.list | 2 +- 12 files changed, 234 insertions(+), 190 deletions(-) create mode 100644 src/backend/replication/logical/syncutils.c diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index b885890de37..e06587b0265 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid) } /* - * Does the subscription have any relations? + * Does the subscription have any tables? * * Use this function only to know true/false, and when you have no need for the * List returned by GetSubscriptionRelations. */ bool -HasSubscriptionRelations(Oid subid) +HasSubscriptionTables(Oid subid) { Relation rel; ScanKeyData skey[1]; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb..c62c8c67521 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -28,6 +28,7 @@ OBJS = \ reorderbuffer.o \ slotsync.o \ snapbuild.o \ + syncutils.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 31a92d1a24a..d3882b40a39 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -963,7 +963,7 @@ ParallelApplyWorkerMain(Datum main_arg) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + InvalidateRelationStates, (Datum) 0); set_apply_error_context_origin(originname); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 6f19614c79d..9283e996ef4 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -14,6 +14,7 @@ backend_sources += files( 'reorderbuffer.c', 'slotsync.c', 'snapbuild.c', + 'syncutils.c', 'tablesync.c', 'worker.c', ) diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c new file mode 100644 index 00000000000..5109b197805 --- /dev/null +++ b/src/backend/replication/logical/syncutils.c @@ -0,0 +1,190 @@ +/*------------------------------------------------------------------------- + * syncutils.c + * PostgreSQL logical replication: common synchronization code + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/syncutils.c + * + * NOTES + * This file contains code common to table synchronization workers, and + * the sequence synchronization worker. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/pg_subscription_rel.h" +#include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/origin.h" +#include "replication/slot.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +/* + * Enum for phases of the subscription relations state. + * + * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations + * state is no longer valid, and the subscription relations should be rebuilt. + * + * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription + * relations state is being rebuilt. + * + * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is + * up-to-date and valid. + */ +typedef enum +{ + SYNC_RELATIONS_STATE_NEEDS_REBUILD, + SYNC_RELATIONS_STATE_REBUILD_STARTED, + SYNC_RELATIONS_STATE_VALID, +} SyncingRelationsState; + +static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; + +/* + * Exit routine for synchronization worker. + */ +pg_noreturn void +FinishSyncWorker(void) +{ + /* + * Commit any outstanding transaction. This is the usual case, unless + * there was nothing to do for the table. + */ + if (IsTransactionState()) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + CommitTransactionCommand(); + + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + /* Stop gracefully */ + proc_exit(0); +} + +/* + * Callback from syscache invalidation. + */ +void +InvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue) +{ + relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; +} + +/* + * Process possible state change(s) of relations that are being synchronized. + */ +void +ProcessSyncingRelations(XLogRecPtr current_lsn) +{ + switch (MyLogicalRepWorker->type) + { + case WORKERTYPE_PARALLEL_APPLY: + /* + * Skip for parallel apply workers because they only operate on + * tables that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + break; + + case WORKERTYPE_TABLESYNC: + ProcessSyncingTablesForSync(current_lsn); + break; + + case WORKERTYPE_APPLY: + ProcessSyncingTablesForApply(current_lsn); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); + } +} + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +bool +FetchRelationStates(bool *started_tx) +{ + static bool has_subtables = false; + + *started_tx = false; + + if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch tables that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY tables found then we know it does. But if + * table_states_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subtables = (table_states_not_ready != NIL) || + HasSubscriptionTables(MySubscription->oid); + + /* + * If the subscription relation cache has been invalidated since we + * entered this routine, we still use and return the relations we just + * finished constructing, to avoid infinite loops, but we leave the + * table states marked as stale so that we'll rebuild it again on next + * access. Otherwise, we mark the table states as valid. + */ + if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED) + relation_states_validity = SYNC_RELATIONS_STATE_VALID; + } + + return has_subtables; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..3c777363243 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -117,58 +117,15 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" -#include "utils/memutils.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/usercontext.h" -typedef enum -{ - SYNC_TABLE_STATE_NEEDS_REBUILD, - SYNC_TABLE_STATE_REBUILD_STARTED, - SYNC_TABLE_STATE_VALID, -} SyncingTablesState; - -static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -static List *table_states_not_ready = NIL; -static bool FetchTableStates(bool *started_tx); +List *table_states_not_ready = NIL; static StringInfo copybuf = NULL; -/* - * Exit routine for synchronization worker. - */ -pg_noreturn static void -finish_sync_worker(void) -{ - /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. - */ - if (IsTransactionState()) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } - - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - - /* Stop gracefully */ - proc_exit(0); -} - /* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. @@ -180,7 +137,7 @@ finish_sync_worker(void) * CATCHUP state to SYNCDONE. */ static bool -wait_for_relation_state_change(Oid relid, char expected_state) +wait_for_table_state_change(Oid relid, char expected_state) { char state; @@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state) return false; } -/* - * Callback from syscache invalidation. - */ -void -invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) -{ - table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -} - /* * Handle table synchronization cooperation from the synchronization * worker. @@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * predetermined synchronization point in the WAL stream, mark the table as * SYNCDONE and finish. */ -static void -process_syncing_tables_for_sync(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForSync(XLogRecPtr current_lsn) { SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. + * This transaction will be ended within the FinishSyncWorker(). Now, + * even, if we fail to remove this here, the apply worker will ensure + * to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database @@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + FinishSyncWorker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. */ -static void -process_syncing_tables_for_apply(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { struct tablesync_start_time_mapping { @@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchTableStates(&started_tx); + FetchRelationStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; - wait_for_relation_state_change(rstate->relid, - SUBREL_STATE_SYNCDONE); + wait_for_table_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } else LWLockRelease(LogicalRepWorkerLock); @@ -689,37 +637,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } -/* - * Process possible state change(s) of tables that are being synchronized. - */ -void -process_syncing_tables(XLogRecPtr current_lsn) -{ - switch (MyLogicalRepWorker->type) - { - case WORKERTYPE_PARALLEL_APPLY: - - /* - * Skip for parallel apply workers because they only operate on - * tables that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - break; - - case WORKERTYPE_TABLESYNC: - process_syncing_tables_for_sync(current_lsn); - break; - - case WORKERTYPE_APPLY: - process_syncing_tables_for_apply(current_lsn); - break; - - case WORKERTYPE_UNKNOWN: - /* Should never happen. */ - elog(ERROR, "Unknown worker type"); - } -} - /* * Create list of columns for COPY based on logical relation mapping. */ @@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + FinishSyncWorker(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1599,77 +1516,6 @@ copy_table_done: return slotname; } -/* - * Common code to fetch the up-to-date sync state info into the static lists. - * - * Returns true if subscription has 1 or more tables, else false. - * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. - */ -static bool -FetchTableStates(bool *started_tx) -{ - static bool has_subrels = false; - - *started_tx = false; - - if (table_states_validity != SYNC_TABLE_STATE_VALID) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED; - - /* Clean the old lists. */ - list_free_deep(table_states_not_ready); - table_states_not_ready = NIL; - - if (!IsTransactionState()) - { - StartTransactionCommand(); - *started_tx = true; - } - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); - } - MemoryContextSwitchTo(oldctx); - - /* - * Does the subscription have tables? - * - * If there were not-READY relations found then we know it does. But - * if table_states_not_ready was empty we still need to check again to - * see if there are 0 tables. - */ - has_subrels = (table_states_not_ready != NIL) || - HasSubscriptionRelations(MySubscription->oid); - - /* - * If the subscription relation cache has been invalidated since we - * entered this routine, we still use and return the relations we just - * finished constructing, to avoid infinite loops, but we leave the - * table states marked as stale so that we'll rebuild it again on next - * access. Otherwise, we mark the table states as valid. - */ - if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED) - table_states_validity = SYNC_TABLE_STATE_VALID; - } - - return has_subrels; -} - /* * Execute the initial sync with error handling. Disable the subscription, * if it's required. @@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + FinishSyncWorker(); } /* @@ -1773,7 +1619,7 @@ AllTablesyncsReady(void) bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchRelationStates(&started_tx); if (started_tx) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f1ebd63e792..d1493f36e04 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -91,7 +91,7 @@ * behave as if two_phase = off. When the apply worker detects that all * tablesyncs have become READY (while the tri-state was PENDING) it will * restart the apply worker process. This happens in - * process_syncing_tables_for_apply. + * ProcessSyncingTablesForApply. * * When the (re-started) apply worker finds that all tablesyncs are READY for a * two_phase tri-state of PENDING it start streaming messages with the @@ -1230,7 +1230,7 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1352,7 +1352,7 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); /* * Since we have already prepared the transaction, in a case where the @@ -1408,7 +1408,7 @@ apply_handle_commit_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1474,7 +1474,7 @@ apply_handle_rollback_prepared(StringInfo s) in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(rollback_data.rollback_end_lsn); + ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1609,7 +1609,7 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + ProcessSyncingRelations(prepare_data.end_lsn); /* * Similar to prepare case, the subskiplsn could be left in a case of @@ -2451,7 +2451,7 @@ apply_handle_stream_commit(StringInfo s) } /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4114,7 +4114,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(last_received); + ProcessSyncingRelations(last_received); } /* Cleanup the memory. */ @@ -5744,7 +5744,7 @@ SetupApplyOrSyncWorker(int worker_slot) * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, + InvalidateRelationStates, (Datum) 0); } diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index a1976fae607..bfd051cf198 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -244,7 +244,7 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading subscriptions"); getSubscriptions(fout); - pg_log_info("reading subscription membership of tables"); + pg_log_info("reading subscription membership of relations"); getSubscriptionTables(fout); free(inhinfo); /* not needed any longer */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 7522efe02e4..e815e1c73be 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5258,7 +5258,7 @@ getSubscriptions(Archive *fout) /* * getSubscriptionTables - * Get information about subscription membership for dumpable tables. This + * Get information about subscription membership for dumpable relations. This * will be used only in binary-upgrade mode for PG17 or later versions. */ void @@ -5316,7 +5316,7 @@ getSubscriptionTables(Archive *fout) tblinfo = findTableByOid(relid); if (tblinfo == NULL) - pg_fatal("failed sanity check, table with OID %u not found", + pg_fatal("failed sanity check, relation with OID %u not found", relid); /* OK, make a DumpableObject for this relationship */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 02f97a547dd..61b63c6bb7a 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -extern bool HasSubscriptionRelations(Oid subid); +extern bool HasSubscriptionTables(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 62ea1a00580..cfd0a223648 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +extern PGDLLIMPORT List *table_states_not_ready; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -274,9 +276,13 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); -extern void process_syncing_tables(XLogRecPtr current_lsn); -extern void invalidate_syncing_table_states(Datum arg, int cacheid, - uint32 hashvalue); +extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); +extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); + +pg_noreturn extern void FinishSyncWorker(void); +extern void InvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue); +extern void ProcessSyncingRelations(XLogRecPtr current_lsn); +extern bool FetchRelationStates(bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 49af245ed8f..a7ff6601054 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2916,7 +2916,7 @@ SyncRepStandbyData SyncRequestHandler SyncRequestType SyncStandbySlotsConfigData -SyncingTablesState +SyncingRelationsState SysFKRelationship SysScanDesc SyscacheCallbackFunction -- 2.43.0