H.3. wal2json — convert WAL changes into JSON via logical decoding #

The wal2json module is a Postgres Pro extension for logical decoding that converts database changes from the write-ahead log (WAL) into JSON format. Postgres Pro has access to tuples generated by INSERT and UPDATE operations. Depending on the configured replica identity, it can also access previous row versions of UPDATE and DELETE. Changes can be consumed either through the streaming protocol (logical replication slots) or through a specialized SQL API.

Format version 1 generates a JSON object per transaction. This object contains all new and old tuples, with optional inclusion of properties such as transaction timestamps, schema-qualified names, data types, and transaction identifiers.

Format version 2 generates a JSON object per tuple, with optional JSON objects marking transaction start and end. Different tuple properties can also be included.

H.3.1. Installation and Setup #

The wal2json extension is provided with Postgres Pro Standard as a separate pre-built package wal2json-std-16 (for the detailed installation instructions, see Chapter 16). Once you have Postgres Pro Standard installed, complete the following steps to enable wal2json:

  1. Enable logical decoding by setting wal_level to logical in the postgresql.conf file.

    Optionally, you can also edit values of the max_replication_slots and max_wal_senders parameters.

  2. Restart the database server for the changes to take effect.

H.3.2. Options #

The wal2json module provides a variety of options for managing logical decoding:

include-xids

Add transaction IDs to each set of changes in the JSON output. The default is off.

include-timestamp

Add timestamp to each set of changes in the JSON output. The default is off.

include-schemas

Add the schema name to each change record in the JSON output. The default is on.

include-types

Add type to each change record in the JSON output. The default is on.

include-typmod

Add type modifiers for columns that have them. The default is on.

include-type-oids

Add type OIDs. The default is off.

include-domain-data-type

Replace a domain name with the underlying data type. The default is off.

include-column-positions

Add the column position (pg_attribute.attnum). The default is off.

include-origin

Add the origin of each change. The default is off.

include-not-null

Add information whether a column is marked as not null in the columnoptionals field of the JSON output. The default is off.

include-default

Add default expressions to the JSON output. The default is off.

include-pk

Add primary key information including column names and data type to the pk field of the JSON output. The default is off.

numeric-data-types-as-string

Convert values of numeric data types to strings in the JSON output. The JSON specification does not support Infinity and NaN as valid numeric values. There might be potential interoperability problems for double precision numbers. The default is off.

pretty-print

Add spaces and indentation to the JSON output structure. The default is off.

write-in-chunks

Enable writing the JSON output in smaller chunks instead of writing the entire changeset at once. This option is used only when format-version is set to 1. The default is off.

include-lsn

Add the nextlsn field to each changeset. The default is off.

include-transaction

Add records denoting the start and end of each transaction to the JSON output. The default is off.

filter-origins

Exclude changes from the specified origins. The default is empty, which means that no origin will be filtered. The value is a comma-separated list.

filter-tables

Exclude rows from the specified tables. The default is empty, which means that no table will be filtered. The value is a comma-separated list. The tables should be schema-qualified. *.foo means the foo table in all schemas, and bar.* means all tables in the bar schema. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table names are case-sensitive. For example, table "public"."Foo bar" should be specified as public.Foo\ bar.

add-tables

Include rows only from the specified tables. The default is empty, which means that all tables from all schemas are included. It follows the same rules as filter-tables.

filter-msg-prefixes

Exclude messages whose prefix is specified in the option value. The default is empty, which means that no messages will be filtered. The value is a comma-separated list.

add-msg-prefixes

Include only messages whose prefix is specified in the option value. The default is all prefixes. The value is a comma-separated list. wal2json applies filter-msg-prefixes before this option.

format-version

Define which output format to use. The default is 1.

actions

Define which operations will be included in the JSON output. The default is all actions (INSERT, UPDATE, DELETE, and TRUNCATE). However, if you are using format-version = 1, TRUNCATE is not enabled (for backward compatibility).

H.3.3. Examples #

There are two ways to obtain the changes (JSON objects) from wal2json: calling functions via SQL or pg_recvlogical.

H.3.3.1. pg_recvlogical #

This is an example how to obtain JSON objects from wal2json using pg_recvlogical. Besides the configuration mentioned above, it is necessary to configure a replication connection to use pg_recvlogical. Since Postgres Pro version 10, logical replication matches a normal entry with a database name or keywords such as all.

To configure a replication connection and database parameters:

  1. Add a replication connection rule to pg_hba.conf:

              local    mydatabase      myuser                     trust
            
  2. Optionally, set max_wal_senders in postgresql.conf:

              max_wal_senders = 1
            
  3. Restart the database server if you changed max_wal_senders.

To obtain JSON objects from wal2json:

  1. Open a terminal and connect to the database:

             $ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
             $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
            
  2. In another terminal:

            $ cat /tmp/example1.sql
            CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
            CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
            BEGIN;
            INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
            INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
            INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
            SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
            SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
            DELETE FROM table1_with_pk WHERE a < 3;
            SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
    
            INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
            -- it is not added to stream because there isn't a pk or a replica identity
            UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
            COMMIT;
    
            DROP TABLE table1_with_pk;
            DROP TABLE table1_without_pk;
    
            $ psql -At -f /tmp/example1.sql postgres
            CREATE TABLE
            CREATE TABLE
            BEGIN
            INSERT 0 1
            INSERT 0 1
            INSERT 0 1
            3/78BFC828
            3/78BFC880
            DELETE 2
            3/78BFC990
            INSERT 0 1
            UPDATE 1
            COMMIT
            DROP TABLE
            DROP TABLE
           
  3. The output in the first terminal might look like this:

              $ psql -At -f /tmp/example2.sql postgres
              CREATE TABLE
              CREATE TABLE
              init
              BEGIN
              INSERT 0 1
              INSERT 0 1
              INSERT 0 1
              3/78C2CA50
              3/78C2CAA8
              DELETE 2
              3/78C2CBD8
              INSERT 0 1
              UPDATE 1
              COMMIT
              {
                  "change": [
                      {
                          "kind": "message",
                          "transactional": false,
                          "prefix": "wal2json",
                          "content": "this non-transactional message will be delivered even if you rollback the transaction"
                      }
                  ]
              }
              psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
              {
                "change": [
                  {
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
                  }
                      ,{
                          "kind": "message",
                          "transactional": true,
                          "prefix": "wal2json",
                          "content": "this message will be delivered"
                      }
                  ,{
                    "kind": "delete",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "oldkeys": {
                      "keynames": ["a", "c"],
                      "keytypes": ["integer", "timestamp without time zone"],
                      "keyvalues": [1, "2018-03-27 12:05:29.914496"]
                    }
                  }
                  ,{
                    "kind": "delete",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "oldkeys": {
                      "keynames": ["a", "c"],
                      "keytypes": ["integer", "timestamp without time zone"],
                      "keyvalues": [2, "2018-03-27 12:05:29.914496"]
                    }
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_without_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "numeric(5,2)", "text"],
                    "columnvalues": [1, 2.34, "Tapir"]
                  }
                ]
              }
              stop
              DROP TABLE
              DROP TABLE
            
  4. To drop the slot in the first terminal:

            Ctrl+C
            $ pg_recvlogical -d postgres --slot test_slot --drop-slot
           

H.3.3.2. Calling SQL Functions #

These are examples how to obtain changes from wal2json via SQL.

If format-version is set to 1, the script might look like this:

        $ cat /tmp/example2.sql
        CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
        CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

        BEGIN;
        INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
        INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
        INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
        DELETE FROM table2_with_pk WHERE a < 3;
        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

        INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
        -- it is not added to stream because there isn't a pk or a replica identity
        UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
        COMMIT;

        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

        DROP TABLE table2_with_pk;
        DROP TABLE table2_without_pk;
       

The expected output might look like this:

        $ psql -At -f /tmp/example2.sql postgres
        CREATE TABLE
        CREATE TABLE
        init
        BEGIN
        INSERT 0 1
        INSERT 0 1
        INSERT 0 1
        3/78C2CA50
        3/78C2CAA8
        DELETE 2
        3/78C2CBD8
        INSERT 0 1
        UPDATE 1
        COMMIT
        {
            "change": [
                {
                    "kind": "message",
                    "transactional": false,
                    "prefix": "wal2json",
                    "content": "this non-transactional message will be delivered even if you rollback the transaction"
                }
            ]
        }
        psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
        {
          "change": [
            {
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
            }
                ,{
                    "kind": "message",
                    "transactional": true,
                    "prefix": "wal2json",
                    "content": "this message will be delivered"
                }
            ,{
              "kind": "delete",
              "schema": "public",
              "table": "table2_with_pk",
              "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [1, "2018-03-27 12:05:29.914496"]
              }
            }
            ,{
              "kind": "delete",
              "schema": "public",
              "table": "table2_with_pk",
              "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [2, "2018-03-27 12:05:29.914496"]
              }
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_without_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "numeric(5,2)", "text"],
              "columnvalues": [1, 2.34, "Tapir"]
            }
          ]
        }
        stop
        DROP TABLE
        DROP TABLE
       

If format-version is set to 2, the script might look like this:

        $ cat /tmp/example3.sql
        CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
        CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

        BEGIN;
        INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
        INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
        INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
        DELETE FROM table3_with_pk WHERE a < 3;
        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

        INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
        -- it is not added to stream because there isn't a pk or a replica identity
        UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
        COMMIT;

        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

        DROP TABLE table3_with_pk;
        DROP TABLE table3_without_pk;
       

The expected output might look like this:

        $ psql -At -f /tmp/example3.sql postgres
        CREATE TABLE
        CREATE TABLE
        init
        BEGIN
        INSERT 0 1
        INSERT 0 1
        INSERT 0 1
        3/78CB8F30
        3/78CB8F88
        DELETE 2
        3/78CB90B8
        INSERT 0 1
        UPDATE 1
        COMMIT
        psql:/tmp/example3.sql:20: WARNING:  no tuple identifier for UPDATE in table "public"."table3_without_pk"
        {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
        {"action":"B"}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
        {"action":"C"}
        stop
        DROP TABLE
        DROP TABLE