From a5811a850ebc66b6b6267afe341c3929cbb57b17 Mon Sep 17 00:00:00 2001 From: Amul Sul Date: Fri, 6 Jan 2017 18:13:37 +0530 Subject: [PATCH 2/2] pg_background as client of BackgroundSession-v1 --- contrib/Makefile | 1 + contrib/pg_background/Makefile | 20 ++ contrib/pg_background/expected/pg_background.out | 75 +++++ contrib/pg_background/pg_background--1.0.sql | 37 +++ contrib/pg_background/pg_background.c | 397 +++++++++++++++++++++++ contrib/pg_background/pg_background.control | 4 + contrib/pg_background/sql/pg_background.sql | 30 ++ 7 files changed, 564 insertions(+) create mode 100644 contrib/pg_background/Makefile create mode 100644 contrib/pg_background/expected/pg_background.out create mode 100644 contrib/pg_background/pg_background--1.0.sql create mode 100644 contrib/pg_background/pg_background.c create mode 100644 contrib/pg_background/pg_background.control create mode 100644 contrib/pg_background/sql/pg_background.sql diff --git a/contrib/Makefile b/contrib/Makefile index 25263c0..04ec28a 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -29,6 +29,7 @@ SUBDIRS = \ oid2name \ pageinspect \ passwordcheck \ + pg_background \ pg_buffercache \ pg_freespacemap \ pg_prewarm \ diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile new file mode 100644 index 0000000..085fbff --- /dev/null +++ b/contrib/pg_background/Makefile @@ -0,0 +1,20 @@ +# contrib/pg_background/Makefile + +MODULE_big = pg_background +OBJS = pg_background.o + +EXTENSION = pg_background +DATA = pg_background--1.0.sql + +REGRESS = pg_background + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_background +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out new file mode 100644 index 0000000..23679e6 --- /dev/null +++ b/contrib/pg_background/expected/pg_background.out @@ -0,0 +1,75 @@ +CREATE EXTENSION pg_background; +--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively +CREATE TABLE input AS + SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n + FROM generate_series(0,.5,0.1) x + ORDER BY x DESC; +CREATE TABLE output(place int,value float); +--display active background workers +SELECT count(*) as active_background_workers FROM pg_background_show(); + active_background_workers +--------------------------- + 6 +(1 row) + +--run sql query +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_run(input.pid, + format($$ + SELECT pg_sleep(%s)::text; + INSERT INTO output VALUES (%s, %s); + $$, + x, n, x + ) + ) ON (true); + worker_num +------------ + 6 + 5 + 4 + 3 + 2 + 1 +(6 rows) + +--wait until everyone finishes +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true); + worker_num +------------ + 6 + 5 + 4 + 3 + 2 + 1 +(6 rows) + +--output results +SELECT * FROM output ORDER BY place; + place | value +-------+------- + 1 | 0 + 2 | 0.1 + 3 | 0.2 + 4 | 0.3 + 5 | 0.4 + 6 | 0.5 +(6 rows) + +--cleanup +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_close(input.pid) ON (true); + worker_num +------------ + 6 + 5 + 4 + 3 + 2 + 1 +(6 rows) + +DROP TABLE input; +DROP TABLE output; +DROP EXTENSION pg_background; diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql new file mode 100644 index 0000000..62a9d18 --- /dev/null +++ b/contrib/pg_background/pg_background--1.0.sql @@ -0,0 +1,37 @@ +/* contrib/pg_background/pg_background--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_background" to load this file. \quit + +CREATE FUNCTION pg_background_show() + RETURNS TABLE(background_worker_id pg_catalog.int4, + num_of_results_waiting_to_read pg_catalog.int4) STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION pg_background_start() + RETURNS pg_catalog.int4 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION pg_background_run(pid pg_catalog.int4, sql pg_catalog.text) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION pg_background_result(pid pg_catalog.int4) + RETURNS SETOF pg_catalog.record STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION pg_background_close(pid pg_catalog.int4) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +REVOKE ALL ON FUNCTION pg_background_show() + FROM public; +REVOKE ALL ON FUNCTION pg_background_start() + FROM public; +REVOKE ALL ON FUNCTION pg_background_run(pid pg_catalog.int4, + sql pg_catalog.text) + FROM public; +REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4) + FROM public; +REVOKE ALL ON FUNCTION pg_background_close(pg_catalog.int4) + FROM public; diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c new file mode 100644 index 0000000..e5a5b35 --- /dev/null +++ b/contrib/pg_background/pg_background.c @@ -0,0 +1,397 @@ +/*-------------------------------------------------------------------------- + * + * pg_background.c + * Run SQL commands using a background worker. + * + * Copyright (C) 2017, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/pg_background/pg_background.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "tcop/bgsession.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + +/* Private state maintained by the launching backend for IPC. */ +typedef struct pg_background_worker_info +{ + int32 pid; + Oid current_user_id; + BackgroundSession *session; + uint32 result_count; +} pg_background_worker_info; + +/* Private state maintained across calls to pg_background_result. */ +typedef struct pg_background_result_state +{ + pg_background_worker_info *info; + BackgroundSessionResult *result; +} pg_background_result_state; + +static HTAB *worker_hash = NULL; + +static void remove_worker_info(int32 pid); +static pg_background_worker_info *find_worker_info(int32 pid); +static void save_worker_info(int32 pid, BackgroundSession *session); +static void check_rights(pg_background_worker_info *info); + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(pg_background_show); +PG_FUNCTION_INFO_V1(pg_background_start); +PG_FUNCTION_INFO_V1(pg_background_run); +PG_FUNCTION_INFO_V1(pg_background_result); +PG_FUNCTION_INFO_V1(pg_background_close); + +/* + * Display the list of background worker previously launched in this session. + */ +Datum +pg_background_show(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + HASH_SEQ_STATUS *hash_seq; + TupleDesc tupdesc; + pg_background_worker_info *info; + + /* First-time setup. */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + funcctx = SRF_FIRSTCALL_INIT(); + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* Construct a tuple descriptor for the result rows. */ + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "results", + INT4OID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + if (worker_hash) + { + hash_seq = palloc(sizeof(HASH_SEQ_STATUS)); + + hash_seq_init(hash_seq, worker_hash); + funcctx->user_fctx = hash_seq; + } + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx; + + while (hash_seq && (info = hash_seq_search(hash_seq)) != NULL) + { + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + + values[0] = Int32GetDatum(info->pid); + nulls[0] = false; + values[1] = Int32GetDatum(info->result_count); + nulls[1] = false; + + /* Build and return the tuple. */ + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Start a dynamic background worker. + */ +Datum +pg_background_start(PG_FUNCTION_ARGS) +{ + BackgroundSession *session; + int32 pid; + + session = BackgroundSessionStart(TopMemoryContext); + pid = session->pid; + + /* Save worker info */ + save_worker_info(pid, session); + + /* Return the worker's PID. */ + PG_RETURN_INT32(pid); +} + +/* + * Run a user-specified SQL command. + */ +Datum +pg_background_run(PG_FUNCTION_ARGS) +{ + char *sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + int32 pid = PG_GETARG_INT32(0); + pg_background_worker_info *info; + + /* See if we have a connection to the specified PID. */ + if ((info = find_worker_info(pid)) == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("PID %d is not attached to this session", pid))); + check_rights(info); + + /* Execute give SQL query */ + BackgroundSessionSend(info->session, sql); + info->result_count++; + + PG_RETURN_VOID(); +} + +/* + * Retrieve the results of a background query previously launched in this + * session. + */ +Datum +pg_background_result(PG_FUNCTION_ARGS) +{ + int32 pid = PG_GETARG_INT32(0); + FuncCallContext *funcctx; + pg_background_result_state *state; + TupleDesc tupdesc; + BackgroundSessionResult *result; + + /* First-time setup. */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + pg_background_worker_info *info; + + funcctx = SRF_FIRSTCALL_INIT(); + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* See if we have a connection to the specified PID. */ + if ((info = find_worker_info(pid)) == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("PID %d is not attached to this session", pid))); + check_rights(info); + + /* Can't read results twice. */ + if (info->result_count <= 0) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("results for PID %d have already been consumed", pid))); + info->result_count--; + + /* Set up tuple-descriptor based on column definition list. */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"), + errhint("Try calling the function in the FROM clause " + "using a column definition list."))); + result = BackgroundSessionGetResult(info->session); + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + if (result->tupdesc) + { + bool datatype_mismatch = false; + + if (tupdesc->natts != result->tupdesc->natts) + datatype_mismatch = true; + else + { + int i; + for (i = 0; i < tupdesc->natts; ++i) + if(tupdesc->attrs[i]->atttypid != + result->tupdesc->attrs[i]->atttypid) + { + datatype_mismatch = true; + break; + } + } + + if (datatype_mismatch) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match the specified FROM clause rowtype"))); + } + + /* Cache state that will be needed on every call. */ + state = palloc0(sizeof(pg_background_result_state)); + state->info = info; + state->result = result; + + funcctx->user_fctx = state; + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + tupdesc = funcctx->tuple_desc; + state = funcctx->user_fctx; + result = state->result; + + if (result->tupdesc) + { + if (result->tuples != NIL && funcctx->call_cntr > 0) + result->tuples = list_delete_first(result->tuples); + + if (result->tuples != NIL) + { + HeapTuple tuple = (HeapTuple) linitial(result->tuples); + + /* + * Set the tuple type ID information fields correctly because + * BackgroundSessionFetchResult returns it as an anonymous record + * type. + */ + HeapTupleHeaderSetTypeId(tuple->t_data, tupdesc->tdtypeid); + HeapTupleHeaderSetTypMod(tuple->t_data, tupdesc->tdtypmod); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + } + else /* If no data rows, return the command tags instead. */ + { + if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query did not return a result set, but result rowtype is not a single text column"))); + + if (result->command != NULL) + { + bool isnull = false; + Datum value = PointerGetDatum(cstring_to_text(result->command)); + HeapTuple tuple = heap_form_tuple(tupdesc, &value, &isnull); + + result->command = NULL; + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * End background session and remove hashtable entry. + */ +Datum +pg_background_close(PG_FUNCTION_ARGS) +{ + int32 pid = PG_GETARG_INT32(0); + pg_background_worker_info *info; + + info = find_worker_info(pid); + if (info == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("PID %d is not attached to this session", pid))); + + check_rights(info); + remove_worker_info(pid); + BackgroundSessionEnd(info->session); + + PG_RETURN_VOID(); +} + +static void +remove_worker_info(int32 pid) +{ + bool found; + + /* Remove the hashtable entry. */ + hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found); + if (!found) + elog(ERROR, "pg_background worker_hash table corrupted"); +} + +/* + * Find the background worker information for the worker with a given PID. + */ +static pg_background_worker_info * +find_worker_info(int32 pid) +{ + pg_background_worker_info *info = NULL; + + if (worker_hash != NULL) + info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL); + + return info; +} + +/* + * Save worker info. + */ +static void +save_worker_info(int32 pid, BackgroundSession *session) +{ + pg_background_worker_info *info; + Oid current_user_id; + int sec_context; + + /* If the hash table hasn't been set up yet, do that now. */ + if (worker_hash == NULL) + { + HASHCTL ctl; + + ctl.keysize = sizeof(int32); + ctl.entrysize = sizeof(pg_background_worker_info); + worker_hash = hash_create("pg_background worker_hash", 8, &ctl, + HASH_ELEM); + } + + /* Get current authentication information. */ + GetUserIdAndSecContext(¤t_user_id, &sec_context); + + /* + * In the unlikely event that there's an older worker with this PID, + * just detach it - unless it has a different user ID than the + * currently-active one, in which case someone might be trying to pull + * a fast one. Let's kill the backend to make sure we don't break + * anyone's expectations. + */ + if ((info = find_worker_info(pid)) != NULL) + { + if (current_user_id != info->current_user_id) + ereport(FATAL, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("background worker with PID \"%d\" already exists", + pid))); + } + + /* Create a new entry for this worker. */ + info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL); + info->session = session; + info->result_count = 0; + info->current_user_id = current_user_id; +} + +/* + * Check whether the current user has rights to manipulate the background + * worker with the given PID. + */ +static void +check_rights(pg_background_worker_info *info) +{ + Oid current_user_id; + int sec_context; + + GetUserIdAndSecContext(¤t_user_id, &sec_context); + if (!has_privs_of_role(current_user_id, info->current_user_id)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for background worker with PID \"%d\"", + info->pid))); +} diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control new file mode 100644 index 0000000..733d0e1 --- /dev/null +++ b/contrib/pg_background/pg_background.control @@ -0,0 +1,4 @@ +comment = 'Run SQL queries in the background' +default_version = '1.0' +module_pathname = '$libdir/pg_background' +relocatable = true diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql new file mode 100644 index 0000000..be49dd5 --- /dev/null +++ b/contrib/pg_background/sql/pg_background.sql @@ -0,0 +1,30 @@ +CREATE EXTENSION pg_background; +--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively +CREATE TABLE input AS + SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n + FROM generate_series(0,.5,0.1) x + ORDER BY x DESC; +CREATE TABLE output(place int,value float); +--display active background workers +SELECT count(*) as active_background_workers FROM pg_background_show(); +--run sql query +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_run(input.pid, + format($$ + SELECT pg_sleep(%s)::text; + INSERT INTO output VALUES (%s, %s); + $$, + x, n, x + ) + ) ON (true); +--wait until everyone finishes +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true); +--output results +SELECT * FROM output ORDER BY place; +--cleanup +SELECT n as worker_num +FROM input JOIN LATERAL pg_background_close(input.pid) ON (true); +DROP TABLE input; +DROP TABLE output; +DROP EXTENSION pg_background; -- 2.6.2