Re: Fix for non-blocking connections in libpq - Mailing list pgsql-patches
From | Bernhard Herzog |
---|---|
Subject | Re: Fix for non-blocking connections in libpq |
Date | |
Msg-id | 6qelj9bynl.fsf@abnoba.intevation.de Whole thread Raw |
In response to | Re: Fix for non-blocking connections in libpq (Bruce Momjian <pgman@candle.pha.pa.us>) |
Responses |
Re: Fix for non-blocking connections in libpq
|
List | pgsql-patches |
Bruce Momjian <pgman@candle.pha.pa.us> writes: > Bernard, just checking. Is this the most recent version of your patch? In principle, yes. However, I've ported it to the CVS version in the meantime. Here's a patch against current CVS HEAD: Index: src/interfaces/libpq/fe-exec.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v retrieving revision 1.113 diff -c -r1.113 fe-exec.c *** src/interfaces/libpq/fe-exec.c 2001/10/25 05:50:13 1.113 --- src/interfaces/libpq/fe-exec.c 2002/02/25 10:21:06 *************** *** 2340,2342 **** --- 2340,2350 ---- return (pqFlush(conn)); } + + /* try to force data out, really only useful for non-blocking users. + * This implementation actually works for non-blocking connections */ + int + PQsendSome(PGconn *conn) + { + return pqSendSome(conn); + } Index: src/interfaces/libpq/fe-misc.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v retrieving revision 1.65 diff -c -r1.65 fe-misc.c *** src/interfaces/libpq/fe-misc.c 2001/12/03 00:28:24 1.65 --- src/interfaces/libpq/fe-misc.c 2002/02/25 10:21:06 *************** *** 110,164 **** static int pqPutBytes(const char *s, size_t nbytes, PGconn *conn) { ! size_t avail = Max(conn->outBufSize - conn->outCount, 0); ! ! /* ! * if we are non-blocking and the send queue is too full to buffer ! * this request then try to flush some and return an error */ ! if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn)) { ! /* ! * even if the flush failed we may still have written some data, ! * recalculate the size of the send-queue relative to the amount ! * we have to send, we may be able to queue it afterall even ! * though it's not sent to the database it's ok, any routines that ! * check the data coming from the database better call pqFlush() ! * anyway. ! */ ! if (nbytes > Max(conn->outBufSize - conn->outCount, 0)) ! { ! printfPQExpBuffer(&conn->errorMessage, ! libpq_gettext("could not flush enough data (space available: %d, space needed %d)\n"), ! (int) Max(conn->outBufSize - conn->outCount, 0), ! (int) nbytes); ! return EOF; ! } ! /* fixup avail for while loop */ avail = Max(conn->outBufSize - conn->outCount, 0); ! } ! /* ! * is the amount of data to be sent is larger than the size of the ! * output buffer then we must flush it to make more room. ! * ! * the code above will make sure the loop conditional is never true for ! * non-blocking connections ! */ ! while (nbytes > avail) ! { ! memcpy(conn->outBuffer + conn->outCount, s, avail); ! conn->outCount += avail; ! s += avail; ! nbytes -= avail; ! if (pqFlush(conn)) ! return EOF; ! avail = conn->outBufSize; ! } ! memcpy(conn->outBuffer + conn->outCount, s, nbytes); ! conn->outCount += nbytes; return 0; } --- 110,184 ---- static int pqPutBytes(const char *s, size_t nbytes, PGconn *conn) { ! /* Strategy to handle blocking and non-blocking connections: Fill ! * the output buffer and flush it repeatedly until either all data ! * has been sent or is at least queued in the buffer. ! * ! * For non-blocking connections, grow the buffer if not all data ! * fits into it and the buffer can't be sent because the socket ! * would block. */ ! ! while (nbytes) { ! size_t avail, remaining; ! ! /* fill the output buffer */ avail = Max(conn->outBufSize - conn->outCount, 0); ! remaining = Min(avail, nbytes); ! memcpy(conn->outBuffer + conn->outCount, s, remaining); ! conn->outCount += remaining; ! s += remaining; ! nbytes -= remaining; ! ! /* if the data didn't fit completely into the buffer, try to ! * flush the buffer */ ! if (nbytes) ! { ! int send_result = pqSendSome(conn); ! /* if there were errors, report them */ ! if (send_result < 0) ! return EOF; ! /* if not all data could be sent, increase the output ! * buffer, put the rest of s into it and return ! * successfully. This case will only happen in a ! * non-blocking connection ! */ ! if (send_result > 0) ! { ! /* try to grow the buffer. ! * FIXME: The new size could be chosen more ! * intelligently. ! */ ! size_t buflen = conn->outCount + nbytes; ! if (buflen > conn->outBufSize) ! { ! char * newbuf = realloc(conn->outBuffer, buflen); ! if (!newbuf) ! { ! /* realloc failed. Probably out of memory */ ! printfPQExpBuffer(&conn->errorMessage, ! "cannot allocate memory for output buffer\n"); ! return EOF; ! } ! conn->outBuffer = newbuf; ! conn->outBufSize = buflen; ! } ! /* put the data into it */ ! memcpy(conn->outBuffer + conn->outCount, s, nbytes); ! conn->outCount += nbytes; + /* report success. */ + return 0; + } + } + + /* pqSendSome was able to send all data. Continue with the next + * chunk of s. */ + } /* while */ + return 0; } *************** *** 604,613 **** } /* ! * pqFlush: send any data waiting in the output buffer */ int ! pqFlush(PGconn *conn) { char *ptr = conn->outBuffer; int len = conn->outCount; --- 624,636 ---- } /* ! * pqSendSome: send any data waiting in the output buffer. ! * ! * Return 0 on sucess, -1 on failure and 1 when data remains because the ! * socket would block and the connection is non-blocking. */ int ! pqSendSome(PGconn *conn) { char *ptr = conn->outBuffer; int len = conn->outCount; *************** *** 616,622 **** { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); ! return EOF; } /* --- 639,645 ---- { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); ! return -1; } /* *************** *** 674,680 **** printfPQExpBuffer(&conn->errorMessage, libpq_gettext( "server closed the connection unexpectedly\n" ! "\tThis probably means the server terminated abnormally\n" "\tbefore or while processing the request.\n")); /* --- 697,703 ---- printfPQExpBuffer(&conn->errorMessage, libpq_gettext( "server closed the connection unexpectedly\n" ! "\tThis probably means the server terminated abnormally\n" "\tbefore or while processing the request.\n")); /* *************** *** 685,698 **** * the socket open until pqReadData finds no more data * can be read. */ ! return EOF; default: printfPQExpBuffer(&conn->errorMessage, libpq_gettext("could not send data to server: %s\n"), SOCK_STRERROR(SOCK_ERRNO)); /* We don't assume it's a fatal error... */ ! return EOF; } } else --- 708,721 ---- * the socket open until pqReadData finds no more data * can be read. */ ! return -1; default: printfPQExpBuffer(&conn->errorMessage, libpq_gettext("could not send data to server: %s\n"), SOCK_STRERROR(SOCK_ERRNO)); /* We don't assume it's a fatal error... */ ! return -1; } } else *************** *** 707,713 **** /* * if the socket is in non-blocking mode we may need to abort ! * here */ #ifdef USE_SSL /* can't do anything for our SSL users yet */ --- 730,736 ---- /* * if the socket is in non-blocking mode we may need to abort ! * here and return 1 to indicate that data is still pending. */ #ifdef USE_SSL /* can't do anything for our SSL users yet */ *************** *** 719,732 **** /* shift the contents of the buffer */ memmove(conn->outBuffer, ptr, len); conn->outCount = len; ! return EOF; } #ifdef USE_SSL } #endif if (pqWait(FALSE, TRUE, conn)) ! return EOF; } } --- 742,755 ---- /* shift the contents of the buffer */ memmove(conn->outBuffer, ptr, len); conn->outCount = len; ! return 1; } #ifdef USE_SSL } #endif if (pqWait(FALSE, TRUE, conn)) ! return -1; } } *************** *** 735,740 **** --- 758,783 ---- if (conn->Pfdebug) fflush(conn->Pfdebug); + return 0; + } + + + + /* + * pqFlush: send any data waiting in the output buffer + * + * Implemented in terms of pqSendSome to recreate the old behavior which + * returned 0 if all data was sent or EOF. EOF was sent regardless of + * whether an error occurred or not all data was sent on a non-blocking + * socket. + */ + int + pqFlush(PGconn *conn) + { + if (pqSendSome(conn)) + { + return EOF; + } return 0; } Index: src/interfaces/libpq/libpq-fe.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v retrieving revision 1.80 diff -c -r1.80 libpq-fe.h *** src/interfaces/libpq/libpq-fe.h 2001/11/08 20:37:52 1.80 --- src/interfaces/libpq/libpq-fe.h 2002/02/25 10:21:06 *************** *** 279,284 **** --- 279,285 ---- /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); + extern int PQsendSome(PGconn *conn); /* * "Fast path" interface --- not really recommended for application Index: src/interfaces/libpq/libpq-int.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v retrieving revision 1.44 diff -c -r1.44 libpq-int.h *** src/interfaces/libpq/libpq-int.h 2001/11/05 17:46:38 1.44 --- src/interfaces/libpq/libpq-int.h 2002/02/25 10:21:06 *************** *** 323,328 **** --- 323,329 ---- extern int pqPutInt(int value, size_t bytes, PGconn *conn); extern int pqReadData(PGconn *conn); extern int pqFlush(PGconn *conn); + extern int pqSendSome(PGconn *conn); extern int pqWait(int forRead, int forWrite, PGconn *conn); extern int pqReadReady(PGconn *conn); extern int pqWriteReady(PGconn *conn); Bernhard -- Intevation GmbH http://intevation.de/ Sketch http://sketch.sourceforge.net/ MapIt! http://mapit.de/
pgsql-patches by date: