Preamble
Working with the database system has always been a difficult and tedious job, recently I had the opportunity to work with a new solution for the database to use to detect changes from a database and copy that data to another database of the same or different type. To handle this problem, I use a solution called title – C hange D ata C apture or CDC for short.
What is Change Data Capture?
True to its name, catching data changes , this is the technique we use to catch changes to the data contained in the database. Capturing changes in data will help us deal with quite a few problems in data processing, which we will explore in the next section.
In order to be able to catch this data change, there are many different ways, the most primitive we can use the TRIGGER mechanism in the already supported databases to catch the ACTION on update, insert, delete, etc. .. Or more gently we can use tools to do this, typically the Debezium tool is most prominent.
Benefits of CDC
The first benefit that everyone will see is copying data to other systems , if you talk about this benefit, some of you will say that: Database systems all support replica mechanisms. use it right away, why use an external tool for headaches that is less stable? Okay, right! If you only need to copy data from databases of the same type (MySQL => MySQL, Mongo => Mongo), then using the database feature is the best. However, now if you want to copy data from MySQL to MongoDB or from MySQL to PostgreSQL, there is no such mechanism. In this problem, CDC will stand in the middle to detect changes in the Database that needs to be monitored and processed, then can use code to process and push data and the system needs to copy data.
Another equally important benefit is the ability to backup data . Data change events will be stored so if unfortunately your database is dropped at 9am, you can take a backup at 3am and reapply the changes that have been saved since 3 a.m. to 9 p.m. Theoretically, if you don’t miss any events, your data will be fully recovered as before it was dropped. Is it too true? )))
Continuing with the first benefit, after we copy the data to another system we can use this system for testing instead of interacting directly on the real database system. It’s not uncommon for test developers to run queries that take minutes to process, even worse, can cause system lock. This problem is mild, it causes a decrease in system performance, and when it is heavy, it causes a crash. CDC is also a way to help us reduce cases like this from happening.
In addition, CDC also supports some specific problems of each system or Big Data processing, if you have ever applied CDC to these problems, then share with me below.
Debezium – CDC Tool
It’s no use talking in theory without any examples to see, that’s why I will introduce a tool that has worked for a while and found it quite delicious, this tool is Debezium. Its core Debezium uses Kafka to generate messages corresponding to data change events. Debezium uses Connectors to connect to database systems and catch changes, currently Debezium 1.9 supports MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, DB2, Cassandra, Vitess. You can see instructions for each connector at the official document: https://debezium.io/documentation/reference/2.0/connectors/index.html
In this section, I will describe the installation steps of MySQL, the type of database that most people work with.
For MySQL, Debezium will rely on binlog to be able to detect data changes, so for systems that need monitoring, you need to enable this binlog feature and make sure the user to connect to needs to have it. binlog read permissions.
I work with Kubernetes a lot, so this tool I will guide everyone to build on K8s, for other environments like VM or Docker, there are basically the same components.
Debezium when running on K8s will use the Strimzi Operator (this is an Operator for Kafka). First we create a separate namespace for this application:
kubectl create ns debezium-example
Then we need to install Strimzi Operator
curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0
Create Secret for the demo database
1 2 3 4 5 6 7 8 9 10 11 12 | cat << EOF | kubectl create -n debezium-example -f apiVersion: v1 kind: Secret metadata: name: debezium-secret namespace: debezium-example type: Opaque data: username: ZGViZXppdW0= password: ZGJ6 EOF |
Create User and decentralize Debezium
1 2 3 4 5 6 7 8 9 10 11 12 13 | cat << EOF | kubectl create -n debezium-example -f apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: connector-configuration-role namespace: debezium-example rules: - apiGroups: [""] resources: ["secrets"] resourceNames: ["debezium-secret"] verbs: ["get"] EOF |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | $ cat << EOF | kubectl create -n debezium-example -f apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: connector-configuration-role-binding namespace: debezium-example subjects: - kind: ServiceAccount name: debezium-connect-cluster-connect namespace: debezium-example roleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io EOF |
Now for the important part, we will create a Kafka cluster for storing changes events. The configuration below will create 1 kafka pod corresponding to 1 broker and 1 zookeeper pod.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | $ cat << EOF | kubectl create -n debezium-example -f - apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: debezium-cluster spec: kafka: replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: nodeport tls: false storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {} EOF |
Next we deploy a MySQL database to test, the user and password of this DB is mysqluser – msqlpw
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | cat << EOF | kubectl create -n debezium-example -f - apiVersion: v1 kind: Service metadata: name: mysql spec: ports: - port: 3306 selector: app: mysql clusterIP: None --- apiVersion: apps/v1 kind: Deployment metadata: name: mysql spec: selector: matchLabels: app: mysql strategy: type: Recreate template: metadata: labels: app: mysql spec: containers: - image: quay.io/debezium/example-mysql:1.9 name: mysql env: - name: MYSQL_ROOT_PASSWORD value: debezium - name: MYSQL_USER value: mysqluser - name: MYSQL_PASSWORD value: mysqlpw ports: - containerPort: 3306 name: mysql EOF |
Now we will deploy the components with the role of connecting to MySQL and detecting changes. First we need to create KafkaConnect to do the job of detecting changes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | $ cat << EOF | kubectl create -n debezium-example -f - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: version: 3.1.0 replicas: 1 bootstrapServers: debezium-cluster-kafka-bootstrap:9092 config: config.providers: secrets config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 build: output: type: docker image: 10.110.154.103/debezium-connect-mysql:latest plugins: - name: debezium-mysql-connector artifacts: - type: tgz url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz EOF |
Then we deploy additional KafkaConnectors to connect to MySQL attached to the KafkaConnect created above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | cat << EOF | kubectl create -n debezium-example -f - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: debezium-connector-mysql labels: strimzi.io/cluster: debezium-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: tasks.max: 1 database.hostname: mysql database.port: 3306 database.user: ${secrets:debezium-example/debezium-secret:username} database.password: ${secrets:debezium-example/debezium-secret:password} database.server.id: 184054 database.server.name: mysql database.include.list: inventory database.history.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory EOF |
That’s it, the setup is done and now we can monitor the changes in the database. We run this command to listen for messages in kafka
1 2 | kubectl run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers |
Open another terminal, Now we will access the DB and proceed to add a record to test:
1 2 | kubectl run -n debezium-example -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium |
Add one more record:
1 2 | sql> update customers set first_name="Sally Marie" where id=1001; |
If you see the following JSON message, the setup was successful:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | { ... "payload": { "before": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "after": { "id": 1001, "first_name": "Sally Marie", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "source": { "version": "{debezium-version}", "connector": "mysql", "name": "mysql", "ts_ms": 1646300467000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 401, "row": 0, "thread": null, "query": null }, "op": "u", "ts_ms": 1646300467746, "transaction": null } } |
The output message will have 3 main items including source (data source, eg how much is the binlog file), before (data before change) and after (data after change).
Conclude
During the installation process, I encountered many problems :v, partly because I did not have much experience with Kafka. If you install errors, you are not alone.. What error can you comment for me to support. Hope this article has helped you gain some knowledge about Change Data Capture.
Some sources I refer to:
https://luminousmen.com/post/change-data-capture
https://www.striim.com/tutorial/streaming-data-integration-using-cdc-to-stream-database- changes/