Re: NOTIFY does not work as expected - Mailing list pgsql-bugs
From | Tom Lane |
---|---|
Subject | Re: NOTIFY does not work as expected |
Date | |
Msg-id | 29914.1539902374@sss.pgh.pa.us Whole thread Raw |
In response to | Re: NOTIFY does not work as expected (Andrey <parihaaraka@gmail.com>) |
Responses |
Re: NOTIFY does not work as expected
|
List | pgsql-bugs |
Andrey <parihaaraka@gmail.com> writes: > Hello. I beg your pardon, but the problem is still in 10.5. May we expect > it to be fixed in 11? Nope :-(. However, I got around to looking at this problem, and I concur with Jeff's diagnosis: the code around ProcessClientReadInterrupt is buggy because it does not account for the possibility that the process latch was cleared some time ago while unhandled interrupt-pending flags remain set. There are some other issues too: 1. ProcessClientWriteInterrupt has the same problem. 2. I don't believe the "blocked" vs "not-blocked" distinction one bit. At best, it creates race-condition-like changes in behavior depending on exactly when a signal arrives vs when data arrives or is sent. At worst, I think it creates the same problem it's purporting to solve, ie failure to respond to ProcDiePending at all. I think the before/during/after calls to ProcessClientXXXInterrupt should just all behave the same and always be willing to execute ProcDiePending. 3. We've got bugs on the client side too. The documentation is pretty clear that libpq users ought to call PQconsumeInput before PQnotifies, but psql had not read the manual at all. Also, most callers were calling PQconsumeInput only once and then looping on PQnotifies, which assumes not-very-safely that we could only see at most one TCP packet worth of notify messages at a time. That's even less safe now that we have "payload" strings than it was before. So we ought to adjust the code and documentation to recommend doing another PQconsumeInput inside the loop. (Congratulations to dblink for getting this right.) In short, I think we need something like the attached. With these patches, psql consistently reports the notification promptly (for me anyway). regards, tom lane diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index d349d7c..2880736 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -145,6 +145,16 @@ secure_read(Port *port, void *ptr, size_t len) ssize_t n; int waitfor; + /* + * We might already have a pending interrupt condition to deal with. If + * the process latch is set, that would cause us to fall through the read + * and handle the condition below --- but the latch might have been + * cleared since the condition interrupt happened. This is cheap enough + * (if there's nothing to do) that it's not worth being too tense about + * avoiding it. + */ + ProcessClientReadInterrupt(); + retry: #ifdef USE_SSL waitfor = 0; @@ -197,7 +207,7 @@ retry: if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessClientReadInterrupt(true); + ProcessClientReadInterrupt(); /* * We'll retry the read. Most likely it will return immediately @@ -209,11 +219,10 @@ retry: } /* - * Process interrupts that happened while (or before) receiving. Note that - * we signal that we're not blocking, which will prevent some types of - * interrupts from being processed. + * Process interrupts that happened during a successful (or hard-failed) + * read. */ - ProcessClientReadInterrupt(false); + ProcessClientReadInterrupt(); return n; } @@ -248,6 +257,16 @@ secure_write(Port *port, void *ptr, size_t len) ssize_t n; int waitfor; + /* + * We might already have a pending interrupt condition to deal with. If + * the process latch is set, that would cause us to fall through the write + * and handle the condition below --- but the latch might have been + * cleared since the condition interrupt happened. This is cheap enough + * (if there's nothing to do) that it's not worth being too tense about + * avoiding it. + */ + ProcessClientWriteInterrupt(); + retry: waitfor = 0; #ifdef USE_SSL @@ -283,7 +302,7 @@ retry: if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessClientWriteInterrupt(true); + ProcessClientWriteInterrupt(); /* * We'll retry the write. Most likely it will return immediately @@ -295,11 +314,10 @@ retry: } /* - * Process interrupts that happened while (or before) sending. Note that - * we signal that we're not blocking, which will prevent some types of - * interrupts from being processed. + * Process interrupts that happened during a successful (or hard-failed) + * write. */ - ProcessClientWriteInterrupt(false); + ProcessClientWriteInterrupt(); return n; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index e4c6e3d..57f8075 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -315,7 +315,7 @@ interactive_getc(void) c = getc(stdin); - ProcessClientReadInterrupt(true); + ProcessClientReadInterrupt(); return c; } @@ -520,13 +520,12 @@ ReadCommand(StringInfo inBuf) /* * ProcessClientReadInterrupt() - Process interrupts specific to client reads * - * This is called just after low-level reads. That might be after the read - * finished successfully, or it was interrupted via interrupt. + * This is called just before and after low-level reads. * * Must preserve errno! */ void -ProcessClientReadInterrupt(bool blocked) +ProcessClientReadInterrupt(void) { int save_errno = errno; @@ -543,7 +542,7 @@ ProcessClientReadInterrupt(bool blocked) if (notifyInterruptPending) ProcessNotifyInterrupt(); } - else if (ProcDiePending && blocked) + else if (ProcDiePending) { /* * We're dying. It's safe (and sane) to handle that now. @@ -557,25 +556,16 @@ ProcessClientReadInterrupt(bool blocked) /* * ProcessClientWriteInterrupt() - Process interrupts specific to client writes * - * This is called just after low-level writes. That might be after the read - * finished successfully, or it was interrupted via interrupt. 'blocked' tells - * us whether the + * This is called just before and after low-level writes. * * Must preserve errno! */ void -ProcessClientWriteInterrupt(bool blocked) +ProcessClientWriteInterrupt(void) { int save_errno = errno; - /* - * We only want to process the interrupt here if socket writes are - * blocking to increase the chance to get an error message to the client. - * If we're not blocked there'll soon be a CHECK_FOR_INTERRUPTS(). But if - * we're blocked we'll never get out of that situation if the client has - * died. - */ - if (ProcDiePending && blocked) + if (ProcDiePending) { /* * We're dying. It's safe (and sane) to handle that now. But we don't diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 63b4e48..8051d9a 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -71,8 +71,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS); extern void FloatExceptionHandler(SIGNAL_ARGS) pg_attribute_noreturn(); extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from SIGUSR1 * handler */ -extern void ProcessClientReadInterrupt(bool blocked); -extern void ProcessClientWriteInterrupt(bool blocked); +extern void ProcessClientReadInterrupt(void); +extern void ProcessClientWriteInterrupt(void); extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 06d909e..82a4405 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -5284,7 +5284,7 @@ typedef struct pgNotify <para> <function>PQnotifies</function> does not actually read data from the server; it just returns messages previously absorbed by another - <application>libpq</application> function. In prior releases of + <application>libpq</application> function. In ancient releases of <application>libpq</application>, the only way to ensure timely receipt of <command>NOTIFY</command> messages was to constantly submit commands, even empty ones, and then check <function>PQnotifies</function> after each @@ -8711,6 +8711,7 @@ main(int argc, char **argv) notify->relname, notify->be_pid); PQfreemem(notify); nnotifies++; + PQconsumeInput(conn); } } diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index b569959..62c2928 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -836,7 +836,8 @@ PrintNotifications(void) { PGnotify *notify; - while ((notify = PQnotifies(pset.db))) + PQconsumeInput(pset.db); + while ((notify = PQnotifies(pset.db)) != NULL) { /* for backward compatibility, only show payload if nonempty */ if (notify->extra[0]) @@ -847,6 +848,7 @@ PrintNotifications(void) notify->relname, notify->be_pid); fflush(pset.queryFout); PQfreemem(notify); + PQconsumeInput(pset.db); } } diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index ff24449..42640ba 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -1722,12 +1722,13 @@ ecpg_process_output(struct statement *stmt, bool clear_result) } /* check for asynchronous returns */ - notify = PQnotifies(stmt->connection->connection); - if (notify) + PQconsumeInput(stmt->connection->connection); + while ((notify = PQnotifies(stmt->connection->connection)) != NULL) { ecpg_log("ecpg_process_output on line %d: asynchronous notification of \"%s\" from backend PID %d received\n", stmt->lineno, notify->relname, notify->be_pid); PQfreemem(notify); + PQconsumeInput(stmt->connection->connection); } return status; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index e8b28d9..6aed8c8 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2265,6 +2265,9 @@ sendFailed: * no unhandled async notification from the backend * * the CALLER is responsible for FREE'ing the structure returned + * + * Note that this function does not read any new data from the socket; + * so usually, caller should call PQconsumeInput() first. */ PGnotify * PQnotifies(PGconn *conn) diff --git a/src/test/examples/testlibpq2.c b/src/test/examples/testlibpq2.c index 62ecd68..6cdf8c8 100644 --- a/src/test/examples/testlibpq2.c +++ b/src/test/examples/testlibpq2.c @@ -140,6 +140,7 @@ main(int argc, char **argv) notify->relname, notify->be_pid); PQfreemem(notify); nnotifies++; + PQconsumeInput(conn); } }
pgsql-bugs by date: