From 55367abdf4ce81e022211cb94e8d036e916069c9 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Wed, 24 May 2017 02:27:38 +0200 Subject: [PATCH 1/3] Fix signal handling in logical workers --- src/backend/replication/logical/launcher.c | 22 +++++++----- src/backend/replication/logical/tablesync.c | 23 +++++++----- src/backend/replication/logical/worker.c | 54 ++++++++++++++++++++--------- src/backend/tcop/postgres.c | 5 +++ src/include/replication/logicalworker.h | 3 ++ src/include/replication/worker_internal.h | 10 ------ 6 files changed, 74 insertions(+), 43 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4e2c350..2e80b4c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ -volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGTERM = false; static bool on_commit_launcher_wakeup = false; @@ -614,12 +614,18 @@ logicalrep_launcher_onexit(int code, Datum arg) static void logicalrep_worker_onexit(int code, Datum arg) { + /* Disconnect gracefully from the remote side. */ + if (wrconn) + walrcv_disconnect(wrconn); + logicalrep_worker_detach(); + + ApplyLauncherWakeup(); } /* SIGTERM: set flag to exit at next convenient time */ -void -logicalrep_worker_sigterm(SIGNAL_ARGS) +static void +logicalrep_launcher_sigterm(SIGNAL_ARGS) { int save_errno = errno; @@ -632,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) } /* SIGHUP: set flag to reload configuration at next convenient time */ -void -logicalrep_worker_sighup(SIGNAL_ARGS) +static void +logicalrep_launcher_sighup(SIGNAL_ARGS) { int save_errno = errno; @@ -793,8 +799,8 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); /* Establish signal handlers. */ - pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGHUP, logicalrep_launcher_sighup); + pqsignal(SIGTERM, logicalrep_launcher_sigterm); BackgroundWorkerUnblockSignals(); /* Make it easy to identify our processes. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1e3753b..b92cc85 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -97,6 +97,7 @@ #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" +#include "replication/logicalworker.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" @@ -137,7 +138,6 @@ finish_sync_worker(void) (errmsg("logical replication synchronization worker finished processing"))); /* Stop gracefully */ - walrcv_disconnect(wrconn); proc_exit(0); } @@ -152,10 +152,12 @@ wait_for_sync_status_change(Oid relid, char origstate) int rc; char state = origstate; - while (!got_SIGTERM) + for (;;) { LogicalRepWorker *worker; + CHECK_FOR_INTERRUPTS(); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false); @@ -476,7 +478,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { - if (am_tablesync_worker()) + if (IsTablesyncWorker()) process_syncing_tables_for_sync(current_lsn); else process_syncing_tables_for_apply(current_lsn); @@ -533,7 +535,7 @@ copy_read_data(void *outbuf, int minread, int maxread) bytesread += avail; } - while (!got_SIGTERM && maxread > 0 && bytesread < minread) + while (maxread > 0 && bytesread < minread) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -587,10 +589,6 @@ copy_read_data(void *outbuf, int minread, int maxread) ResetLatch(&MyProc->procLatch); } - /* Check for exit condition. */ - if (got_SIGTERM) - proc_exit(0); - return bytesread; } @@ -910,3 +908,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) return slotname; } + +/* + * Is current process a tablesync worker? + */ +bool +IsTablesyncWorker(void) +{ + return MyLogicalRepWorker != NULL && OidIsValid(MyLogicalRepWorker->relid); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7d1787d..971f76b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -72,6 +72,8 @@ #include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" + #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/datum.h" @@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void reread_subscription(void); +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; + /* * Should this worker apply changes for given relation. * @@ -134,7 +139,7 @@ static void reread_subscription(void); static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { - if (am_tablesync_worker()) + if (IsTablesyncWorker()) return MyLogicalRepWorker->relid == rel->localreloid; else return (rel->state == SUBREL_STATE_READY || @@ -444,7 +449,7 @@ apply_handle_commit(StringInfo s) Assert(commit_data.commit_lsn == remote_final_lsn); /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState() && !IsTablesyncWorker()) { /* * Update origin state so we can restart streaming from correct @@ -480,7 +485,7 @@ apply_handle_origin(StringInfo s) * actual writes. */ if (!in_remote_transaction || - (IsTransactionState() && !am_tablesync_worker())) + (IsTransactionState() && !IsTablesyncWorker())) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("ORIGIN message sent out of order"))); @@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!got_SIGTERM) + for (;;) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + CHECK_FOR_INTERRUPTS(); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1129,7 +1136,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimeLineID tli; walrcv_endstreaming(wrconn, &tli); - break; + proc_exit(0); } /* @@ -1329,7 +1336,6 @@ reread_subscription(void) "stop because the subscription was removed", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1344,7 +1350,6 @@ reread_subscription(void) "stop because the subscription was disabled", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1359,7 +1364,6 @@ reread_subscription(void) "restart because the connection information was changed", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1374,7 +1378,6 @@ reread_subscription(void) "restart because subscription was renamed", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1392,7 +1395,6 @@ reread_subscription(void) "restart because the replication slot name was changed", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1407,7 +1409,6 @@ reread_subscription(void) "restart because subscription's publications were changed", MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1443,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +logicalrep_worker_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} /* Logical Replication Apply worker entry point */ void @@ -1460,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg) /* Setup signal handling */ pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Initialise stats to a sanish value */ @@ -1515,7 +1529,7 @@ ApplyWorkerMain(Datum main_arg) subscription_change_cb, (Datum) 0); - if (am_tablesync_worker()) + if (IsTablesyncWorker()) elog(LOG, "logical replication sync for subscription %s, table %s started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)); else @@ -1528,7 +1542,7 @@ ApplyWorkerMain(Datum main_arg) elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) + if (IsTablesyncWorker()) { char *syncslotname; @@ -1608,8 +1622,14 @@ ApplyWorkerMain(Datum main_arg) /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); - walrcv_disconnect(wrconn); + /* Not reached. */ +} - /* We should only get here if we received SIGTERM */ - proc_exit(0); +/* + * Is current process a apply worker? + */ +bool +IsApplyWorker(void) +{ + return MyLogicalRepWorker != NULL; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 75c2d9a..1b1134c8 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -2845,6 +2846,10 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating autovacuum process due to administrator command"))); + else if (IsApplyWorker()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating logical replication worker due to administrator command"))); else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 3e0affa..6c71343 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,4 +14,7 @@ extern void ApplyWorkerMain(Datum main_arg); +extern bool IsApplyWorker(void); +extern bool IsTablesyncWorker(void); + #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 0654461..c310ca5 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -67,8 +67,6 @@ extern Subscription *MySubscription; extern LogicalRepWorker *MyLogicalRepWorker; extern bool in_remote_transaction; -extern volatile sig_atomic_t got_SIGHUP; -extern volatile sig_atomic_t got_SIGTERM; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, @@ -81,17 +79,9 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void logicalrep_worker_sighup(SIGNAL_ARGS); -extern void logicalrep_worker_sigterm(SIGNAL_ARGS); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); -static inline bool -am_tablesync_worker(void) -{ - return OidIsValid(MyLogicalRepWorker->relid); -} - #endif /* WORKER_INTERNAL_H */ -- 2.7.4