dblink patches for comment - Mailing list pgsql-hackers
From | Joe Conway |
---|---|
Subject | dblink patches for comment |
Date | |
Msg-id | 4A1C9C8C.6030405@joeconway.com Whole thread Raw |
Responses |
Re: dblink patches for comment
|
List | pgsql-hackers |
The attached addresses items#2 and 3 as listed by Bruce here: http://momjian.us/cgi-bin/pgsql/joe I think it is consistent with the discussions we had a PGCon last week. Any objections to me committing this for 8.4? On a side note, should I try to address items #1 & #4 for 8.4 as well? Perhaps #4 yes since it is arguably a bug fix, but no to #1? Joe Index: dblink.c =================================================================== RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v retrieving revision 1.77 diff -c -r1.77 dblink.c *** dblink.c 1 Jan 2009 17:23:31 -0000 1.77 --- dblink.c 25 May 2009 22:57:22 -0000 *************** *** 46,51 **** --- 46,52 ---- #include "catalog/pg_type.h" #include "executor/executor.h" #include "executor/spi.h" + #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/execnodes.h" *************** *** 77,83 **** /* * Internal declarations */ ! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn * rconn); --- 78,84 ---- /* * Internal declarations */ ! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn * rconn); *************** *** 93,101 **** static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); static char *generate_relation_name(Oid relid); ! static void dblink_connstr_check(const char *connstr); ! static void dblink_security_check(PGconn *conn, remoteConn *rconn); static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); /* Global */ static remoteConn *pconn = NULL; --- 94,103 ---- static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); static char *generate_relation_name(Oid relid); ! static void dblink_connstr_check(const char *connstr, bool is_fdw); ! static void dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw); static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); + static char *get_connect_string(const char *servername); /* Global */ static remoteConn *pconn = NULL; *************** *** 165,172 **** } \ else \ { \ ! connstr = conname_or_str; \ ! dblink_connstr_check(connstr); \ conn = PQconnectdb(connstr); \ if (PQstatus(conn) == CONNECTION_BAD) \ { \ --- 167,180 ---- } \ else \ { \ ! bool is_fdw = true; \ ! connstr = get_connect_string(conname_or_str); \ ! if (connstr == NULL) \ ! { \ ! is_fdw = false; \ ! connstr = conname_or_str; \ ! } \ ! dblink_connstr_check(connstr, is_fdw); \ conn = PQconnectdb(connstr); \ if (PQstatus(conn) == CONNECTION_BAD) \ { \ *************** *** 177,183 **** errmsg("could not establish connection"), \ errdetail("%s", msg))); \ } \ ! dblink_security_check(conn, rconn); \ freeconn = true; \ } \ } while (0) --- 185,191 ---- errmsg("could not establish connection"), \ errdetail("%s", msg))); \ } \ ! dblink_security_check(conn, rconn, is_fdw); \ freeconn = true; \ } \ } while (0) *************** *** 210,237 **** Datum dblink_connect(PG_FUNCTION_ARGS) { char *connstr = NULL; char *connname = NULL; char *msg; PGconn *conn = NULL; remoteConn *rconn = NULL; DBLINK_INIT; if (PG_NARGS() == 2) { ! connstr = text_to_cstring(PG_GETARG_TEXT_PP(1)); connname = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else if (PG_NARGS() == 1) ! connstr = text_to_cstring(PG_GETARG_TEXT_PP(0)); if (connname) rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); /* check password in connection string if not superuser */ ! dblink_connstr_check(connstr); conn = PQconnectdb(connstr); if (PQstatus(conn) == CONNECTION_BAD) --- 218,255 ---- Datum dblink_connect(PG_FUNCTION_ARGS) { + char *conname_or_str = NULL; char *connstr = NULL; char *connname = NULL; char *msg; PGconn *conn = NULL; remoteConn *rconn = NULL; + bool is_fdw = true; DBLINK_INIT; if (PG_NARGS() == 2) { ! conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1)); connname = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else if (PG_NARGS() == 1) ! conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); if (connname) rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); + /* first check for valid foreign data server */ + connstr = get_connect_string(conname_or_str); + if (connstr == NULL) + { + is_fdw = false; + connstr = conname_or_str; + } + /* check password in connection string if not superuser */ ! dblink_connstr_check(connstr, is_fdw); conn = PQconnectdb(connstr); if (PQstatus(conn) == CONNECTION_BAD) *************** *** 248,254 **** } /* check password actually used if not superuser */ ! dblink_security_check(conn, rconn); if (connname) { --- 266,272 ---- } /* check password actually used if not superuser */ ! dblink_security_check(conn, rconn, is_fdw); if (connname) { *************** *** 689,713 **** Datum dblink_record(PG_FUNCTION_ARGS) { ! return dblink_record_internal(fcinfo, false, false); } PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { ! return dblink_record_internal(fcinfo, true, false); } PG_FUNCTION_INFO_V1(dblink_get_result); Datum dblink_get_result(PG_FUNCTION_ARGS) { ! return dblink_record_internal(fcinfo, true, true); } static Datum ! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get) { FuncCallContext *funcctx; TupleDesc tupdesc = NULL; --- 707,753 ---- Datum dblink_record(PG_FUNCTION_ARGS) { ! return dblink_record_internal(fcinfo, false); } PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { ! PGconn *conn = NULL; ! char *connstr = NULL; ! char *sql = NULL; ! remoteConn *rconn = NULL; ! char *msg; ! bool freeconn = false; ! int retval; ! ! if (PG_NARGS() == 2) ! { ! DBLINK_GET_CONN; ! sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); ! } ! else ! /* shouldn't happen */ ! elog(ERROR, "wrong number of arguments"); ! ! /* async query send */ ! retval = PQsendQuery(conn, sql); ! if (retval != 1) ! elog(NOTICE, "%s", PQerrorMessage(conn)); ! ! PG_RETURN_INT32(retval); } PG_FUNCTION_INFO_V1(dblink_get_result); Datum dblink_get_result(PG_FUNCTION_ARGS) { ! return dblink_record_internal(fcinfo, true); } static Datum ! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { FuncCallContext *funcctx; TupleDesc tupdesc = NULL; *************** *** 775,788 **** /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); } ! else if (is_async && do_get) { /* get async result */ if (PG_NARGS() == 2) { /* text,bool */ DBLINK_GET_CONN; ! fail = PG_GETARG_BOOL(2); } else if (PG_NARGS() == 1) { --- 815,828 ---- /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); } ! else /* is_async */ { /* get async result */ if (PG_NARGS() == 2) { /* text,bool */ DBLINK_GET_CONN; ! fail = PG_GETARG_BOOL(1); } else if (PG_NARGS() == 1) { *************** *** 793,929 **** /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); } - else - { - /* send async query */ - if (PG_NARGS() == 2) - { - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); - } if (!conn) DBLINK_CONN_NOT_AVAIL; ! if (!is_async || (is_async && do_get)) { ! /* synchronous query, or async result retrieval */ ! if (!is_async) ! res = PQexec(conn, sql); ! else { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - { - MemoryContextSwitchTo(oldcontext); - SRF_RETURN_DONE(funcctx); - } - } - - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) - { - dblink_res_error(conname, res, "could not execute query", fail); - if (freeconn) - PQfinish(conn); MemoryContextSwitchTo(oldcontext); SRF_RETURN_DONE(funcctx); } ! if (PQresultStatus(res) == PGRES_COMMAND_OK) ! { ! is_sql_cmd = true; ! ! /* need a tuple descriptor representing one TEXT column */ ! tupdesc = CreateTemplateTupleDesc(1, false); ! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", ! TEXTOID, -1, 0); ! ! /* ! * and save a copy of the command status string to return as ! * our result tuple ! */ ! sql_cmd_status = PQcmdStatus(res); ! funcctx->max_calls = 1; ! } ! else ! funcctx->max_calls = PQntuples(res); ! ! /* got results, keep track of them */ ! funcctx->user_fctx = res; ! ! /* if needed, close the connection to the database and cleanup */ if (freeconn) PQfinish(conn); ! if (!is_sql_cmd) ! { ! /* get a tuple descriptor for our result type */ ! switch (get_call_result_type(fcinfo, NULL, &tupdesc)) ! { ! case TYPEFUNC_COMPOSITE: ! /* success */ ! break; ! case TYPEFUNC_RECORD: ! /* failed to determine actual type of RECORD */ ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("function returning record called in context " ! "that cannot accept type record"))); ! break; ! default: ! /* result type isn't composite */ ! elog(ERROR, "return type must be a row type"); ! break; ! } ! /* make sure we have a persistent copy of the tupdesc */ ! tupdesc = CreateTupleDescCopy(tupdesc); ! } /* ! * check result and tuple descriptor have the same number of ! * columns */ ! if (PQnfields(res) != tupdesc->natts) ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("remote query result rowtype does not match " ! "the specified FROM clause rowtype"))); ! /* fast track when no results */ ! if (funcctx->max_calls < 1) { ! if (res) ! PQclear(res); ! MemoryContextSwitchTo(oldcontext); ! SRF_RETURN_DONE(funcctx); } ! /* store needed metadata for subsequent calls */ ! attinmeta = TupleDescGetAttInMetadata(tupdesc); ! funcctx->attinmeta = attinmeta; ! ! MemoryContextSwitchTo(oldcontext); } ! else { ! /* async query send */ MemoryContextSwitchTo(oldcontext); ! PG_RETURN_INT32(PQsendQuery(conn, sql)); } - } ! if (is_async && !do_get) ! { ! /* async query send -- should not happen */ ! elog(ERROR, "async query send called more than once"); } --- 833,942 ---- /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); } if (!conn) DBLINK_CONN_NOT_AVAIL; ! /* synchronous query, or async result retrieval */ ! if (!is_async) ! res = PQexec(conn, sql); ! else { ! res = PQgetResult(conn); ! /* NULL means we're all done with the async results */ ! if (!res) { MemoryContextSwitchTo(oldcontext); SRF_RETURN_DONE(funcctx); } + } ! if (!res || ! (PQresultStatus(res) != PGRES_COMMAND_OK && ! PQresultStatus(res) != PGRES_TUPLES_OK)) ! { ! dblink_res_error(conname, res, "could not execute query", fail); if (freeconn) PQfinish(conn); + MemoryContextSwitchTo(oldcontext); + SRF_RETURN_DONE(funcctx); + } ! if (PQresultStatus(res) == PGRES_COMMAND_OK) ! { ! is_sql_cmd = true; ! /* need a tuple descriptor representing one TEXT column */ ! tupdesc = CreateTemplateTupleDesc(1, false); ! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", ! TEXTOID, -1, 0); /* ! * and save a copy of the command status string to return as ! * our result tuple */ ! sql_cmd_status = PQcmdStatus(res); ! funcctx->max_calls = 1; ! } ! else ! funcctx->max_calls = PQntuples(res); ! /* got results, keep track of them */ ! funcctx->user_fctx = res; ! ! /* if needed, close the connection to the database and cleanup */ ! if (freeconn) ! PQfinish(conn); ! ! if (!is_sql_cmd) ! { ! /* get a tuple descriptor for our result type */ ! switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { ! case TYPEFUNC_COMPOSITE: ! /* success */ ! break; ! case TYPEFUNC_RECORD: ! /* failed to determine actual type of RECORD */ ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("function returning record called in context " ! "that cannot accept type record"))); ! break; ! default: ! /* result type isn't composite */ ! elog(ERROR, "return type must be a row type"); ! break; } ! /* make sure we have a persistent copy of the tupdesc */ ! tupdesc = CreateTupleDescCopy(tupdesc); } ! ! /* ! * check result and tuple descriptor have the same number of ! * columns ! */ ! if (PQnfields(res) != tupdesc->natts) ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("remote query result rowtype does not match " ! "the specified FROM clause rowtype"))); ! ! /* fast track when no results */ ! if (funcctx->max_calls < 1) { ! if (res) ! PQclear(res); MemoryContextSwitchTo(oldcontext); ! SRF_RETURN_DONE(funcctx); } ! /* store needed metadata for subsequent calls */ ! attinmeta = TupleDescGetAttInMetadata(tupdesc); ! funcctx->attinmeta = attinmeta; ! ! MemoryContextSwitchTo(oldcontext); } *************** *** 2249,2257 **** } static void ! dblink_security_check(PGconn *conn, remoteConn *rconn) { ! if (!superuser()) { if (!PQconnectionUsedPassword(conn)) { --- 2262,2270 ---- } static void ! dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw) { ! if (!superuser() && !is_fdw) { if (!PQconnectionUsedPassword(conn)) { *************** *** 2275,2283 **** * to be accessible to non-superusers. */ static void ! dblink_connstr_check(const char *connstr) { ! if (!superuser()) { PQconninfoOption *options; PQconninfoOption *option; --- 2288,2296 ---- * to be accessible to non-superusers. */ static void ! dblink_connstr_check(const char *connstr, bool is_fdw) { ! if (!superuser() && !is_fdw) { PQconninfoOption *options; PQconninfoOption *option; *************** *** 2358,2360 **** --- 2371,2431 ---- errcontext("Error occurred on dblink connection named \"%s\": %s.", dblink_context_conname, dblink_context_msg))); } + + /* + * Obtain connection string for a foreign server + */ + static char * + get_connect_string(const char *servername) + { + ForeignServer *foreign_server; + UserMapping *user_mapping; + ListCell *cell; + StringInfo buf = makeStringInfo(); + ForeignDataWrapper *fdw; + AclResult aclresult; + + /* first gather the server connstr options */ + foreign_server = GetForeignServerByName(servername, true); + + if (foreign_server) + { + Oid serverid = foreign_server->serverid; + Oid fdwid = foreign_server->fdwid; + Oid userid = GetUserId(); + + user_mapping = GetUserMapping(userid, serverid); + fdw = GetForeignDataWrapper(fdwid); + + /* Check permissions, user must have usage on the server. */ + aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername); + + foreach (cell, fdw->options) + { + DefElem *def = lfirst(cell); + + appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg)); + } + + foreach (cell, foreign_server->options) + { + DefElem *def = lfirst(cell); + + appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg)); + } + + foreach (cell, user_mapping->options) + { + + DefElem *def = lfirst(cell); + + appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg)); + } + + return pstrdup(buf->data); + } + else + return NULL; + } Index: expected/dblink.out =================================================================== RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v retrieving revision 1.24 diff -c -r1.24 dblink.out *** expected/dblink.out 3 Jul 2008 03:56:57 -0000 1.24 --- expected/dblink.out 25 May 2009 23:14:03 -0000 *************** *** 784,786 **** --- 784,819 ---- OK (1 row) + -- test foreign data wrapper functionality + CREATE USER dblink_regression_test; + CREATE FOREIGN DATA WRAPPER postgresql; + CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression'); + CREATE USER MAPPING FOR public SERVER fdtest; + GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test; + \set ORIGINAL_USER :USER + \c - dblink_regression_test + SELECT dblink_connect_u('myconn', 'fdtest'); + dblink_connect_u + ------------------ + OK + (1 row) + + SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]); + a | b | c + ----+---+--------------- + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} + 6 | g | {a6,b6,c6} + 7 | h | {a7,b7,c7} + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} + (11 rows) + + \c - :ORIGINAL_USER + REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test; + DROP USER dblink_regression_test; Index: sql/dblink.sql =================================================================== RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v retrieving revision 1.20 diff -c -r1.20 dblink.sql *** sql/dblink.sql 6 Apr 2008 16:54:48 -0000 1.20 --- sql/dblink.sql 25 May 2009 23:13:49 -0000 *************** *** 364,366 **** --- 364,383 ---- SELECT dblink_cancel_query('dtest1'); SELECT dblink_error_message('dtest1'); SELECT dblink_disconnect('dtest1'); + + -- test foreign data wrapper functionality + CREATE USER dblink_regression_test; + + CREATE FOREIGN DATA WRAPPER postgresql; + CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression'); + CREATE USER MAPPING FOR public SERVER fdtest; + GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test; + + \set ORIGINAL_USER :USER + \c - dblink_regression_test + SELECT dblink_connect_u('myconn', 'fdtest'); + SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]); + + \c - :ORIGINAL_USER + REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test; + DROP USER dblink_regression_test;
pgsql-hackers by date: