diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 26d7b7c..2d298a3 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -25,6 +25,8 @@ #include "storage/spin.h" /* + * This structure represents the actual queue, stored in shared memory. + * * Some notes on synchronization: * * mq_receiver and mq_bytes_read can only be changed by the receiver; and @@ -77,6 +79,8 @@ struct shm_mq }; /* + * This structure is a backend-private handle for access to a queue. + * * mqh_queue is a pointer to the queue we've attached, and mqh_segment is * a pointer to the dynamic shared memory segment that contains it. * @@ -147,10 +151,10 @@ static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); /* Minimum queue size is enough for header and at least one chunk of data. */ -Size shm_mq_minimum_size = +const Size shm_mq_minimum_size = MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; -#define MQH_BUFSIZE 8192 +#define MQH_INITIAL_BUFSIZE 8192 /* * Initialize a new shared message queue. @@ -168,6 +172,7 @@ shm_mq_create(void *address, Size size) Assert(size > data_offset); /* Initialize queue header. */ + SpinLockInit(&mq->mq_mutex); mq->mq_receiver = NULL; mq->mq_sender = NULL; mq->mq_bytes_read = 0; @@ -255,6 +260,8 @@ shm_mq_get_sender(shm_mq *mq) * * The memory context in effect at the time this function is called should * be one which will last for at least as long as the message queue itself. + * We'll allocate the handle in that context, and future allocations that + * are needed to buffer incoming data will happen in that context as well. * * If seg != NULL, the queue will be automatically detached when that dynamic * shared memory segment is detached. @@ -302,7 +309,9 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) * When nowait = true, we do not manipulate the state of the process latch; * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In * this case, the caller should call this function again, with the same - * arguments, each time the process latch is set. + * arguments, each time the process latch is set. (Once begun, the sending + * of a message cannot be aborted except by detaching from the queue; changing + * the length or payload will corrupt the queue.) */ shm_mq_result shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) @@ -468,7 +477,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) */ if (mqh->mqh_buflen < nbytes) { - uint64 newbuflen = Max(mqh->mqh_buflen, MQH_BUFSIZE); + uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); while (newbuflen < nbytes) newbuflen *= 2; @@ -856,7 +865,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached) *detached = mq->mq_detached; SpinLockRelease(&mq->mq_mutex); - return mq->mq_bytes_read; + return v; } /* @@ -891,7 +900,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached) *detached = mq->mq_detached; SpinLockRelease(&mq->mq_mutex); - return mq->mq_bytes_written; + return v; } /* diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 749ab3b..1ce88a1 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -65,6 +65,6 @@ extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); /* Smallest possible queue. */ -extern Size shm_mq_minimum_size; +extern const Size shm_mq_minimum_size; #endif /* SHM_MQ_H */