diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index 21721b4..823d5c3 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/access top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist transam +SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc shmmq spgist transam include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c index baed981..1afac59 100644 --- a/src/backend/access/common/printtup.c +++ b/src/backend/access/common/printtup.c @@ -243,7 +243,19 @@ SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist, int16 *formats) pq_sendint(&buf, 0, 2); } } - pq_endmessage(&buf); + + /* + * Send the message via shared-memory tuple queue, if the same + * is enabled. + */ + if (is_tuple_shm_mq_enabled()) + { + mq_putmessage_direct(buf.cursor, buf.data, buf.len); + pfree(buf.data); + buf.data = NULL; + } + else + pq_endmessage(&buf); } /* @@ -371,7 +383,18 @@ printtup(TupleTableSlot *slot, DestReceiver *self) } } - pq_endmessage(&buf); + /* + * Send the message via shared-memory tuple queue, if the same + * is enabled. + */ + if (is_tuple_shm_mq_enabled()) + { + mq_putmessage_direct(buf.cursor, buf.data, buf.len); + pfree(buf.data); + buf.data = NULL; + } + else + pq_endmessage(&buf); /* Return to caller's context, and flush row's temporary memory */ MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/access/shmmq/Makefile b/src/backend/access/shmmq/Makefile new file mode 100644 index 0000000..aeae8d9 --- /dev/null +++ b/src/backend/access/shmmq/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/shmmq +# +# IDENTIFICATION +# src/backend/access/shmmq/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/shmmq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = shmmqam.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/shmmq/shmmqam.c b/src/backend/access/shmmq/shmmqam.c new file mode 100644 index 0000000..758d7e8 --- /dev/null +++ b/src/backend/access/shmmq/shmmqam.c @@ -0,0 +1,375 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.c + * shared memory queue access method code + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/shmmq/shmmqam.c + * + * + * INTERFACE ROUTINES + * shm_getnext - retrieve next tuple in queue + * + * NOTES + * This file contains the shmmq_ routines which implement + * the POSTGRES shared memory access method used for all POSTGRES + * relations. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/shmmqam.h" +#include "access/tupdesc.h" +#include "fmgr.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/lsyscache.h" + + +static bool +HandleParallelTupleMessage(worker_result resultState, TupleDesc tupdesc, + StringInfo msg, int queueId); +static HeapTuple +form_result_tuple(worker_result resultState, TupleDesc tupdesc, + StringInfo msg, int queueId); + +/* + * shm_beginscan - + * Initializes the shared memory scan descriptor to retrieve tuples + * from worker backends. + */ +ShmScanDesc +shm_beginscan(int num_queues) +{ + ShmScanDesc shmscan; + + shmscan = palloc(sizeof(ShmScanDescData)); + + shmscan->num_shm_queues = num_queues; + shmscan->ss_cqueue = -1; + shmscan->shmscan_inited = false; + + return shmscan; +} + +/* + * ExecInitWorkerResult - + * Initializes the result state to retrieve tuples from worker backends. + */ +worker_result +ExecInitWorkerResult(TupleDesc tupdesc, int nWorkers) +{ + worker_result workerResult; + int i; + int natts = tupdesc->natts; + + workerResult = palloc0(sizeof(worker_result_state)); + workerResult->receive_functions = palloc(sizeof(FmgrInfo) * natts); + workerResult->typioparams = palloc(sizeof(Oid) * natts); + workerResult->num_shm_queues = nWorkers; + workerResult->has_row_description = palloc0(sizeof(bool) * nWorkers); + workerResult->queue_detached = palloc0(sizeof(bool) * nWorkers); + + for (i = 0; i < natts; ++i) + { + Oid receive_function_id; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &receive_function_id, + &workerResult->typioparams[i]); + fmgr_info(receive_function_id, &workerResult->receive_functions[i]); + } + + return workerResult; +} + + +/* + * shm_getnext - + * Get the next tuple from shared memory queue. This function + * is reponsible for fetching tuples from all the queues associated + * with worker backends used in parallel sequential scan. + */ +HeapTuple +shm_getnext(HeapScanDesc scanDesc, ShmScanDesc shmScan, + worker_result resultState, shm_mq_handle **responseq, + TupleDesc tupdesc, ScanDirection direction, bool *fromheap) +{ + shm_mq_result res; + Size nbytes; + void *data; + StringInfoData msg; + int queueId = 0; + + /* + * calculate next starting queue used for fetching tuples + */ + if(!shmScan->shmscan_inited) + { + shmScan->shmscan_inited = true; + Assert(shmScan->num_shm_queues > 0); + queueId = 0; + } + else + queueId = shmScan->ss_cqueue; + + /* Read and processes messages from the shared memory queues. */ + for(;;) + { + if (!resultState->all_queues_detached) + { + if (queueId == shmScan->num_shm_queues) + queueId = 0; + + /* + * Don't fetch from detached queue. This loop could continue + * forever, if we reach a situation such that all queue's are + * detached, however we won't reach here if that is the case. + */ + while (resultState->queue_detached[queueId]) + { + ++queueId; + if (queueId == shmScan->num_shm_queues) + queueId = 0; + } + + for (;;) + { + /* + * mark current queue used for fetching tuples, this is used + * to fetch consecutive tuples from queue used in previous + * fetch. + */ + shmScan->ss_cqueue = queueId; + + /* Get next message. */ + res = shm_mq_receive(responseq[queueId], &nbytes, &data, true); + if (res == SHM_MQ_DETACHED) + { + /* + * mark the queue that got detached, so that we don't + * try to fetch from it again. + */ + resultState->queue_detached[queueId] = true; + resultState->has_row_description[queueId] = false; + --resultState->num_shm_queues; + /* + * if we have exhausted data from all worker queues, then don't + * process data from queues. + */ + if (resultState->num_shm_queues <= 0) + resultState->all_queues_detached = true; + break; + } + else if (res == SHM_MQ_WOULD_BLOCK) + break; + else if (res == SHM_MQ_SUCCESS) + { + bool rettuple; + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + rettuple = HandleParallelTupleMessage(resultState, tupdesc, &msg, queueId); + pfree(msg.data); + if (rettuple) + { + *fromheap = false; + return resultState->tuple; + } + } + } + } + + /* + * if we have checked all the message queue's and didn't find + * any message or we have already fetched all the data from queue's, + * then it's time to fetch directly from heap. Reset the current + * queue as the first queue from which we need to receive tuples. + */ + if ((queueId == shmScan->num_shm_queues - 1 || + resultState->all_queues_detached) && + !resultState->all_heap_fetched) + { + HeapTuple tuple; + shmScan->ss_cqueue = 0; + tuple = heap_getnext(scanDesc, direction); + if (tuple) + { + *fromheap = true; + return tuple; + } + else if (tuple == NULL && resultState->all_queues_detached) + break; + else + resultState->all_heap_fetched = true; + } + else if (resultState->all_queues_detached && + resultState->all_heap_fetched) + break; + + /* check the data in next queue. */ + ++queueId; + } + + return NULL; +} + +/* + * HandleParallelTupleMessage - + * Handle a single tuple related protocol message received from + * a single parallel worker. + */ +static bool +HandleParallelTupleMessage(worker_result resultState, TupleDesc tupdesc, + StringInfo msg, int queueId) +{ + char msgtype; + bool rettuple = false; + + msgtype = pq_getmsgbyte(msg); + + /* Dispatch on message type. */ + switch (msgtype) + { + case 'T': + { + int16 natts = pq_getmsgint(msg, 2); + int16 i; + + if (resultState->has_row_description[queueId]) + elog(ERROR, "multiple RowDescription messages"); + resultState->has_row_description[queueId] = true; + if (natts != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("worker result rowtype does not match " + "the specified FROM clause rowtype"))); + + for (i = 0; i < natts; ++i) + { + Oid type_id; + + (void) pq_getmsgstring(msg); /* name */ + (void) pq_getmsgint(msg, 4); /* table OID */ + (void) pq_getmsgint(msg, 2); /* table attnum */ + type_id = pq_getmsgint(msg, 4); /* type OID */ + (void) pq_getmsgint(msg, 2); /* type length */ + (void) pq_getmsgint(msg, 4); /* typmod */ + (void) pq_getmsgint(msg, 2); /* format code */ + + if (type_id != tupdesc->attrs[i]->atttypid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + + pq_getmsgend(msg); + + break; + } + case 'D': + { + /* Handle DataRow message. */ + resultState->tuple = form_result_tuple(resultState, tupdesc, msg, queueId); + rettuple = true; + break; + } + case 'C': + { + /* + * Handle CommandComplete message. Ignore tags sent by + * worker backend as we are anyway going to use tag of + * master backend for sending the same to client. + */ + (void) pq_getmsgstring(msg); + break; + } + case 'G': + case 'H': + case 'W': + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY protocol not allowed in worker"))); + } + default: + elog(WARNING, "unknown message type: %c", msg->data[0]); + break; + } + + return rettuple; +} + +/* + * form_result_tuple - + * Parse a DataRow message and form a result tuple. + */ +static HeapTuple +form_result_tuple(worker_result resultState, TupleDesc tupdesc, + StringInfo msg, int queueId) +{ + /* Handle DataRow message. */ + int16 natts = pq_getmsgint(msg, 2); + int16 i; + Datum *values = NULL; + bool *isnull = NULL; + HeapTuple tuple; + StringInfoData buf; + + if (!resultState->has_row_description[queueId]) + elog(ERROR, "DataRow not preceded by RowDescription"); + if (natts != tupdesc->natts) + elog(ERROR, "malformed DataRow"); + if (natts > 0) + { + values = palloc(natts * sizeof(Datum)); + isnull = palloc(natts * sizeof(bool)); + } + initStringInfo(&buf); + + for (i = 0; i < natts; ++i) + { + int32 bytes = pq_getmsgint(msg, 4); + + if (bytes < 0) + { + values[i] = ReceiveFunctionCall(&resultState->receive_functions[i], + NULL, + resultState->typioparams[i], + tupdesc->attrs[i]->atttypmod); + isnull[i] = true; + } + else + { + resetStringInfo(&buf); + appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes); + values[i] = ReceiveFunctionCall(&resultState->receive_functions[i], + &buf, + resultState->typioparams[i], + tupdesc->attrs[i]->atttypmod); + isnull[i] = false; + } + } + + pq_getmsgend(msg); + + tuple = heap_form_tuple(tupdesc, values, isnull); + + /* + * Release locally palloc'd space. XXX would probably be good to pfree + * values of pass-by-reference datums, as well. + */ + pfree(values); + pfree(isnull); + + pfree(buf.data); + + return tuple; +} diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 8a0be5d..bb581a8 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -713,6 +713,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -909,6 +910,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SeqScan: pname = sname = "Seq Scan"; break; + case T_ParallelSeqScan: + pname = sname = "Parallel Seq Scan"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1058,6 +1062,7 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_BitmapHeapScan: case T_TidScan: case T_SubqueryScan: @@ -1324,6 +1329,16 @@ ExplainNode(PlanState *planstate, List *ancestors, show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); break; + case T_ParallelSeqScan: + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Number of Workers", + ((ParallelSeqScan *) plan)->num_workers, es); + ExplainPropertyInteger("Number of Blocks Per Worker", + ((ParallelSeqScan *) plan)->num_blocks_per_worker, es); + break; case T_FunctionScan: if (es->verbose) { @@ -2141,6 +2156,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index af707b0..9a8ca75 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -21,7 +21,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ - nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ + nodeSeqscan.o nodeParallelSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 9892499..f77a77f 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodeParallelSeqscan.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSeqscan.h" @@ -190,6 +191,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_ParallelSeqScan: + result = (PlanState *) ExecInitParallelSeqScan((ParallelSeqScan *) node, + estate, eflags); + break; + case T_IndexScan: result = (PlanState *) ExecInitIndexScan((IndexScan *) node, estate, eflags); @@ -406,6 +412,10 @@ ExecProcNode(PlanState *node) result = ExecSeqScan((SeqScanState *) node); break; + case T_ParallelSeqScanState: + result = ExecParallelSeqScan((ParallelSeqScanState *) node); + break; + case T_IndexScanState: result = ExecIndexScan((IndexScanState *) node); break; @@ -644,6 +654,10 @@ ExecEndNode(PlanState *node) ExecEndSeqScan((SeqScanState *) node); break; + case T_ParallelSeqScanState: + ExecEndParallelSeqScan((ParallelSeqScanState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c index 3f0d809..39c624d 100644 --- a/src/backend/executor/execScan.c +++ b/src/backend/executor/execScan.c @@ -191,8 +191,17 @@ ExecScan(ScanState *node, * check for non-nil qual here to avoid a function call to ExecQual() * when the qual is nil ... saves only a few cycles, but they add up * ... + * + * check for non-heap tuples (can get such tuples from shared memory + * message queue's in case of parallel query), for such tuples no need + * to perform qualification as for them the same is done by backend + * worker. This case will happen only for parallel query where we push + * down the qualification. + * XXX - We can do this optimization for projection as well, but for + * now it is okay, as we don't allow parallel query if there are + * expressions involved in target list. */ - if (!qual || ExecQual(qual, econtext, false)) + if (!slot->tts_fromheap || !qual || ExecQual(qual, econtext, false)) { /* * Found a satisfactory scan tuple. diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 753754d..4c5bd88 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -123,6 +123,7 @@ MakeTupleTableSlot(void) slot->tts_values = NULL; slot->tts_isnull = NULL; slot->tts_mintuple = NULL; + slot->tts_fromheap = true; return slot; } @@ -473,6 +474,8 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ slot->tts_isempty = true; slot->tts_nvalid = 0; + slot->tts_fromheap = true; + return slot; } diff --git a/src/backend/executor/nodeParallelSeqscan.c b/src/backend/executor/nodeParallelSeqscan.c new file mode 100644 index 0000000..1855e52 --- /dev/null +++ b/src/backend/executor/nodeParallelSeqscan.c @@ -0,0 +1,318 @@ +/*------------------------------------------------------------------------- + * + * nodeParallelSeqscan.c + * Support routines for parallel sequential scans of relations. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeParallelSeqscan.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * ExecParallelSeqScan sequentially scans a relation. + * ExecSeqNext retrieve next tuple in sequential order. + * ExecInitParallelSeqScan creates and initializes a parallel seqscan node. + * ExecEndParallelSeqScan releases any storage allocated. + */ +#include "postgres.h" + +#include "access/relscan.h" +#include "access/shmmqam.h" +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "executor/execdebug.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodeParallelSeqscan.h" +#include "postmaster/backendworker.h" +#include "utils/rel.h" + + + +/* ---------------------------------------------------------------- + * Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ParallelSeqNext + * + * This is a workhorse for ExecParallelSeqScan + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +ParallelSeqNext(ParallelSeqScanState *node) +{ + HeapTuple tuple; + HeapScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + bool fromheap = true; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + slot = node->ss.ss_ScanTupleSlot; + + /* + * get the next tuple from the table based on result tuple descriptor. + */ + tuple = shm_getnext(scandesc, node->pss_currentShmScanDesc, + node->pss_workerResult, + node->responseq, + node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor, + direction, &fromheap); + + slot->tts_fromheap = fromheap; + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot and return the slot. Note: we pass '!fromheap' + * because tuples returned by shm_getnext() are either pointers that are + * created with palloc() or are pointers onto disk pages and so it should + * be pfree()'d accordingly. Note also that ExecStoreTuple will increment + * the refcount of the buffer; the refcount will not be dropped until the + * tuple table slot is cleared. + */ + if (tuple) + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + fromheap ? scandesc->rs_cbuf : InvalidBuffer, /* buffer associated with this + * tuple */ + !fromheap); /* pfree this pointer if not from heap */ + else + ExecClearTuple(slot); + + return slot; +} + +/* + * ParallelSeqRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +ParallelSeqRecheck(SeqScanState *node, TupleTableSlot *slot) +{ + /* + * Note that unlike IndexScan, ParallelSeqScan never use keys in + * shm_beginscan/heap_beginscan (and this is very bad) - so, here + * we do not check are keys ok or not. + */ + return true; +} + +/* ---------------------------------------------------------------- + * InitParallelScanRelation + * + * Set up to access the scan relation. + * ---------------------------------------------------------------- + */ +static void +InitParallelScanRelation(SeqScanState *node, EState *estate, int eflags) +{ + Relation currentRelation; + HeapScanDesc currentScanDesc; + + /* + * get the relation object id from the relid'th entry in the range table, + * open that relation and acquire appropriate lock on it. + */ + currentRelation = ExecOpenScanRelation(estate, + ((SeqScan *) node->ps.plan)->scanrelid, + eflags); + + /* initialize a heapscan */ + currentScanDesc = heap_beginscan(currentRelation, + estate->es_snapshot, + 0, + NULL); + + node->ss_currentRelation = currentRelation; + node->ss_currentScanDesc = currentScanDesc; + + /* and report the scan tuple slot's rowtype */ + ExecAssignScanType(node, RelationGetDescr(currentRelation)); +} + + +/* ---------------------------------------------------------------- + * ExecInitParallelSeqScan + * ---------------------------------------------------------------- + */ +ParallelSeqScanState * +ExecInitParallelSeqScan(ParallelSeqScan *node, EState *estate, int eflags) +{ + ParallelSeqScanState *parallelscanstate; + ShmScanDesc currentShmScanDesc; + worker_result workerResult; + BlockNumber end_block; + + /* + * Once upon a time it was possible to have an outerPlan of a SeqScan, but + * not any more. + */ + Assert(outerPlan(node) == NULL); + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + parallelscanstate = makeNode(ParallelSeqScanState); + parallelscanstate->ss.ps.plan = (Plan *) node; + parallelscanstate->ss.ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, ¶llelscanstate->ss.ps); + + /* + * initialize child expressions + */ + parallelscanstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->scan.plan.targetlist, + (PlanState *) parallelscanstate); + parallelscanstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->scan.plan.qual, + (PlanState *) parallelscanstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, ¶llelscanstate->ss.ps); + ExecInitScanTupleSlot(estate, ¶llelscanstate->ss); + + /* + * initialize scan relation + */ + InitParallelScanRelation(¶llelscanstate->ss, estate, eflags); + + parallelscanstate->ss.ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(¶llelscanstate->ss.ps); + ExecAssignScanProjectionInfo(¶llelscanstate->ss); + + /* + * If we are just doing EXPLAIN (ie, aren't going to run the plan), stop + * here, no need to start workers. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return parallelscanstate; + + /* Initialize the workers required to perform parallel scan. */ + InitiateWorkers(parallelscanstate->ss.ss_currentRelation->rd_id, + node->scan.plan.targetlist, + node->scan.plan.qual, + ¶llelscanstate->responseq, + ¶llelscanstate->pcxt, + node->num_blocks_per_worker, + node->num_workers); + + /* Initialize the blocks to be scanned by master backend. */ + end_block = (parallelscanstate->pcxt->nworkers + 1) * + node->num_blocks_per_worker; + ((SeqScan*) parallelscanstate->ss.ps.plan)->startblock = + end_block - node->num_blocks_per_worker; + /* + * As master backend is the last backend to scan the blocks, it + * should scan all the blocks. + */ + ((SeqScan*) parallelscanstate->ss.ps.plan)->endblock = InvalidBlockNumber; + + /* Set the scan limits for master backend. */ + heap_setscanlimits(parallelscanstate->ss.ss_currentScanDesc, + ((SeqScan*) parallelscanstate->ss.ps.plan)->startblock, + (parallelscanstate->ss.ss_currentScanDesc->rs_nblocks - + ((SeqScan*) parallelscanstate->ss.ps.plan)->startblock)); + + /* + * Use result tuple descriptor to fetch data from shared memory queues + * as the worker backends would have put the data after projection. + * Number of queue's must be equal to number of worker backends. + */ + currentShmScanDesc = shm_beginscan(parallelscanstate->pcxt->nworkers); + workerResult = ExecInitWorkerResult(parallelscanstate->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor, + parallelscanstate->pcxt->nworkers); + + parallelscanstate->pss_currentShmScanDesc = currentShmScanDesc; + parallelscanstate->pss_workerResult = workerResult; + + return parallelscanstate; +} + +/* ---------------------------------------------------------------- + * ExecParallelSeqScan(node) + * + * Scans the relation sequentially from multiple workers and returns + * the next qualifying tuple. + * We call the ExecScan() routine and pass it the appropriate + * access method functions. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecParallelSeqScan(ParallelSeqScanState *node) +{ + return ExecScan((ScanState *) &node->ss, + (ExecScanAccessMtd) ParallelSeqNext, + (ExecScanRecheckMtd) ParallelSeqRecheck); +} + +/* ---------------------------------------------------------------- + * ExecEndParallelSeqScan + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndParallelSeqScan(ParallelSeqScanState *node) +{ + Relation relation; + HeapScanDesc scanDesc; + + /* + * get information from node + */ + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * clean out the tuple table + */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* + * close heap scan + */ + heap_endscan(scanDesc); + + /* + * close the heap relation. + */ + ExecCloseScanRelation(relation); + + if (node->pcxt) + { + /* destroy parallel context. */ + DestroyParallelContext(node->pcxt); + + ExitParallelMode(); + } +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 3cb81fc..5780df0 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -139,6 +139,22 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags) 0, NULL); + /* + * set the scan limits, if requested by plan. If the end block + * is not specified, then scan all the blocks till end. + */ + if (((SeqScan *) node->ps.plan)->startblock != InvalidBlockNumber && + ((SeqScan *) node->ps.plan)->endblock != InvalidBlockNumber) + heap_setscanlimits(currentScanDesc, + ((SeqScan *) node->ps.plan)->startblock, + (((SeqScan *) node->ps.plan)->endblock - + ((SeqScan *) node->ps.plan)->startblock)); + else if (((SeqScan *) node->ps.plan)->startblock != InvalidBlockNumber) + heap_setscanlimits(currentScanDesc, + ((SeqScan *) node->ps.plan)->startblock, + (currentScanDesc->rs_nblocks - + ((SeqScan *) node->ps.plan)->startblock)); + node->ss_currentRelation = currentRelation; node->ss_currentScanDesc = currentScanDesc; diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index f12f2d5..cfab8b5 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -26,6 +26,8 @@ static bool pq_mq_busy = false; static pid_t pq_mq_parallel_master_pid = 0; static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; +static shm_mq_handle *pq_mq_tuple_handle = NULL; + static void mq_comm_reset(void); static int mq_flush(void); static int mq_flush_if_writable(void); @@ -61,6 +63,26 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh) } /* + * Arrange to send some frontend/backend protocol messages to a shared-memory + * tuple message queue. + */ +void +pq_redirect_to_tuple_shm_mq(shm_mq_handle *mqh) +{ + pq_mq_tuple_handle = mqh; +} + +/* + * Check if tuples can be sent through tuple shared-memory + * message queue. + */ +bool +is_tuple_shm_mq_enabled(void) +{ + return pq_mq_tuple_handle ? true : false; +} + +/* * Arrange to SendProcSignal() to the parallel master each time we transmit * message data via the shm_mq. */ @@ -161,6 +183,42 @@ mq_putmessage(char msgtype, const char *s, size_t len) return 0; } +/* + * Transmit a libpq protocol message to the shared memory message queue + * via pq_mq_tuple_handle. We don't include a length word, because the + * receiver will know the length of the message from shm_mq_receive(). + */ +int +mq_putmessage_direct(char msgtype, const char *s, size_t len) +{ + shm_mq_iovec iov[2]; + shm_mq_result result; + + iov[0].data = &msgtype; + iov[0].len = 1; + iov[1].data = s; + iov[1].len = len; + + Assert(pq_mq_tuple_handle != NULL); + + for (;;) + { + result = shm_mq_sendv(pq_mq_tuple_handle, iov, 2, true); + + if (result != SHM_MQ_WOULD_BLOCK) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } + + Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); + if (result != SHM_MQ_SUCCESS) + return EOF; + return 0; +} + static void mq_putmessage_noblock(char msgtype, const char *s, size_t len) { diff --git a/src/backend/optimizer/path/Makefile b/src/backend/optimizer/path/Makefile index 6864a62..6e462b1 100644 --- a/src/backend/optimizer/path/Makefile +++ b/src/backend/optimizer/path/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = allpaths.o clausesel.o costsize.o equivclass.o indxpath.o \ - joinpath.o joinrels.o pathkeys.o tidpath.o + joinpath.o joinrels.o pathkeys.o parallelpath.o tidpath.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 58d78e6..528727c 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -410,6 +410,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* Consider sequential scan */ add_path(rel, create_seqscan_path(root, rel, required_outer)); + /* Consider parallel scans */ + create_parallelscan_paths(root, rel); + /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 020558b..4abfd25 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -11,6 +11,9 @@ * cpu_tuple_cost Cost of typical CPU time to process a tuple * cpu_index_tuple_cost Cost of typical CPU time to process an index tuple * cpu_operator_cost Cost of CPU time to execute an operator or function + * cpu_tuple_comm_cost Cost of CPU time to pass a tuple from worker to master backend + * parallel_setup_cost Cost of setting up shared memory for parallelism + * parallel_startup_cost Cost of starting up parallel workers * * We expect that the kernel will typically do some amount of read-ahead * optimization; this in conjunction with seek costs means that seq_page_cost @@ -101,11 +104,16 @@ double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; +double cpu_tuple_comm_cost = DEFAULT_CPU_TUPLE_COMM_COST; +double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST; +double parallel_startup_cost = DEFAULT_PARALLEL_STARTUP_COST; int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int parallel_seqscan_degree = 0; + bool enable_seqscan = true; bool enable_indexscan = true; bool enable_indexonlyscan = true; @@ -219,6 +227,73 @@ cost_seqscan(Path *path, PlannerInfo *root, } /* + * cost_parallelseqscan + * Determines and returns the cost of scanning a relation parallely. + * + * 'baserel' is the relation to be scanned + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + */ +void +cost_parallelseqscan(ParallelSeqPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, int nWorkers) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + double spc_seq_page_cost; + QualCost qpqual_cost; + Cost cpu_per_tuple; + + /* Should only be applied to base relations */ + Assert(baserel->relid > 0); + Assert(baserel->rtekind == RTE_RELATION); + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = baserel->rows; + + if (!enable_seqscan) + startup_cost += disable_cost; + + /* fetch estimated page cost for tablespace containing table */ + get_tablespace_page_costs(baserel->reltablespace, + NULL, + &spc_seq_page_cost); + + /* + * disk costs + */ + run_cost += spc_seq_page_cost * baserel->pages; + + /* CPU costs */ + get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost); + + startup_cost += qpqual_cost.startup; + cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple; + run_cost += cpu_per_tuple * baserel->tuples; + + /* + * Runtime cost will be equally shared by all workers. + * Here assumption is that disk access cost will also be + * equally shared between workers which is generally true + * unless there are too many workers working on a relatively + * lesser number of blocks. If we come across any such case, + * then we can think of changing the current cost model for + * parallel sequiantial scan. + */ + run_cost = run_cost / (nWorkers + 1); + + /* Parallel setup and communication cost. */ + startup_cost += parallel_setup_cost; + startup_cost += parallel_startup_cost * nWorkers; + run_cost += cpu_tuple_comm_cost * baserel->tuples; + + path->path.startup_cost = startup_cost; + path->path.total_cost = (startup_cost + run_cost); +} + +/* * cost_index * Determines and returns the cost of scanning a relation using an index. * diff --git a/src/backend/optimizer/path/parallelpath.c b/src/backend/optimizer/path/parallelpath.c new file mode 100644 index 0000000..5245652 --- /dev/null +++ b/src/backend/optimizer/path/parallelpath.c @@ -0,0 +1,126 @@ +/*------------------------------------------------------------------------- + * + * parallelpath.c + * Routines to determine which conditions are usable for scanning + * a given relation, and create ParallelPaths accordingly. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/path/parallelpath.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/clauses.h" + + +/* + * IsTargetListContainNonVars - + * Check if target list contain non-var entries. + */ +static bool +IsTargetListContainNonVars(List *targetlist) +{ + ListCell *l; + + foreach(l, targetlist) + { + TargetEntry *te = (TargetEntry *) lfirst(l); + + if (!IsA(te, TargetEntry)) + continue; /* probably should never happen */ + if (!IsA(te->expr, Var)) + return true; + } + return false; +} + +/* + * check_simple_qual - + * Check if qual is made only of simple things we can + * hand out directly to backend worker for execution. + * + * XXX - Currently we don't allow to push an expression + * if it contains volatile function, however eventually we + * need a mechanism (proisparallel) with which we can distinquish + * the functions that can be pushed for execution by parallel + * worker. + */ +static bool +check_simple_qual(Node *node) +{ + if (node == NULL) + return TRUE; + + if (contain_volatile_functions(node)) + return FALSE; + + return TRUE; +} + +/* + * create_parallelscan_paths + * Create paths corresponding to parallel scans of the given rel. + * Currently we only support parallel sequential scan. + * + * Candidate paths are added to the rel's pathlist (using add_path). + */ +void +create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel) +{ + int num_parallel_workers = 0; + + /* + * parallel scan is possible only if user has set + * parallel_seqscan_degree to value greater than 0. + */ + if (parallel_seqscan_degree <= 0) + return; + + /* + * parallel scan is not supported for joins. + */ + if (root->simple_rel_array_size > 2) + return; + + /* parallel scan is supportted only for Select statements. */ + if (root->parse->commandType != CMD_SELECT) + return; + + /* + * parallel scan is not supported for non-var target list. + * + * XXX - This is to keep the implementation simple, we can do this + * in future. Here we are checking by passing root->parse->targetList + * instead of rel->reltargetlist because rel->targetlist always contains + * Vars (refer build_base_rel_tlists). + */ + if (IsTargetListContainNonVars(root->parse->targetList)) + return; + + /* + * parallel scan is not supported for mutable functions + */ + if (!check_simple_qual((Node*) extract_actual_clauses(rel->baserestrictinfo, false))) + return; + + /* + * There should be atleast one page to scan for each worker. + */ + if (parallel_seqscan_degree <= rel->pages) + num_parallel_workers = parallel_seqscan_degree; + else + num_parallel_workers = rel->pages; + + add_path(rel, (Path *) create_parallelseqscan_path(root, rel, + num_parallel_workers)); +} diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 655be81..1c7f640 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -58,6 +58,9 @@ static Material *create_material_plan(PlannerInfo *root, MaterialPath *best_path static Plan *create_unique_plan(PlannerInfo *root, UniquePath *best_path); static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Scan *create_parallelseqscan_plan(PlannerInfo *root, + ParallelSeqPath *best_path, + List *tlist, List *scan_clauses); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, List *tlist, List *scan_clauses, bool indexonly); static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root, @@ -100,6 +103,9 @@ static List *order_qual_clauses(PlannerInfo *root, List *clauses); static void copy_path_costsize(Plan *dest, Path *src); static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); +static ParallelSeqScan *make_parallelseqscan(List *qptlist, List *qpqual, + Index scanrelid, int nworkers, + BlockNumber nblocksperworker); static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid, Oid indexid, List *indexqual, List *indexqualorig, List *indexorderby, List *indexorderbyorig, @@ -228,6 +234,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) switch (best_path->pathtype) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -343,6 +350,13 @@ create_scan_plan(PlannerInfo *root, Path *best_path) scan_clauses); break; + case T_ParallelSeqScan: + plan = (Plan *) create_parallelseqscan_plan(root, + (ParallelSeqPath *) best_path, + tlist, + scan_clauses); + break; + case T_IndexScan: plan = (Plan *) create_indexscan_plan(root, (IndexPath *) best_path, @@ -1133,6 +1147,71 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path, } /* + * create_worker_seqscan_plan + * Returns a seqscan plan for the base relation scanned by worker + * with restriction clauses 'scan_clauses' and targetlist 'tlist'. + */ +SeqScan * +create_worker_seqscan_plan(List *targetList, List *scan_clauses, + BlockNumber startBlock, BlockNumber endBlock) +{ + SeqScan *scan_plan; + + /* + * Pass scan_relid as 1, this is okay for now as sequence scan worker + * is allowed to operate on just one relation. + * XXX - we should ideally get scanrelid from master backend. + */ + scan_plan = make_seqscan(targetList, + scan_clauses, + 1); + + scan_plan->startblock = startBlock; + scan_plan->endblock = endBlock; + return scan_plan; +} + +/* + * create_parallelseqscan_plan + * Returns a seqscan plan for the base relation scanned by 'best_path' + * with restriction clauses 'scan_clauses' and targetlist 'tlist'. + */ +static Scan * +create_parallelseqscan_plan(PlannerInfo *root, ParallelSeqPath *best_path, + List *tlist, List *scan_clauses) +{ + Scan *scan_plan; + Index scan_relid = best_path->path.parent->relid; + + /* it should be a base rel... */ + Assert(scan_relid > 0); + Assert(best_path->path.parent->rtekind == RTE_RELATION); + + /* Sort clauses into best execution order */ + scan_clauses = order_qual_clauses(root, scan_clauses); + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + scan_clauses = extract_actual_clauses(scan_clauses, false); + + /* Replace any outer-relation variables with nestloop params */ + if (best_path->path.param_info) + { + scan_clauses = (List *) + replace_nestloop_params(root, (Node *) scan_clauses); + } + + scan_plan = (Scan *) make_parallelseqscan(tlist, + scan_clauses, + scan_relid, + best_path->num_workers, + best_path->num_blocks_per_worker); + + copy_path_costsize(&scan_plan->plan, &best_path->path); + + return scan_plan; +} + +/* * create_indexscan_plan * Returns an indexscan plan for the base relation scanned by 'best_path' * with restriction clauses 'scan_clauses' and targetlist 'tlist'. @@ -3314,6 +3393,30 @@ make_seqscan(List *qptlist, plan->lefttree = NULL; plan->righttree = NULL; node->scanrelid = scanrelid; + node->startblock = InvalidBlockNumber; + node->endblock = InvalidBlockNumber; + + return node; +} + +static ParallelSeqScan * +make_parallelseqscan(List *qptlist, + List *qpqual, + Index scanrelid, + int nworkers, + BlockNumber nblocksperworker) +{ + ParallelSeqScan *node = makeNode(ParallelSeqScan); + Plan *plan = &node->scan.plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = NULL; + plan->righttree = NULL; + node->scan.scanrelid = scanrelid; + node->num_workers = nworkers; + node->num_blocks_per_worker = nblocksperworker; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 9cbbcfb..d2b1621 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -260,6 +260,71 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) return result; } +/* + * create_worker_seqscan_plannedstmt + * Returns a planned statement to be used by worker for execution. + * Ideally, master backend should form worker's planned statement + * and pass the same to worker, however for now master backend + * just passes the required information and PlannedStmt is then + * constructed by worker. + */ +PlannedStmt * +create_worker_seqscan_plannedstmt(worker_stmt *workerstmt) +{ + AclMode required_access = ACL_SELECT; + RangeTblEntry *rte; + SeqScan *scan_plan; + PlannedStmt *result; + ListCell *tlist; + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = workerstmt->relId; + rte->relkind = 'r'; + rte->requiredPerms = required_access; + + /* Fill in opfuncid values if missing */ + fix_opfuncids((Node*) workerstmt->qual); + + /* + * Avoid removing junk entries in worker as those are + * required by upper nodes in master backend. + */ + foreach(tlist, workerstmt->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(tlist); + + tle->resjunk = false; + } + + scan_plan = create_worker_seqscan_plan(workerstmt->targetList, + workerstmt->qual, + workerstmt->startBlock, + workerstmt->endBlock); + + /* build the PlannedStmt result */ + result = makeNode(PlannedStmt); + + result->commandType = CMD_SELECT; + result->queryId = 0; + result->hasReturning = 0; + result->hasModifyingCTE = 0; + result->canSetTag = 1; + result->transientPlan = 0; + result->planTree = (Plan*) scan_plan; + result->rtable = list_make1(rte); + result->resultRelations = NIL; + result->utilityStmt = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->relationOids = lappend_oid(result->relationOids, rte->relid);; + result->invalItems = NIL; + result->nParamExec = 0; + result->hasRowSecurity = false; + + return result; +} /*-------------------- * subquery_planner diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 7703946..3a44aef 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -436,6 +436,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: { SeqScan *splan = (SeqScan *) plan; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 1395a21..538e612 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -706,6 +706,41 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) } /* + * create_parallelseqscan_path + * Creates a path corresponding to a parallel sequential scan, returning the + * pathnode. + */ +ParallelSeqPath * +create_parallelseqscan_path(PlannerInfo *root, RelOptInfo *rel, int nWorkers) +{ + ParallelSeqPath *pathnode = makeNode(ParallelSeqPath); + + pathnode->path.pathtype = T_ParallelSeqScan; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + false); + pathnode->path.pathkeys = NIL; /* seqscan has unordered result */ + + pathnode->num_workers = nWorkers; + /* + * Divide the work equally among all the workers, for cases + * where division is not equal (example if there are total + * 10 blocks and 3 workers, then as per below calculation each + * worker will scan 3 blocks), last worker will be responsible for + * scanning remaining blocks. We always consider master backend + * as last worker because it will first try to get the tuples + * scanned by other workers. For calculation of number of blocks + * per worker, an additional worker needs to be consider for + * master backend. + */ + pathnode->num_blocks_per_worker = rel->pages / (nWorkers + 1); + + cost_parallelseqscan(pathnode, root, rel, pathnode->path.param_info, nWorkers); + + return pathnode; +} + +/* * create_index_path * Creates a path node for an index scan. * diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..f056bd5 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,8 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o backendworker.o bgworker.o bgwriter.o checkpointer.o \ + fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o \ + walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/backendworker.c b/src/backend/postmaster/backendworker.c new file mode 100644 index 0000000..d52d1b6 --- /dev/null +++ b/src/backend/postmaster/backendworker.c @@ -0,0 +1,224 @@ +/*------------------------------------------------------------------------- + * + * backendworker.c + * Support routines for setting up backend workers. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/postmaster/backendworker.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * InitiateWorkers Setup dynamic shared memory and parallel backend workers. + */ +#include "postgres.h" + +#include "access/xact.h" +#include "access/parallel.h" +#include "commands/dbcommands.h" +#include "commands/async.h" +#include "executor/nodeParallelSeqscan.h" +#include "miscadmin.h" +#include "nodes/parsenodes.h" +#include "postmaster/backendworker.h" +#include "storage/ipc.h" +#include "storage/procsignal.h" +#include "storage/procarray.h" +#include "storage/shm_toc.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + + +#define PARALLEL_TUPLE_QUEUE_SIZE 65536 + + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define PG_WORKER_KEY_RELID 0 +#define PG_WORKER_KEY_TARGETLIST 1 +#define PG_WORKER_KEY_QUAL 2 +#define PG_WORKER_KEY_BLOCKS 3 +#define PARALLEL_KEY_TUPLE_QUEUE 4 + +static void exec_worker_message(dsm_segment *seg, shm_toc *toc); + +/* + * InitiateWorkers + * It sets up the required infrastructure for backend workers to + * perform execution and return results to the main backend. + */ +void +InitiateWorkers(Oid relId, List *targetList, List *qual, + shm_mq_handle ***responseqp, ParallelContext **pcxtp, + BlockNumber numBlocksPerWorker, int nWorkers) +{ + bool already_in_parallel_mode = IsInParallelMode(); + int i; + Size targetlist_len, qual_len; + BlockNumber *num_blocks_per_worker; + Oid *reliddata; + char *targetlistdata; + char *targetlist_str; + char *qualdata; + char *qual_str; + char *tuple_queue_space; + ParallelContext *pcxt; + shm_mq *mq; + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContext(exec_worker_message, nWorkers); + + /* Estimate space for parallel seq. scan specific contents. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(relId)); + + targetlist_str = nodeToString(targetList); + targetlist_len = strlen(targetlist_str) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, targetlist_len); + + qual_str = nodeToString(qual); + qual_len = strlen(qual_str) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, qual_len); + + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(BlockNumber)); + + shm_toc_estimate_chunk(&pcxt->estimator, + (Size) PARALLEL_TUPLE_QUEUE_SIZE * nWorkers); + + /* 5 keys for parallel seq. scan specific data. */ + shm_toc_estimate_keys(&pcxt->estimator, 5); + + InitializeParallelDSM(pcxt); + + /* Store scan relation id in dynamic shared memory. */ + reliddata = shm_toc_allocate(pcxt->toc, sizeof(Oid)); + *reliddata = relId; + shm_toc_insert(pcxt->toc, PG_WORKER_KEY_RELID, reliddata); + + /* Store target list in dynamic shared memory. */ + targetlistdata = shm_toc_allocate(pcxt->toc, targetlist_len); + memcpy(targetlistdata, targetlist_str, targetlist_len); + shm_toc_insert(pcxt->toc, PG_WORKER_KEY_TARGETLIST, targetlistdata); + + /* Store qual list in dynamic shared memory. */ + qualdata = shm_toc_allocate(pcxt->toc, qual_len); + memcpy(qualdata, qual_str, qual_len); + shm_toc_insert(pcxt->toc, PG_WORKER_KEY_QUAL, qualdata); + + /* Store blocks to be scanned by each worker in dynamic shared memory. */ + num_blocks_per_worker = shm_toc_allocate(pcxt->toc, sizeof(BlockNumber)); + *num_blocks_per_worker = numBlocksPerWorker; + shm_toc_insert(pcxt->toc, PG_WORKER_KEY_BLOCKS, num_blocks_per_worker); + + /* Allocate memory for shared memory queue handles. */ + *responseqp = (shm_mq_handle**) palloc(nWorkers * sizeof(shm_mq_handle*)); + + /* + * Establish one message queue per worker in dynamic shared memory. + * These queues should be used to transmit tuple data. + */ + tuple_queue_space = + shm_toc_allocate(pcxt->toc, PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + mq = shm_mq_create(tuple_queue_space + i * PARALLEL_TUPLE_QUEUE_SIZE, + (Size) PARALLEL_TUPLE_QUEUE_SIZE); + + shm_mq_set_receiver(mq, MyProc); + + /* + * Attach the queue before launching a worker, so that we'll automatically + * detach the queue if we error out. (Otherwise, the worker might sit + * there trying to write the queue long after we've gone away.) + */ + (*responseqp)[i] = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tuple_queue_space); + + /* Register backend workers. */ + LaunchParallelWorkers(pcxt); + + for (i = 0; i < pcxt->nworkers; ++i) + shm_mq_set_handle((*responseqp)[i], pcxt->worker[i].bgwhandle); + + /* Return results to caller. */ + *pcxtp = pcxt; +} + + +/* + * exec_worker_message + * + * Execute the work assigned to a worker by master backend. + */ +void +exec_worker_message(dsm_segment *seg, shm_toc *toc) +{ + char *targetlistdata; + char *qualdata; + char *tuple_queue_space; + BlockNumber *num_blocks_per_worker; + BlockNumber start_block; + BlockNumber end_block; + shm_mq *mq; + shm_mq_handle *responseq; + Oid *relId; + List *targetList = NIL; + List *qual = NIL; + worker_stmt *workerstmt; + + relId = shm_toc_lookup(toc, PG_WORKER_KEY_RELID); + targetlistdata = shm_toc_lookup(toc, PG_WORKER_KEY_TARGETLIST); + qualdata = shm_toc_lookup(toc, PG_WORKER_KEY_QUAL); + num_blocks_per_worker = shm_toc_lookup(toc, PG_WORKER_KEY_BLOCKS); + + tuple_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); + mq = (shm_mq *) (tuple_queue_space + + ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE); + + shm_mq_set_sender(mq, MyProc); + responseq = shm_mq_attach(mq, seg, NULL); + + end_block = (ParallelWorkerNumber + 1) * (*num_blocks_per_worker); + start_block = end_block - (*num_blocks_per_worker); + + /* Redirect protocol messages to responseq. */ + pq_redirect_to_tuple_shm_mq(responseq); + + /* Restore targetList and qual passed by main backend. */ + targetList = (List *) stringToNode(targetlistdata); + qual = (List *) stringToNode(qualdata); + + workerstmt = palloc(sizeof(worker_stmt)); + + workerstmt->relId = *relId; + workerstmt->targetList = targetList; + workerstmt->qual = qual; + workerstmt->startBlock = start_block; + + /* + * Last worker should scan all the remaining blocks. + * + * XXX - It is possible that expected number of workers + * won't get started, so to handle such cases master + * backend should scan remaining blocks. + */ + workerstmt->endBlock = end_block; + + /* Execute the worker command. */ + exec_worker_stmt(workerstmt); + + /* + * Once we are done with sending tuples, detach from + * shared memory message queue used to send tuples. + */ + shm_mq_detach(mq); +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 47ed84c..994eeba 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -103,6 +103,7 @@ #include "miscadmin.h" #include "pg_getopt.h" #include "pgstat.h" +#include "optimizer/cost.h" #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/fork_process.h" @@ -835,6 +836,12 @@ PostmasterMain(int argc, char *argv[]) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); + if (parallel_seqscan_degree >= MaxConnections) + { + write_stderr("%s: parallel_scan_degree must be less than max_connections\n", progname); + ExitPostmaster(1); + } + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index bcf3895..da6e099 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -148,10 +148,19 @@ EndCommand(const char *commandTag, CommandDest dest) case DestRemoteExecute: /* - * We assume the commandTag is plain ASCII and therefore requires - * no encoding conversion. + * Send the message via shared-memory tuple queue, if the same + * is enabled. */ - pq_putmessage('C', commandTag, strlen(commandTag) + 1); + if (is_tuple_shm_mq_enabled()) + mq_putmessage_direct('C', commandTag, strlen(commandTag) + 1); + else + { + /* + * We assume the commandTag is plain ASCII and therefore requires + * no encoding conversion. + */ + pq_putmessage('C', commandTag, strlen(commandTag) + 1); + } break; case DestNone: diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index bbad0dc..411f150 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "postmaster/backendworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -1132,6 +1133,100 @@ exec_simple_query(const char *query_string) } /* + * execute_worker_stmt + * + * Execute the plan for backend worker. + */ +void +exec_worker_stmt(worker_stmt *workerstmt) +{ + Portal portal; + int16 format = 1; + DestReceiver *receiver; + bool isTopLevel = true; + PlannedStmt *planned_stmt; + MemoryContext oldcontext; + MemoryContext plancontext; + + set_ps_display("SELECT", false); + BeginCommand("SELECT", DestNone); + + /* + * Unlike exec_simple_query(), in backend worker we won't allow + * transaction control statements, so we can allow plancontext + * to be created in TopTransaction context. + */ + plancontext = AllocSetContextCreate(CurrentMemoryContext, + "worker plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plancontext); + + planned_stmt = create_worker_seqscan_plannedstmt(workerstmt); + /* + * Create unnamed portal to run the query or queries in. If there + * already is one, silently drop it. + */ + portal = CreatePortal("", true, true); + /* Don't display the portal in pg_cursors */ + portal->visible = false; + + /* + * We don't have to copy anything into the portal, because everything + * we are passing here is in MessageContext, which will outlive the + * portal anyway. + */ + PortalDefineQuery(portal, + NULL, + "", + "", + list_make1(planned_stmt), + NULL); + + /* + * Start the portal. No parameters here. + */ + PortalStart(portal, NULL, 0, InvalidSnapshot); + + /* We always use binary format, for efficiency. */ + PortalSetResultFormat(portal, 1, &format); + + receiver = CreateDestReceiver(DestRemote); + SetRemoteDestReceiverParams(receiver, portal); + + /* + * Only once the portal and destreceiver have been established can + * we return to the transaction context. All that stuff needs to + * survive an internal commit inside PortalRun! + */ + MemoryContextSwitchTo(oldcontext); + + /* + * Run the portal to completion, and then drop it (and the receiver). + */ + (void) PortalRun(portal, + FETCH_ALL, + isTopLevel, + receiver, + receiver, + NULL); + + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + + /* + * Send appropriate CommandComplete to client. There is no + * need to send completion tag from worker as that won't be + * of any use considering the completiong tag of master backend + * will be used for sending to client. + */ + EndCommand("", DestRemote); +} + +/* * exec_parse_message * * Execute a "Parse" protocol message. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index d9bfa25..b8f90b7 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -630,6 +630,8 @@ const char *const config_group_names[] = gettext_noop("Statistics / Query and Index Statistics Collector"), /* AUTOVACUUM */ gettext_noop("Autovacuum"), + /* PARALLEL_QUERY */ + gettext_noop("parallel_seqscan_degree"), /* CLIENT_CONN */ gettext_noop("Client Connection Defaults"), /* CLIENT_CONN_STATEMENT */ @@ -2445,6 +2447,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"parallel_seqscan_degree", PGC_SUSET, PARALLEL_QUERY, + gettext_noop("Sets the maximum number of simultaneously running backend worker processes."), + NULL + }, + ¶llel_seqscan_degree, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, @@ -2632,6 +2644,36 @@ static struct config_real ConfigureNamesReal[] = DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX, NULL, NULL, NULL }, + { + {"cpu_tuple_comm_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "passing each tuple (row) from worker to master backend."), + NULL + }, + &cpu_tuple_comm_cost, + DEFAULT_CPU_TUPLE_COMM_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "setting up environment (shared memory) for parallelism."), + NULL + }, + ¶llel_setup_cost, + DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_startup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "starting parallel workers."), + NULL + }, + ¶llel_startup_cost, + DEFAULT_PARALLEL_STARTUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, { {"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index b053659..784cfe0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -287,6 +287,9 @@ #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above +#cpu_tuple_comm_cost = 0.1 # same scale as above +#parallel_setup_cost = 0.0 # same scale as above +#parallel_startup_cost = 0.0 # same scale as above #effective_cache_size = 4GB # - Genetic Query Optimizer - @@ -497,6 +500,11 @@ # autovacuum, -1 means use # vacuum_cost_limit +#------------------------------------------------------------------------------ +# PARALLEL_QUERY PARAMETERS +#------------------------------------------------------------------------------ + +#parallel_seqscan_degree = 0 # max number of worker backend subprocesses #------------------------------------------------------------------------------ # CLIENT CONNECTION DEFAULTS diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 761ba1f..00ad468 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -45,6 +45,8 @@ typedef struct ParallelContext extern bool ParallelMessagePending; +extern int ParallelWorkerNumber; + extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); extern ParallelContext *CreateParallelContextForExtension(char *library_name, char *function_name, int nworkers); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 9bb6362..3c56b49 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -105,4 +105,13 @@ typedef struct SysScanDescData Snapshot snapshot; /* snapshot to unregister at end of scan */ } SysScanDescData; +/* struct for scanning shared memory queues */ +typedef struct ShmScanDescData +{ + /* scan current state */ + int num_shm_queues; /* number of shared memory queues used in scan. */ + int ss_cqueue; /* current queue # in scan, if any */ + bool shmscan_inited; /* false = scan not init'd yet */ +} ShmScanDescData; + #endif /* RELSCAN_H */ diff --git a/src/include/access/shmmqam.h b/src/include/access/shmmqam.h new file mode 100644 index 0000000..df56cfe --- /dev/null +++ b/src/include/access/shmmqam.h @@ -0,0 +1,44 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.h + * POSTGRES shared memory queue access method definitions. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/shmmqam.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHMMQAM_H +#define SHMMQAM_H + +#include "access/relscan.h" +#include "libpq/pqmq.h" + + +/* Private state maintained across calls to shm_getnext. */ +typedef struct worker_result_state +{ + FmgrInfo *receive_functions; + Oid *typioparams; + HeapTuple tuple; + int num_shm_queues; + bool *has_row_description; + bool *queue_detached; + bool all_queues_detached; + bool all_heap_fetched; +} worker_result_state; + +typedef struct worker_result_state *worker_result; + +typedef struct ShmScanDescData *ShmScanDesc; + +extern worker_result ExecInitWorkerResult(TupleDesc tupdesc, int nWorkers); +extern ShmScanDesc shm_beginscan(int num_queues); +extern HeapTuple shm_getnext(HeapScanDesc scanDesc, ShmScanDesc shmScan, + worker_result resultState, shm_mq_handle **responseq, + TupleDesc tupdesc, ScanDirection direction, bool *fromheap); + +#endif /* SHMMQAM_H */ diff --git a/src/include/executor/nodeParallelSeqscan.h b/src/include/executor/nodeParallelSeqscan.h new file mode 100644 index 0000000..b638a24 --- /dev/null +++ b/src/include/executor/nodeParallelSeqscan.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * nodeparallelSeqscan.h + * + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeParallelSeqscan.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEPARALLELSEQSCAN_H +#define NODEPARALLELSEQSCAN_H + +#include "nodes/execnodes.h" + +extern ParallelSeqScanState *ExecInitParallelSeqScan(ParallelSeqScan *node, EState *estate, int eflags); +extern TupleTableSlot *ExecParallelSeqScan(ParallelSeqScanState *node); +extern void ExecEndParallelSeqScan(ParallelSeqScanState *node); + +extern Size EstimateScanRelationIdSpace(Oid relId); +extern void SerializeScanRelationId(Oid relId, Size maxsize, + char *start_address); +extern void RestoreScanRelationId(Oid *relId, char *start_address); + +extern Size EstimateTargetListSpace(List *targetList); +extern void SerializeTargetList(List *targetList, Size maxsize, + char *start_address); +extern void RestoreTargetList(List **targetList, char *start_address); + +#endif /* NODEPARALLELSEQSCAN_H */ diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 48f84bf..e5dec1e 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -127,6 +127,8 @@ typedef struct TupleTableSlot MinimalTuple tts_mintuple; /* minimal tuple, or NULL if none */ HeapTupleData tts_minhdr; /* workspace for minimal-tuple-only case */ long tts_off; /* saved state for slot_deform_tuple */ + bool tts_fromheap; /* indicates whether the tuple is fetched from + heap or shrared memory message queue */ } TupleTableSlot; #define TTS_HAS_PHYSICAL_TUPLE(slot) \ diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index ad7589d..067edbe 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -19,6 +19,13 @@ extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *); extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); +extern int +mq_putmessage_direct(char msgtype, const char *s, size_t len); +extern void +pq_redirect_to_tuple_shm_mq(shm_mq_handle *mqh); +extern bool +is_tuple_shm_mq_enabled(void); + extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); #endif /* PQMQ_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 41288ed..86f4731 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -16,9 +16,12 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/parallel.h" +#include "access/shmmqam.h" #include "executor/instrument.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/shm_mq.h" #include "utils/reltrigger.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" @@ -1212,6 +1215,23 @@ typedef struct ScanState typedef ScanState SeqScanState; /* + * ParallelScanState extends ScanState by storing additional information + * related to parallel workers. + * dsm_segment dynamic shared memory segment to setup worker queues + * responseq shared memory queues to receive data from workers + */ +typedef struct ParallelScanState +{ + ScanState ss; /* its first field is NodeTag */ + ParallelContext *pcxt; + shm_mq_handle **responseq; + ShmScanDesc pss_currentShmScanDesc; + worker_result pss_workerResult; +} ParallelScanState; + +typedef ParallelScanState ParallelSeqScanState; + +/* * These structs store information about index quals that don't have simple * constant right-hand sides. See comments for ExecIndexBuildScanKeys() * for discussion. diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 97ef0fc..b6f1493 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -51,6 +51,7 @@ typedef enum NodeTag T_BitmapOr, T_Scan, T_SeqScan, + T_ParallelSeqScan, T_IndexScan, T_IndexOnlyScan, T_BitmapIndexScan, @@ -97,6 +98,7 @@ typedef enum NodeTag T_BitmapOrState, T_ScanState, T_SeqScanState, + T_ParallelSeqScanState, T_IndexScanState, T_IndexOnlyScanState, T_BitmapIndexScanState, @@ -217,6 +219,7 @@ typedef enum NodeTag T_IndexOptInfo, T_ParamPathInfo, T_Path, + T_ParallelSeqPath, T_IndexPath, T_BitmapHeapPath, T_BitmapAndPath, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b1dfa85..5777271 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -23,6 +23,7 @@ #include "nodes/bitmapset.h" #include "nodes/primnodes.h" #include "nodes/value.h" +#include "storage/block.h" #include "utils/lockwaitpolicy.h" /* Possible sources of a Query */ @@ -156,6 +157,15 @@ typedef struct Query * depends on to be semantically valid */ } Query; +/* worker statement required for execution. */ +typedef struct worker_stmt +{ + Oid relId; + List *targetList; + List *qual; + BlockNumber startBlock; + BlockNumber endBlock; +} worker_stmt; /**************************************************************************** * Supporting data structures for Parse Trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 316c9ce..3354398 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -18,6 +18,7 @@ #include "lib/stringinfo.h" #include "nodes/bitmapset.h" #include "nodes/primnodes.h" +#include "storage/block.h" #include "utils/lockwaitpolicy.h" @@ -269,6 +270,8 @@ typedef struct Scan { Plan plan; Index scanrelid; /* relid is index into the range table */ + BlockNumber startblock; /* block to start seq scan */ + BlockNumber endblock; /* block upto which scan has to be done */ } Scan; /* ---------------- @@ -278,6 +281,17 @@ typedef struct Scan typedef Scan SeqScan; /* ---------------- + * parallel sequential scan node + * ---------------- + */ +typedef struct ParallelSeqScan +{ + Scan scan; + int num_workers; + BlockNumber num_blocks_per_worker; +} ParallelSeqScan; + +/* ---------------- * index scan node * * indexqualorig is an implicitly-ANDed list of index qual expressions, each diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 6845a40..576add5 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -737,6 +737,13 @@ typedef struct Path /* pathkeys is a List of PathKey nodes; see above */ } Path; +typedef struct ParallelSeqPath +{ + Path path; + int num_workers; + BlockNumber num_blocks_per_worker; +} ParallelSeqPath; + /* Macro for extracting a path's parameterization relids; beware double eval */ #define PATH_REQ_OUTER(path) \ ((path)->param_info ? (path)->param_info->ppi_req_outer : (Relids) NULL) diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 9c2000b..0b6a469 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -26,6 +26,14 @@ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 +#define DEFAULT_CPU_TUPLE_COMM_COST 0.1 +/* + * XXX - We need some experiments to know what could be + * appropriate default values for parallel setup and startup + * cost. + */ +#define DEFAULT_PARALLEL_SETUP_COST 0.0 +#define DEFAULT_PARALLEL_STARTUP_COST 0.0 #define DEFAULT_EFFECTIVE_CACHE_SIZE 524288 /* measured in pages */ @@ -48,8 +56,12 @@ extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; +extern PGDLLIMPORT double cpu_tuple_comm_cost; +extern PGDLLIMPORT double parallel_setup_cost; +extern PGDLLIMPORT double parallel_startup_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; +extern int parallel_seqscan_degree; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; @@ -68,6 +80,8 @@ extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); +extern void cost_parallelseqscan(ParallelSeqPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, int nWorkers); extern void cost_index(IndexPath *path, PlannerInfo *root, double loop_count); extern void cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 9923f0e..32c3e0d 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -32,6 +32,8 @@ extern bool add_path_precheck(RelOptInfo *parent_rel, extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); +extern ParallelSeqPath *create_parallelseqscan_path(PlannerInfo *root, + RelOptInfo *rel, int nWorkers); extern IndexPath *create_index_path(PlannerInfo *root, IndexOptInfo *index, List *indexclauses, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 6cad92e..391d519 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -46,6 +46,13 @@ extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel); #endif /* + * parallelpath.c + * routines to generate parallel scan paths + */ + +extern void create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel); + +/* * indxpath.c * routines to generate index paths */ diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 082f7d7..ef5a320 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -41,6 +41,9 @@ extern Plan *optimize_minmax_aggregates(PlannerInfo *root, List *tlist, * prototypes for plan/createplan.c */ extern Plan *create_plan(PlannerInfo *root, Path *best_path); +extern SeqScan * +create_worker_seqscan_plan(List *targetList, List *scan_clauses, + BlockNumber startBlock, BlockNumber endBlock); extern SubqueryScan *make_subqueryscan(List *qptlist, List *qpqual, Index scanrelid, Plan *subplan); extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index cd62aec..91ddffe 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -14,6 +14,7 @@ #ifndef PLANNER_H #define PLANNER_H +#include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "nodes/relation.h" @@ -29,6 +30,8 @@ extern PlannedStmt *planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern PlannedStmt *standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern PlannedStmt * +create_worker_seqscan_plannedstmt(worker_stmt *workerstmt); extern Plan *subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, diff --git a/src/include/postmaster/backendworker.h b/src/include/postmaster/backendworker.h new file mode 100644 index 0000000..8813b6d --- /dev/null +++ b/src/include/postmaster/backendworker.h @@ -0,0 +1,30 @@ +/*-------------------------------------------------------------------- + * backendworker.h + * POSTGRES backend workers interface + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/postmaster/backendworker.h + *-------------------------------------------------------------------- + */ +#ifndef BACKENDWORKER_H +#define BACKENDWORKER_H + +/*--------------------------------------------------------------------- + * External module API. + *--------------------------------------------------------------------- + */ + +#include "libpq/pqmq.h" + +extern int parallel_seqscan_degree; +extern void InitiateWorkers(Oid relId, List *targetList, + List *qual, + shm_mq_handle ***responseqp, + ParallelContext **pcxtp, + BlockNumber numBlocksPerWorker, + int nWorkers); + +#endif /* BACKENDWORKER_H */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 0a350fd..02cf518 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -83,5 +83,6 @@ extern void set_debug_options(int debug_flag, extern bool set_plan_disabling_options(const char *arg, GucContext context, GucSource source); extern const char *get_stats_option_name(const char *arg); +extern void exec_worker_stmt(worker_stmt *workerstmt); #endif /* TCOPPROT_H */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index cf319af..38855e5 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -85,6 +85,7 @@ enum config_group STATS_MONITORING, STATS_COLLECTOR, AUTOVACUUM, + PARALLEL_QUERY, CLIENT_CONN, CLIENT_CONN_STATEMENT, CLIENT_CONN_LOCALE,