Kafka Connect CDC Training

Rich Lee

2021/6

Integration Architect

  • System Integration 
  • Microservices
  • Event-Driven Architecture
  • Cloud Native App Development

rich.lee@cathayholdings.com

@rich04230

RICH0423

Agenda

  • CDC(Change Data Capture)

  • Debezium CDC Platform

  • Debezium DB2 Connector

  • Kafka Connect JDBC Connector

What is CDC?

Get an event stream with all data and schema changes in your DB. 

Change Data Capture

Change Data Capture

Get an event stream with all data and schema changes in your DB. 

Change Data Capture Tools

Change Data Capture Architecture

Debezium CDC Platform

Debezium is a distributed platform for change data capture, led by RedHat, to support capturing changes to a database and generate those changes to Kafka.

Debezium Connectors

  • MongoDB
  • MySQL
  • PostgreSQL
  • SQL Server
  • Oracle (Incubating)
  • Db2 (Incubating)
  • Cassandra (Incubating)

 

Debezium Event Structure

  • Change Event Structure

    • Key: Primary key of table

    • Value: Describing the change event

  • Value fields

    • Old row state

    • New row state

    • Metadata

    • op: the type of operation

    • ts_ms: connector processed the event time

 

{
  "before": null,
  "after": {
    "id": 1004,
    "first_name": ”Rich",
    "last_name": ”Lee",
    "email": ”rich.lee@noanswer.org"
  },
  "source": {
    "name": "dbserver1",
    "server_id": 0,
    "ts_sec": 0,
    "file": "mysqlbin.000003",
    "pos": 154,
    "row": 0,
    "snapshot": true,
	"db": "inventory",
	"table": "customers"
  },
  "op": "c",
  "ts_ms": 1486500577691
}

Debezium DB2 Connector

Debezium DB2 Connector

Debezium’s Db2 connector can capture row-level changes in the tables of a Db2 database.

  • inspired by the Debezium's SQL Server connector
  • uses a SQL-based polling model that puts tables into "capture mode".
  • the Debezium Db2 connector generates and streams a change event for each row-level update to that table.

 

Debezium DB2 Connector

DB2 connector workflow:

 

 

Debezium DB2 Connector

Deploy DB2 connector

 

 

Debezium DB2 Connector

Setting Up DB2 - Manually

  • Install and enable the Debezium management UDFs
    • Debezium provides a set of user-defined functions (UDFs) for your convenience.
    • Alternatively, you can run Db2 control commands to put tables into capture mode.
    • reference here and follow steps 1~9

Debezium DB2 Connector

Setting Up DB2 - Docker

  • pull the custom Docker image with DB2 and start the container
docker run -itd --name mydb2 --privileged=true -p 50000:50000 \
  -e LICENSE=accept \
  -e DB2INST1_PASSWORD=db2passw0rd \
  -e DBNAME=testdb \
  rich0423/db2-asncdc:latest

Debezium DB2 Connector

Setting Up DB2 - Docker

VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');
  • start tha ASN agent

Debezium DB2 Connector

Setting Up DB2 - Docker

  • Put tables into capture mode
    • create a schema: TEST 
    •  
    •  
    •  
    •  
    • create a table​: TXLOG

 

 

 

# enter in the db2 container
docker exec -ti mydb2 bash -c "su - db2inst1"

#connect to db2
db2 connect to testdb
db2 create schema TEST authorization db2inst1
CREATE TABLE txlog(
    tx_id INT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
    content VARCHAR(150) NOT NULL,
    status INT NOT NULL,
    PRIMARY KEY (tx_id)
);

Debezium DB2 Connector

Setting Up DB2 - Docker

  • Put tables into capture mode
    • Invoke the ADDTABLE statement for each table that you want to put into capture

 

 

CALL ASNCDC.ADDTABLE('TEST', 'TXLOG'); 
  • Reinitialize the ASN service

 

 

VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');

Installing Debezium DB2 Connector

  • Download the connector’s plug-in archive.
  • Extract the JAR files into your Kafka Connect environment.
  • Add the directory with the JAR files to Kafka Connect’s plugin.path.
  • Restart your Kafka Connect
# config/connect-distributed.properties
plugin.path=/Users/rich/work/projects/gitlab/kafka-cdc-poc/connectors

 

  • Restart your Kafka Connect
#stop kafka connect worker
kill {process id}

#start kafka connect worker
./bin/connect-distributed.sh config/connect-distributed.properties

Installing Debezium DB2 Connector

  • Get the installed plugins
[
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "10.0.1"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "10.0.1"
  },
  {
    "class": "io.debezium.connector.db2.Db2Connector",
    "type": "source",
    "version": "1.4.0.Alpha2"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "1.0.3.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.4.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.4.0"
  }
]
curl localhost:8083/connector-plugins | jq

Installing Debezium DB2 Connector

  • DB2 connector configuration properties

Installing Debezium DB2 Connector

  • Validate the provided configuration values against the configuration definition.
curl  -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connector-plugins/Db2Connector/config/validate -d '
{
    "name": "db2-source-connector",
    "connector.class": "io.debezium.connector.db2.Db2Connector",
    "database.hostname": "localhost",
    "database.port": "50000",
    "database.user": "db2inst1",
    "database.password": "db2passw0rd",
    "database.dbname": "testdb",
    "database.server.name": "testdb",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "history.testdb",
    "table.include.list": "TEST.TXLOG"
 }' | jq

Installing Debezium DB2 Connector

  • Deploy DB2 source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '
{
  "name": "db2-source-connector",  
  "config": {
    "connector.class": "io.debezium.connector.db2.Db2Connector", 
    "database.hostname": "localhost", 
    "database.port": "50000", 
    "database.user": "db2inst1", 
    "database.password": "db2passw0rd", 
    "database.dbname": "testdb", 
    "database.server.name": "testdb", 
    "database.history.kafka.bootstrap.servers": "localhost:9092", 
    "database.history.kafka.topic": "history.testdb",
    "table.whitelist": "TEST.TXLOG",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "testdb.TEST.(.*)",
    "transforms.route.replacement": "$1"
  }
}'

Installing Debezium DB2 Connector

  • Inserting test data
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('info log', 200);
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('error log', 500);
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('debug log', 404);

Kafka Connect JDBC Connector

Setting up PostgreSQL and JDBC Sink connector

 

 

Kafka Connect JDBC Connector

Setting up PostgreSQL

 

 

Kafka Connect JDBC Connector

#start PostgreSQL
docker-compose up -d
# docker-compose.yaml

version: '3.1'

services:
  db:
    image: postgres
    restart: always
    environment:
      POSTGRES_PASSWORD: passw0rd
      POSTGRES_USER: admin
      POSTGRES_DB: demo
    ports:
      - 5432:5432
    volumes:
      - database-data:/var/lib/postgresql/data/

  adminer:
    image: adminer
    restart: always
    ports:
      - 9090:8080

volumes:
  database-data:

 

Kafka Connect JDBC Connector

  •  JDBC Sink Connector configuration properties

Kafka Connect JDBC Connector

Kafka Connect JDBC Connector

  • Deploy JDBC sink connector for PostgreSQL
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '
{
  "name": "postgre-jdbc-sink",  
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", 
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/demo?user=admin&password=passw0rd",
    "topics":"TXLOG",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "auto.create" : "true",
    "insert.mode": "upsert",
    "pk.fields": "TX_ID",
    "pk.mode" : "record_key",
    "delete.enabled": "true",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}'

Reference