From 8578720819ea56aed4993bc926402b67179868de Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:21:58 +0900 Subject: [PATCH v2 1/3] Add errcontext to errors of the applying logical replication changes. --- src/backend/commands/tablecmds.c | 7 + src/backend/replication/logical/proto.c | 49 +++++ src/backend/replication/logical/worker.c | 220 ++++++++++++++++++++--- src/include/replication/logicalproto.h | 1 + src/include/replication/logicalworker.h | 2 + 5 files changed, 257 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 46b108caa6..4662ec4787 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -78,6 +78,7 @@ #include "partitioning/partbounds.h" #include "partitioning/partdesc.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" @@ -1897,6 +1898,9 @@ ExecuteTruncateGuts(List *explicit_rels, continue; } + /* Set logical replication error callback info if necessary */ + set_logicalrep_error_context_rel(rel); + /* * Build the lists of foreign tables belonging to each foreign server * and pass each list to the foreign data wrapper's callback function, @@ -2004,6 +2008,9 @@ ExecuteTruncateGuts(List *explicit_rels, pgstat_count_truncate(rel); } + /* Reset logical replication error callback info */ + reset_logicalrep_error_context_rel(); + /* Now go through the hash table, and truncate foreign tables */ if (ft_htab) { diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 13c8c3bd5b..833a97aec9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1109,3 +1109,52 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * get string representing LogicalRepMsgType. + */ +char * +logicalrep_message_type(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_END: + return "STREAM END"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + case LOGICAL_REP_MSG_BEGIN_PREPARE: + return "BEGIN PREPARE"; + case LOGICAL_REP_MSG_PREPARE: + return "PREPARE"; + case LOGICAL_REP_MSG_COMMIT_PREPARED: + return "COMMIT PREPARED"; + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + return "ROLLBACK PREPARED"; + } + + elog(ERROR, "invalid logical replication message type \"%c\"", action); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9a7a7ffbb..c23713468c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,6 +221,27 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +/* Struct for saving and restoring apply information */ +typedef struct ApplyErrCallbackArg +{ + LogicalRepMsgType command; /* 0 if invalid */ + + /* Local relation information */ + char *nspname; /* used for error context */ + char *relname; /* used for error context */ + + TransactionId remote_xid; + TimestampTz committs; +} ApplyErrCallbackArg; +static ApplyErrCallbackArg apply_error_callback_arg = +{ + .command = 0, + .relname = NULL, + .nspname = NULL, + .remote_xid = InvalidTransactionId, + .committs = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -333,6 +354,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +static void apply_error_callback(void *arg); +static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); +static void reset_apply_error_context_rel(void); +static void reset_apply_error_context_info(void); /* * Should this worker apply changes for given relation. @@ -826,6 +851,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; + apply_error_callback_arg.committs = begin_data.committime; remote_final_lsn = begin_data.final_lsn; @@ -859,6 +886,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -876,6 +904,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; remote_final_lsn = begin_data.prepare_lsn; @@ -894,6 +923,8 @@ apply_handle_prepare(StringInfo s) char gid[GIDSIZE]; logicalrep_read_prepare(s, &prepare_data); + apply_error_callback_arg.remote_xid = prepare_data.xid; + apply_error_callback_arg.committs = prepare_data.prepare_time; if (prepare_data.prepare_lsn != remote_final_lsn) ereport(ERROR, @@ -950,6 +981,7 @@ apply_handle_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -962,6 +994,8 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); + apply_error_callback_arg.remote_xid = prepare_data.xid; + apply_error_callback_arg.committs = prepare_data.commit_time; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -989,6 +1023,7 @@ apply_handle_commit_prepared(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1001,6 +1036,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + apply_error_callback_arg.remote_xid = rollback_data.xid; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1038,6 +1074,7 @@ apply_handle_rollback_prepared(StringInfo s) process_syncing_tables(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1094,6 +1131,8 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + apply_error_callback_arg.remote_xid = stream_xid; + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -1150,6 +1189,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1173,7 +1213,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + apply_error_callback_arg.remote_xid = xid; stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -1198,6 +1241,7 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + apply_error_callback_arg.remote_xid = subxid; subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1222,6 +1266,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1253,6 +1298,8 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + reset_apply_error_context_info(); } /* @@ -1277,6 +1324,8 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.committs = commit_data.committime; elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1399,6 +1448,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1518,6 +1569,9 @@ apply_handle_insert(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1541,6 +1595,9 @@ apply_handle_insert(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1639,6 +1696,9 @@ apply_handle_update(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1692,6 +1752,9 @@ apply_handle_update(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1795,6 +1858,9 @@ apply_handle_delete(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -1820,6 +1886,9 @@ apply_handle_delete(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -2224,6 +2293,9 @@ apply_handle_truncate(StringInfo s) * Even if we used CASCADE on the upstream primary we explicitly default * to replaying changes without further cascading. This might be later * changeable with a user specified option. + * + * Both namespace and relation name for error callback will be set in + * ExecuteTruncateGuts(). */ ExecuteTruncateGuts(rels, relids, @@ -2254,44 +2326,54 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + ErrorContextCallback errcallback; + + /* + * Push apply error context callback. Other fields will be filled + * during applying the change. + */ + apply_error_callback_arg.command = action; + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2300,45 +2382,48 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); - return; + break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); - return; + break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); - return; + break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* @@ -3571,3 +3656,94 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + + if (apply_error_callback_arg.command == 0) + return; + + initStringInfo(&buf); + appendStringInfo(&buf, _("during apply of \"%s\""), + logicalrep_message_type(apply_error_callback_arg.command)); + + if (apply_error_callback_arg.relname) + appendStringInfo(&buf, _(" for relation \"%s.%s\""), + apply_error_callback_arg.nspname, + apply_error_callback_arg.relname); + + if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid)) + appendStringInfo(&buf, _(" in transaction with xid %u committs %s"), + apply_error_callback_arg.remote_xid, + apply_error_callback_arg.committs == 0 + ? "(unset)" + : timestamptz_to_str(apply_error_callback_arg.committs)); + + errcontext("%s", buf.data); +} + +/* Set relation information for apply error callback */ +static void +set_apply_error_context_rel(LogicalRepRelMapEntry *rel) +{ + apply_error_callback_arg.nspname = rel->remoterel.nspname; + apply_error_callback_arg.relname = rel->remoterel.relname; +} + +/* Reset relation information for apply error callback */ +static void +reset_apply_error_context_rel(void) +{ + apply_error_callback_arg.nspname = NULL; + apply_error_callback_arg.relname = NULL; +} + +/* Reset all information for apply error callback */ +static void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.command = 0; + apply_error_callback_arg.remote_xid = InvalidTransactionId; + apply_error_callback_arg.committs = 0; + reset_apply_error_context_rel(); +} + +/* + * Set relation information for error callback by the given relation. + * Both set_logicalrep_error_context_rel() and + * reset_logicalrep_error_context_rel() functions are intended to be + * used by functions outside of logical replication module where don't + * use LogicalRepRelMapEntry. + * + * The caller must call reset_logicalrep_error_context_rel() after use + * so we free the memory used for names. + */ +void +set_logicalrep_error_context_rel(Relation rel) +{ + if (IsLogicalWorker()) + { + apply_error_callback_arg.nspname = + get_namespace_name(RelationGetNamespace(rel)); + apply_error_callback_arg.relname = + pstrdup(RelationGetRelationName(rel)); + } +} + +/* Reset relation information for error callback set */ +void +reset_logicalrep_error_context_rel(void) +{ + if (IsLogicalWorker()) + { + if (apply_error_callback_arg.nspname) + pfree(apply_error_callback_arg.nspname); + apply_error_callback_arg.nspname = NULL; + + if (apply_error_callback_arg.relname) + pfree(apply_error_callback_arg.relname); + apply_error_callback_arg.relname = NULL; + } +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 63de90d94a..c78a4409bc 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -242,5 +242,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern char *logicalrep_message_type(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 2ad61a001a..d3e8514ffd 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -15,5 +15,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void set_logicalrep_error_context_rel(Relation rel); +extern void reset_logicalrep_error_context_rel(void); #endif /* LOGICALWORKER_H */ -- 2.24.3 (Apple Git-128)