· Tutorial · 11 min read
Turn PostgreSQL to an event streams with Kafka Connect & Debezium
We will leverage CDC pattern to turns your existing databases into an event stream 🌊

Debezium is a distributed platform that converts information from your existing databases into event stream, enabling applications to detect, and immediately respond to row-level changes in the databases.
In this article, we will use the Debezium PostgreSQL connector to captures row-level changes that insert, update, and delete database content and streams them to Kafka topics.
Since PostgreSQL 10, there is a logical replication stream mode, called pgoutput
that is natively supported. This means that a Debezium PostgreSQL connector can consume that replication stream without the need for any additional plug-ins. This is particularly valuable for environments where installation of plug-ins is not supported or not allowed.
docker-compose.yml
A good way to start is to use the Confluent Cloud stack with docker-compose seen in my last blog post.
services:
zookeeper: image: confluentinc/cp-zookeeper hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
broker: image: confluentinc/cp-server hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry: image: confluentinc/cp-schema-registry hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
control-center: image: confluentinc/cp-enterprise-control-center hostname: control-center container_name: control-center depends_on: - broker - schema-registry ports: - "9021:9021" environment: PORT: 9021 CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083' CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors' CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1
akhq: image: tchiotludo/akhq hostname: akhq container_name: akhq depends_on: - broker - schema-registry ports: - "8080:8080" environment: AKHQ_CONFIGURATION: | akhq: connections: docker-kafka-server: properties: bootstrap.servers: "broker:29092" schema-registry: url: "http://schema-registry:8081"
I just added two environment variables to the control-center
service related to Kafka Connect :
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083'CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
Next, we add Kafka Connect with debezium-connector-postgresql
:
connect: image: confluentinc/cp-kafka-connect hostname: connect container_name: connect depends_on: - zookeeper - broker - postgres ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: broker:29092 CONNECT_REST_PORT: 28082 CONNECT_GROUP_ID: my-kafka-connect-group-id CONNECT_CONFIG_STORAGE_TOPIC: connect-storage-config CONNECT_OFFSET_STORAGE_TOPIC: connect-storage-offset CONNECT_STATUS_STORAGE_TOPIC: connect-storage-status CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_REST_ADVERTISED_HOST_NAME: localhost command: - bash - -c - | echo "Installing Connector" confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest
echo "Launching Kafka Connect worker" /etc/confluent/docker/run
Finally, we add PostgreSQL service :
postgres: image: postgres hostname: postgres container_name: postgres ports: - "5432:5432" environment: POSTGRES_USER : my_user POSTGRES_PASSWORD : my_password POSTGRES_DB : my_db
Starting the services
You can now start all the services with docker-compose up -d
:
docker ps --format 'table {{.ID}} \t {{.Names}} \t {{.Ports}} \t {{.Status}}'
CONTAINER ID NAMES PORTS STATUS656ce9e3fa50 akhq 0.0.0.0:8080->8080/tcp Up 5 minutes (healthy)59aacf4cfbcf control-center 0.0.0.0:9021->9021/tcp Up 5 minutesab74630c5b23 connect 0.0.0.0:8083->8083/tcp, 9092/tcp Up 5 minutes (health: starting)025429f8bf41 schema-registry 0.0.0.0:8081->8081/tcp Up 5 minutes544089c5bc49 broker 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp Up 5 minutesa746f524d7d4 zookeeper 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp Up 5 minutese1222cadc74c postgres 0.0.0.0:5432->5432/tcp Up 5 minutes
After some startup time, you can access Control Center at http://localhost:9021
and see the PostgresConnector
available in the Connect section :
Configuring PostgreSQL wal_level
PostgreSQL support 3 types of Write Ahead Log levels, configured via wal_level
:
replica
minimal
logical
Only logical
adds information necessary to support logical decoding. So let’s enable it !
# show current wal_level, default 'replica'PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SHOW wal_level"
wal_level----------- replica(1 row)
# set wal_level to logical using SQL...PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "ALTER SYSTEM SET wal_level = logical;"
# ... or directly in postgresql.confdocker exec postgres bash -c "echo 'wal_level = logical' >> /var/lib/postgresql/data/postgresql.conf"
# restart containerdocker restart postgres
# wal_level is now 'logical'PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SHOW wal_level"
wal_level----------- logical(1 row)
Configuring PostgreSQL client authentication
PostgreSQL need to accept connection from Debezium PostgreSQL connector. Client authentication is controlled by a configuration file, which traditionally is named pg_hba.conf
. It should work out of the box 🥳
docker exec postgres bash -c "tail -n 15 /var/lib/postgresql/data/pg_hba.conf"
# TYPE DATABASE USER ADDRESS METHOD
# "local" is for Unix domain socket connections onlylocal all all trust# IPv4 local connections:host all all 127.0.0.1/32 trust# IPv6 local connections:host all all ::1/128 trust# Allow replication connections from localhost, by a user with the# replication privilege.local replication all trusthost replication all 127.0.0.1/32 trusthost replication all ::1/128 trust
host all all all scram-sha-256
Register Debezium PostgreSQL connector
Now it’s time to register the Debezium PostgreSQL source connector on our Kafka connect service.
Let’s use the Kafka Connect REST API instead of the UI 🤓
curl -i -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://localhost:8083/connectors -d '{ "name": "cdc-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "topic.prefix": "cdc", "database.hostname": "postgres", "database.port": "5432", "database.user": "my_user", "database.password": "my_password", "database.dbname" : "my_db", "plugin.name": "pgoutput" }}'
HTTP/1.1 201 CreatedDate: Mon, 14 Oct 2024 19:30:16 GMTLocation: http://localhost:8083/connectors/cdc-connectorContent-Type: application/jsonContent-Length: 342Server: Jetty(9.4.54.v20240208)
{"name":"cdc-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","topic.prefix":"cdc","database.hostname":"postgres","database.port":"5432","database.user":"my_user","database.password":"my_password","database.dbname":"my_db","plugin.name":"pgoutput","name":"cdc-connector"},"tasks":[],"type":"source"}%
You should now be able to see the cdc-connector
running in your Control Center 😎
You can also view the replication_slot
created by Debezium using SQL :
PGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user -c "SELECT * FROM pg_replication_slots;"
With the formatted output :
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+------------- debezium | pgoutput | logical | 16384 | my_db | f | t | 55 | | 748 | 0/1976F78 | 0/1976F78 | reserved | | f | f
Do some SQL changes
Debezium PostgreSQL source connector is now up and running, ready to do Change Data Capture on our Database, and send these events into Kafka.
Let’s do some SQL actions !
# launch psqlPGPASSWORD=my_password docker exec -it postgres psql my_db -U my_user
# create table userCREATE TABLE users ( id char(10) NOT NULL, firstname char(50), lastname char(50), PRIMARY KEY (id));
# insert a rowINSERT INTO users (id, firstname, lastname) VALUES('id1', 'Vincent', 'Spiewak');
# update a rowUPDATE users SET firstname='Vince' where id='id1';
# delete a rowDELETE FROM users WHERE id='id1';
Viewing events in Kafka
We just made 4 changes on our database with :
CREATE TABLE
(not captured)INSERT
UPDATE
DELETE
Using AKHQ, you should be able to see 4 events in the topic cdc.public.users
!
First mesage on INSERT
:
163 collapsed lines
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ], "optional": false, "name": "cdc.public.users.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": "id1 ", "firstname": "Vincent ", "lastname": "Spiewak " }, "source": { "version": "2.5.4.Final", "connector": "postgresql", "name": "cdc", "ts_ms": 1728938019116, "snapshot": "false", "db": "my_db", "sequence": "[null,\"26701016\"]", "schema": "public", "table": "users", "txId": 746, "lsn": 26701016, "xmin": null }, "op": "c", "ts_ms": 1728938019487, "transaction": null }}
Second message on UPDATE
:
163 collapsed lines
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ], "optional": false, "name": "cdc.public.users.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": "id1 ", "firstname": "Vince ", "lastname": "Spiewak " }, "source": { "version": "2.5.4.Final", "connector": "postgresql", "name": "cdc", "ts_ms": 1728938081045, "snapshot": "false", "db": "my_db", "sequence": "[\"26701400\",\"26701456\"]", "schema": "public", "table": "users", "txId": 747, "lsn": 26701456, "xmin": null }, "op": "u", "ts_ms": 1728938081058, "transaction": null }}
Third message on DELETE
:
163 collapsed lines
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "firstname" }, { "type": "string", "optional": true, "field": "lastname" } ], "optional": true, "name": "cdc.public.users.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ], "optional": false, "name": "cdc.public.users.Envelope", "version": 1 }, "payload": { "before": { "id": "id1 ", "firstname": null, "lastname": null }, "after": null, "source": { "version": "2.5.4.Final", "connector": "postgresql", "name": "cdc", "ts_ms": 1728938159344, "snapshot": "false", "db": "my_db", "sequence": "[\"26701688\",\"26701744\"]", "schema": "public", "table": "users", "txId": 748, "lsn": 26701744, "xmin": null }, "op": "d", "ts_ms": 1728938159420, "transaction": null }}
There is also a fourth empty message, with the following key
:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" } ], "optional": false, "name": "cdc.public.users.Key" }, "payload": { "id": "id1 " }}
Source Code
You can find the complete sources on my GitHub 🫡