· Tutorial  · 11 min read

Turn PostgreSQL to an event streams with Kafka Connect & Debezium

We will leverage CDC pattern to turns your existing databases into an event stream 🌊

We will leverage CDC pattern to turns your existing databases into an event stream 🌊

Debezium is a distributed platform that converts information from your existing databases into event stream, enabling applications to detect, and immediately respond to row-level changes in the databases.

In this article, we will use the Debezium PostgreSQL connector to captures row-level changes that insert, update, and delete database content and streams them to Kafka topics.

Since PostgreSQL 10, there is a logical replication stream mode, called pgoutput that is natively supported. This means that a Debezium PostgreSQL connector can consume that replication stream without the need for any additional plug-ins. This is particularly valuable for environments where installation of plug-ins is not supported or not allowed.

docker-compose.yml

A good way to start is to use the Confluent Cloud stack with docker-compose seen in my last blog post.

services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
PORT: 9021
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
akhq:
image: tchiotludo/akhq
hostname: akhq
container_name: akhq
depends_on:
- broker
- schema-registry
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "broker:29092"
schema-registry:
url: "http://schema-registry:8081"

I just added two environment variables to the control-center service related to Kafka Connect :

CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'

Next, we add Kafka Connect with debezium-connector-postgresql :

connect:
image: confluentinc/cp-kafka-connect
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- postgres
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_PORT: 28082
CONNECT_GROUP_ID: my-kafka-connect-group-id
CONNECT_CONFIG_STORAGE_TOPIC: connect-storage-config
CONNECT_OFFSET_STORAGE_TOPIC: connect-storage-offset
CONNECT_STATUS_STORAGE_TOPIC: connect-storage-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run

Finally, we add PostgreSQL service :

postgres:
image: postgres
hostname: postgres
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER : my_user
POSTGRES_PASSWORD : my_password
POSTGRES_DB : my_db

Starting the services

You can now start all the services with docker-compose up -d :

Terminal window
docker ps --format 'table {{.ID}} \t {{.Names}} \t {{.Ports}} \t {{.Status}}'
CONTAINER ID NAMES PORTS STATUS
656ce9e3fa50 akhq 0.0.0.0:8080->8080/tcp Up 5 minutes (healthy)
59aacf4cfbcf control-center 0.0.0.0:9021->9021/tcp Up 5 minutes
ab74630c5b23 connect 0.0.0.0:8083->8083/tcp, 9092/tcp Up 5 minutes (health: starting)
025429f8bf41 schema-registry 0.0.0.0:8081->8081/tcp Up 5 minutes
544089c5bc49 broker 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp Up 5 minutes
a746f524d7d4 zookeeper 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp Up 5 minutes
e1222cadc74c postgres 0.0.0.0:5432->5432/tcp Up 5 minutes

After some startup time, you can access Control Center at http://localhost:9021 and see the PostgresConnector available in the Connect section :

Control Center

Configuring PostgreSQL wal_level

PostgreSQL support 3 types of Write Ahead Log levels, configured via wal_level :

  • replica
  • minimal
  • logical

Only logical adds information necessary to support logical decoding. So let’s enable it !

Terminal window
# show current wal_level, default 'replica'
PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SHOW wal_level"
wal_level
-----------
replica
(1 row)
# set wal_level to logical using SQL...
PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "ALTER SYSTEM SET wal_level = logical;"
# ... or directly in postgresql.conf
docker exec postgres bash -c "echo 'wal_level = logical' >> /var/lib/postgresql/data/postgresql.conf"
# restart container
docker restart postgres
# wal_level is now 'logical'
PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SHOW wal_level"
wal_level
-----------
logical
(1 row)

Configuring PostgreSQL client authentication

PostgreSQL need to accept connection from Debezium PostgreSQL connector. Client authentication is controlled by a configuration file, which traditionally is named pg_hba.conf. It should work out of the box 🥳

Terminal window
docker exec postgres bash -c "tail -n 15 /var/lib/postgresql/data/pg_hba.conf"
# TYPE DATABASE USER ADDRESS METHOD
# "local" is for Unix domain socket connections only
local all all trust
# IPv4 local connections:
host all all 127.0.0.1/32 trust
# IPv6 local connections:
host all all ::1/128 trust
# Allow replication connections from localhost, by a user with the
# replication privilege.
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust
host all all all scram-sha-256

Register Debezium PostgreSQL connector

Now it’s time to register the Debezium PostgreSQL source connector on our Kafka connect service.

Let’s use the Kafka Connect REST API instead of the UI 🤓

Terminal window
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors -d '
{
"name": "cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "cdc",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "my_user",
"database.password": "my_password",
"database.dbname" : "my_db",
"plugin.name": "pgoutput"
}
}'
HTTP/1.1 201 Created
Date: Mon, 14 Oct 2024 19:30:16 GMT
Location: http://localhost:8083/connectors/cdc-connector
Content-Type: application/json
Content-Length: 342
Server: Jetty(9.4.54.v20240208)
{"name":"cdc-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","topic.prefix":"cdc","database.hostname":"postgres","database.port":"5432","database.user":"my_user","database.password":"my_password","database.dbname":"my_db","plugin.name":"pgoutput","name":"cdc-connector"},"tasks":[],"type":"source"}%

You should now be able to see the cdc-connector running in your Control Center 😎

Control Center

You can also view the replication_slot created by Debezium using SQL :

PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SELECT * FROM pg_replication_slots;"

With the formatted output :

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
debezium | pgoutput | logical | 16384 | my_db | f | t | 55 | | 748 | 0/1976F78 | 0/1976F78 | reserved | | f | f

Do some SQL changes

Debezium PostgreSQL source connector is now up and running, ready to do Change Data Capture on our Database, and send these events into Kafka.

Let’s do some SQL actions !

Terminal window
# launch psql
PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user
# create table user
CREATE TABLE users (
id char(10) NOT NULL,
firstname char(50),
lastname char(50),
PRIMARY KEY (id)
);
# insert a row
INSERT INTO users (id, firstname, lastname) VALUES('id1', 'Vincent', 'Spiewak');
# update a row
UPDATE users SET firstname='Vince' where id='id1';
# delete a row
DELETE FROM users WHERE id='id1';

Viewing events in Kafka

We just made 4 changes on our database with :

  • CREATE TABLE (not captured)
  • INSERT
  • UPDATE
  • DELETE

Using AKHQ, you should be able to see 4 events in the topic cdc.public.users !

AKHQ

First mesage on INSERT :

163 collapsed lines
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "cdc.public.users.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": "id1 ",
"firstname": "Vincent ",
"lastname": "Spiewak "
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1728938019116,
"snapshot": "false",
"db": "my_db",
"sequence": "[null,\"26701016\"]",
"schema": "public",
"table": "users",
"txId": 746,
"lsn": 26701016,
"xmin": null
},
"op": "c",
"ts_ms": 1728938019487,
"transaction": null
}
}

Second message on UPDATE :

163 collapsed lines
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "cdc.public.users.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": "id1 ",
"firstname": "Vince ",
"lastname": "Spiewak "
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1728938081045,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"26701400\",\"26701456\"]",
"schema": "public",
"table": "users",
"txId": 747,
"lsn": 26701456,
"xmin": null
},
"op": "u",
"ts_ms": 1728938081058,
"transaction": null
}
}

Third message on DELETE :

163 collapsed lines
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": true,
"name": "cdc.public.users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "cdc.public.users.Envelope",
"version": 1
},
"payload": {
"before": {
"id": "id1 ",
"firstname": null,
"lastname": null
},
"after": null,
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1728938159344,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"26701688\",\"26701744\"]",
"schema": "public",
"table": "users",
"txId": 748,
"lsn": 26701744,
"xmin": null
},
"op": "d",
"ts_ms": 1728938159420,
"transaction": null
}
}

There is also a fourth empty message, with the following key :

{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "cdc.public.users.Key"
},
"payload": {
"id": "id1 "
}
}

Source Code

You can find the complete sources on my GitHub 🫡

References

Back to Blog

Related Posts

View All Posts »