Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: Speed dblink using alternate libpq tuple storage |
Date | |
Msg-id | 20120327.112042.74201667.horiguchi.kyotaro@oss.ntt.co.jp Whole thread Raw |
In response to | Re: Speed dblink using alternate libpq tuple storage (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>) |
Responses |
Re: Speed dblink using alternate libpq tuple storage
|
List | pgsql-hackers |
I'm sorry to have coded a silly bug. The previous patch has a bug in realloc size calculation. And separation of the 'connname patch' was incomplete in regtest. It is fixed in this patch. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..4de28ef 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char* valbuf; + int valbuflen; + char **cstrs; + bool error_occurred; + bool nummismatch; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static int storeHandler(PGresult *res, PGrowValue *columns, void *param); +/* Global */static remoteConn *pconn = NULL; @@ -111,6 +127,9 @@ typedef struct remoteConnHashEnt/* initial number of connection hashes */#define NUMCONN 16 +/* Initial block size for value buffer in storeHandler */ +#define INITBUFLEN 64 +/* general utility */#define xpfree(var_) \ do { \ @@ -503,6 +522,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +579,51 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ - res = PQexec(conn, buf.data); + PG_TRY(); + { + res = PQexec(conn, buf.data); + } + PG_CATCH(); + { + ErrorData *edata; + + finishStoreInfo(&storeinfo); + edata = CopyErrorData(); + FlushErrorState(); + + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, TRUE); + ReThrowError(edata); + } + PG_END_TRY(); + + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +635,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +696,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -660,6 +717,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { /* text,text,bool*/ DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -715,164 +773,234 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; - /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); - else + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + + PG_TRY(); { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; + /* synchronous query, or async result retrieval */ + if (!is_async) + res = PQexec(conn, sql); + else + res = PQgetResult(conn); } + PG_CATCH(); + { + ErrorData *edata; - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); + edata = CopyErrorData(); + FlushErrorState(); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, TRUE); + ReThrowError(edata); + } + PG_END_TRY(); + + finishStoreInfo(&storeinfo); + + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + /* + * exclude mismatch of the numbers of the colums here so as to + * behave as before. + */ + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK && + !storeinfo.nummismatch)) + { + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } + + /* Set command return status when the query was a command. */ + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + char *values[1]; + HeapTuple tuple; + AttInMetadata *attinmeta; + ReturnSetInfo *rcinfo = (ReturnSetInfo*)fcinfo->resultinfo; + + values[0] = PQcmdStatus(res); + attinmeta = TupleDescGetAttInMetadata(rcinfo->setDesc); + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(rcinfo->setResult, tuple); + } + else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD) + { + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + } + + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } } - materializeResult(fcinfo, res); + if (res) + PQclear(res); + return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; - - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ + case TYPEFUNC_COMPOSITE: + tupdesc = CreateTupleDescCopy(tupdesc); + sinfo->nattrs = tupdesc->natts; + break; + case TYPEFUNC_RECORD: tupdesc = CreateTemplateTupleDesc(1, false); TupleDescInitEntry(tupdesc,(AttrNumber) 1, "status", TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + sinfo->nattrs = 1; + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } - is_sql_cmd = false; + /* make sure we have a persistent copy of the tupdesc */ - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->valbuflen = INITBUFLEN; + sinfo->valbuf = (char *)palloc(sinfo->valbuflen); + sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *)); - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); - } + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void +finishStoreInfo(storeInfo *sinfo) +{ + if (sinfo->valbuf) + { + pfree(sinfo->valbuf); + sinfo->valbuf = NULL; + } - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->cstrs) + { + pfree(sinfo->cstrs); + sinfo->cstrs = NULL; + } - values = (char **) palloc(nfields * sizeof(char *)); + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) - { - HeapTuple tuple; +/* Prototype of this function is PQrowProcessor */ +static int +storeHandler(PGresult *res, PGrowValue *columns, void *param) +{ + storeInfo *sinfo = (storeInfo *)param; + HeapTuple tuple; + int newbuflen; + int fields = PQnfields(res); + int i; + char **cstrs = sinfo->cstrs; + char *pbuf; + + if (sinfo->error_occurred) + return -1; + + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - if (!is_sql_cmd) - { - int i; + /* This error will be processed in dblink_record_internal() */ + return -1; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + /* + * value input functions assumes that the input string is + * terminated by zero. We should make the values to be so. + */ - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + /* + * The length of the buffer for each field is value length + 1 for + * zero-termination + */ + newbuflen = fields; + for(i = 0 ; i < fields ; i++) + newbuflen += columns[i].len; + + if (newbuflen > sinfo->valbuflen) + { + int tmplen = sinfo->valbuflen * 2; + /* + * Try to (re)allocate in bigger steps to avoid flood of allocations + * on weird data. + */ + while (newbuflen > tmplen && tmplen >= 0) + tmplen *= 2; - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* Check if the integer was wrap-rounded. */ + if (tmplen < 0) + elog(ERROR, "Buffer size for one row exceeds integer limit"); - PQclear(res); + sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen); + sinfo->valbuflen = tmplen; } - PG_CATCH(); + + pbuf = sinfo->valbuf; + for(i = 0 ; i < fields ; i++) { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + cstrs[i] = pbuf; + memcpy(pbuf, columns[i].value, len); + pbuf += len; + *pbuf++ = '\0'; + } } - PG_END_TRY(); + + /* + * These functions may throw exception. It will be caught in + * dblink_record_internal() + */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); + + return 1;}/* diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 4de28ef..05d7e98 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -733,6 +733,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else { DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } @@ -763,6 +764,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else /* shouldn't happen*/ elog(ERROR, "wrong number of arguments"); + + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); } if (!conn) diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index 511dd5e..2dcba15 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -371,7 +371,7 @@ SELECT *FROM dblink('myconn','SELECT * FROM foobar',false) AS t(a int, b text, c text[])WHERE t.a > 7;NOTICE: relation "foobar" does not exist -CONTEXT: Error occurred on dblink connection named "unnamed": could not execute query. +CONTEXT: Error occurred on dblink connection named "myconn": could not execute query. a | b | c ---+---+---(0 rows)
pgsql-hackers by date: