diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 0abde43..40db40d 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -26,6 +26,7 @@ #include "storage/shmem.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" +#include "utils/cmdstatus.h" /* @@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_CMDSTATUS_INFO)) + HandleCmdStatusInfoInterrupt(); + if (set_latch_on_sigusr1) SetLatch(MyLatch); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ce4bdaf..5d5df58 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -67,6 +67,7 @@ #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" +#include "utils/cmdstatus.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -2991,6 +2992,9 @@ ProcessInterrupts(void) if (ParallelMessagePending) HandleParallelMessages(); + + if (CmdStatusInfoRequested) + ProcessCmdStatusInfoRequest(); } diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile index 3ed0b44..2c8687c 100644 --- a/src/backend/utils/adt/Makefile +++ b/src/backend/utils/adt/Makefile @@ -18,7 +18,7 @@ endif # keep this list arranged alphabetically or it gets to be a mess OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \ array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \ - bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \ + bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \ encode.o enum.o expandeddatum.o \ float.o format_type.o formatting.o genfile.o \ geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \ diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c new file mode 100644 index 0000000..5f31a2d --- /dev/null +++ b/src/backend/utils/adt/cmdstatus.c @@ -0,0 +1,567 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.c + * Definitions for pg_cmdstatus function. + * + * Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/utils/adt/cmdstatus.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/htup_details.h" +#include "commands/explain.h" +#include "lib/stringinfo.h" +#include "storage/latch.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "tcop/dest.h" +#include "tcop/pquery.h" +#include "utils/builtins.h" +#include "utils/cmdstatus.h" + + +typedef enum { + CMD_STATUS_REQUEST_EXPLAIN = 1, + CMD_STATUS_REQUEST_QUERY_TEXT = 2, + CMD_STATUS_REQUEST_PROGRESS_TAG = 3, + CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4 +} CmdStatusInfoRequestType; + +#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE + +typedef enum { + CMD_STATUS_RESULT_FAILURE = -1, + CMD_STATUS_RESULT_SUCCESS = 0, + CMD_STATUS_RESULT_BACKEND_IDLE, + CMD_STATUS_RESULT_NO_COMMAND_TAG +} CmdStatusInfoResultCode; + +typedef struct { + LWLock *lock; + pid_t target_pid; + pid_t sender_pid; + CmdStatusInfoRequestType request_type; + CmdStatusInfoResultCode result_code; + char buffer[FLEXIBLE_ARRAY_MEMBER]; +} CmdStatusInfo; + +#define BUFFER_SIZE 8192 + +/* + * These structs are allocated on the program stack as local variables in the + * ExecutorRun hook. The top of stack is current_query_stack, see below. + */ +typedef struct CmdInfoStack { + QueryDesc *query_desc; + struct CmdInfoStack *parent; +} CmdInfoStack; + + +bool CmdStatusInfoEnabled = true; +bool CmdStatusInfoRequested = false; + +static CmdStatusInfo *cmd_status_info = NULL; +static CmdInfoStack *current_query_stack = NULL; +static int query_stack_size = 0; + +static ExecutorRun_hook_type prev_ExecutorRun = NULL; + +static void attach_shmem(void); +static void cmdstatus_ExecutorRun(QueryDesc *queryDesc, + ScanDirection direction, long count); + + +void +InstallCmdStatusInfoHooks(void) +{ + /*elog(LOG, "InstallCmdStatusInfoHooks");*/ + + prev_ExecutorRun = ExecutorRun_hook; + ExecutorRun_hook = cmdstatus_ExecutorRun; +} + + +static void +attach_shmem(void) +{ + bool found; + + Assert(sizeof(CmdStatusInfo) < BUFFER_SIZE); + + if (cmd_status_info == NULL) + { + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + cmd_status_info = (CmdStatusInfo *) ShmemInitStruct("cmdstatusinfo", + BUFFER_SIZE, &found); + if (!found) + { + cmd_status_info->lock = LWLockAssign(); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + } + + LWLockRelease(AddinShmemInitLock); + } +} + + +static void +cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +{ + CmdInfoStack current; + + /*ereport(LOG, + (errmsg("ExecutorRun : %p\t%d", queryDesc, query_stack_size), + errhidecontext(true), + errhidestmt(true)));*/ + + current.query_desc = queryDesc; + current.parent = current_query_stack; + + current_query_stack = ¤t; + query_stack_size++; + + PG_TRY(); + { + if (prev_ExecutorRun) + prev_ExecutorRun(queryDesc, direction, count); + else + standard_ExecutorRun(queryDesc, direction, count); + + Assert(current_query_stack == ¤t); + Assert(query_stack_size > 0); + + query_stack_size--; + current_query_stack = current.parent; + } + PG_CATCH(); + { + Assert(current_query_stack == ¤t); + Assert(query_stack_size > 0); + + query_stack_size--; + current_query_stack = current.parent; + + PG_RE_THROW(); + } + PG_END_TRY(); +} + + +static StringInfo +explain_query(QueryDesc *queryDesc) +{ + ExplainState *es; + + es = NewExplainState(); + es->analyze = false; + es->verbose = false; + es->buffers = false; + es->format = EXPLAIN_FORMAT_TEXT; + + ExplainBeginOutput(es); + /* XXX: appendStringInfo(es->str, "#%d ", depth); ? */ + ExplainQueryText(es, queryDesc); + ExplainPrintPlan(es, queryDesc); + ExplainEndOutput(es); + + /* Remove last line break. */ + if (es->str->len > 0 && es->str->data[es->str->len - 1] == '\n') + es->str->data[--es->str->len] = '\0'; + + return es->str; +} + +static void +send_explain_plan(shm_mq_handle *shmq, const char *str) +{ + /* Break plan into lines and send each as a separate message. */ + while (*str) + { + const char *q = strchr(str, '\n'); + Size len = q ? q - str : strlen(str); + + shm_mq_send(shmq, len, str, false); + + if (!q) + break; + + str += len + 1; + } +} + +static void +explain_one_query_desc(shm_mq_handle *shmq, QueryDesc *query_desc) +{ + StringInfo str; + + str = explain_query(query_desc); + + send_explain_plan(shmq, str->data); + + pfree(str->data); + pfree(str); +} + + +/* signal handler for PROCSIG_CMDSTATUS_INFO */ +void +HandleCmdStatusInfoInterrupt(void) +{ + if (CmdStatusInfoEnabled && !CmdStatusInfoRequested) + { + InterruptPending = true; + CmdStatusInfoRequested = true; + + SetLatch(MyLatch); + } +} + +void +ProcessCmdStatusInfoRequest(void) +{ + shm_mq *buffer = NULL; + shm_mq_handle *buffer_handle = NULL; + + if (!(CmdStatusInfoEnabled && CmdStatusInfoRequested)) + return; + + CmdStatusInfoEnabled = false; + CmdStatusInfoRequested = false; + + PG_TRY(); + { + attach_shmem(); + + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + if (cmd_status_info->target_pid == 0) + { + LWLockRelease(cmd_status_info->lock); + + elog(LOG, "target backend PID is not set"); + return; + } + else if (cmd_status_info->target_pid != MyProcPid) + { + pid_t target_pid = cmd_status_info->target_pid; + + LWLockRelease(cmd_status_info->lock); + + elog(LOG, "target backend PID doesn't match: expected %d, but seen %d", + MyProcPid, target_pid); + return; + } + + buffer = (shm_mq *) &cmd_status_info->buffer; + shm_mq_set_sender(buffer, MyProc); + + buffer_handle = shm_mq_attach(buffer, NULL, NULL); + + /* Show some optimism. */ + cmd_status_info->result_code = CMD_STATUS_RESULT_SUCCESS; + + /* + * It should be safe to release the lock now, since the IO + * synchronization is performed on the shared memory queue itself. + */ + LWLockRelease(cmd_status_info->lock); + + if (ActivePortal) + { + switch (cmd_status_info->request_type) + { + case CMD_STATUS_REQUEST_EXPLAIN: + explain_one_query_desc(buffer_handle, ActivePortal->queryDesc); + break; + + case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE: + { + CmdInfoStack *query; + + if (current_query_stack != NULL) + { + for (query = current_query_stack; + query != NULL; + query = query->parent) + { + explain_one_query_desc(buffer_handle, query->query_desc); + } + } + else + { + /* XXX */ + cmd_status_info->result_code = CMD_STATUS_RESULT_BACKEND_IDLE; + } + break; + } + + case CMD_STATUS_REQUEST_QUERY_TEXT: + /* XXX: how useful is this one? we already have pg_stat_activity */ + shm_mq_send(buffer_handle, strlen(ActivePortal->sourceText), + ActivePortal->sourceText, false); + break; + + case CMD_STATUS_REQUEST_PROGRESS_TAG: + if (ActivePortal->commandTag != NULL) + { + if (ActivePortal->queryDesc != NULL && + ActivePortal->queryDesc->estate != NULL) + { + char completionTag[COMPLETION_TAG_BUFSIZE]; + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %u", + ActivePortal->commandTag, + ActivePortal->queryDesc->estate->es_processed); + + shm_mq_send(buffer_handle, strlen(completionTag), + completionTag, false); + } + else + { + /* no progress available, at least show the command tag */ + shm_mq_send(buffer_handle, strlen(ActivePortal->commandTag), + ActivePortal->commandTag, false); + } + } + else + { + cmd_status_info->result_code = CMD_STATUS_RESULT_NO_COMMAND_TAG; + } + break; + } + } + else + { + cmd_status_info->result_code = CMD_STATUS_RESULT_BACKEND_IDLE; + } + + shm_mq_detach(buffer); + + CmdStatusInfoEnabled = true; + } + PG_CATCH(); + { + if (buffer_handle) + shm_mq_detach(buffer); + + CmdStatusInfoEnabled = true; + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +static void +report_result_code(CmdStatusInfoResultCode result, pid_t target_pid) +{ + switch (result) + { + case CMD_STATUS_RESULT_BACKEND_IDLE: + elog(INFO, "no command is currently running in backend with PID %d", + target_pid); + break; + + case CMD_STATUS_RESULT_NO_COMMAND_TAG: + elog(WARNING, "no command tag found for the query in backend with PID %d", + target_pid); + break; + + default: + elog(ERROR, "general command status request failure"); + break; + } +} + +/* + * try to get status of command in another process + * + * FUNCTION get_cmdstatus(pid, request_type) + * RETURNS SETOF text + */ +Datum +pg_cmdstatus(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Tuplestorestate *tupstore; + TupleDesc tupdesc; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + pid_t target_pid = (pid_t) PG_GETARG_INT32(0); + int request_type = PG_GETARG_INT32(1); + PGPROC *proc; + + shm_mq *buffer = NULL; + shm_mq_handle *buffer_handle = NULL; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unknown command status request"))); + + /* need to build tuplestore in query context */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + tupstore = tuplestore_begin_heap(false, false, work_mem); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + bool done; + + if (target_pid == MyProcPid) + ereport(ERROR, + (errmsg("backend cannot query command status of itself"))); + + /* verify access to target_pid */ + proc = BackendPidGetProc(target_pid); + + if (proc == NULL) + ereport(ERROR, + (errmsg("PID %d is not a PostgreSQL server process", target_pid))); + + if (!(superuser() || proc->roleId == GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or have the same role to cancel queries running in other server processes")))); + + attach_shmem(); + + /* wait before the shared status struct is free to be used */ + for (;;) + { + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + if (cmd_status_info->target_pid == 0) + { + cmd_status_info->target_pid = target_pid; + cmd_status_info->sender_pid = MyProcPid; + cmd_status_info->request_type = request_type; + cmd_status_info->result_code = CMD_STATUS_RESULT_FAILURE; + + buffer = shm_mq_create(&cmd_status_info->buffer, + BUFFER_SIZE - offsetof(CmdStatusInfo, buffer)); + + shm_mq_set_receiver(buffer, MyProc); + + buffer_handle = shm_mq_attach(buffer, NULL, NULL); + + LWLockRelease(cmd_status_info->lock); + break; + } + else + { + LWLockRelease(cmd_status_info->lock); + pg_usleep(1000L); + } + } + + if (SendProcSignal(target_pid, PROCSIG_CMDSTATUS_INFO, InvalidBackendId) < 0) + elog(ERROR, "could not signal backend with PID %d", target_pid); + + done = false; + while (!done) + { + char *data; + Size len; + shm_mq_result res; + + res = shm_mq_receive(buffer_handle, &len, (void **) &data, + /* nowait = */ false); + switch (res) + { + case SHM_MQ_SUCCESS: + { + Datum value; + HeapTuple tuple; + bool isnull = false; + + value = PointerGetDatum(cstring_to_text_with_len(data, len)); + + tuple = heap_form_tuple(tupdesc, &value, &isnull); + tuplestore_puttuple(tupstore, tuple); + break; + } + + case SHM_MQ_DETACHED: + if (cmd_status_info->result_code != CMD_STATUS_RESULT_SUCCESS) + { + report_result_code(cmd_status_info->result_code, target_pid); + } + done = true; + break; + + default: + Assert(!"unknown shm_mq_result code"); + } + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + shm_mq_detach(buffer); + + /* clean the status struct for the next user */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + + LWLockRelease(cmd_status_info->lock); + } + PG_CATCH(); + { + if (buffer_handle) + shm_mq_detach(buffer); + + if (cmd_status_info) + { + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + + LWLockRelease(cmd_status_info->lock); + } + + PG_RE_THROW(); + } + PG_END_TRY(); + + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + return (Datum) 0; +} diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 7b19714..19e7009 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -50,6 +50,7 @@ #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" +#include "utils/cmdstatus.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -1017,6 +1018,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, /* initialize client encoding */ InitializeClientEncoding(); + InstallCmdStatusInfoHooks(); + /* report this backend in the PgBackendStatus array */ if (!bootstrap) pgstat_bestart(); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index ddf7c67..d083aaf 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any"); DATA(insert OID = 2855 ( pg_is_other_temp_schema PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ )); DESCR("is schema another session's temp schema?"); +DATA(insert OID = 4099 ( pg_cmdstatus PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ pg_cmdstatus _null_ _null_ _null_ )); +DESCR("returns information about another process"); DATA(insert OID = 2171 ( pg_cancel_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ )); DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index af1a0cd..ab1698c 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -41,6 +41,9 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + /* cmd status info */ + PROCSIG_CMDSTATUS_INFO, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index fc1679e..605612e 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS); /* catalog/objectaddress.c */ extern Datum pg_get_object_address(PG_FUNCTION_ARGS); +/* utils/adt/cmdstatus.c */ +extern Datum pg_cmdstatus(PG_FUNCTION_ARGS); + /* commands/constraint.c */ extern Datum unique_key_recheck(PG_FUNCTION_ARGS); diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h new file mode 100644 index 0000000..52ddf6a --- /dev/null +++ b/src/include/utils/cmdstatus.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.h + * Declarations for command status interrupt handling. + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * src/include/utils/cmdstatus.h + * + *------------------------------------------------------------------------- + */ +#ifndef CMDSTATUS_H +#define CMDSTATUS_H + +extern bool CmdStatusInfoRequested; + +extern void InstallCmdStatusInfoHooks(void); + +extern void HandleCmdStatusInfoInterrupt(void); +extern void ProcessCmdStatusInfoRequest(void); + +#endif /* CMDSTATUS_H */