From 6713624657fa349e6672540e814e4b7089390e14 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 17 Dec 2020 17:30:59 +0530 Subject: [PATCH v32 9/9] Support 2PC txn tests for concurrent aborts. Add tap tests to test_decoding for testing concurrent aborts during 2PC. --- contrib/test_decoding/Makefile | 2 + contrib/test_decoding/test_decoding.c | 58 +++++++++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 5 +++ 3 files changed, 65 insertions(+) diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 380b716..e314bad 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,6 +9,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream twophase_snapshot +TAP_TESTS = 1 + REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 94ba227..83ca592 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -11,11 +11,13 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" #include "catalog/pg_type.h" #include "replication/logical.h" #include "replication/origin.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -35,6 +37,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + TransactionId check_xid_aborted; /* track abort of this txid */ } TestDecodingData; /* @@ -173,6 +176,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->check_xid_aborted = InvalidTransactionId; ctx->output_plugin_private = data; @@ -274,6 +278,24 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "check-xid-aborted") == 0) + { + if (elem->arg == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted needs an input value"))); + else + { + errno = 0; + data->check_xid_aborted = (TransactionId)strtoul(strVal(elem->arg), NULL, 0); + + if (errno || !TransactionIdIsValid(data->check_xid_aborted)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted is not a valid xid: \"%s\"", + strVal(elem->arg)))); + } + } else { ereport(ERROR, @@ -468,6 +490,30 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +test_concurrent_aborts(TestDecodingData *data) +{ + /* + * If check_xid_aborted is a valid xid, then it was passed in as an option + * to check if the transaction having this xid would be aborted. This is + * to test concurrent aborts. + */ + if (TransactionIdIsValid(data->check_xid_aborted)) + { + elog(LOG, "waiting for %u to abort", data->check_xid_aborted); + while (TransactionIdIsInProgress(data->check_xid_aborted)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid_aborted) && + !TransactionIdDidCommit(data->check_xid_aborted)) + elog(LOG, "%u aborted", data->check_xid_aborted); + + Assert(TransactionIdDidAbort(data->check_xid_aborted)); + } +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -617,6 +663,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -703,6 +752,9 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -915,6 +967,9 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* Test for concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -968,6 +1023,9 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 36d5fb9..4bc9e1e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2488,6 +2488,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, curtxn->concurrent_abort = true; /* Reset the TXN so that it is allowed to stream remaining data. */ + if (rbtxn_prepared(txn)) + elog(LOG, "stop decoding of prepared txn %s (%u)", + txn->gid != NULL ? txn->gid : "", txn->xid); + else + elog(LOG, "stop decoding of txn %u", txn->xid); ReorderBufferResetTXN(rb, txn, snapshot_now, command_id, prev_lsn, specinsert); -- 1.8.3.1