From 7c00681b05302dc117e227192f68f4bf2a26de84 Mon Sep 17 00:00:00 2001 From: Emre Hasegeli Date: Mon, 13 Mar 2023 16:54:18 +0100 Subject: [PATCH v02 1/2] pgoutput: Improve error messages for options Author: Emre Hasegeli Reviewed-by: Peter Smith Reviewed-by: Amit Kapila Discussion: https://www.postgresql.org/message-id/flat/CAE2gYzwdwtUbs-tPSV-QBwgTubiyGD2ZGsSnAVsDfAGGLDrGOA%40mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 65 ++++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f9ed1083df..0370db972a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -267,21 +267,21 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_abort_cb = pgoutput_stream_abort; cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void -parse_output_parameters(List *options, PGOutputData *data) +parse_output_options(List *options, PGOutputData *data) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; @@ -289,30 +289,32 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; foreach(lc, options) { DefElem *defel = (DefElem *) lfirst(lc); Assert(defel->arg == NULL || IsA(defel->arg, String)); - /* Check each param, whether or not we recognize it */ + /* Check each option, whether or not we recognize it */ if (strcmp(defel->defname, "proto_version") == 0) { unsigned long parsed; char *endptr; if (protocol_version_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "proto_version")); protocol_version_given = true; errno = 0; parsed = strtoul(strVal(defel->arg), &endptr, 10); if (errno != 0 || *endptr != '\0') ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid proto_version"))); if (parsed > PG_UINT32_MAX) @@ -320,93 +322,117 @@ parse_output_parameters(List *options, PGOutputData *data) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("proto_version \"%s\" out of range", strVal(defel->arg)))); data->protocol_version = (uint32) parsed; } else if (strcmp(defel->defname, "publication_names") == 0) { if (publication_names_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "publication_names")); publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', &data->publication_names)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } else if (strcmp(defel->defname, "binary") == 0) { if (binary_option_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "binary")); binary_option_given = true; data->binary = defGetBoolean(defel); } else if (strcmp(defel->defname, "messages") == 0) { if (messages_option_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "messages")); messages_option_given = true; data->messages = defGetBoolean(defel); } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "streaming")); streaming_given = true; data->streaming = defGetStreamingMode(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { if (two_phase_option_given) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "two_phase")); two_phase_option_given = true; data->two_phase = defGetBoolean(defel); } else if (strcmp(defel->defname, "origin") == 0) { char *origin; if (origin_option_given) ereport(ERROR, errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options")); + /* translator: %s is a pgoutput option */ + errmsg("conflicting or redundant option: %s", + "origin")); origin_option_given = true; origin = defGetString(defel); if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0) data->publish_no_origin = true; else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0) data->publish_no_origin = false; else ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } + + /* Check required options */ + if (!protocol_version_given) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("missing pgoutput option: %s", "proto_version")); + if (!publication_names_given) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + /* translator: %s is a pgoutput option */ + errmsg("missing pgoutput option: %s", "publication_names")); } /* * Initialize this plugin */ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { PGOutputData *data = palloc0(sizeof(PGOutputData)); @@ -426,41 +452,36 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; /* * This is replication start and not slot initialization. * * Parse and validate options passed by the client. */ if (!is_init) { - /* Parse the params and ERROR if we see any we don't recognize */ - parse_output_parameters(ctx->output_plugin_options, data); + /* Parse the options and ERROR if we see any we don't recognize */ + parse_output_options(ctx->output_plugin_options, data); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("client sent proto_version=%d but server only supports protocol %d or lower", data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM))); if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("client sent proto_version=%d but server only supports protocol %d or higher", data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM))); - if (data->publication_names == NIL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("publication_names parameter missing"))); - /* * Decide whether to enable streaming. It is disabled by default, in * which case we just update the flag in decoding context. Otherwise * we only allow it with sufficient version of the protocol, and when * the output plugin supports it. */ if (data->streaming == LOGICALREP_STREAM_OFF) ctx->streaming = false; else if (data->streaming == LOGICALREP_STREAM_ON && data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) -- 2.39.3 (Apple Git-145)