Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: Speed dblink using alternate libpq tuple storage |
Date | |
Msg-id | 20120228.170444.122386056.horiguchi.kyotaro@oss.ntt.co.jp Whole thread Raw |
In response to | Re: Speed dblink using alternate libpq tuple storage (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>) |
Responses |
Re: Speed dblink using alternate libpq tuple storage
|
List | pgsql-hackers |
This is the new version of the patch. It is not rebased to the HEAD because of a build error. > It's better to restore old two-path error handling. I restorerd "OOM and save result" route. But it seems needed to get back any amount of memory on REAL OOM as the comment in original code says. So I restored the meaning of rp == 0 && errMsg == NULL as REAL OOM which is to throw the async result away and the result will be preserved if errMsg is not NULL. `unknown error' has been removed. As the result, if row processor returns 0 the parser skips to the end of rows and returns the working result or an error result according to whether errMsg is set or not in the row processor. > I don't think that should be required. Just use a dummy msg. Considering the above, pqAddRow is also restored to leave errMsg NULL on OOM. > There is still one EOF in v3 getAnotherTuple() - > pqGetInt(tupnfields), please turn that one also to > protocolerror. pqGetInt() returns EOF only when it wants additional reading from network if the parameter `bytes' is appropreate. Non-zero return from it seems should be handled as EOF, not a protocol error. The one point I had modified bugilly is also restored. The so-called 'protocol error' has been vanished eventually. > Instead use ("%s", errmsg) as argument there. libpq code > is noisy enough, no need to add more. done Is there someting left undone? By the way, I noticed that dblink always says that the current connection is 'unnamed' in messages the errors in dblink_record_internal@dblink. I could see that dblink_record_internal defines the local variable conname = NULL and pass it to dblink_res_error to display the error message. But no assignment on it in the function. It seemed properly shown when I added the code to set conname from PG_GETARG_TEXT_PP(0) if available, in other words do that just after DBLINK_GET_CONN/DBLINK_GET_NAMED_CONN's. It seems the dblink's manner... This is not included in this patch. Furthurmore dblink_res_error looks only into returned PGresult to display the error and always says only `Error occurred on dblink connection..: could not execute query'.. Is it right to consider this as follows? - dblink is wrong in error handling. A client of libpq should see PGconn by PQerrorMessage() if (or regardless of whether?) PGresult says nothing about error. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..239edb8 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,7 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQsetRowProcessor 161 +PQgetRowProcessor 162 +PQresultSetErrMsg 163 +PQskipResult 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 27a9805..4605e49 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2693,6 +2693,9 @@ makeEmptyPGconn(void) conn->wait_ssl_try = false;#endif + /* set default row processor */ + PQsetRowProcessor(conn, NULL, NULL); + /* * We try to send at least 8K at a time, which is the usual size of pipe * buffers on Unix systems. Thatway, when we are sending a large amount @@ -2711,8 +2714,13 @@ makeEmptyPGconn(void) initPQExpBuffer(&conn->errorMessage); initPQExpBuffer(&conn->workBuffer); + /* set up initial row buffer */ + conn->rowBufLen = 32; + conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue)); + if (conn->inBuffer == NULL || conn->outBuffer == NULL || + conn->rowBuf == NULL || PQExpBufferBroken(&conn->errorMessage) || PQExpBufferBroken(&conn->workBuffer)) { @@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn) free(conn->inBuffer); if (conn->outBuffer) free(conn->outBuffer); + if (conn->rowBuf) + free(conn->rowBuf); termPQExpBuffer(&conn->errorMessage); termPQExpBuffer(&conn->workBuffer); @@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..ce58778 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); +static int pqAddRow(PGresult *res, PGrowValue *columns, void *param);/* ---------------- @@ -701,7 +702,6 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result =NULL; - conn->curTuple = NULL;}/* @@ -756,7 +756,6 @@ pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handingover ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -828,6 +827,87 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/* + * PQsetRowProcessor + * Set function that copies column data out from network buffer. + */ +void +PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) +{ + conn->rowProcessor = (func ? func : pqAddRow); + conn->rowProcessorParam = param; +} + +/* + * PQgetRowProcessor + * Get current row processor of conn. set pointer to current parameter for + * row processor to param if not NULL. + */ +PQrowProcessor +PQgetRowProcessor(PGconn *conn, void **param) +{ + if (param) + *param = conn->rowProcessorParam; + + return conn->rowProcessor; +} + +/* + * PQresultSetErrMsg + * Set the error message to PGresult. + * + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ +void +PQresultSetErrMsg(PGresult *res, const char *msg) +{ + if (msg) + res->errMsg = pqResultStrdup(res, msg); + else + res->errMsg = NULL; +} + +/* + * pqAddRow + * add a row to the PGresult structure, growing it if necessary + * Returns TRUE if OK, FALSE if not enough memory to add the row. + */ +static int +pqAddRow(PGresult *res, PGrowValue *columns, void *param) +{ + PGresAttValue *tup; + int nfields = res->numAttributes; + int i; + + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) + return FALSE; + + for (i = 0 ; i < nfields ; i++) + { + tup[i].len = columns[i].len; + if (tup[i].len == NULL_LEN) + { + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary); + if (tup[i].value == NULL) + return FALSE; + + memcpy(tup[i].value, columns[i].value, tup[i].len); + /* We have to terminate this ourselves */ + tup[i].value[tup[i].len] = '\0'; + } + } + + return pqAddTuple(res, tup); +} + +/* * pqAddTuple * add a row pointer to the PGresult structure, growing it if necessary * Returns TRUE if OK, FALSEif not enough memory to add the row @@ -1223,7 +1303,6 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result= NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; @@ -1831,6 +1910,55 @@ PQexecFinish(PGconn *conn) return lastResult;} + +/* + * Do-nothing row processor for PQskipResult + */ +static int +dummyRowProcessor(PGresult *res, PGrowValue *columns, void *param) +{ + return 1; +} + +/* + * Exaust remaining Data Rows in curret conn. + * + * Exaust current result if skipAll is false and all succeeding results if + * true. + */ +int +PQskipResult(PGconn *conn, int skipAll) +{ + PQrowProcessor savedRowProcessor; + void * savedRowProcParam; + PGresult *res; + int ret = 0; + + /* save the current row processor settings and set dummy processor */ + savedRowProcessor = PQgetRowProcessor(conn, &savedRowProcParam); + PQsetRowProcessor(conn, dummyRowProcessor, NULL); + + /* + * Throw away the remaining rows in current result, or all succeeding + * results if skipAll is not FALSE. + */ + if (skipAll) + { + while ((res = PQgetResult(conn)) != NULL) + PQclear(res); + } + else if ((res = PQgetResult(conn)) != NULL) + { + PQclear(res); + ret = 1; + } + + PQsetRowProcessor(conn, savedRowProcessor, savedRowProcParam); + + return ret; +} + +/* * PQdescribePrepared * Obtain information about a previously prepared statement diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index ce0eac3..d11cb3c 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/* + * pqGetnchar: + * skip len bytes in input buffer. + */ +int +pqSkipnchar(size_t len, PGconn *conn) +{ + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + conn->inCursor += len; + + if (conn->Pfdebug) + fprintf(conn->Pfdebug, "From backend (%lu skipped)\n", + (unsigned long) len); + + return 0; +} + +/* * pqPutnchar: * write exactly len bytes to the current message */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..36773cb 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, FALSE)) return; + /* getAnotherTuple moves inStart itself */ + continue; } else { @@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, TRUE)) return; + /* getAnotherTuple moves inStart itself */ + continue; } else { @@ -703,52 +707,55 @@ failure:/* * parseInput subroutine to read a 'B' or 'D' (row data) message. - * We add another tuple to the existing PGresult structure. + * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, bool binary){ PGresult *result= conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGrowValue *rowbuf; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64];/* used unless it doesn't fit */ char *bitmap = std_bitmap; int i; + int rp; size_t nbytes; /* the number of bytes in bitmap */ char bmap; /* One byte of the bitmap */ int bitmap_index; /* Its index */ int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value*/ + char *errmsg = libpq_gettext("unknown error\n"); - result->binary = binary; - - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + /* resize row buffer if needed */ + if (nfields > conn->rowBufLen) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; + errmsg = libpq_gettext("out of memory for query result\n"); + goto error_clearresult; } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + else + { + rowbuf = conn->rowBuf; + } + + result->binary = binary; + + if (binary) + { + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,11 +764,15 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if(!bitmap) - goto outOfMemory; + { + errmsg = libpq_gettext("out of memory for query result\n"); + goto error_clearresult; + } } if (pqGetnchar(bitmap, nbytes, conn)) - goto EOFexit; + goto error_clearresult; + /* Scan the fields */ bitmap_index = 0; @@ -771,34 +782,29 @@ getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { if (!(bmap& 0200)) - { - /* if the field value is absent, make it a null string */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - } + vlen = NULL_LEN; + else if (pqGetInt(&vlen, 4, conn)) + goto EOFexit; else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } + + /* + * rowbuf[i].value always points to the next address of the + * length field even if the value is NULL, to allow safe + * size estimates and data copy. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + rowbuf[i].len = vlen; + + /* Skip the value */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + goto EOFexit; + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) @@ -811,33 +817,64 @@ getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; - if (bitmap != std_bitmap) free(bitmap); - return 0; + bitmap = NULL; -outOfMemory: - /* Replace partially constructed result with an error result */ + /* tag the row as parsed */ + conn->inStart = conn->inCursor; + + /* Pass the completed row values to rowProcessor */ + rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam); + if (rp == 1) + return 0; + else if (rp == 2 && pqIsnonblocking(conn)) + /* processor requested early exit */ + return EOF; + else if (rp == 0) + { + errmsg = result->errMsg; + result->errMsg = NULL; + if (errmsg == NULL) + { + /* If errmsg == NULL, we assume that the row processor + * notices out of memory. We should immediately free any + * space to go forward. */ + errmsg = "out of memory"; + goto error_clearresult; + } + /* + * We assume that some ancestor which has a relation with the + * row processor wants the result built halfway when row + * processor sets any errMsg for rp == 0. + */ + goto error_saveresult; + } + errmsg = libpq_gettext("invalid return value from row processor\n"); + /* FALL THROUGH */ +error_clearresult: /* * we do NOT use pqSaveErrorResult() here, because of the likelihood that * there's notenough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); +error_saveresult: + /* + * If error message is passed from RowProcessor, set it into + * PGconn, assume out of memory if not. + */ + printfPQExpBuffer(&conn->errorMessage, "%s", errmsg); + /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can * do to recover... */ conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); + conn->asyncStatus = PGASYNC_READY; + /* Discard the failed message --- good idea? */ conn->inStart = conn->inEnd; diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..2693ce0 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) return; + + /* getAnotherTuple() moves inStart itself */ + continue; } else if (conn->result != NULL && conn->result->resultStatus == PGRES_FATAL_ERROR) @@ -613,33 +616,23 @@ failure:/* * parseInput subroutine to read a 'D' (row data) message. - * We add another tuple to the existing PGresult structure. + * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, int msgLength){ PGresult *result= conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGrowValue *rowbuf; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; - - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; + int rp; + char *errmsg = libpq_gettext("unknown error\n"); /* Get the field count and make sure it's what we expect*/ if (pqGetInt(&tupnfields, 2, conn)) @@ -647,13 +640,22 @@ getAnotherTuple(PGconn *conn, int msgLength) if (tupnfields != nfields) { - /* Replace partially constructed result with an error result */ - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("unexpected field count in \"D\" message\n")); - pqSaveErrorResult(conn); - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; - return 0; + errmsg = libpq_gettext("unexpected field count in \"D\" message\n"); + goto error_and_forward; + } + + /* resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) + { + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) + { + errmsg = libpq_gettext("out of memory for query result\n"); + goto error_and_forward; + } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; } /* Scan the fields */ @@ -662,53 +664,88 @@ getAnotherTuple(PGconn *conn, int msgLength) /* get the value length */ if (pqGetInt(&vlen,4, conn)) return EOF; + if (vlen == -1) - { - /* null field */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - continue; - } - if (vlen < 0) + vlen = NULL_LEN; + else if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; + /* + * rowbuf[i].value always points to the next address of the + * length field even if the value is NULL, to allow safe + * size estimates and data copy. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + rowbuf[i].len = vlen; + + /* Skip to the next length field */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + return EOF; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* tag the row as parsed, check if correctly */ + conn->inStart += 5 + msgLength; + if (conn->inCursor != conn->inStart) + { + errmsg = libpq_gettext("invalid row contents\n"); + goto error_clearresult; + } - return 0; + /* Pass the completed row values to rowProcessor */ + rp = conn->rowProcessor(result, rowbuf, conn->rowProcessorParam); + if (rp == 1) + { + /* everything is good */ + return 0; + } + if (rp == 2 && pqIsnonblocking(conn)) + { + /* processor requested early exit */ + return EOF; + } + + /* there was some problem */ + if (rp == 0) + { + /* + * Unlink errMsg from result here to use it after + * pqClearAsyncResult() is called. + */ + errmsg = result->errMsg; + result->errMsg = NULL; + if (errmsg == NULL) + { + /* If errmsg == NULL, we assume that the row processor + * notices out of memory. We should immediately free any + * space to go forward. */ + errmsg = "out of memory"; + goto error_clearresult; + } + /* + * We assume that some ancestor which has a relation with the + * row processor wants the result built halfway when row + * processor sets any errMsg for rp == 0. + */ + goto error_saveresult; + } -outOfMemory: + errmsg = libpq_gettext("invalid return value from row processor\n"); + goto error_clearresult; + +error_and_forward: + /* Discard the failed message by pretending we read it */ + conn->inCursor = conn->inStart + 5 + msgLength; +error_clearresult: + pqClearAsyncResult(conn); + +error_saveresult: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ - pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + printfPQExpBuffer(&conn->errorMessage, "%s", errmsg); pqSaveErrorResult(conn); - - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; return 0;} diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..810b04e 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -149,6 +149,17 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGrowValue points a column value of in network buffer. + * Value is a string without null termination and length len. + * NULL is represented as len < 0, value points then to place + * where value would have been. + */ +typedef struct pgRowValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ +} PGrowValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -416,6 +427,38 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative row processor. + * + * Columns array will contain PQnfields() entries, each one + * pointing to particular column data in network buffer. + * This function is supposed to copy data out from there + * and store somewhere. NULL is signified with len<0. + * + * This function must return 1 for success and must return 0 for + * failure and may set error message by PQresultSetErrMsg. It is assumed by + * caller as out of memory when the error message is not set on + * failure. This function is assumed not to throw any exception. + */ +typedef int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, + void *param); + +/* + * Set alternative row data processor for PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it for rows one by one. + * + * func is row processor function. See the typedef RowProcessor. + * + * rowProcessorParam is the contextual variable that passed to + * RowProcessor. + */ +extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, + void *rowProcessorParam); +extern PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param); +extern int PQskipResult(PGconn *conn, int skipAll); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +497,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void PQresultSetErrMsg(PGresult *res, const char *msg);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 987311e..9cabd20 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -398,7 +398,6 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */#ifdef USE_SSL bool allow_ssl_try; /* Allowedto try SSL negotiation */ @@ -443,6 +442,14 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* + * Read column data from network buffer. + */ + PQrowProcessor rowProcessor;/* Function pointer */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */ + PGrowValue *rowBuf; /* Buffer for passing values to rowProcessor */ + int rowBufLen; /* Number of columns allocated in rowBuf */};/* PGcancel stores all data necessary tocancel a connection. A copy of this @@ -560,6 +567,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);extern int pqGets_append(PQExpBuffer buf, PGconn*conn);extern int pqPuts(const char *s, PGconn *conn);extern int pqGetnchar(char *s, size_t len, PGconn *conn); +extern int pqSkipnchar(size_t len, PGconn *conn);extern int pqPutnchar(const char *s, size_t len, PGconn *conn);externint pqGetInt(int *result, size_t bytes, PGconn *conn);extern int pqPutInt(int value, size_t bytes, PGconn*conn); diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..5ef89e7 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,332 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-altrowprocessor"> + <title>Alternative row processor</title> + + <indexterm zone="libpq-altrowprocessor"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, rows are stored into <type>PGresult</type> + until full resultset is received. Then such completely-filled + <type>PGresult</type> is passed to user. This behavior can be + changed by registering alternative row processor function, + that will see each row data as soon as it is received + from network. It has the option of processing the data + immediately, or storing it into custom container. + </para> + + <para> + Note - as row processor sees rows as they arrive, it cannot know + whether the SQL statement actually finishes successfully on server + or not. So some care must be taken to get proper + transactionality. + </para> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessor"> + <term> + <function>PQsetRowProcessor</function> + <indexterm> + <primary>PQsetRowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to process each row. +<synopsis> +void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the row processor function. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default processor. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqrowprocessor"> + <term> + <type>PQrowProcessor</type> + <indexterm> + <primary>PQrowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the row processor callback function. +<synopsis> +int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, void *param); + +typedef struct +{ + int len; /* length in bytes of the value, -1 if NULL */ + char *value; /* actual value, without null termination */ +} PGrowValue; +</synopsis> + </para> + + <para> + The <parameter>columns</parameter> array will have PQnfields() + elements, each one pointing to column value in network buffer. + The <parameter>len</parameter> field will contain number of + bytes in value. If the field value is NULL then + <parameter>len</parameter> will be -1 and value will point + to position where the value would have been in buffer. + This allows estimating row size by pointer arithmetic. + </para> + + <para> + This function must process or copy row values away from network + buffer before it returns, as next row might overwrite them. + </para> + + <para> + This function must return 1 for success, and 0 for failure. On + failure the caller assumes the error as an out of memory and + releases the PGresult under construction. If you set any + message with <function>PQresultSetErrMsg</function>, it is set + as the PGconn's error message and the PGresult will be + preserved. When non-blocking API is in use, it can also return + 2 for early exit from <function>PQisBusy</function> function. + The supplied <parameter>res</parameter> + and <parameter>columns</parameter> values will stay valid so + row can be processed outside of callback. Caller is + responsible for tracking whether + the <parameter>PQisBusy</parameter> returned early from + callback or for other reasons. Usually this should happen via + setting cached values to NULL before + calling <function>PQisBusy</function>. + </para> + + <para> + The function is allowed to exit via exception (setjmp/longjmp). + The connection and row are guaranteed to be in valid state. + The connection can later be closed + via <function>PQfinish</function>. Processing can also be + continued without closing the connection, + call <function>getResult</function> on synchronous mode, + <function>PQisBusy</function> on asynchronous connection. Then + processing will continue with new row, previous row that got + exception will be skipped. Or you can discard all remaining + rows by calling <function>PQskipResult</function> without + closing connection. + </para> + + <variablelist> + <varlistentry> + + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>columns</parameter></term> + <listitem> + <para> + Column values of the row to process. Column values + are located in network buffer, the processor must + copy them out from there. + </para> + <para> + Column values are not null-terminated, so processor cannot + use C string functions on them directly. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>param</parameter></term> + <listitem> + <para> + Extra parameter that was given to <function>PQsetRowProcessor</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqskipresult"> + <term> + <function>PQskipResult</function> + <indexterm> + <primary>PQskipResult</primary> + </indexterm> + </term> + <listitem> + <para> + Discard all the remaining row data + after <function>PQexec</function> + or <function>PQgetResult</function> exits by the exception raised + in <type>RowProcessor</type> without closing connection. +<synopsis> +void PQskipResult(PGconn *conn, int skipAll) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><parameter>skipAll</parameter></term> + <listitem> + <para> + Skip remaining rows in current result + if <parameter>skipAll</parameter> is false(0). Skip + remaining rows in current result and all rows in + succeeding results if true(non-zero). + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqresultseterrmsg"> + <term> + <function>PQresultSetErrMsg</function> + <indexterm> + <primary>PQresultSetErrMsg</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>PQrowProcessor</type>. If this message is not set, the + caller assumes the error to be `unknown' error. +<synopsis> +void PQresultSetErrMsg(PGresult *res, const char *msg) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>PQrowProcessor</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>msg</parameter></term> + <listitem> + <para> + Error message. This will be copied internally so there is + no need to care of the scope. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it will be overwritten. Set NULL to cancel the the custom + message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgetrowprcessor"> + <term> + <function>PQgetRowProcessor</function> + <indexterm> + <primary>PQgetRowProcessor</primary> + </indexterm> + </term> + <listitem> + <para> + Get row processor and its context parameter currently set to + the connection. +<synopsis> +PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + Set the current row processor parameter of the + connection here if not NULL. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title> diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..8bf0759 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char** valbuf; + int *valbuflen; + char **cstrs; + bool error_occurred; + bool nummismatch; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static int storeHandler(PGresult *res, PGrowValue *columns, void *param); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,51 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ - res = PQexec(conn, buf.data); + PG_TRY(); + { + res = PQexec(conn, buf.data); + } + PG_CATCH(); + { + ErrorData *edata; + + finishStoreInfo(&storeinfo); + edata = CopyErrorData(); + FlushErrorState(); + + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, FALSE); + ReThrowError(edata); + } + PG_END_TRY(); + + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +632,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +693,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -660,6 +714,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { /* text,text,bool*/ DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -675,6 +730,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else { DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } @@ -705,6 +761,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else /* shouldn't happen*/ elog(ERROR, "wrong number of arguments"); + + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); } if (!conn) @@ -715,164 +773,257 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); - else + PG_TRY(); { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; + if (!is_async) + res = PQexec(conn, sql); + else + res = PQgetResult(conn); } + PG_CATCH(); + { + ErrorData *edata; - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); + edata = CopyErrorData(); + FlushErrorState(); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, FALSE); + ReThrowError(edata); + } + PG_END_TRY(); + + finishStoreInfo(&storeinfo); + + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } - Assert(rsinfo->returnMode == SFRM_Materialize); + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); - PG_TRY(); + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->valbuf = NULL; + sinfo->valbuflen = NULL; + + /* Preallocate memory of same size with c string array for values. */ + sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*)); + if (sinfo->valbuf) + sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int)); + if (sinfo->valbuflen) + sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*)); + + if (sinfo->cstrs == NULL) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + if (sinfo->valbuf) + free(sinfo->valbuf); + if (sinfo->valbuflen) + free(sinfo->valbuflen); - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbuflen[i] = -1; + } - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + if (sinfo->valbuf) + { + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + free(sinfo->valbuf[i]); } + free(sinfo->valbuf); + sinfo->valbuf = NULL; + } - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); + if (sinfo->valbuflen) + { + free(sinfo->valbuflen); + sinfo->valbuflen = NULL; + } - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->cstrs) + { + free(sinfo->cstrs); + sinfo->cstrs = NULL; + } - values = (char **) palloc(nfields * sizeof(char *)); + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) - { - HeapTuple tuple; +static int +storeHandler(PGresult *res, PGrowValue *columns, void *param) +{ + storeInfo *sinfo = (storeInfo *)param; + HeapTuple tuple; + int fields = PQnfields(res); + int i; + char **cstrs = sinfo->cstrs; - if (!is_sql_cmd) - { - int i; + if (sinfo->error_occurred) + { + PQresultSetErrMsg(res, "storeHandler is called after error\n"); + return FALSE; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); + + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + + PQresultSetErrMsg(res, "unexpected field count in \"D\" message\n"); + return FALSE; + } - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); + /* + * value input functions assumes that the input string is + * terminated by zero. We should make the values to be so. + */ + for(i = 0 ; i < fields ; i++) + { + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + char *tmp = sinfo->valbuf[i]; + int tmplen = sinfo->valbuflen[i]; + + /* + * Divide calls to malloc and realloc so that things will + * go fine even on the systems of which realloc() does not + * accept NULL as old memory block. + * + * Also try to (re)allocate in bigger steps to + * avoid flood of allocations on weird data. + */ + if (tmp == NULL) + { + tmplen = len + 1; + if (tmplen < 64) + tmplen = 64; + tmp = (char *)malloc(tmplen); + } + else if (tmplen < len + 1) + { + if (len + 1 > tmplen * 2) + tmplen = len + 1; + else + tmplen = tmplen * 2; + tmp = (char *)realloc(tmp, tmplen); } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * sinfo->valbuf[n] will be freed in finishStoreInfo() + * when realloc returns NULL. + */ + if (tmp == NULL) + return FALSE; /* Inform out of memory to the caller */ - PQclear(res); - } - PG_CATCH(); - { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + sinfo->valbuf[i] = tmp; + sinfo->valbuflen[i] = tmplen; + + cstrs[i] = sinfo->valbuf[i]; + memcpy(cstrs[i], columns[i].value, len); + cstrs[i][len] = '\0'; + } } - PG_END_TRY(); + + /* + * These functions may throw exception. It will be caught in + * dblink_record_internal() + */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); + + return TRUE;}/* diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml index 1245e85..5ef89e7 100644 --- b/doc/src/sgml/libpq.sgml +++ a/doc/src/sgml/libpq.sgml @@ -7353,7 +7353,16 @@ typedef struct releases the PGresult under construction. If you set any message with <function>PQresultSetErrMsg</function>,it is set as the PGconn's error message and the PGresult will be - preserved. + preserved. When non-blocking API is in use, it can also return + 2 for early exit from <function>PQisBusy</function> function. + The supplied <parameter>res</parameter> + and <parameter>columns</parameter> values will stay valid so + row can be processed outside of callback. Caller is + responsible for tracking whether + the <parameter>PQisBusy</parameter> returned early from + callback or for other reasons. Usually this should happen via + setting cached values to NULL before + calling <function>PQisBusy</function>. </para> <para> diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c index 6555f85..36773cb 100644 --- b/src/interfaces/libpq/fe-protocol2.c +++ a/src/interfaces/libpq/fe-protocol2.c @@ -828,6 +828,9 @@ getAnotherTuple(PGconn *conn, bool binary) rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam); if (rp == 1) return 0; + else if (rp == 2 && pqIsnonblocking(conn)) + /* processor requested early exit */ + return EOF; else if (rp == 0) { errmsg = result->errMsg; diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c index 3725de2..2693ce0 100644 --- b/src/interfaces/libpq/fe-protocol3.c +++ a/src/interfaces/libpq/fe-protocol3.c @@ -698,6 +698,11 @@ getAnotherTuple(PGconn *conn, int msgLength) /* everything is good */ return 0; } + if (rp == 2 && pqIsnonblocking(conn)) + { + /* processor requested early exit */ + return EOF; + } /* there was some problem */ if (rp == 0)
pgsql-hackers by date: