PostgreSQL 9.4.11 Documentation | |||
---|---|---|---|
Prev | Up | Chapter 46. Logical Decoding | Next |
The following example demonstrates controlling logical decoding using the SQL interface.
Before you can use logical decoding, you must set wal_level to logical and max_replication_slots to at least 1. Then, you should connect to the target database (in the example below, postgres) as a superuser.
postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); slot_name | xlog_position -----------------+--------------- regression_slot | 0/16B1970 (1 row) postgres=# SELECT * FROM pg_replication_slots; slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn -----------------+---------------+-----------+--------+----------+--------+--------+--------------+------------- regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408 (1 row) postgres=# -- There are no changes to see yet postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); location | xid | data ----------+-----+------ (0 rows) postgres=# CREATE TABLE data(id serial primary key, data text); CREATE TABLE postgres=# -- DDL isn't replicated, so all you'll see is the transaction postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); location | xid | data -----------+-----+------------ 0/16D5D48 | 688 | BEGIN 688 0/16E0380 | 688 | COMMIT 688 (2 rows) postgres=# -- Once changes are read, they're consumed and not emitted postgres=# -- in a subsequent call: postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); location | xid | data ----------+-----+------ (0 rows) postgres=# BEGIN; postgres=# INSERT INTO data(data) VALUES('1'); postgres=# INSERT INTO data(data) VALUES('2'); postgres=# COMMIT; postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); location | xid | data -----------+-----+----------------------------------------------- 0/16E0478 | 689 | BEGIN 689 0/16E0478 | 689 | table public.data: INSERT: id[integer]:1 data[text]:'1' 0/16E0580 | 689 | table public.data: INSERT: id[integer]:2 data[text]:'2' 0/16E0650 | 689 | COMMIT 689 (4 rows) postgres=# INSERT INTO data(data) VALUES('3'); postgres=# -- You can also peek ahead in the change stream without consuming changes postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); location | xid | data -----------+-----+----------------------------------------------- 0/16E09C0 | 690 | BEGIN 690 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/16E0B90 | 690 | COMMIT 690 (3 rows) postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); location | xid | data -----------+-----+----------------------------------------------- 0/16E09C0 | 690 | BEGIN 690 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/16E0B90 | 690 | COMMIT 690 (3 rows) postgres=# -- options can be passed to output plugin, to influence the formatting postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); location | xid | data -----------+-----+----------------------------------------------- 0/16E09C0 | 690 | BEGIN 690 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01) (3 rows) postgres=# -- Remember to destroy a slot you no longer need to stop it consuming postgres=# -- server resources: postgres=# SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot ----------------------- (1 row)
The following example shows how logical decoding is controlled over the streaming replication protocol, using the program pg_recvlogical included in the PostgreSQL distribution. This requires that client authentication is set up to allow replication connections (see Section 25.2.5.1) and that max_wal_senders is set sufficiently high to allow an additional connection.
$ pg_recvlogical -d postgres --slot test --create-slot $ pg_recvlogical -d postgres --slot test --start -f - Control+Z $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg BEGIN 693 table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 Control+C $ pg_recvlogical -d postgres --slot test --drop-slot