From 72df2b4e2f76294692ce57553349b61ff141e00c Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Sat, 6 Sep 2025 11:29:02 -0300 Subject: [PATCH v1] Make AsyncQueueEntry's self contained Previously the asyncQueueProcessPageEntries() use the TransactionIdDidCommit() to check if the transaction that a notification belongs is committed or not. Although this work for almost all scenarios we may have some cases where if a notification is keep for to long on the queue and the VACUUM FREEZE is executed during this time it may remove clog files that is needed to check the transaction status of these notifications which will cause errors to listener backends when reading the async queue. This commit fix this issue by making the AsyncQueueEntry self contained by adding the "committed" boolean field so asyncQueueProcessPageEntries() can use this to check if the transaction of the notification is committed or not. We set committed as true when adding the entry on the SLRU page buffer cache when PreCommit_Notify() is called and if an error occur before AtCommit_Notify() the AtAbort_Notify() will be called which will mark the committed field as false. A new global List pendingAsyncQueueEntries is created to keep track of pending notification entries that is on shared async queue page buffer but it's not fully committed yet so we can mark these entries as not committed in case of transaction abort. Also this commit include TAP tests to exercise the VACUUM FREEZE issue and also the scenario of an error being occur between the PreCommit_Notify() and AtCommit_Notify() calls by using the injection points extension. --- src/backend/access/transam/xact.c | 3 + src/backend/commands/async.c | 92 ++++++++++++++++++- src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_listen_notify/Makefile | 17 ++++ .../modules/test_listen_notify/meson.build | 14 +++ .../test_listen_notify/t/001_xid_freeze.pl | 66 +++++++++++++ .../test_listen_notify/t/002_transaction.pl | 57 ++++++++++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl create mode 100644 src/test/modules/test_listen_notify/t/002_transaction.pl diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b46e7e9c2a6..34b46ab4d4a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -64,6 +64,7 @@ #include "utils/builtins.h" #include "utils/combocid.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/memutils.h" #include "utils/relmapper.h" @@ -2340,6 +2341,8 @@ CommitTransaction(void) */ PreCommit_Notify(); + INJECTION_POINT("commit-transaction-pos-pre-commit-notify", NULL); + /* * Mark serializable transaction as complete for predicate locking * purposes. This should be done as late as we can put it and still allow diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..a7c211fc09a 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -180,6 +180,8 @@ typedef struct AsyncQueueEntry Oid dboid; /* sender's database OID */ TransactionId xid; /* sender's XID */ int32 srcPid; /* sender's PID */ + bool committed; /* Is transaction that the entry belongs + * committed? */ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; } AsyncQueueEntry; @@ -401,8 +403,30 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +/* + * Struct representing an AsyncQueueEntry that is added on global async queue + * but it is not yet fully committed. + * + * This struct is used to find AsyncQueueEntry's already added on the global + * async queue and mark then as not committed when an error between the + * Pre_CommitNotify() and At_CommitNotify() is raised. + */ +typedef struct PendingAsyncQueueEntry +{ + /* Page which the entry was added */ + int64 page; + + /* slot number where the entry was added */ + int slotno; + + /* offset of the entry on page buffer */ + int offset; +} PendingAsyncQueueEntry; + static NotificationList *pendingNotifies = NULL; +static List *pendingAsyncQueueEntries = NIL; + /* * Inbound notifications are initially processed by HandleNotifyInterrupt(), * called from inside a signal handler. That just sets the @@ -1398,12 +1422,30 @@ asyncQueueAddEntries(ListCell *nextNotify) while (nextNotify != NULL) { Notification *n = (Notification *) lfirst(nextNotify); + PendingAsyncQueueEntry *pendingEntry = palloc(sizeof(PendingAsyncQueueEntry)); /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); + /* + * Mark the entry as committed. If the transaction that this + * notification belongs fails to commit the AtAbort_Notify() will mark + * this entry as not committed. + */ + qe.committed = true; + offset = QUEUE_POS_OFFSET(queue_head); + /* + * Store information from AsyncQueueEntry so AtAbort_Notify() can + * lookup the entry on shared page buffer if the transaction fails to + * commit + */ + pendingEntry->page = pageno; + pendingEntry->slotno = slotno; + pendingEntry->offset = offset; + pendingAsyncQueueEntries = lappend(pendingAsyncQueueEntries, pendingEntry); + /* Check whether the entry really fits on the current page */ if (offset + qe.length <= QUEUE_PAGESIZE) { @@ -1670,6 +1712,8 @@ SignalBackends(void) void AtAbort_Notify(void) { + ListCell *lc; + /* * If we LISTEN but then roll back the transaction after PreCommit_Notify, * we have registered as a listener but have not made any entry in @@ -1678,6 +1722,51 @@ AtAbort_Notify(void) if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); + /* + * At this stage if we have pendingAsyncQueueEntries we already have added + * these notifications on the shared async queue, so we need to lookup + * these notifications and mark then as not committed, so when a backend + * is processing this notification it can skip without checking the + * transaction status on clog files that may already have be truncated by + * VACUUM FREEZE. + */ + if (pendingAsyncQueueEntries != NIL) + { + /* + * We can not have pending async queue entries without pending + * notifications + */ + Assert(pendingNotifies != NULL); + + foreach(lc, pendingAsyncQueueEntries) + { + PendingAsyncQueueEntry *pendingEntry = (PendingAsyncQueueEntry *) lfirst(lc); + int slotno = pendingEntry->slotno; + int offset = pendingEntry->offset; + int64 page = pendingEntry->page; + LWLock *banklock = SimpleLruGetBankLock(NotifyCtl, page); + AsyncQueueEntry *qe; + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + qe = (AsyncQueueEntry *) (NotifyCtl->shared->page_buffer[slotno] + offset); + + /* + * The entry should already be marked as committed when adding on + * into the shared async queue. + */ + Assert(qe->committed); + + /* + * Mark notification entry as not committed so it's not visible to + * other backends + */ + qe->committed = false; + + LWLockRelease(banklock); + } + } + /* And clean up */ ClearPendingActionsAndNotifies(); } @@ -2066,7 +2155,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, reachedStop = true; break; } - else if (TransactionIdDidCommit(qe->xid)) + else if (qe->committed) { /* qe->data is the null-terminated channel name */ char *channel = qe->data; @@ -2385,6 +2474,7 @@ ClearPendingActionsAndNotifies(void) */ pendingActions = NULL; pendingNotifies = NULL; + pendingAsyncQueueEntries = NIL; } /* diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 903a8ac151a..4c0160df341 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -28,6 +28,7 @@ SUBDIRS = \ test_int128 \ test_integerset \ test_json_parser \ + test_listen_notify \ test_lfind \ test_misc \ test_oat_hooks \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 93be0f57289..144379b619b 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -27,6 +27,7 @@ subdir('test_ginpostinglist') subdir('test_int128') subdir('test_integerset') subdir('test_json_parser') +subdir('test_listen_notify') subdir('test_lfind') subdir('test_misc') subdir('test_oat_hooks') diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..da1bf5bb1b7 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,17 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..f0a2b5058e4 --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,14 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl', + 't/002_transaction.pl', + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl new file mode 100644 index 00000000000..0a5130a042e --- /dev/null +++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl @@ -0,0 +1,66 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,100000) g;' +); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Session 2, multiple notify's, and commit --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation +$node->safe_psql('postgres', 'select consume_xids(10000000);'); + +# Execute update so the frozen xid of "t" table is updated to a xid greater +# than consume_xids() result +$node->safe_psql('postgres', 'UPDATE t SET a = a+b;'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but +# we can still get the notification status of the notification +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced'); + +# On Session 1, commit and ensure that the all notifications is received +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +done_testing(); diff --git a/src/test/modules/test_listen_notify/t/002_transaction.pl b/src/test/modules/test_listen_notify/t/002_transaction.pl new file mode 100644 index 00000000000..825ae9cb1a9 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/002_transaction.pl @@ -0,0 +1,57 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +if (!$node->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Use injection_points extension to test that if an error occur between +# Pre_CommitNotify() and At_CommitNotify() the notification is not processed by +# a backend listener. +$node->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Start session 1, create a listener and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen c1;', "Session 1 listens to 'c1'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# Start session 2 to send a notification inside a transaction block +my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0); +$psql_session2->query_safe('BEGIN;'); +$psql_session2->query_safe('NOTIFY c1;'); + +# Add injection point to fail to commit the notify c1 transaction. +$psql_session2->query_safe("SELECT injection_points_attach('commit-transaction-pos-pre-commit-notify', 'error');"); + +# Commit the NOTIFY transaction which will raise an error due to injection point +$psql_session2->query('COMMIT;'); + +# detach the injection point on the open transaction to make it complete. +$psql_session1->query_safe("SELECT injection_points_detach('commit-transaction-pos-pre-commit-notify');"); + +# Send another NOTIFY after injection point is detached to signal the psql_session1 backend. +$node->safe_psql('postgres', 'NOTIFY c1') ; + +# Commit the listener transaction - It should not see the notification that fails to commit. +my $res = $psql_session1->query_safe('commit;', "commit listen s2;"); + +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "c1" received/); +} +is($notifications_count, 1, 'received only committed notifications'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e90af5b2ad3..6228faa56a5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2148,6 +2148,7 @@ PatternInfo PatternInfoArray Pattern_Prefix_Status Pattern_Type +PendingAsyncQueueEntry PendingFsyncEntry PendingRelDelete PendingRelSync -- 2.39.5 (Apple Git-154)