Change Data Capture via Debezium in Integrations
Change data capture (CDC) can be desirable to setup on databases in order to stream granular change events to other parts of the system as data changes.
A fairly popular open source project that handles the intricacies of interfacing with a variety of databases to capture changes is Debezium.
This example highlights a minimal setup to leverage the new standalone Debezium Server for performing change data capture (CDC) from a Postgres database to a NATS stream.
For Postgres, Debzium leverages the logical replication API and
registers itself as a replication target. It then takes this raw representation
of the change and converts it to a standard event data model. Finally, it
publishes to the configured NATS stream, by default, called DebeziumStream
.
Each event is published to a subject corresponding to the Postgres schema and
table, for example, postgres.public.test
. The first token is an optional prefix
specified in the Debezium configuration.
See the source repo for the Docker Compose file and Debezium configuration.
Code
#!/bin/bash
set -euo pipefail
Ensure all the services are up and running.
until nc -z nats 4222; do sleep 1; done
until nc -z postgres 5432; do sleep 1; done
until nc -z debezium 8080; do sleep 1; done
Allow Debezium to setup its connection to Postgres.
sleep 1
Create a table and insert, update, and delete some data.
printf 'Create and populate the database.\n'
psql -h postgres -c "create table profile (id serial primary key, name text, color text);"
psql -h postgres -c "insert into profile (name, color) values ('Joe', 'blue');"
psql -h postgres -c "insert into profile (name, color) values ('Pam', 'green');"
psql -h postgres -c "update profile set color = 'red' where name = 'Joe';"
psql -h postgres -c "update profile set color = 'yellow' where name = 'Pam';"
psql -h postgres -c "delete from profile where name = 'Joe';"
psql -h postgres -c "create table books (id serial primary key, title text, author text);"
psql -h postgres -c "insert into books (title, author) values ('NATS Diaries', 'Pam');"
Ensure the change events have been published to the stream.
sleep 1
Print the stream subjects, seeing each schema/table pair as subjects.
printf '\nStream subjects.\n'
nats stream subjects DebeziumStream
Print the data in the stream, plucking out the before and after values for ease of reading.
printf '\nChange events.\n'
nats consumer add DebeziumStream viewer --ephemeral --pull --defaults > /dev/null
nats consumer next --raw --count 5 DebeziumStream viewer | jq -r '.payload'
Output
Create and populate the database. CREATE TABLE INSERT 0 1 INSERT 0 1 UPDATE 1 UPDATE 1 DELETE 1 CREATE TABLE INSERT 0 1 Stream subjects. ╭─────────────────────────────────────────────────────────────────╮ │ 2 Subjects in stream DebeziumStream │ ├───────────────────────┬───────┬─────────────────────────┬───────┤ │ Subject │ Count │ Subject │ Count │ ├───────────────────────┼───────┼─────────────────────────┼───────┤ │ postgres.public.books │ 1 │ postgres.public.profile │ 5 │ ╰───────────────────────┴───────┴─────────────────────────┴───────╯ Change events. { "before": null, "after": { "id": 1, "name": "Joe", "color": "blue" }, "source": { "version": "2.4.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1699905639230, "snapshot": "false", "db": "postgres", "sequence": "[\"22920144\",\"22920248\"]", "schema": "public", "table": "profile", "txId": 733, "lsn": 22920248, "xmin": null }, "op": "c", "ts_ms": 1699905639524, "transaction": null } { "before": null, "after": { "id": 2, "name": "Pam", "color": "green" }, "source": { "version": "2.4.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1699905639240, "snapshot": "false", "db": "postgres", "sequence": "[\"22920528\",\"22920528\"]", "schema": "public", "table": "profile", "txId": 734, "lsn": 22920528, "xmin": null }, "op": "c", "ts_ms": 1699905639527, "transaction": null } { "before": null, "after": { "id": 1, "name": "Joe", "color": "red" }, "source": { "version": "2.4.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1699905639249, "snapshot": "false", "db": "postgres", "sequence": "[\"22920712\",\"22920712\"]", "schema": "public", "table": "profile", "txId": 735, "lsn": 22920712, "xmin": null }, "op": "u", "ts_ms": 1699905639527, "transaction": null } { "before": null, "after": { "id": 2, "name": "Pam", "color": "yellow" }, "source": { "version": "2.4.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1699905639257, "snapshot": "false", "db": "postgres", "sequence": "[\"22920840\",\"22920840\"]", "schema": "public", "table": "profile", "txId": 736, "lsn": 22920840, "xmin": null }, "op": "u", "ts_ms": 1699905639527, "transaction": null } { "before": { "id": 1, "name": null, "color": null }, "after": null, "source": { "version": "2.4.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1699905639266, "snapshot": "false", "db": "postgres", "sequence": "[\"22920976\",\"22920976\"]", "schema": "public", "table": "profile", "txId": 737, "lsn": 22920976, "xmin": null }, "op": "d", "ts_ms": 1699905639527, "transaction": null }