Debezium: PostgreSQL Change Data Capture

Figure 1: Debezium Postgres Connector

1. Goal

Set up Debezium to capture row-level changes in the schemas of a PostgreSQL database and publish to Kafka topic(s).

The high-level architecture is unquestionably explained in the above diagram 😎. Pikachu, aka Debezium PostgreSQL Connector, detects and carries/publishes row-level change events to Kafka topic(s) for configured Postgres tables.


2. Definitions

2.1. Change Data Capture (CDC)

In databases, change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed (the "deltas") so that action can be taken using the changed data [1].


2.2. Debezium

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred [2].


2.3. Debezium Connectors

A library of connectors that capture changes from a variety of database management systems and produce events with very similar structures, making it far easier for your applications to consume and respond to the events regardless of where the changes originated [3].


2.4. Debezium connector for PostgreSQL

The Debezium PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database [4].


2.5. Kafka

Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Streaming data is data that is continuously generated by thousands of data sources, which typically send the data records in simultaneously [5].


2.6. Kafka Connect

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka [6].


3. Define Services (Docker-Compose)

As a generate note, If you use Mac M1/M2, ensure the docker image has linux/arm64 OS/ARCH.

Section 3.x covers the breakdown of each service/docker image used in docker-compose.yaml file, if you have worked with docker before, skip the section and pick up the entire file from section 4 instead.

Break down of services in docker-compose.yaml

  • Postgres: The database containing the table(s) for which CDC is tracked.

  • Kafka and Zookeeper: The event broker where CDC events are stored.

  • Schema Registry: To serialize/deserialize CDC message(s) using Avro schema.

  • Debezium: Responsible for capturing the row-level changes made to Postgres table(s) and streaming them to a Kafka topic.

3.1. PostgreSQL

debezium/postgres: PostgreSQL for use with Debezium change data capture. This image is based upon postgres along with logical decoding plugin from Debezium

dpage/pgadmin4 (Optional): Web browser version of pgAdmin 4 for the ease of running DML and DDL operations on PostgreSQL.


postgres:
  image: debezium/postgres:13-alpine
  ports:
    - 5432:5432
  environment:
    - POSTGRES_USER=admin
    - POSTGRES_PASSWORD=root
    - POSTGRES_DB=pyblog

pgadmin:
  image: dpage/pgadmin4
  environment:
    - PGADMIN_DEFAULT_EMAIL=admin@admin.com
    - PGADMIN_DEFAULT_PASSWORD=root
  ports:
    - '5050:80'
  restart: always

3.2. Kafka and Zookeeper

Confluent Platform Docker images for Kafka: confluentinc/cp-enterprise-kafka/postgres and Zookeeper: confluentinc/cp-zookeeper. The below example is for version 7.3, a more recent version, i.e., 7.5 onwards, Confluent recommends KRaft mode for new deployments, and Zookeeper is deprecated.


zookeeper:
  image: confluentinc/cp-zookeeper:7.3.5
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181

kafka:
  image: confluentinc/cp-enterprise-kafka:7.3.5
  depends_on: [zookeeper]
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_JMX_PORT: 9991
  ports:
    - 9092:9092

3.3. Debezium and Schema Registry

debezium/connect image defines a runnable Kafka Connect service preconfigured with all Debezium connectors; it monitors database management system(s) for changing data and then forwards those changes directly into Kafka topics organized by server, database, and table.

confluentinc/cp-schema-registry enables client applications to read and write Avro data, in this case, to serialize and deserialize CDC messages.


debezium:
  image: debezium/connect:2.4
  environment:
    BOOTSTRAP_SERVERS: kafka:9092
    GROUP_ID: 1
    CONFIG_STORAGE_TOPIC: connect_configs
    OFFSET_STORAGE_TOPIC: connect_offsets
    STATUS_STORAGE_TOPIC: my_status_topic
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
  depends_on: [kafka]
  ports:
    - 8083:8083

schema-registry:
  image: confluentinc/cp-schema-registry:7.3.5
  environment:
    - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
    - SCHEMA_REGISTRY_HOST_NAME=schema-registry
    - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8085,http://localhost:8085
  ports:
    - 8085:8085
  depends_on: [zookeeper, kafka]

4. Start Services (Docker-Compose)

The complete docker-compose.yaml to set up Postgres with debezium and publish data change events to Kafka:

Note: At the time of writing this post, the services use the current stable version(s); visit the docker hub page for the latest stable version(s).

version: "3.7"
services:
  postgres:
    image: debezium/postgres:13-alpine
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=root
      - POSTGRES_DB=pyblog

  pgadmin:
    image: dpage/pgadmin4
    environment:
      - PGADMIN_DEFAULT_EMAIL=admin@admin.com
      - PGADMIN_DEFAULT_PASSWORD=root
    ports:
      - '5050:80'
    restart: always

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.5
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:7.3.5
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092

  debezium:
    image: debezium/connect:2.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: my_status_topic
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
    depends_on: [kafka]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.5
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8085,http://localhost:8085
    ports:
      - 8085:8085
    depends_on: [zookeeper, kafka]

4.1. Run all containers

Clean-up (Optional) and Run (Create and Start) containers:

docker rm -f $(docker ps -a -q)
docker-compose up -d

Make a note of the assigned network name; from the above output, the network name is: enricher_default. To create a custom network, refer Networking in Compose


5. Configure Services

End-to-end configuration for all the services to create the CDC pipeline:

5.1. Create Postgres Tables

a. Login to pgAdmin localhost:5050 with email/password (admin@admin.com/root) configured in pgadmin container (refer: docker-compose.yaml)


b. Register database server with username/password (admin/root) and hostname (postgres) configured in postgres container (refer: docker-compose.yaml)


c. Create and Alter table queries:

Example: Create a table user-profile from the query tool to track data change events in this table. Skip this; if you already have your own schema for Postgres database tables for which CDC has to be configured.

CREATE TABLE user_profile (
  user_id INT NOT NULL,
  full_name VARCHAR(64) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY (user_id),
  UNIQUE (email)
);

ALTER TABLE user_profile REPLICA IDENTITY FULL;

Setting the table's replication identity to full infers that the entire row is used as the identifier for change-tracking.


5.2. Set up Debezium Postgres Connector (Kafka Connect)

a. Check the status of the Kafka Connect service:

curl -H "Accept:application/json" localhost:8083/


b. Register the Debezium Postgres connector:

Create a file debezium.json, the Debezium Postgres connector configuration, where user_profile is the table being tracked

{
    "name": "postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "admin",
        "database.password": "root",
        "database.dbname": "pyblog",
        "database.server.name": "postgres",
        "table.include.list": "public.user_profile",
        "table.whitelist": "public.user_profile",
        "database.tcpKeepAlive": true,
        "topic.prefix": "topic_user_profile"
    }
}

This command uses the Kafka Connect service’s API to submit a POST request against the /connectors resource with a JSON document that describes the new connector (called postgresql-connector).

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data "@debezium.json"


c. Check the list of connectors registered with Kafka Connect:

curl -H "Accept:application/json" localhost:8083/connectors/


5.3. View Kafka Messages

a. Pull kafkacat docker image:

docker pull confluentinc/cp-kafkacat:7.1.9
kafkacat is a commandline tool for interacting with Kafka brokers. It can be used to produce and consume messages, as well as query metadata.

b. Listing topics on a broker:

For the Kafka broker is accessible as kafka:9092 on the Docker network enricher_default, list topics by running:

docker run --tty \
--network enricher_default \
confluentinc/cp-kafkacat:7.1.9 \
kafkacat -b kafka:9092 \
-L

c. Consuming messages from a topic:

For the Kafka broker is accessible as kafka:9092 on the Docker network enricher_default, print messages and their associated metadata from topic topic_user_profile.public.user_profile:

docker run --tty \
--network enricher_default \
confluentinc/cp-kafkacat:7.1.9 \
kafkacat -b kafka:9092 -C \
-t topic_user_profile.public.user_profile

If you get the error % ERROR: Topic topic_user_profile.public.user_profile error: Broker: Leader not available, run the same command again!


6. Moment of Truth 🚀

a. Insert/Update a row in Postgres table:

For the table, Debezium CDC is configured; Following the example, creating a row in user_profile

INSERT INTO user_profile
 (user_id, full_name, email) 
VALUES
 (1,'John Ross', 'john.ross@pyblog.xyz');

b. Validate messages in Kafka topic:

Consuming the Kafka messages, as mentioned in 3.2.4, section c, the output for inserting a new row:


c. Stop services and delete Docker Containers:

To stop all the services and delete the docker containers, run:

docker-compose down
docker rm -f $(docker ps -a -q)


7. Conclusion

The post demonstrated how to capture data change events with Debezium by streaming data from a PostgreSQL database to Kafka.

Change Data Capture (CDC) has a lot of use cases, some of the top uses: Updating/Invalidating Cache, Enriching Data/Logs from Entity Identifiers, Real-time data loading into Data Warehouse(s) and search engine(s), Synchronize data (on-premises to cloud), Microservices Data exchange with the Outbox Pattern and many more.

Whats' next: In the next post, we see how to process the CDC events with stream processing engines such as Apache Flink, cache the transformed data (RockDB), and enrich/cleanse other events with more meaningful information than their raw versions without having to query the source database.


8. References

[1] Wikipedia Contributors, “Change data capture,” Wikipedia, Feb. 04, 2019. https://en.wikipedia.org/wiki/Change_data_capture

[2] “Debezium Documentation :: Debezium Documentation,” debezium.io. https://debezium.io/documentation/reference/stable/index.html

[3] “Connectors :: Debezium Documentation,” debezium.io. https://debezium.io/documentation/reference/stable/connectors/index.html

[4] “Debezium connector for PostgreSQL :: Debezium Documentation,” debezium.io. https://debezium.io/documentation/reference/stable/connectors/postgresql.html (accessed Oct. 21, 2023).

[5] “What is Apache Kafka? | AWS,” Amazon Web Services, Inc. https://aws.amazon.com/msk/what-is-kafka/

[6] “Kafka Connect | Confluent Documentation,” docs.confluent.io. https://docs.confluent.io/platform/current/connect/index.html
‌
‌[7] J. P. Alvim, “Streaming data from PostgreSQL to s3 using Debezium, Kafka and Python,” Medium, Feb. 11, 2023. https://medium.com/@joaopaulonobregaalvim/streaming-data-from-postgresql-to-s3-using-debezium-kafka-and-python-16c6cdd6dc1e (accessed Oct. 21, 2023).

[8] D. Danushka, “Configuring Debezium to Capture PostgreSQL Changes with Docker Compose,” Tributary Data, Aug. 16, 2021. https://medium.com/event-driven-utopia/configuring-debezium-to-capture-postgresql-changes-with-docker-compose-224742ca5372 (accessed Oct. 21, 2023).

Cite this article as: Adesh Nalpet Adimurthy. (Oct 16, 2023). Debezium: PostgreSQL Change Data Capture. PyBlog. https://www.pyblog.xyz/debezium-postgres-cdc

#index table of contents