Re: Proposal: Generic WAL logical messages - Mailing list pgsql-hackers
From | Petr Jelinek |
---|---|
Subject | Re: Proposal: Generic WAL logical messages |
Date | |
Msg-id | 56F1428A.7080506@2ndquadrant.com Whole thread Raw |
In response to | Re: Proposal: Generic WAL logical messages (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Proposal: Generic WAL logical messages
|
List | pgsql-hackers |
On 22/03/16 12:47, Andres Freund wrote: > On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote: > >> + >> + <sect3 id="logicaldecoding-output-plugin-message"> >> + <title>Generic Message Callback</title> >> + >> + <para> >> + The optional <function>message_cb</function> callback is called whenever >> + a logical decoding message has been decoded. >> +<programlisting> >> +typedef void (*LogicalDecodeMessageCB) ( >> + struct LogicalDecodingContext *, >> + ReorderBufferTXN *txn, >> + XLogRecPtr message_lsn, >> + const char *prefix, >> + Size message_size, >> + const char *message >> +); > > I see you removed the transactional parameter. I'm doubtful that that's > a good idea: It seems like it'd be rather helpful to pass the > transaction for a nontransaction message that's emitted while an xid was > assigned? > Hmm but won't that give the output plugin even transactions that were later aborted? That seems quite different behavior from how the txn parameter works everywhere else. > >> +/* >> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). >> + */ >> +static void >> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) >> +{ >> + SnapBuild *builder = ctx->snapshot_builder; >> + XLogReaderState *r = buf->record; >> + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; >> + xl_logical_message *message; >> + >> + if (info != XLOG_LOGICAL_MESSAGE) >> + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); >> + >> + message = (xl_logical_message *) XLogRecGetData(r); >> + >> + if (message->transactional) >> + { >> + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr)) >> + return; >> + >> + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r), >> + buf->endptr, >> + message->message, /* first part of message is prefix */ >> + message->message_size, >> + message->message + message->prefix_size); >> + } >> + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT && >> + !SnapBuildXactNeedsSkip(builder, buf->origptr)) >> + { >> + volatile Snapshot snapshot_now; >> + ReorderBuffer *rb = ctx->reorder; >> + >> + /* setup snapshot to allow catalog access */ >> + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r)); >> + SetupHistoricSnapshot(snapshot_now, NULL); >> + rb->message(rb, NULL, buf->origptr, message->message, >> + message->message_size, >> + message->message + message->prefix_size); >> + TeardownHistoricSnapshot(false); >> + } >> +} > > A number of things: > 1) The SnapBuildProcessChange needs to be toplevel, not just for > transactional messages - we can't yet necessarily build a snapshot. Nope, the snapshot state is checked in the else if. > 2) I'm inclined to move even the non-transactional stuff to reorderbuffer. Well, it's not doing anything with reorderbuffer but sure it can be done (didn't do it in the attached though). > 3) This lacks error handling, we surely don't want to error out while > still having the historic snapshot setup > 4) Without 3) the volatile is bogus. > 5) Misses a ReorderBufferProcessXid() call. Fixed (all 3 above). > > >> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) >> change->data.tp.oldtuple = NULL; >> } >> break; >> + case REORDER_BUFFER_CHANGE_MESSAGE: >> + if (change->data.msg.prefix != NULL) >> + pfree(change->data.msg.prefix); >> + change->data.msg.prefix = NULL; >> + if (change->data.msg.message != NULL) >> + pfree(change->data.msg.message); >> + change->data.msg.message = NULL; >> + break; > > Hm, this will have some overhead, but I guess the messages won't be > super frequent, and usually not very large. Yeah but since we don't really know the size of the future messages it's hard to have some preallocated buffer for this so I dunno how else to do it. > >> +/* >> + * Queue message into a transaction so it can be processed upon commit. >> + */ >> +void >> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, >> + const char *prefix, Size msg_sz, const char *msg) >> +{ >> + ReorderBufferChange *change; >> + >> + Assert(xid != InvalidTransactionId); >> + >> + change = ReorderBufferGetChange(rb); >> + change->action = REORDER_BUFFER_CHANGE_MESSAGE; >> + change->data.msg.prefix = pstrdup(prefix); >> + change->data.msg.message_size = msg_sz; >> + change->data.msg.message = palloc(msg_sz); >> + memcpy(change->data.msg.message, msg, msg_sz); >> + >> + ReorderBufferQueueChange(rb, xid, lsn, change); >> +} > > I'm not sure right now if there's any guarantee that the current memory > context is meaningful here? IIRC other long-lived allocations explicitly > use a context? > I didn't find any explicit guarantee so I added one. >> + case REORDER_BUFFER_CHANGE_MESSAGE: >> + { >> + char *data; >> + size_t prefix_size = strlen(change->data.msg.prefix) + 1; >> + >> + sz += prefix_size + change->data.msg.message_size; >> + ReorderBufferSerializeReserve(rb, sz); >> + >> + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); >> + memcpy(data, change->data.msg.prefix, >> + prefix_size); >> + memcpy(data + prefix_size, change->data.msg.message, >> + change->data.msg.message_size); >> + break; >> + } > > Can you please include the sizes of the blocks explicitly, rather than > relying on 0 termination? > Okay, I see I did that in WAL, no idea why I didn't do the same here. > >> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start >> PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) >> PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) >> PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) >> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, >> logicalmsg_desc, logicalmsg_identify, NULL, NULL) > > Did you consider doing this via the standby rmgr instead? > Yes in one of the first versions I did that but Simon didn't like that in his review as this has nothing to do with standby. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
Attachment
pgsql-hackers by date: