Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: Speed dblink using alternate libpq tuple storage |
Date | |
Msg-id | 20120216.174934.80061891.horiguchi.kyotaro@oss.ntt.co.jp Whole thread Raw |
In response to | Re: Speed dblink using alternate libpq tuple storage (Marko Kreen <markokr@gmail.com>) |
Responses |
Re: Speed dblink using alternate libpq tuple storage
|
List | pgsql-hackers |
Hello, I added the function PQskipRemainingResult() and use it in dblink. This reduces the number of executing try-catch block from the number of rows to one per query in dblink. - fe-exec.c : new function PQskipRemainingResult. - dblink.c : using PQskipRemainingResult in dblink_record_internal(). - libpq.sgml: documentation for PQskipRemainingResult and related change in RowProcessor. > Instead I added simple feature: rowProcessor can return '2', > in which case getAnotherTuple() does early exit without setting > any error state. In user side it appears as PQisBusy() returned > with TRUE result. All pointers stay valid, so callback can just > stuff them into some temp area. ... > It's included in main patch, but I also attached it as separate patch > so that it can be examined separately and reverted if not acceptable. This patch is based on the patch above and composed in the same manner - main three patches include all modifications and the '2' patch separately. This patch is not rebased to the HEAD because the HEAD yields error about the symbol LEAKPROOF... regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/#fe-protocol3.c# b/src/interfaces/libpq/#fe-protocol3.c# new file mode 100644 index 0000000..8b7eed2 --- /dev/null +++ b/src/interfaces/libpq/#fe-protocol3.c# @@ -0,0 +1,1967 @@ +/*------------------------------------------------------------------------- + * + * fe-protocol3.c + * functions that are specific to frontend/backend protocol version 3 + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/interfaces/libpq/fe-protocol3.c + * + *------------------------------------------------------------------------- + */ +#include "postgres_fe.h" + +#include <ctype.h> +#include <fcntl.h> + +#include "libpq-fe.h" +#include "libpq-int.h" + +#include "mb/pg_wchar.h" + +#ifdef WIN32 +#include "win32.h" +#else +#include <unistd.h> +#include <netinet/in.h> +#ifdef HAVE_NETINET_TCP_H +#include <netinet/tcp.h> +#endif +#include <arpa/inet.h> +#endif + + +/* + * This macro lists the backend message types that could be "long" (more + * than a couple of kilobytes). + */ +#define VALID_LONG_MESSAGE_TYPE(id) \ + ((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'V' || \ + (id) == 'E' || (id) == 'N' || (id) == 'A') + + +static void handleSyncLoss(PGconn *conn, char id, int msgLength); +static int getRowDescriptions(PGconn *conn); +static int getParamDescriptions(PGconn *conn); +static int getAnotherTuple(PGconn *conn, int msgLength); +static int getParameterStatus(PGconn *conn); +static int getNotify(PGconn *conn); +static int getCopyStart(PGconn *conn, ExecStatusType copytype); +static int getReadyForQuery(PGconn *conn); +static void reportErrorPosition(PQExpBuffer msg, const char *query, + int loc, int encoding); +static int build_startup_packet(const PGconn *conn, char *packet, + const PQEnvironmentOption *options); + + +/* + * parseInput: if appropriate, parse input data from backend + * until input is exhausted or a stopping state is reached. + * Note that this function will NOT attempt to read more data from the backend. + */ +void +pqParseInput3(PGconn *conn) +{ + char id; + int msgLength; + int avail; + + /* + * Loop to parse successive complete messages available in the buffer. + */ + for (;;) + { + /* + * Try to read a message. First get the type code and length. Return + * if not enough data. + */ + conn->inCursor = conn->inStart; + if (pqGetc(&id, conn)) + return; + if (pqGetInt(&msgLength, 4, conn)) + return; + + /* + * Try to validate message type/length here. A length less than 4 is + * definitely broken. Large lengths should only be believed for a few + * message types. + */ + if (msgLength < 4) + { + handleSyncLoss(conn, id, msgLength); + return; + } + if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id)) + { + handleSyncLoss(conn, id, msgLength); + return; + } + + /* + * Can't process if message body isn't all here yet. + */ + msgLength -= 4; + avail = conn->inEnd - conn->inCursor; + if (avail < msgLength) + { + /* + * Before returning, enlarge the input buffer if needed to hold + * the whole message. This is better than leaving it to + * pqReadData because we can avoid multiple cycles of realloc() + * when the message is large; also, we can implement a reasonable + * recovery strategy if we are unable to make the buffer big + * enough. + */ + if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength, + conn)) + { + /* + * XXX add some better recovery code... plan is to skip over + * the message using its length, then report an error. For the + * moment, just treat this like loss of sync (which indeed it + * might be!) + */ + handleSyncLoss(conn, id, msgLength); + } + return; + } + + /* + * NOTIFY and NOTICE messages can happen in any state; always process + * them right away. + * + * Most other messages should only be processed while in BUSY state. + * (In particular, in READY state we hold off further parsing until + * the application collects the current PGresult.) + * + * However, if the state is IDLE then we got trouble; we need to deal + * with the unexpected message somehow. + * + * ParameterStatus ('S') messages are a special case: in IDLE state we + * must process 'em (this case could happen if a new value was adopted + * from config file due to SIGHUP), but otherwise we hold off until + * BUSY state. + */ + if (id == 'A') + { + if (getNotify(conn)) + return; + } + else if (id == 'N') + { + if (pqGetErrorNotice3(conn, false)) + return; + } + else if (conn->asyncStatus != PGASYNC_BUSY) + { + /* If not IDLE state, just wait ... */ + if (conn->asyncStatus != PGASYNC_IDLE) + return; + + /* + * Unexpected message in IDLE state; need to recover somehow. + * ERROR messages are displayed using the notice processor; + * ParameterStatus is handled normally; anything else is just + * dropped on the floor after displaying a suitable warning + * notice. (An ERROR is very possibly the backend telling us why + * it is about to close the connection, so we don't want to just + * discard it...) + */ + if (id == 'E') + { + if (pqGetErrorNotice3(conn, false /* treat as notice */ )) + return; + } + else if (id == 'S') + { + if (getParameterStatus(conn)) + return; + } + else + { + pqInternalNotice(&conn->noticeHooks, + "message type 0x%02x arrived from server while idle", + id); + /* Discard the unexpected message */ + conn->inCursor += msgLength; + } + } + else + { + /* + * In BUSY state, we can process everything. + */ + switch (id) + { + case 'C': /* command complete */ + if (pqGets(&conn->workBuffer, conn)) + return; + if (conn->result == NULL) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_COMMAND_OK); + if (!conn->result) + return; + } + strncpy(conn->result->cmdStatus, conn->workBuffer.data, + CMDSTATUS_LEN); + conn->asyncStatus = PGASYNC_READY; + break; + case 'E': /* error return */ + if (pqGetErrorNotice3(conn, true)) + return; + conn->asyncStatus = PGASYNC_READY; + break; + case 'Z': /* backend is ready for new query */ + if (getReadyForQuery(conn)) + return; + conn->asyncStatus = PGASYNC_IDLE; + break; + case 'I': /* empty query */ + if (conn->result == NULL) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_EMPTY_QUERY); + if (!conn->result) + return; + } + conn->asyncStatus = PGASYNC_READY; + break; + case '1': /* Parse Complete */ + /* If we're doing PQprepare, we're done; else ignore */ + if (conn->queryclass == PGQUERY_PREPARE) + { + if (conn->result == NULL) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_COMMAND_OK); + if (!conn->result) + return; + } + conn->asyncStatus = PGASYNC_READY; + } + break; + case '2': /* Bind Complete */ + case '3': /* Close Complete */ + /* Nothing to do for these message types */ + break; + case 'S': /* parameter status */ + if (getParameterStatus(conn)) + return; + break; + case 'K': /* secret key data from the backend */ + + /* + * This is expected only during backend startup, but it's + * just as easy to handle it as part of the main loop. + * Save the data and continue processing. + */ + if (pqGetInt(&(conn->be_pid), 4, conn)) + return; + if (pqGetInt(&(conn->be_key), 4, conn)) + return; + break; + case 'T': /* Row Description */ + if (conn->result == NULL || + conn->queryclass == PGQUERY_DESCRIBE) + { + /* First 'T' in a query sequence */ + if (getRowDescriptions(conn)) + return; + + /* + * If we're doing a Describe, we're ready to pass the + * result back to the client. + */ + if (conn->queryclass == PGQUERY_DESCRIBE) + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* + * A new 'T' message is treated as the start of + * another PGresult. (It is not clear that this is + * really possible with the current backend.) We stop + * parsing until the application accepts the current + * result. + */ + conn->asyncStatus = PGASYNC_READY; + return; + } + break; + case 'n': /* No Data */ + + /* + * NoData indicates that we will not be seeing a + * RowDescription message because the statement or portal + * inquired about doesn't return rows. + * + * If we're doing a Describe, we have to pass something + * back to the client, so set up a COMMAND_OK result, + * instead of TUPLES_OK. Otherwise we can just ignore + * this message. + */ + if (conn->queryclass == PGQUERY_DESCRIBE) + { + if (conn->result == NULL) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_COMMAND_OK); + if (!conn->result) + return; + } + conn->asyncStatus = PGASYNC_READY; + } + break; + case 't': /* Parameter Description */ + if (getParamDescriptions(conn)) + return; + break; + case 'D': /* Data Row */ + if (conn->result != NULL && + conn->result->resultStatus == PGRES_TUPLES_OK) + { + /* Read another tuple of a normal query response */ + if (getAnotherTuple(conn, msgLength)) + return; + } + else if (conn->result != NULL && + conn->result->resultStatus == PGRES_FATAL_ERROR) + { + /* + * We've already choked for some reason. Just discard + * tuples till we get to the end of the query. + */ + conn->inCursor += msgLength; + } + else + { + /* Set up to report error at end of query */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("server sent data (\"D\" message) without prior row description(\"T\" message)\n")); + pqSaveErrorResult(conn); + /* Discard the unexpected message */ + conn->inCursor += msgLength; + } + break; + case 'G': /* Start Copy In */ + if (getCopyStart(conn, PGRES_COPY_IN)) + return; + conn->asyncStatus = PGASYNC_COPY_IN; + break; + case 'H': /* Start Copy Out */ + if (getCopyStart(conn, PGRES_COPY_OUT)) + return; + conn->asyncStatus = PGASYNC_COPY_OUT; + conn->copy_already_done = 0; + break; + case 'W': /* Start Copy Both */ + if (getCopyStart(conn, PGRES_COPY_BOTH)) + return; + conn->asyncStatus = PGASYNC_COPY_BOTH; + conn->copy_already_done = 0; + break; + case 'd': /* Copy Data */ + + /* + * If we see Copy Data, just silently drop it. This would + * only occur if application exits COPY OUT mode too + * early. + */ + conn->inCursor += msgLength; + break; + case 'c': /* Copy Done */ + + /* + * If we see Copy Done, just silently drop it. This is + * the normal case during PQendcopy. We will keep + * swallowing data, expecting to see command-complete for + * the COPY command. + */ + break; + default: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "unexpected response from server; first received character was \"%c\"\n"), + id); + /* build an error result holding the error message */ + pqSaveErrorResult(conn); + /* not sure if we will see more, so go to ready state */ + conn->asyncStatus = PGASYNC_READY; + /* Discard the unexpected message */ + conn->inCursor += msgLength; + break; + } /* switch on protocol character */ + } + /* Successfully consumed this message */ + if (conn->inCursor == conn->inStart + 5 + msgLength) + { + /* Normal case: parsing agrees with specified length */ + conn->inStart = conn->inCursor; + } + else + { + /* Trouble --- report it */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("message contents do not agree with length in message type \"%c\"\n"), + id); + /* build an error result holding the error message */ + pqSaveErrorResult(conn); + conn->asyncStatus = PGASYNC_READY; + /* trust the specified message length as what to skip */ + conn->inStart += 5 + msgLength; + } + } +} + +/* + * handleSyncLoss: clean up after loss of message-boundary sync + * + * There isn't really a lot we can do here except abandon the connection. + */ +static void +handleSyncLoss(PGconn *conn, char id, int msgLength) +{ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "lost synchronization with server: got message type \"%c\", length %d\n"), + id, msgLength); + /* build an error result holding the error message */ + pqSaveErrorResult(conn); + conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ + + pqsecure_close(conn); + closesocket(conn->sock); + conn->sock = -1; + conn->status = CONNECTION_BAD; /* No more connection to backend */ +} + +/* + * parseInput subroutine to read a 'T' (row descriptions) message. + * We'll build a new PGresult structure (unless called for a Describe + * command for a prepared statement) containing the attribute data. + * Returns: 0 if completed message, EOF if not enough data yet. + * + * Note that if we run out of data, we have to release the partially + * constructed PGresult, and rebuild it again next time. Fortunately, + * that shouldn't happen often, since 'T' messages usually fit in a packet. + */ +static int +getRowDescriptions(PGconn *conn) +{ + PGresult *result; + int nfields; + int i; + + /* + * When doing Describe for a prepared statement, there'll already be a + * PGresult created by getParamDescriptions, and we should fill data into + * that. Otherwise, create a new, empty PGresult. + */ + if (conn->queryclass == PGQUERY_DESCRIBE) + { + if (conn->result) + result = conn->result; + else + result = PQmakeEmptyPGresult(conn, PGRES_COMMAND_OK); + } + else + result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK); + if (!result) + goto failure; + + /* parseInput already read the 'T' label and message length. */ + /* the next two bytes are the number of fields */ + if (pqGetInt(&(result->numAttributes), 2, conn)) + goto failure; + nfields = result->numAttributes; + + /* allocate space for the attribute descriptors */ + if (nfields > 0) + { + result->attDescs = (PGresAttDesc *) + pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); + if (!result->attDescs) + goto failure; + MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); + } + + /* result->binary is true only if ALL columns are binary */ + result->binary = (nfields > 0) ? 1 : 0; + + /* get type info */ + for (i = 0; i < nfields; i++) + { + int tableid; + int columnid; + int typid; + int typlen; + int atttypmod; + int format; + + if (pqGets(&conn->workBuffer, conn) || + pqGetInt(&tableid, 4, conn) || + pqGetInt(&columnid, 2, conn) || + pqGetInt(&typid, 4, conn) || + pqGetInt(&typlen, 2, conn) || + pqGetInt(&atttypmod, 4, conn) || + pqGetInt(&format, 2, conn)) + { + goto failure; + } + + /* + * Since pqGetInt treats 2-byte integers as unsigned, we need to + * coerce these results to signed form. + */ + columnid = (int) ((int16) columnid); + typlen = (int) ((int16) typlen); + format = (int) ((int16) format); + + result->attDescs[i].name = pqResultStrdup(result, + conn->workBuffer.data); + if (!result->attDescs[i].name) + goto failure; + result->attDescs[i].tableid = tableid; + result->attDescs[i].columnid = columnid; + result->attDescs[i].format = format; + result->attDescs[i].typid = typid; + result->attDescs[i].typlen = typlen; + result->attDescs[i].atttypmod = atttypmod; + + if (format != 1) + result->binary = 0; + } + + /* Success! */ + conn->result = result; + return 0; + +failure: + + /* + * Discard incomplete result, unless it's from getParamDescriptions. + * + * Note that if we hit a bufferload boundary while handling the + * describe-statement case, we'll forget any PGresult space we just + * allocated, and then reallocate it on next try. This will bloat the + * PGresult a little bit but the space will be freed at PQclear, so it + * doesn't seem worth trying to be smarter. + */ + if (result != conn->result) + PQclear(result); + return EOF; +} + +/* + * parseInput subroutine to read a 't' (ParameterDescription) message. + * We'll build a new PGresult structure containing the parameter data. + * Returns: 0 if completed message, EOF if not enough data yet. + * + * Note that if we run out of data, we have to release the partially + * constructed PGresult, and rebuild it again next time. Fortunately, + * that shouldn't happen often, since 't' messages usually fit in a packet. + */ +static int +getParamDescriptions(PGconn *conn) +{ + PGresult *result; + int nparams; + int i; + + result = PQmakeEmptyPGresult(conn, PGRES_COMMAND_OK); + if (!result) + goto failure; + + /* parseInput already read the 't' label and message length. */ + /* the next two bytes are the number of parameters */ + if (pqGetInt(&(result->numParameters), 2, conn)) + goto failure; + nparams = result->numParameters; + + /* allocate space for the parameter descriptors */ + if (nparams > 0) + { + result->paramDescs = (PGresParamDesc *) + pqResultAlloc(result, nparams * sizeof(PGresParamDesc), TRUE); + if (!result->paramDescs) + goto failure; + MemSet(result->paramDescs, 0, nparams * sizeof(PGresParamDesc)); + } + + /* get parameter info */ + for (i = 0; i < nparams; i++) + { + int typid; + + if (pqGetInt(&typid, 4, conn)) + goto failure; + result->paramDescs[i].typid = typid; + } + + /* Success! */ + conn->result = result; + return 0; + +failure: + PQclear(result); + return EOF; +} + +/* + * parseInput subroutine to read a 'D' (row data) message. + * We add another tuple to the existing PGresult structure. + * Returns: 0 if completed message, EOF if error or not enough data yet. + * + * Note that if we run out of data, we have to suspend and reprocess + * the message after more data is received. We keep a partially constructed + * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + */ +static int +getAnotherTuple(PGconn *conn, int msgLength) +{ + PGresult *result = conn->result; + int nfields = result->numAttributes; + PGrowValue rowval[result->numAttributes + 1]; + int tupnfields; /* # fields from tuple */ + int vlen; /* length of the current field value */ + int i; + + /* Allocate tuple space if first time for this data message */ + /* Get the field count and make sure it's what we expect */ + if (pqGetInt(&tupnfields, 2, conn)) + return EOF; + + if (tupnfields != nfields) + { + /* Replace partially constructed result with an error result */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("unexpected field count in \"D\" message\n")); + pqSaveErrorResult(conn); + /* Discard the failed message by pretending we read it */ + conn->inCursor = conn->inStart + 5 + msgLength; + return 0; + } + + /* Scan the fields */ + for (i = 0; i < nfields; i++) + { + /* get the value length */ + if (pqGetInt(&vlen, 4, conn)) + return EOF; + if (vlen == -1) + vlen = NULL_LEN; + else if (vlen < 0) + vlen = 0; + + /* + * Buffer content may be shifted on reloading additional + * data. So we must set all pointers on every scan. + * + * rowval[i].value always points to the next address of the + * length field even if the value length is zero or the value + * is NULL for the access safety. + */ + rowval[i].value = conn->inBuffer + conn->inCursor; + rowval[i].len = vlen; + + /* Skip to the next length field */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + return EOF; + } + + /* + * Set rowval[nfields] for the access safety. We can estimate the + * length of the buffer to store by + * + * rowval[nfields].value - rowval[0].value - 4 * nfields. + */ + rowval[nfields].value = conn->inBuffer + conn->inCursor; + rowval[nfields].len = NULL_LEN; + + /* Success! Pass the completed row values to rowProcessor */ + if (!result->rowProcessor(result, result->rowProcessorParam, rowval)) + goto rowProcessError; + + /* Free garbage error message. */ + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } + + return 0; + +rowProcessError: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->rowProcessorErrMes ? + result->rowProcessorErrMes : + "out of memory for query result\n")); + if (result->rowProcessorErrMes) + { +o free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } + pqSaveErrorResult(conn); + + /* Discard the failed message by pretending we read it */ + conn->inCursor = conn->inStart + 5 + msgLength; + return 0; +} + + +/* + * Attempt to read an Error or Notice response message. + * This is possible in several places, so we break it out as a subroutine. + * Entry: 'E' or 'N' message type and length have already been consumed. + * Exit: returns 0 if successfully consumed message. + * returns EOF if not enough data. + */ +int +pqGetErrorNotice3(PGconn *conn, bool isError) +{ + PGresult *res = NULL; + PQExpBufferData workBuf; + char id; + const char *val; + const char *querytext = NULL; + int querypos = 0; + + /* + * Since the fields might be pretty long, we create a temporary + * PQExpBuffer rather than using conn->workBuffer. workBuffer is intended + * for stuff that is expected to be short. We shouldn't use + * conn->errorMessage either, since this might be only a notice. + */ + initPQExpBuffer(&workBuf); + + /* + * Make a PGresult to hold the accumulated fields. We temporarily lie + * about the result status, so that PQmakeEmptyPGresult doesn't uselessly + * copy conn->errorMessage. + */ + res = PQmakeEmptyPGresult(conn, PGRES_EMPTY_QUERY); + if (!res) + goto fail; + res->resultStatus = isError ? PGRES_FATAL_ERROR : PGRES_NONFATAL_ERROR; + + /* + * Read the fields and save into res. + */ + for (;;) + { + if (pqGetc(&id, conn)) + goto fail; + if (id == '\0') + break; /* terminator found */ + if (pqGets(&workBuf, conn)) + goto fail; + pqSaveMessageField(res, id, workBuf.data); + } + + /* + * Now build the "overall" error message for PQresultErrorMessage. + * + * Also, save the SQLSTATE in conn->last_sqlstate. + */ + resetPQExpBuffer(&workBuf); + val = PQresultErrorField(res, PG_DIAG_SEVERITY); + if (val) + appendPQExpBuffer(&workBuf, "%s: ", val); + val = PQresultErrorField(res, PG_DIAG_SQLSTATE); + if (val) + { + if (strlen(val) < sizeof(conn->last_sqlstate)) + strcpy(conn->last_sqlstate, val); + if (conn->verbosity == PQERRORS_VERBOSE) + appendPQExpBuffer(&workBuf, "%s: ", val); + } + val = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + if (val) + appendPQExpBufferStr(&workBuf, val); + val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION); + if (val) + { + if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL) + { + /* emit position as a syntax cursor display */ + querytext = conn->last_query; + querypos = atoi(val); + } + else + { + /* emit position as text addition to primary message */ + /* translator: %s represents a digit string */ + appendPQExpBuffer(&workBuf, libpq_gettext(" at character %s"), + val); + } + } + else + { + val = PQresultErrorField(res, PG_DIAG_INTERNAL_POSITION); + if (val) + { + querytext = PQresultErrorField(res, PG_DIAG_INTERNAL_QUERY); + if (conn->verbosity != PQERRORS_TERSE && querytext != NULL) + { + /* emit position as a syntax cursor display */ + querypos = atoi(val); + } + else + { + /* emit position as text addition to primary message */ + /* translator: %s represents a digit string */ + appendPQExpBuffer(&workBuf, libpq_gettext(" at character %s"), + val); + } + } + } + appendPQExpBufferChar(&workBuf, '\n'); + if (conn->verbosity != PQERRORS_TERSE) + { + if (querytext && querypos > 0) + reportErrorPosition(&workBuf, querytext, querypos, + conn->client_encoding); + val = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + if (val) + appendPQExpBuffer(&workBuf, libpq_gettext("DETAIL: %s\n"), val); + val = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + if (val) + appendPQExpBuffer(&workBuf, libpq_gettext("HINT: %s\n"), val); + val = PQresultErrorField(res, PG_DIAG_INTERNAL_QUERY); + if (val) + appendPQExpBuffer(&workBuf, libpq_gettext("QUERY: %s\n"), val); + val = PQresultErrorField(res, PG_DIAG_CONTEXT); + if (val) + appendPQExpBuffer(&workBuf, libpq_gettext("CONTEXT: %s\n"), val); + } + if (conn->verbosity == PQERRORS_VERBOSE) + { + const char *valf; + const char *vall; + + valf = PQresultErrorField(res, PG_DIAG_SOURCE_FILE); + vall = PQresultErrorField(res, PG_DIAG_SOURCE_LINE); + val = PQresultErrorField(res, PG_DIAG_SOURCE_FUNCTION); + if (val || valf || vall) + { + appendPQExpBufferStr(&workBuf, libpq_gettext("LOCATION: ")); + if (val) + appendPQExpBuffer(&workBuf, libpq_gettext("%s, "), val); + if (valf && vall) /* unlikely we'd have just one */ + appendPQExpBuffer(&workBuf, libpq_gettext("%s:%s"), + valf, vall); + appendPQExpBufferChar(&workBuf, '\n'); + } + } + + /* + * Either save error as current async result, or just emit the notice. + */ + if (isError) + { + res->errMsg = pqResultStrdup(res, workBuf.data); + if (!res->errMsg) + goto fail; + pqClearAsyncResult(conn); + conn->result = res; + appendPQExpBufferStr(&conn->errorMessage, workBuf.data); + } + else + { + /* We can cheat a little here and not copy the message. */ + res->errMsg = workBuf.data; + if (res->noticeHooks.noticeRec != NULL) + (*res->noticeHooks.noticeRec) (res->noticeHooks.noticeRecArg, res); + PQclear(res); + } + + termPQExpBuffer(&workBuf); + return 0; + +fail: + PQclear(res); + termPQExpBuffer(&workBuf); + return EOF; +} + +/* + * Add an error-location display to the error message under construction. + * + * The cursor location is measured in logical characters; the query string + * is presumed to be in the specified encoding. + */ +static void +reportErrorPosition(PQExpBuffer msg, const char *query, int loc, int encoding) +{ +#define DISPLAY_SIZE 60 /* screen width limit, in screen cols */ +#define MIN_RIGHT_CUT 10 /* try to keep this far away from EOL */ + + char *wquery; + int slen, + cno, + i, + *qidx, + *scridx, + qoffset, + scroffset, + ibeg, + iend, + loc_line; + bool mb_encoding, + beg_trunc, + end_trunc; + + /* Convert loc from 1-based to 0-based; no-op if out of range */ + loc--; + if (loc < 0) + return; + + /* Need a writable copy of the query */ + wquery = strdup(query); + if (wquery == NULL) + return; /* fail silently if out of memory */ + + /* + * Each character might occupy multiple physical bytes in the string, and + * in some Far Eastern character sets it might take more than one screen + * column as well. We compute the starting byte offset and starting + * screen column of each logical character, and store these in qidx[] and + * scridx[] respectively. + */ + + /* we need a safe allocation size... */ + slen = strlen(wquery) + 1; + + qidx = (int *) malloc(slen * sizeof(int)); + if (qidx == NULL) + { + free(wquery); + return; + } + scridx = (int *) malloc(slen * sizeof(int)); + if (scridx == NULL) + { + free(qidx); + free(wquery); + return; + } + + /* We can optimize a bit if it's a single-byte encoding */ + mb_encoding = (pg_encoding_max_length(encoding) != 1); + + /* + * Within the scanning loop, cno is the current character's logical + * number, qoffset is its offset in wquery, and scroffset is its starting + * logical screen column (all indexed from 0). "loc" is the logical + * character number of the error location. We scan to determine loc_line + * (the 1-based line number containing loc) and ibeg/iend (first character + * number and last+1 character number of the line containing loc). Note + * that qidx[] and scridx[] are filled only as far as iend. + */ + qoffset = 0; + scroffset = 0; + loc_line = 1; + ibeg = 0; + iend = -1; /* -1 means not set yet */ + + for (cno = 0; wquery[qoffset] != '\0'; cno++) + { + char ch = wquery[qoffset]; + + qidx[cno] = qoffset; + scridx[cno] = scroffset; + + /* + * Replace tabs with spaces in the writable copy. (Later we might + * want to think about coping with their variable screen width, but + * not today.) + */ + if (ch == '\t') + wquery[qoffset] = ' '; + + /* + * If end-of-line, count lines and mark positions. Each \r or \n + * counts as a line except when \r \n appear together. + */ + else if (ch == '\r' || ch == '\n') + { + if (cno < loc) + { + if (ch == '\r' || + cno == 0 || + wquery[qidx[cno - 1]] != '\r') + loc_line++; + /* extract beginning = last line start before loc. */ + ibeg = cno + 1; + } + else + { + /* set extract end. */ + iend = cno; + /* done scanning. */ + break; + } + } + + /* Advance */ + if (mb_encoding) + { + int w; + + w = pg_encoding_dsplen(encoding, &wquery[qoffset]); + /* treat any non-tab control chars as width 1 */ + if (w <= 0) + w = 1; + scroffset += w; + qoffset += pg_encoding_mblen(encoding, &wquery[qoffset]); + } + else + { + /* We assume wide chars only exist in multibyte encodings */ + scroffset++; + qoffset++; + } + } + /* Fix up if we didn't find an end-of-line after loc */ + if (iend < 0) + { + iend = cno; /* query length in chars, +1 */ + qidx[iend] = qoffset; + scridx[iend] = scroffset; + } + + /* Print only if loc is within computed query length */ + if (loc <= cno) + { + /* If the line extracted is too long, we truncate it. */ + beg_trunc = false; + end_trunc = false; + if (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE) + { + /* + * We first truncate right if it is enough. This code might be + * off a space or so on enforcing MIN_RIGHT_CUT if there's a wide + * character right there, but that should be okay. + */ + if (scridx[ibeg] + DISPLAY_SIZE >= scridx[loc] + MIN_RIGHT_CUT) + { + while (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE) + iend--; + end_trunc = true; + } + else + { + /* Truncate right if not too close to loc. */ + while (scridx[loc] + MIN_RIGHT_CUT < scridx[iend]) + { + iend--; + end_trunc = true; + } + + /* Truncate left if still too long. */ + while (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE) + { + ibeg++; + beg_trunc = true; + } + } + } + + /* truncate working copy at desired endpoint */ + wquery[qidx[iend]] = '\0'; + + /* Begin building the finished message. */ + i = msg->len; + appendPQExpBuffer(msg, libpq_gettext("LINE %d: "), loc_line); + if (beg_trunc) + appendPQExpBufferStr(msg, "..."); + + /* + * While we have the prefix in the msg buffer, compute its screen + * width. + */ + scroffset = 0; + for (; i < msg->len; i += pg_encoding_mblen(encoding, &msg->data[i])) + { + int w = pg_encoding_dsplen(encoding, &msg->data[i]); + + if (w <= 0) + w = 1; + scroffset += w; + } + + /* Finish up the LINE message line. */ + appendPQExpBufferStr(msg, &wquery[qidx[ibeg]]); + if (end_trunc) + appendPQExpBufferStr(msg, "..."); + appendPQExpBufferChar(msg, '\n'); + + /* Now emit the cursor marker line. */ + scroffset += scridx[loc] - scridx[ibeg]; + for (i = 0; i < scroffset; i++) + appendPQExpBufferChar(msg, ' '); + appendPQExpBufferChar(msg, '^'); + appendPQExpBufferChar(msg, '\n'); + } + + /* Clean up. */ + free(scridx); + free(qidx); + free(wquery); +} + + +/* + * Attempt to read a ParameterStatus message. + * This is possible in several places, so we break it out as a subroutine. + * Entry: 'S' message type and length have already been consumed. + * Exit: returns 0 if successfully consumed message. + * returns EOF if not enough data. + */ +static int +getParameterStatus(PGconn *conn) +{ + PQExpBufferData valueBuf; + + /* Get the parameter name */ + if (pqGets(&conn->workBuffer, conn)) + return EOF; + /* Get the parameter value (could be large) */ + initPQExpBuffer(&valueBuf); + if (pqGets(&valueBuf, conn)) + { + termPQExpBuffer(&valueBuf); + return EOF; + } + /* And save it */ + pqSaveParameterStatus(conn, conn->workBuffer.data, valueBuf.data); + termPQExpBuffer(&valueBuf); + return 0; +} + + +/* + * Attempt to read a Notify response message. + * This is possible in several places, so we break it out as a subroutine. + * Entry: 'A' message type and length have already been consumed. + * Exit: returns 0 if successfully consumed Notify message. + * returns EOF if not enough data. + */ +static int +getNotify(PGconn *conn) +{ + int be_pid; + char *svname; + int nmlen; + int extralen; + PGnotify *newNotify; + + if (pqGetInt(&be_pid, 4, conn)) + return EOF; + if (pqGets(&conn->workBuffer, conn)) + return EOF; + /* must save name while getting extra string */ + svname = strdup(conn->workBuffer.data); + if (!svname) + return EOF; + if (pqGets(&conn->workBuffer, conn)) + { + free(svname); + return EOF; + } + + /* + * Store the strings right after the PQnotify structure so it can all be + * freed at once. We don't use NAMEDATALEN because we don't want to tie + * this interface to a specific server name length. + */ + nmlen = strlen(svname); + extralen = strlen(conn->workBuffer.data); + newNotify = (PGnotify *) malloc(sizeof(PGnotify) + nmlen + extralen + 2); + if (newNotify) + { + newNotify->relname = (char *) newNotify + sizeof(PGnotify); + strcpy(newNotify->relname, svname); + newNotify->extra = newNotify->relname + nmlen + 1; + strcpy(newNotify->extra, conn->workBuffer.data); + newNotify->be_pid = be_pid; + newNotify->next = NULL; + if (conn->notifyTail) + conn->notifyTail->next = newNotify; + else + conn->notifyHead = newNotify; + conn->notifyTail = newNotify; + } + + free(svname); + return 0; +} + +/* + * getCopyStart - process CopyInResponse, CopyOutResponse or + * CopyBothResponse message + * + * parseInput already read the message type and length. + */ +static int +getCopyStart(PGconn *conn, ExecStatusType copytype) +{ + PGresult *result; + int nfields; + int i; + + result = PQmakeEmptyPGresult(conn, copytype); + if (!result) + goto failure; + + if (pqGetc(&conn->copy_is_binary, conn)) + goto failure; + result->binary = conn->copy_is_binary; + /* the next two bytes are the number of fields */ + if (pqGetInt(&(result->numAttributes), 2, conn)) + goto failure; + nfields = result->numAttributes; + + /* allocate space for the attribute descriptors */ + if (nfields > 0) + { + result->attDescs = (PGresAttDesc *) + pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); + if (!result->attDescs) + goto failure; + MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); + } + + for (i = 0; i < nfields; i++) + { + int format; + + if (pqGetInt(&format, 2, conn)) + goto failure; + + /* + * Since pqGetInt treats 2-byte integers as unsigned, we need to + * coerce these results to signed form. + */ + format = (int) ((int16) format); + result->attDescs[i].format = format; + } + + /* Success! */ + conn->result = result; + return 0; + +failure: + PQclear(result); + return EOF; +} + +/* + * getReadyForQuery - process ReadyForQuery message + */ +static int +getReadyForQuery(PGconn *conn) +{ + char xact_status; + + if (pqGetc(&xact_status, conn)) + return EOF; + switch (xact_status) + { + case 'I': + conn->xactStatus = PQTRANS_IDLE; + break; + case 'T': + conn->xactStatus = PQTRANS_INTRANS; + break; + case 'E': + conn->xactStatus = PQTRANS_INERROR; + break; + default: + conn->xactStatus = PQTRANS_UNKNOWN; + break; + } + + return 0; +} + +/* + * getCopyDataMessage - fetch next CopyData message, process async messages + * + * Returns length word of CopyData message (> 0), or 0 if no complete + * message available, -1 if end of copy, -2 if error. + */ +static int +getCopyDataMessage(PGconn *conn) +{ + char id; + int msgLength; + int avail; + + for (;;) + { + /* + * Do we have the next input message? To make life simpler for async + * callers, we keep returning 0 until the next message is fully + * available, even if it is not Copy Data. + */ + conn->inCursor = conn->inStart; + if (pqGetc(&id, conn)) + return 0; + if (pqGetInt(&msgLength, 4, conn)) + return 0; + if (msgLength < 4) + { + handleSyncLoss(conn, id, msgLength); + return -2; + } + avail = conn->inEnd - conn->inCursor; + if (avail < msgLength - 4) + { + /* + * Before returning, enlarge the input buffer if needed to hold + * the whole message. See notes in parseInput. + */ + if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength - 4, + conn)) + { + /* + * XXX add some better recovery code... plan is to skip over + * the message using its length, then report an error. For the + * moment, just treat this like loss of sync (which indeed it + * might be!) + */ + handleSyncLoss(conn, id, msgLength); + return -2; + } + return 0; + } + + /* + * If it's a legitimate async message type, process it. (NOTIFY + * messages are not currently possible here, but we handle them for + * completeness.) Otherwise, if it's anything except Copy Data, + * report end-of-copy. + */ + switch (id) + { + case 'A': /* NOTIFY */ + if (getNotify(conn)) + return 0; + break; + case 'N': /* NOTICE */ + if (pqGetErrorNotice3(conn, false)) + return 0; + break; + case 'S': /* ParameterStatus */ + if (getParameterStatus(conn)) + return 0; + break; + case 'd': /* Copy Data, pass it back to caller */ + return msgLength; + default: /* treat as end of copy */ + return -1; + } + + /* Drop the processed message and loop around for another */ + conn->inStart = conn->inCursor; + } +} + +/* + * PQgetCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH + * + * If successful, sets *buffer to point to a malloc'd row of data, and + * returns row length (always > 0) as result. + * Returns 0 if no row available yet (only possible if async is true), + * -1 if end of copy (consult PQgetResult), or -2 if error (consult + * PQerrorMessage). + */ +int +pqGetCopyData3(PGconn *conn, char **buffer, int async) +{ + int msgLength; + + for (;;) + { + /* + * Collect the next input message. To make life simpler for async + * callers, we keep returning 0 until the next message is fully + * available, even if it is not Copy Data. + */ + msgLength = getCopyDataMessage(conn); + if (msgLength < 0) + { + /* + * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller + * read status with PQgetResult(). The normal case is that it's + * Copy Done, but we let parseInput read that. If error, we + * expect the state was already changed. + */ + if (msgLength == -1) + conn->asyncStatus = PGASYNC_BUSY; + return msgLength; /* end-of-copy or error */ + } + if (msgLength == 0) + { + /* Don't block if async read requested */ + if (async) + return 0; + /* Need to load more data */ + if (pqWait(TRUE, FALSE, conn) || + pqReadData(conn) < 0) + return -2; + continue; + } + + /* + * Drop zero-length messages (shouldn't happen anyway). Otherwise + * pass the data back to the caller. + */ + msgLength -= 4; + if (msgLength > 0) + { + *buffer = (char *) malloc(msgLength + 1); + if (*buffer == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return -2; + } + memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength); + (*buffer)[msgLength] = '\0'; /* Add terminating null */ + + /* Mark message consumed */ + conn->inStart = conn->inCursor + msgLength; + + return msgLength; + } + + /* Empty, so drop it and loop around for another */ + conn->inStart = conn->inCursor; + } +} + +/* + * PQgetline - gets a newline-terminated string from the backend. + * + * See fe-exec.c for documentation. + */ +int +pqGetline3(PGconn *conn, char *s, int maxlen) +{ + int status; + + if (conn->sock < 0 || + conn->asyncStatus != PGASYNC_COPY_OUT || + conn->copy_is_binary) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("PQgetline: not doing text COPY OUT\n")); + *s = '\0'; + return EOF; + } + + while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0) + { + /* need to load more data */ + if (pqWait(TRUE, FALSE, conn) || + pqReadData(conn) < 0) + { + *s = '\0'; + return EOF; + } + } + + if (status < 0) + { + /* End of copy detected; gin up old-style terminator */ + strcpy(s, "\\."); + return 0; + } + + /* Add null terminator, and strip trailing \n if present */ + if (s[status - 1] == '\n') + { + s[status - 1] = '\0'; + return 0; + } + else + { + s[status] = '\0'; + return 1; + } +} + +/* + * PQgetlineAsync - gets a COPY data row without blocking. + * + * See fe-exec.c for documentation. + */ +int +pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize) +{ + int msgLength; + int avail; + + if (conn->asyncStatus != PGASYNC_COPY_OUT) + return -1; /* we are not doing a copy... */ + + /* + * Recognize the next input message. To make life simpler for async + * callers, we keep returning 0 until the next message is fully available + * even if it is not Copy Data. This should keep PQendcopy from blocking. + * (Note: unlike pqGetCopyData3, we do not change asyncStatus here.) + */ + msgLength = getCopyDataMessage(conn); + if (msgLength < 0) + return -1; /* end-of-copy or error */ + if (msgLength == 0) + return 0; /* no data yet */ + + /* + * Move data from libpq's buffer to the caller's. In the case where a + * prior call found the caller's buffer too small, we use + * conn->copy_already_done to remember how much of the row was already + * returned to the caller. + */ + conn->inCursor += conn->copy_already_done; + avail = msgLength - 4 - conn->copy_already_done; + if (avail <= bufsize) + { + /* Able to consume the whole message */ + memcpy(buffer, &conn->inBuffer[conn->inCursor], avail); + /* Mark message consumed */ + conn->inStart = conn->inCursor + avail; + /* Reset state for next time */ + conn->copy_already_done = 0; + return avail; + } + else + { + /* We must return a partial message */ + memcpy(buffer, &conn->inBuffer[conn->inCursor], bufsize); + /* The message is NOT consumed from libpq's buffer */ + conn->copy_already_done += bufsize; + return bufsize; + } +} + +/* + * PQendcopy + * + * See fe-exec.c for documentation. + */ +int +pqEndcopy3(PGconn *conn) +{ + PGresult *result; + + if (conn->asyncStatus != PGASYNC_COPY_IN && + conn->asyncStatus != PGASYNC_COPY_OUT) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("no COPY in progress\n")); + return 1; + } + + /* Send the CopyDone message if needed */ + if (conn->asyncStatus == PGASYNC_COPY_IN) + { + if (pqPutMsgStart('c', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + return 1; + + /* + * If we sent the COPY command in extended-query mode, we must issue a + * Sync as well. + */ + if (conn->queryclass != PGQUERY_SIMPLE) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + return 1; + } + } + + /* + * make sure no data is waiting to be sent, abort if we are non-blocking + * and the flush fails + */ + if (pqFlush(conn) && pqIsnonblocking(conn)) + return 1; + + /* Return to active duty */ + conn->asyncStatus = PGASYNC_BUSY; + resetPQExpBuffer(&conn->errorMessage); + + /* + * Non blocking connections may have to abort at this point. If everyone + * played the game there should be no problem, but in error scenarios the + * expected messages may not have arrived yet. (We are assuming that the + * backend's packetizing will ensure that CommandComplete arrives along + * with the CopyDone; are there corner cases where that doesn't happen?) + */ + if (pqIsnonblocking(conn) && PQisBusy(conn)) + return 1; + + /* Wait for the completion response */ + result = PQgetResult(conn); + + /* Expecting a successful result */ + if (result && result->resultStatus == PGRES_COMMAND_OK) + { + PQclear(result); + return 0; + } + + /* + * Trouble. For backwards-compatibility reasons, we issue the error + * message as if it were a notice (would be nice to get rid of this + * silliness, but too many apps probably don't handle errors from + * PQendcopy reasonably). Note that the app can still obtain the error + * status from the PGconn object. + */ + if (conn->errorMessage.len > 0) + { + /* We have to strip the trailing newline ... pain in neck... */ + char svLast = conn->errorMessage.data[conn->errorMessage.len - 1]; + + if (svLast == '\n') + conn->errorMessage.data[conn->errorMessage.len - 1] = '\0'; + pqInternalNotice(&conn->noticeHooks, "%s", conn->errorMessage.data); + conn->errorMessage.data[conn->errorMessage.len - 1] = svLast; + } + + PQclear(result); + + return 1; +} + + +/* + * PQfn - Send a function call to the POSTGRES backend. + * + * See fe-exec.c for documentation. + */ +PGresult * +pqFunctionCall3(PGconn *conn, Oid fnid, + int *result_buf, int *actual_result_len, + int result_is_int, + const PQArgBlock *args, int nargs) +{ + bool needInput = false; + ExecStatusType status = PGRES_FATAL_ERROR; + char id; + int msgLength; + int avail; + int i; + + /* PQfn already validated connection state */ + + if (pqPutMsgStart('F', false, conn) < 0 || /* function call msg */ + pqPutInt(fnid, 4, conn) < 0 || /* function id */ + pqPutInt(1, 2, conn) < 0 || /* # of format codes */ + pqPutInt(1, 2, conn) < 0 || /* format code: BINARY */ + pqPutInt(nargs, 2, conn) < 0) /* # of args */ + { + pqHandleSendFailure(conn); + return NULL; + } + + for (i = 0; i < nargs; ++i) + { /* len.int4 + contents */ + if (pqPutInt(args[i].len, 4, conn)) + { + pqHandleSendFailure(conn); + return NULL; + } + if (args[i].len == -1) + continue; /* it's NULL */ + + if (args[i].isint) + { + if (pqPutInt(args[i].u.integer, args[i].len, conn)) + { + pqHandleSendFailure(conn); + return NULL; + } + } + else + { + if (pqPutnchar((char *) args[i].u.ptr, args[i].len, conn)) + { + pqHandleSendFailure(conn); + return NULL; + } + } + } + + if (pqPutInt(1, 2, conn) < 0) /* result format code: BINARY */ + { + pqHandleSendFailure(conn); + return NULL; + } + + if (pqPutMsgEnd(conn) < 0 || + pqFlush(conn)) + { + pqHandleSendFailure(conn); + return NULL; + } + + for (;;) + { + if (needInput) + { + /* Wait for some data to arrive (or for the channel to close) */ + if (pqWait(TRUE, FALSE, conn) || + pqReadData(conn) < 0) + break; + } + + /* + * Scan the message. If we run out of data, loop around to try again. + */ + needInput = true; + + conn->inCursor = conn->inStart; + if (pqGetc(&id, conn)) + continue; + if (pqGetInt(&msgLength, 4, conn)) + continue; + + /* + * Try to validate message type/length here. A length less than 4 is + * definitely broken. Large lengths should only be believed for a few + * message types. + */ + if (msgLength < 4) + { + handleSyncLoss(conn, id, msgLength); + break; + } + if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id)) + { + handleSyncLoss(conn, id, msgLength); + break; + } + + /* + * Can't process if message body isn't all here yet. + */ + msgLength -= 4; + avail = conn->inEnd - conn->inCursor; + if (avail < msgLength) + { + /* + * Before looping, enlarge the input buffer if needed to hold the + * whole message. See notes in parseInput. + */ + if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength, + conn)) + { + /* + * XXX add some better recovery code... plan is to skip over + * the message using its length, then report an error. For the + * moment, just treat this like loss of sync (which indeed it + * might be!) + */ + handleSyncLoss(conn, id, msgLength); + break; + } + continue; + } + + /* + * We should see V or E response to the command, but might get N + * and/or A notices first. We also need to swallow the final Z before + * returning. + */ + switch (id) + { + case 'V': /* function result */ + if (pqGetInt(actual_result_len, 4, conn)) + continue; + if (*actual_result_len != -1) + { + if (result_is_int) + { + if (pqGetInt(result_buf, *actual_result_len, conn)) + continue; + } + else + { + if (pqGetnchar((char *) result_buf, + *actual_result_len, + conn)) + continue; + } + } + /* correctly finished function result message */ + status = PGRES_COMMAND_OK; + break; + case 'E': /* error return */ + if (pqGetErrorNotice3(conn, true)) + continue; + status = PGRES_FATAL_ERROR; + break; + case 'A': /* notify message */ + /* handle notify and go back to processing return values */ + if (getNotify(conn)) + continue; + break; + case 'N': /* notice */ + /* handle notice and go back to processing return values */ + if (pqGetErrorNotice3(conn, false)) + continue; + break; + case 'Z': /* backend is ready for new query */ + if (getReadyForQuery(conn)) + continue; + /* consume the message and exit */ + conn->inStart += 5 + msgLength; + /* if we saved a result object (probably an error), use it */ + if (conn->result) + return pqPrepareAsyncResult(conn); + return PQmakeEmptyPGresult(conn, status); + case 'S': /* parameter status */ + if (getParameterStatus(conn)) + continue; + break; + default: + /* The backend violates the protocol. */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("protocol error: id=0x%x\n"), + id); + pqSaveErrorResult(conn); + /* trust the specified message length as what to skip */ + conn->inStart += 5 + msgLength; + return pqPrepareAsyncResult(conn); + } + /* Completed this message, keep going */ + /* trust the specified message length as what to skip */ + conn->inStart += 5 + msgLength; + needInput = false; + } + + /* + * We fall out of the loop only upon failing to read data. + * conn->errorMessage has been set by pqWait or pqReadData. We want to + * append it to any already-received error message. + */ + pqSaveErrorResult(conn); + return pqPrepareAsyncResult(conn); +} + + +/* + * Construct startup packet + * + * Returns a malloc'd packet buffer, or NULL if out of memory + */ +char * +pqBuildStartupPacket3(PGconn *conn, int *packetlen, + const PQEnvironmentOption *options) +{ + char *startpacket; + + *packetlen = build_startup_packet(conn, NULL, options); + startpacket = (char *) malloc(*packetlen); + if (!startpacket) + return NULL; + *packetlen = build_startup_packet(conn, startpacket, options); + return startpacket; +} + +/* + * Build a startup packet given a filled-in PGconn structure. + * + * We need to figure out how much space is needed, then fill it in. + * To avoid duplicate logic, this routine is called twice: the first time + * (with packet == NULL) just counts the space needed, the second time + * (with packet == allocated space) fills it in. Return value is the number + * of bytes used. + */ +static int +build_startup_packet(const PGconn *conn, char *packet, + const PQEnvironmentOption *options) +{ + int packet_len = 0; + const PQEnvironmentOption *next_eo; + const char *val; + + /* Protocol version comes first. */ + if (packet) + { + ProtocolVersion pv = htonl(conn->pversion); + + memcpy(packet + packet_len, &pv, sizeof(ProtocolVersion)); + } + packet_len += sizeof(ProtocolVersion); + + /* Add user name, database name, options */ + +#define ADD_STARTUP_OPTION(optname, optval) \ + do { \ + if (packet) \ + strcpy(packet + packet_len, optname); \ + packet_len += strlen(optname) + 1; \ + if (packet) \ + strcpy(packet + packet_len, optval); \ + packet_len += strlen(optval) + 1; \ + } while(0) + + if (conn->pguser && conn->pguser[0]) + ADD_STARTUP_OPTION("user", conn->pguser); + if (conn->dbName && conn->dbName[0]) + ADD_STARTUP_OPTION("database", conn->dbName); + if (conn->replication && conn->replication[0]) + ADD_STARTUP_OPTION("replication", conn->replication); + if (conn->pgoptions && conn->pgoptions[0]) + ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->send_appname) + { + /* Use appname if present, otherwise use fallback */ + val = conn->appname ? conn->appname : conn->fbappname; + if (val && val[0]) + ADD_STARTUP_OPTION("application_name", val); + } + + if (conn->client_encoding_initial && conn->client_encoding_initial[0]) + ADD_STARTUP_OPTION("client_encoding", conn->client_encoding_initial); + + /* Add any environment-driven GUC settings needed */ + for (next_eo = options; next_eo->envName; next_eo++) + { + if ((val = getenv(next_eo->envName)) != NULL) + { + if (pg_strcasecmp(val, "default") != 0) + ADD_STARTUP_OPTION(next_eo->pgName, val); + } + } + + /* Add trailing terminator */ + if (packet) + packet[packet_len] = '\0'; + packet_len++; + + return packet_len; +} diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..23cf729 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQsetRowProcessor 161 +PQsetRowProcessorErrMsg 162 +PQskipRemainingResults 163 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 27a9805..4605e49 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2693,6 +2693,9 @@ makeEmptyPGconn(void) conn->wait_ssl_try = false;#endif + /* set default row processor */ + PQsetRowProcessor(conn, NULL, NULL); + /* * We try to send at least 8K at a time, which is the usual size of pipe * buffers on Unix systems. Thatway, when we are sending a large amount @@ -2711,8 +2714,13 @@ makeEmptyPGconn(void) initPQExpBuffer(&conn->errorMessage); initPQExpBuffer(&conn->workBuffer); + /* set up initial row buffer */ + conn->rowBufLen = 32; + conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue)); + if (conn->inBuffer == NULL || conn->outBuffer == NULL || + conn->rowBuf == NULL || PQExpBufferBroken(&conn->errorMessage) || PQExpBufferBroken(&conn->workBuffer)) { @@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn) free(conn->inBuffer); if (conn->outBuffer) free(conn->outBuffer); + if (conn->rowBuf) + free(conn->rowBuf); termPQExpBuffer(&conn->errorMessage); termPQExpBuffer(&conn->workBuffer); @@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..d7f3ae9 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); +static int pqAddRow(PGresult *res, void *param, PGrowValue *columns);/* ---------------- @@ -160,6 +161,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->rowProcessorErrMsg = NULL; if (conn) { @@ -701,7 +703,6 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result =NULL; - conn->curTuple = NULL;}/* @@ -756,7 +757,6 @@ pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handingover ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -828,6 +828,73 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/* + * PQsetRowProcessor + * Set function that copies column data out from network buffer. + */ +void +PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) +{ + conn->rowProcessor = (func ? func : pqAddRow); + conn->rowProcessorParam = param; +} + +/* + * PQsetRowProcessorErrMsg + * Set the error message pass back to the caller of RowProcessor. + * + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ +void +PQsetRowProcessorErrMsg(PGresult *res, char *msg) +{ + if (msg) + res->rowProcessorErrMsg = pqResultStrdup(res, msg); + else + res->rowProcessorErrMsg = NULL; +} + +/* + * pqAddRow + * add a row to the PGresult structure, growing it if necessary + * Returns TRUE if OK, FALSE if not enough memory to add the row. + */ +static int +pqAddRow(PGresult *res, void *param, PGrowValue *columns) +{ + PGresAttValue *tup; + int nfields = res->numAttributes; + int i; + + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) + return FALSE; + + for (i = 0 ; i < nfields ; i++) + { + tup[i].len = columns[i].len; + if (tup[i].len == NULL_LEN) + { + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary); + if (tup[i].value == NULL) + return FALSE; + + memcpy(tup[i].value, columns[i].value, tup[i].len); + /* We have to terminate this ourselves */ + tup[i].value[tup[i].len] = '\0'; + } + } + + return pqAddTuple(res, tup); +} + +/* * pqAddTuple * add a row pointer to the PGresult structure, growing it if necessary * Returns TRUE if OK, FALSEif not enough memory to add the row @@ -1223,7 +1290,6 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result= NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; @@ -1832,6 +1898,22 @@ PQexecFinish(PGconn *conn)}/* + * Exaust remaining Data Rows in curret conn. + */ +PGresult * +PQskipRemainingResults(PGconn *conn) +{ + pqClearAsyncResult(conn); + + /* conn->result is set NULL in pqClearAsyncResult() */ + pqSaveErrorResult(conn); + + /* Skip over remaining Data Row messages */ + PQgetResult(conn); +} + + +/* * PQdescribePrepared * Obtain information about a previously prepared statement * diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index ce0eac3..d11cb3c 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/* + * pqGetnchar: + * skip len bytes in input buffer. + */ +int +pqSkipnchar(size_t len, PGconn *conn) +{ + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + conn->inCursor += len; + + if (conn->Pfdebug) + fprintf(conn->Pfdebug, "From backend (%lu skipped)\n", + (unsigned long) len); + + return 0; +} + +/* * pqPutnchar: * write exactly len bytes to the current message */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..ae4d7b0 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, FALSE)) return; + /* getAnotherTuple moves inStart itself */ + continue; } else { @@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, TRUE)) return; + /* getAnotherTuple moves inStart itself */ + continue; } else { @@ -703,52 +707,51 @@ failure:/* * parseInput subroutine to read a 'B' or 'D' (row data) message. - * We add another tuple to the existing PGresult structure. + * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, bool binary){ PGresult *result= conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGrowValue *rowbuf; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64];/* used unless it doesn't fit */ char *bitmap = std_bitmap; int i; + int rp; size_t nbytes; /* the number of bytes in bitmap */ char bmap; /* One byte of the bitmap */ int bitmap_index; /* Its index */ int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value*/ + /* resize row buffer if needed */ + if (nfields > conn->rowBufLen) + { + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) + goto rowProcessError; + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + else + { + rowbuf = conn->rowBuf; + } + result->binary = binary; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + if (binary) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) - { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; - } + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,7 +760,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto rowProcessError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -771,34 +774,29 @@ getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { if (!(bmap& 0200)) - { - /* if the field value is absent, make it a null string */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - } + vlen = NULL_LEN; + else if (pqGetInt(&vlen, 4, conn)) + goto EOFexit; else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } + + /* + * rowbuf[i].value always points to the next address of the + * length field even if the value is NULL, to allow safe + * size estimates and data copy. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + rowbuf[i].len = vlen; + + /* Skip the value */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + goto EOFexit; + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) @@ -811,33 +809,56 @@ getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; - if (bitmap != std_bitmap) free(bitmap); - return 0; + bitmap = NULL; + + /* tag the row as parsed */ + conn->inStart = conn->inCursor; + + /* Pass the completed row values to rowProcessor */ + rp= conn->rowProcessor(result, conn->rowProcessorParam, rowbuf); + if (rp == 1) + return 0; + else if (rp == 2 && pqIsnonblocking(conn)) + /* processor requested early exit */ + return EOF; + else if (rp != 0) + PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value from row processor\n")); + +rowProcessError: -outOfMemory: /* Replace partially constructed result with an error result */ - /* - * we do NOT use pqSaveErrorResult() here, because of the likelihood that - * there's not enough memory to concatenate messages... - */ - pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + if (result->rowProcessorErrMsg) + { + printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg); + pqSaveErrorResult(conn); + } + else + { + /* + * we do NOT use pqSaveErrorResult() here, because of the likelihood that + * there's not enough memory to concatenate messages... + */ + pqClearAsyncResult(conn); + resetPQExpBuffer(&conn->errorMessage); - /* - * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can - * do to recover... - */ - conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); + /* + * If error message is passed from RowProcessor, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory for query result\n")); + + /* + * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can + * do to recover... + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); + } conn->asyncStatus = PGASYNC_READY; + /* Discard the failed message --- good idea? */ conn->inStart = conn->inEnd; diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..0260ba6 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) return; + + /* getAnotherTuple() moves inStart itself */ + continue; } else if (conn->result != NULL && conn->result->resultStatus == PGRES_FATAL_ERROR) @@ -613,33 +616,22 @@ failure:/* * parseInput subroutine to read a 'D' (row data) message. - * We add another tuple to the existing PGresult structure. + * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, int msgLength){ PGresult *result= conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGrowValue *rowbuf; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; - - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; + int rp; /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) @@ -652,52 +644,88 @@ getAnotherTuple(PGconn *conn, int msgLength) libpq_gettext("unexpected field countin \"D\" message\n")); pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it*/ - conn->inCursor = conn->inStart + 5 + msgLength; + conn->inStart += 5 + msgLength; return 0; } + /* resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) + { + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) + { + goto outOfMemory1; + } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + /* Scan the fields */ for (i = 0; i < nfields; i++) { /* get the value length */ if (pqGetInt(&vlen,4, conn)) - return EOF; + goto protocolError; if (vlen == -1) - { - /* null field */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - continue; - } - if (vlen < 0) + vlen = NULL_LEN; + else if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; + /* + * rowbuf[i].value always points to the next address of the + * length field even if the value is NULL, to allow safe + * size estimates and data copy. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + rowbuf[i].len = vlen; + + /* Skip to the next length field */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + goto protocolError; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* tag the row as parsed, check if correctly */ + conn->inStart += 5 + msgLength; + if (conn->inCursor != conn->inStart) + goto protocolError; + /* Pass the completed row values to rowProcessor */ + rp = conn->rowProcessor(result, conn->rowProcessorParam, rowbuf); + if (rp == 1) + { + /* everything is good */ + return 0; + } + if (rp == 2 && pqIsnonblocking(conn)) + { + /* processor requested early exit */ + return EOF; + } + + /* there was some problem */ + if (rp == 0) + { + if (result->rowProcessorErrMsg == NULL) + goto outOfMemory2; + + /* use supplied error message */ + printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg); + } + else + { + /* row processor messed up */ + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("invalid return value from row processor\n")); + } + pqSaveErrorResult(conn); return 0; -outOfMemory: +outOfMemory1: + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; +outOfMemory2: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. @@ -706,9 +734,14 @@ outOfMemory: printfPQExpBuffer(&conn->errorMessage, libpq_gettext("out of memoryfor query result\n")); pqSaveErrorResult(conn); + return 0; +protocolError: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("invalid row contents\n")); + pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; + conn->inStart += 5 + msgLength; return 0;} diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..2d913c4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -149,6 +149,17 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGrowValue points a column value of in network buffer. + * Value is a string without null termination and length len. + * NULL is represented as len < 0, value points then to place + * where value would have been. + */ +typedef struct pgRowValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ +} PGrowValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -367,6 +378,8 @@ extern PGresult *PQexecPrepared(PGconn *conn, const int *paramFormats, intresultFormat); +extern PGresult *PQskipRemainingResults(PGconn *conn); +/* Interface for multiple-result or asynchronous queries */extern int PQsendQuery(PGconn *conn, const char *query);externint PQsendQueryParams(PGconn *conn, @@ -416,6 +429,37 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative row processor. + * + * Columns array will contain PQnfields() entries, each one + * pointing to particular column data in network buffer. + * This function is supposed to copy data out from there + * and store somewhere. NULL is signified with len<0. + * + * This function must return 1 for success and must return 0 for + * failure and may set error message by PQsetRowProcessorErrMsg. It + * is assumed by caller as out of memory when the error message is not + * set on failure. This function is assumed not to throw any + * exception. + */ +typedef int (*PQrowProcessor)(PGresult *res, void *param, + PGrowValue *columns); + +/* + * Set alternative row data processor for PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it for rows one by one. + * + * func is row processor function. See the typedef RowProcessor. + * + * rowProcessorParam is the contextual variable that passed to + * RowProcessor. + */ +extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, + void *rowProcessorParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +498,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void PQsetRowProcessorErrMsg(PGresult *res, char *msg);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 987311e..1fc5aab 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -209,6 +209,9 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + /* temp etorage for message from row processor callback */ + char *rowProcessorErrMsg;};/* PGAsyncStatusType defines the state of the query-execution state machine */ @@ -398,7 +401,6 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */#ifdef USE_SSL bool allow_ssl_try; /* Allowedto try SSL negotiation */ @@ -443,6 +445,14 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* + * Read column data from network buffer. + */ + PQrowProcessor rowProcessor;/* Function pointer */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */ + PGrowValue *rowBuf; /* Buffer for passing values to rowProcessor */ + int rowBufLen; /* Number of columns allocated in rowBuf */};/* PGcancel stores all data necessary tocancel a connection. A copy of this @@ -560,6 +570,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);extern int pqGets_append(PQExpBuffer buf, PGconn*conn);extern int pqPuts(const char *s, PGconn *conn);extern int pqGetnchar(char *s, size_t len, PGconn *conn); +extern int pqSkipnchar(size_t len, PGconn *conn);extern int pqPutnchar(const char *s, size_t len, PGconn *conn);externint pqGetInt(int *result, size_t bytes, PGconn *conn);extern int pqPutInt(int value, size_t bytes, PGconn*conn); diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..9cb6d65 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,274 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-altrowprocessor"> + <title>Alternative row processor</title> + + <indexterm zone="libpq-altrowprocessor"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, rows are stored into <type>PQresult</type> + until full resultset is received. Then such completely-filled + <type>PQresult</type> is passed to user. This behaviour can be + changed by registering alternative row processor function, + that will see each row data as soon as it is received + from network. It has the option of processing the data + immediately, or storing it into custom container. + </para> + + <para> + Note - as row processor sees rows as they arrive, it cannot know + whether the SQL statement actually finishes successfully on server + or not. So some care must be taken to get proper + transactionality. + </para> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessor"> + <term> + <function>PQsetRowProcessor</function> + <indexterm> + <primary>PQsetRowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to process each row. +<synopsis> +void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the row processor function. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default processor. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqrowprocessor"> + <term> + <type>PQrowProcessor</type> + <indexterm> + <primary>PQrowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the row processor callback function. +<synopsis> +int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns); + +typedef struct +{ + int len; /* length in bytes of the value, -1 if NULL */ + char *value; /* actual value, without null termination */ +} PGrowValue; +</synopsis> + </para> + + <para> + The <parameter>columns</parameter> array will have PQnfields() + elements, each one pointing to column value in network buffer. + The <parameter>len</parameter> field will contain number of + bytes in value. If the field value is NULL then + <parameter>len</parameter> will be -1 and value will point + to position where the value would have been in buffer. + This allows estimating row size by pointer arithmetic. + </para> + + <para> + This function must process or copy row values away from network + buffer before it returns, as next row might overwrite them. + </para> + + <para> + This function must return 1 for success, and 0 for failure. + On failure this function should set the error message + with <function>PGsetRowProcessorErrMsg</function> if the cause + is other than out of memory. + When non-blocking API is in use, it can also return 2 + for early exit from <function>PQisBusy</function> function. + The supplied <parameter>res</parameter> and <parameter>columns</parameter> + values will stay valid so row can be processed outside of callback. + Caller is resposible for tracking whether the <parameter>PQisBusy</parameter> + returned early from callback or for other reasons. + Usually this should happen via setting cached values to NULL + before calling <function>PQisBusy</function>. + </para> + + <para> + The function is allowed to exit via exception (setjmp/longjmp). + The connection and row are guaranteed to be in valid state. + The connection can later be closed via <function>PQfinish</function>. + Processing can also be continued without closing the connection, + call <function>getResult</function> on syncronous mode, + <function>PQisBusy</function> on asynchronous connection. Then + processing will continue with new row, previous row that got + exception will be skipped. Or you can discard all remaing rows + by calling <function>PQskipRemainingResults</function> without + closing connection. + </para> + + <variablelist> + <varlistentry> + + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>param</parameter></term> + <listitem> + <para> + Extra parameter that was given to <function>PQsetRowProcessor</function>. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>columns</parameter></term> + <listitem> + <para> + Column values of the row to process. Column values + are located in network buffer, the processor must + copy them out from there. + </para> + <para> + Column values are not null-terminated, so processor cannot + use C string functions on them directly. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqskipremaingresults"> + <term> + <function>PQskipRemainigResults</function> + <indexterm> + <primary>PQskipRemainigResults</primary> + </indexterm> + </term> + <listitem> + <para> + Discard all the remaining row data + after <function>PQexec</function> + or <function>PQgetResult</function> exits by the exception raised + in <type>RowProcessor</type> without closing connection. + </para> + <para> + Do not call this function when the functions above return normally. +<synopsis> +void PQskipRemainingResults(PGconn *conn) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessorerrmsg"> + <term> + <function>PQsetRowProcessorErrMsg</function> + <indexterm> + <primary>PQsetRowProcessorErrMsg</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>PQrowProcessor</type>. If this message is not set, the + caller assumes the error to be out of memory. +<synopsis> +void PQsetRowProcessorErrMsg(PGresult *res, char *msg) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>PQrowProcessor</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>msg</parameter></term> + <listitem> + <para> + Error message. This will be copied internally so there is + no need to care of the scope. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it will be overwritten. Set NULL to cancel the the custom + message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title> diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..b1c171a 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,24 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char** valbuf; + int *valbuflen; + char **cstrs; + bool error_occurred; + bool nummismatch; + ErrorData *edata; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static int storeHandler(PGresult *res, void *param, PGrowValue *columns); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +755,249 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); - else + PG_TRY(); { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; + if (!is_async) + res = PQexec(conn, sql); + else + res = PQgetResult(conn); + } + PG_CATCH(); + { + /* Skip remaining results when storeHandler raises exception. */ + PQskipRemainingResults(conn); + storeinfo.error_occurred = TRUE; } + PG_END_TRY(); - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + int i; - Assert(rsinfo->returnMode == SFRM_Materialize); + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } - PG_TRY(); + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->edata = NULL; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->valbuf = NULL; + sinfo->valbuflen = NULL; + + /* Preallocate memory of same size with c string array for values. */ + sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*)); + if (sinfo->valbuf) + sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int)); + if (sinfo->valbuflen) + sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*)); + + if (sinfo->cstrs == NULL) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + if (sinfo->valbuf) + free(sinfo->valbuf); + if (sinfo->valbuflen) + free(sinfo->valbuflen); - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbuflen[i] = -1; + } - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + if (sinfo->valbuf) + { + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + free(sinfo->valbuf[i]); } + free(sinfo->valbuf); + sinfo->valbuf = NULL; + } - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); + if (sinfo->valbuflen) + { + free(sinfo->valbuflen); + sinfo->valbuflen = NULL; + } - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->cstrs) + { + free(sinfo->cstrs); + sinfo->cstrs = NULL; + } - values = (char **) palloc(nfields * sizeof(char *)); + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) - { - HeapTuple tuple; +static int +storeHandler(PGresult *res, void *param, PGrowValue *columns) +{ + storeInfo *sinfo = (storeInfo *)param; + HeapTuple tuple; + int fields = PQnfields(res); + int i; + char **cstrs = sinfo->cstrs; - if (!is_sql_cmd) - { - int i; + if (sinfo->error_occurred) + return FALSE; - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); + + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + return FALSE; + } + + /* + * value input functions assumes that the input string is + * terminated by zero. We should make the values to be so. + */ + for(i = 0 ; i < fields ; i++) + { + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + char *tmp = sinfo->valbuf[i]; + int tmplen = sinfo->valbuflen[i]; - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); + /* + * Divide calls to malloc and realloc so that things will + * go fine even on the systems of which realloc() does not + * accept NULL as old memory block. + * + * Also try to (re)allocate in bigger steps to + * avoid flood of allocations on weird data. + */ + if (tmp == NULL) + { + tmplen = len + 1; + if (tmplen < 64) + tmplen = 64; + tmp = (char *)malloc(tmplen); + } + else if (tmplen < len + 1) + { + if (len + 1 > tmplen * 2) + tmplen = len + 1; + else + tmplen = tmplen * 2; + tmp = (char *)realloc(tmp, tmplen); } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * sinfo->valbuf[n] will be freed in finishStoreInfo() + * when realloc returns NULL. + */ + if (tmp == NULL) + return FALSE; - PQclear(res); - } - PG_CATCH(); - { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + sinfo->valbuf[i] = tmp; + sinfo->valbuflen[i] = tmplen; + + cstrs[i] = sinfo->valbuf[i]; + memcpy(cstrs[i], columns[i].value, len); + cstrs[i][len] = '\0'; + } } - PG_END_TRY(); + + /* + * These functions may throw exception. It will be caught in + * dblink_record_internal() + */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); + + return TRUE;}/* diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml index de95ea8..9cb6d65 100644 --- b/doc/src/sgml/libpq.sgml +++ a/doc/src/sgml/libpq.sgml @@ -7352,6 +7352,14 @@ typedef struct On failure this function should set the error message with <function>PGsetRowProcessorErrMsg</function>if the cause is other than out of memory. + When non-blocking API is in use, it can also return 2 + for early exit from <function>PQisBusy</function> function. + The supplied <parameter>res</parameter> and <parameter>columns</parameter> + values will stay valid so row can be processed outside of callback. + Caller is resposible for tracking whether the <parameter>PQisBusy</parameter> + returned early from callback or for other reasons. + Usually this should happen via setting cached values to NULL + before calling <function>PQisBusy</function>. </para> <para> diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c index 7498580..ae4d7b0 100644 --- b/src/interfaces/libpq/fe-protocol2.c +++ a/src/interfaces/libpq/fe-protocol2.c @@ -820,6 +820,9 @@ getAnotherTuple(PGconn *conn, bool binary) rp= conn->rowProcessor(result, conn->rowProcessorParam,rowbuf); if (rp == 1) return 0; + else if (rp == 2 && pqIsnonblocking(conn)) + /* processor requested early exit */ + return EOF; else if (rp != 0) PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value fromrow processor\n")); diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c index a67e3ac..0260ba6 100644 --- b/src/interfaces/libpq/fe-protocol3.c +++ a/src/interfaces/libpq/fe-protocol3.c @@ -697,6 +697,11 @@ getAnotherTuple(PGconn *conn, int msgLength) /* everything is good */ return 0; } + if (rp == 2 && pqIsnonblocking(conn)) + { + /* processor requested early exit */ + return EOF; + } /* there was some problem */ if (rp == 0)
pgsql-hackers by date: