[PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ... - Mailing list pgsql-hackers
From | Daniel Farina |
---|---|
Subject | [PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ... |
Date | |
Msg-id | 1259012082-6196-4-git-send-email-dfarina@truviso.com Whole thread Raw |
In response to | [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..." (Daniel Farina <dfarina@truviso.com>) |
List | pgsql-hackers |
This patch enables dblink to be used for the purpose of efficient bulk-loading via COPY and libpq in combination with the COPY TO FUNCTION patch. The following functions were added to accomplish this: dblink_connection_reset: useful when handling errors and one just wants to restore a connection to a known state, rolling back as many transactions as necessary. dblink_copy_end: completes the COPY dblink_copy_open: puts a connection into the COPY state. Accepts connection name, relation name, and binary mode flag. dblink_copy_write: writes a row to the last connection put in the COPY state by dblink_copy_open. Generally speaking, code that uses this will look like the following (presuming a named connection has already been made): try: SELECT dblink_copy_open('myconn', 'relation_name', true); COPY bar TO FUNCTION dblink_copy_write; -- since the dblink connection is still in the COPY state, one -- can even copy some more data in multiple steps...COPYbar_2 TO FUNCTION dblink_copy_write; SELECT dblink_copy_end(); finally: SELECT dblink_copy_reset('myconn'); Signed-off-by: Daniel Farina <dfarina@truviso.com> ---contrib/dblink/dblink.c | 190 +++++++++++++++++++++++++++++++++++contrib/dblink/dblink.h | 5 +contrib/dblink/dblink.sql.in | 20 ++++contrib/dblink/uninstall_dblink.sql | 8 ++4 files changed, 223 insertions(+),0 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 72fdf56..d32aeec 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1722,6 +1722,196 @@ dblink_get_notify(PG_FUNCTION_ARGS) * internal functions */ +/* + * Attempts to take the connection into a known state by rolling back + * transactions. If unable to restore the connection to a known idle state, + * raises an error. + */ +PG_FUNCTION_INFO_V1(dblink_connection_reset); +Datum +dblink_connection_reset(PG_FUNCTION_ARGS) +{ + PGresult *res = NULL; + PGconn *conn = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + + bool triedonce = false; + + DBLINK_INIT; + + /* must be text */ + Assert(PG_NARGS() == 1); + DBLINK_GET_NAMED_CONN; + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + while (!triedonce) + { + switch (PQtransactionStatus(conn)) + { + case PQTRANS_IDLE: + /* Everything is okay */ + goto finish; + case PQTRANS_ACTIVE: + case PQTRANS_INTRANS: + case PQTRANS_INERROR: + res = PQexec(conn, "ROLLBACK;"); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("%s: could not issue ROLLBACK command", + PG_FUNCNAME_MACRO))); + + PQclear(res); + triedonce = true; + break; + case PQTRANS_UNKNOWN: + elog(ERROR, "%s: connection in unknown transaction state", + PG_FUNCNAME_MACRO); + } + } + +finish: + PG_RETURN_VOID(); +} + +/* + * dblink COPY support, procedures and variables + */ +static PGconn *dblink_copy_current = NULL; + +/* + * dblink_copy_open + * + * Start a COPY into a given relation on the named remote connection. + */ +PG_FUNCTION_INFO_V1(dblink_copy_open); +Datum +dblink_copy_open(PG_FUNCTION_ARGS) +{ + PGresult *res = NULL; + PGconn *conn = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + + const char *copy_stmt = "COPY %s FROM STDIN%s;"; + const char *with_binary = " WITH BINARY"; + const char *quoted_remoted_relname; + bool isbinary; + int snprintf_retcode; + + /* + * Should be large enough to contain any formatted output. Formed by + * counting the characters in the static formatting sections plus the + * bounded length of identifiers. Some modest padding was added for + * paranoia's sake, although all uses of this buffer are checked for + * over-length formats anyway. + */ + char buf[64 + NAMEDATALEN]; + + DBLINK_INIT; + + /* must be text,text,bool */ + Assert(PG_NARGS() == 3); + DBLINK_GET_NAMED_CONN; + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + /* Read the procedure arguments into primitive values */ + quoted_remoted_relname = NameListToQuotedString( + textToQualifiedNameList(PG_GETARG_TEXT_P(1))); + isbinary = PG_GETARG_BOOL(2); + + /* + * Query parameterization only handles value-parameters -- of which + * identifiers are not considered one of -- so format the string the old + * fashioned way. It is very important to quote identifiers for this + * reason, as performed previously. + */ + snprintf_retcode = snprintf(buf, sizeof buf, copy_stmt, + quoted_remoted_relname, + isbinary ? with_binary : ""); + + if (snprintf_retcode < 0) + elog(ERROR, "could not format dblink COPY query"); + else if (snprintf_retcode >= sizeof buf) + /* + * Should not be able to happen, see documentation of the "buf" value + * in this procedure. + */ + elog(ERROR, "could not fit formatted dblink COPY query into buffer"); + + /* + * Run the created query, and check to ensure that PGRES_COPY_IN state has + * been achieved. + */ + res = PQexec(conn, buf); + if (!res || PQresultStatus(res) != PGRES_COPY_IN) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not start COPY FROM on remote node"))); + PQclear(res); + + /* + * Everything went well; finally bind the global dblink_copy_current to the + * connection ready to accept copy data. + */ + dblink_copy_current = conn; + PG_RETURN_TEXT_P(cstring_to_text("OK")); +} + +/* + * dblink_copy_write + * + * Write the provided StringInfo to the currently open COPY. + */ +PG_FUNCTION_INFO_V1(dblink_copy_write); +Datum +dblink_copy_write(PG_FUNCTION_ARGS) +{ + StringInfo copybuf = (void *) PG_GETARG_POINTER(0); + + if (PQputCopyData(dblink_copy_current, copybuf->data, copybuf->len) != 1) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("could not send row to remote node"))); + + PG_RETURN_VOID(); +} + +/* + * dblink_copy_end + * + * Finish the currently open COPY. + */ +PG_FUNCTION_INFO_V1(dblink_copy_end); +Datum +dblink_copy_end(PG_FUNCTION_ARGS) +{ + PGresult *res; + + /* Check to ensure that termination data was sent successfully */ + if (PQputCopyEnd(dblink_copy_current, NULL) != 1) + elog(ERROR, "COPY end failed"); + + do + { + res = PQgetResult(dblink_copy_current); + if (res == NULL) + break; + if (PQresultStatus(res) != PGRES_COMMAND_OK) + elog(ERROR, "COPY failed: %s", + PQerrorMessage(dblink_copy_current)); + PQclear(res); + } while (true); + + dblink_copy_current = NULL; + PG_RETURN_TEXT_P(cstring_to_text("OK")); +}/* * get_pkey_attnames diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 255f5d0..8a2faee 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -59,4 +59,9 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);extern Datum dblink_current_query(PG_FUNCTION_ARGS);externDatum dblink_get_notify(PG_FUNCTION_ARGS); +extern Datum dblink_connection_reset(PG_FUNCTION_ARGS); + +extern Datum dblink_copy_open(PG_FUNCTION_ARGS); +extern Datum dblink_copy_write(PG_FUNCTION_ARGS); +extern Datum dblink_copy_end(PG_FUNCTION_ARGS);#endif /* DBLINK_H */ diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in index da5dd65..aedca34 100644 --- a/contrib/dblink/dblink.sql.in +++ b/contrib/dblink/dblink.sql.in @@ -221,3 +221,23 @@ CREATE OR REPLACE FUNCTION dblink_get_notify(RETURNS setof recordAS 'MODULE_PATHNAME', 'dblink_get_notify'LANGUAGEC STRICT; + +CREATE OR REPLACE FUNCTION dblink_connection_reset (text) +RETURNS void +AS 'MODULE_PATHNAME','dblink_connection_reset' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_copy_open (text, text, boolean) +RETURNS text +AS 'MODULE_PATHNAME','dblink_copy_open' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_copy_write (internal) +RETURNS void +AS 'MODULE_PATHNAME','dblink_copy_write' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_copy_end () +RETURNS text +AS 'MODULE_PATHNAME','dblink_copy_end' +LANGUAGE C STRICT; diff --git a/contrib/dblink/uninstall_dblink.sql b/contrib/dblink/uninstall_dblink.sql index 45cf13c..465beb7 100644 --- a/contrib/dblink/uninstall_dblink.sql +++ b/contrib/dblink/uninstall_dblink.sql @@ -11,6 +11,14 @@ DROP FUNCTION dblink_build_sql_delete (text, int2vector, int4, _text);DROP FUNCTION dblink_build_sql_insert(text, int2vector, int4, _text, _text); +DROP FUNCTION dblink_copy_end (); + +DROP FUNCTION dblink_copy_open (text, text, boolean); + +DROP FUNCTION dblink_copy_write (internal); + +DROP FUNCTION dblink_connection_reset (text); +DROP FUNCTION dblink_get_pkey (text);DROP TYPE dblink_pkey_results; -- 1.6.5.3
pgsql-hackers by date: