H.3. wal2json — convert WAL changes into JSON via logical decoding #
The wal2json module is a Postgres Pro extension for logical decoding that converts database changes from the write-ahead log (WAL) into JSON
format. Postgres Pro has access to tuples generated by INSERT
and UPDATE
operations. Depending on the configured replica identity, it can also access previous row versions of UPDATE
and DELETE
. Changes can be consumed either through the streaming protocol (logical replication slots) or through a specialized SQL API.
Format version 1 generates a JSON object per transaction. This object contains all new and old tuples, with optional inclusion of properties such as transaction timestamps, schema-qualified names, data types, and transaction identifiers.
Format version 2 generates a JSON object per tuple, with optional JSON objects marking transaction start and end. Different tuple properties can also be included.
H.3.1. Installation and Setup #
The wal2json extension is provided with Postgres Pro Standard as a separate pre-built package wal2json-std-16
(for the detailed installation instructions, see Chapter 16). Once you have Postgres Pro Standard installed, complete the following steps to enable wal2json:
Enable logical decoding by setting
wal_level
tological
in thepostgresql.conf
file.Optionally, you can also edit values of the
max_replication_slots
andmax_wal_senders
parameters.Restart the database server for the changes to take effect.
H.3.2. Options #
The wal2json module provides a variety of options for managing logical decoding:
include-xids
Add transaction IDs to each set of changes in the JSON output. The default is
off
.include-timestamp
Add
timestamp
to each set of changes in the JSON output. The default isoff
.include-schemas
Add the schema name to each change record in the JSON output. The default is
on
.include-types
Add
type
to each change record in the JSON output. The default ison
.include-typmod
Add type modifiers for columns that have them. The default is
on
.include-type-oids
Add type OIDs. The default is off.
include-domain-data-type
Replace a domain name with the underlying data type. The default is
off
.include-column-positions
Add the column position (
pg_attribute.attnum
). The default isoff
.include-origin
Add the origin of each change. The default is
off
.include-not-null
Add information whether a column is marked as
not null
in thecolumnoptionals
field of the JSON output. The default isoff
.include-default
Add default expressions to the JSON output. The default is
off
.include-pk
Add primary key information including column names and data type to the
pk
field of the JSON output. The default isoff
.numeric-data-types-as-string
Convert values of numeric data types to strings in the JSON output. The JSON specification does not support
Infinity
andNaN
as valid numeric values. There might be potential interoperability problems for double precision numbers. The default isoff
.pretty-print
Add spaces and indentation to the JSON output structure. The default is
off
.write-in-chunks
Enable writing the JSON output in smaller chunks instead of writing the entire changeset at once. This option is used only when
format-version
is set to1
. The default isoff
.include-lsn
Add the
nextlsn
field to each changeset. The default isoff
.include-transaction
Add records denoting the start and end of each transaction to the JSON output. The default is
off
.filter-origins
Exclude changes from the specified origins. The default is empty, which means that no origin will be filtered. The value is a comma-separated list.
filter-tables
Exclude rows from the specified tables. The default is empty, which means that no table will be filtered. The value is a comma-separated list. The tables should be schema-qualified.
*.foo
means thefoo
table in all schemas, andbar.*
means all tables in thebar
schema. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table names are case-sensitive. For example, table"public"."Foo bar"
should be specified aspublic.Foo\ bar
.add-tables
Include rows only from the specified tables. The default is empty, which means that all tables from all schemas are included. It follows the same rules as
filter-tables
.filter-msg-prefixes
Exclude messages whose prefix is specified in the option value. The default is empty, which means that no messages will be filtered. The value is a comma-separated list.
add-msg-prefixes
Include only messages whose prefix is specified in the option value. The default is all prefixes. The value is a comma-separated list.
wal2json
appliesfilter-msg-prefixes
before this option.format-version
Define which output format to use. The default is
1
.actions
Define which operations will be included in the JSON output. The default is all actions (
INSERT
,UPDATE
,DELETE
, andTRUNCATE
). However, if you are usingformat-version = 1
,TRUNCATE
is not enabled (for backward compatibility).
H.3.3. Examples #
There are two ways to obtain the changes (JSON objects) from wal2json: calling functions via SQL or pg_recvlogical.
H.3.3.1. pg_recvlogical #
This is an example how to obtain JSON objects from wal2json using pg_recvlogical. Besides the configuration mentioned above, it is necessary to configure a replication connection to use pg_recvlogical. Since Postgres Pro version 10, logical replication matches a normal entry with a database name or keywords such as all
.
To configure a replication connection and database parameters:
Add a replication connection rule to
pg_hba.conf
:local mydatabase myuser trust
Optionally, set
max_wal_senders
inpostgresql.conf
:max_wal_senders = 1
Restart the database server if you changed
max_wal_senders
.
To obtain JSON objects from wal2json:
Open a terminal and connect to the database:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
In another terminal:
$ cat /tmp/example1.sql CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); BEGIN; INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table1_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; DROP TABLE table1_with_pk; DROP TABLE table1_without_pk; $ psql -At -f /tmp/example1.sql postgres CREATE TABLE CREATE TABLE BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78BFC828 3/78BFC880 DELETE 2 3/78BFC990 INSERT 0 1 UPDATE 1 COMMIT DROP TABLE DROP TABLE
The output in the first terminal might look like this:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLE
To drop the slot in the first terminal:
Ctrl+C $ pg_recvlogical -d postgres --slot test_slot --drop-slot
H.3.3.2. Calling SQL Functions #
These are examples how to obtain changes from wal2json via SQL.
If format-version
is set to 1
, the script might look like this:
$ cat /tmp/example2.sql CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table2_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table2_with_pk; DROP TABLE table2_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLE
If format-version
is set to 2
, the script might look like this:
$ cat /tmp/example3.sql CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table3_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table3_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table3_with_pk; DROP TABLE table3_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example3.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78CB8F30 3/78CB8F88 DELETE 2 3/78CB90B8 INSERT 0 1 UPDATE 1 COMMIT psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk" {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"} {"action":"B"} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]} {"action":"C"} stop DROP TABLE DROP TABLE