Rich Lee
2021/6
rich.lee@cathayholdings.com
@rich04230
RICH0423
Get an event stream with all data and schema changes in your DB.
Get an event stream with all data and schema changes in your DB.
Debezium (open source)
Attunity Replicate(Qlik)
Oracle GoldenGate for Big Data
Debezium is a distributed platform for change data capture, led by RedHat, to support capturing changes to a database and generate those changes to Kafka.
Change Event Structure
Key: Primary key of table
Value: Describing the change event
Value fields
Old row state
New row state
Metadata
op: the type of operation
ts_ms: connector processed the event time
{
"before": null,
"after": {
"id": 1004,
"first_name": ”Rich",
"last_name": ”Lee",
"email": ”rich.lee@noanswer.org"
},
"source": {
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"file": "mysqlbin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"db": "inventory",
"table": "customers"
},
"op": "c",
"ts_ms": 1486500577691
}
Debezium’s Db2 connector can capture row-level changes in the tables of a Db2 database.
DB2 connector workflow:
Deploy DB2 connector
Setting Up DB2 - Manually
Setting Up DB2 - Docker
docker run -itd --name mydb2 --privileged=true -p 50000:50000 \
-e LICENSE=accept \
-e DB2INST1_PASSWORD=db2passw0rd \
-e DBNAME=testdb \
rich0423/db2-asncdc:latest
Setting Up DB2 - Docker
VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');
Setting Up DB2 - Docker
# enter in the db2 container
docker exec -ti mydb2 bash -c "su - db2inst1"
#connect to db2
db2 connect to testdb
db2 create schema TEST authorization db2inst1
CREATE TABLE txlog(
tx_id INT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
content VARCHAR(150) NOT NULL,
status INT NOT NULL,
PRIMARY KEY (tx_id)
);
Setting Up DB2 - Docker
CALL ASNCDC.ADDTABLE('TEST', 'TXLOG');
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');
# config/connect-distributed.properties
plugin.path=/Users/rich/work/projects/gitlab/kafka-cdc-poc/connectors
#stop kafka connect worker
kill {process id}
#start kafka connect worker
./bin/connect-distributed.sh config/connect-distributed.properties
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.0.1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.0.1"
},
{
"class": "io.debezium.connector.db2.Db2Connector",
"type": "source",
"version": "1.4.0.Alpha2"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "1.0.3.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.4.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.4.0"
}
]
curl localhost:8083/connector-plugins | jq
curl -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connector-plugins/Db2Connector/config/validate -d '
{
"name": "db2-source-connector",
"connector.class": "io.debezium.connector.db2.Db2Connector",
"database.hostname": "localhost",
"database.port": "50000",
"database.user": "db2inst1",
"database.password": "db2passw0rd",
"database.dbname": "testdb",
"database.server.name": "testdb",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "history.testdb",
"table.include.list": "TEST.TXLOG"
}' | jq
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '
{
"name": "db2-source-connector",
"config": {
"connector.class": "io.debezium.connector.db2.Db2Connector",
"database.hostname": "localhost",
"database.port": "50000",
"database.user": "db2inst1",
"database.password": "db2passw0rd",
"database.dbname": "testdb",
"database.server.name": "testdb",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "history.testdb",
"table.whitelist": "TEST.TXLOG",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "testdb.TEST.(.*)",
"transforms.route.replacement": "$1"
}
}'
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('info log', 200);
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('error log', 500);
INSERT INTO TEST.TXLOG (CONTENT, STATUS) VALUES('debug log', 404);
Setting up PostgreSQL and JDBC Sink connector
Setting up PostgreSQL
#start PostgreSQL
docker-compose up -d
# docker-compose.yaml
version: '3.1'
services:
db:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: passw0rd
POSTGRES_USER: admin
POSTGRES_DB: demo
ports:
- 5432:5432
volumes:
- database-data:/var/lib/postgresql/data/
adminer:
image: adminer
restart: always
ports:
- 9090:8080
volumes:
database-data:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '
{
"name": "postgre-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/demo?user=admin&password=passw0rd",
"topics":"TXLOG",
"dialect.name": "PostgreSqlDatabaseDialect",
"auto.create" : "true",
"insert.mode": "upsert",
"pk.fields": "TX_ID",
"pk.mode" : "record_key",
"delete.enabled": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}'