go-zero(十六)结合Canal :实现高效数据同步与处理
go-zero 结合Canal
一、Canal介绍
1.Canal 简介
Canal 是阿里巴巴开源的一款数据库 binlog 增量订阅&消费的中间件,最初用于 MySQL 数据库主从复制协议解析。它能够模拟 MySQL slave 获取 binlog 并解析为结构化的数据,广泛应用于数据同步、实时数据处理等场景。
github地址:
https://github.com/alibaba/canal
2.Canal 的原理
Canal 的工作原理主要依赖MySQL主备复制原理
,可以总结为以下几点:
- MySQL Binlog:Canal 通过模拟 MySQL Slave 的通信协议连接到主库,拉取 binlog 数据。
- Binlog 解析:将获取到的 binlog 数据进行解析,转化为像 JSON 这样易于处理的结构化格式,方便后续使用。
- 增量数据订阅:把解析后的结构化数据推送给消费端,消费端可以是 Kafka、MQ 等消息队列,或者是直接接收数据的应用程序。
3.Canal 的作用
- 数据同步:能实现 MySQL 数据库到其他存储系统(如 Elasticsearch、MongoDB)的数据实时同步,用于全文搜索、数据分析等场景,比如电商商品信息同步到 Elasticsearch 以支持快速搜索。
- 事件驱动:订阅 binlog 数据,捕获数据库的增删改操作,实时触发下游业务逻辑,像异步处理(如订单生成后发通知)、业务解耦(通过消息队列实现)。
- 实时缓存更新:在分布式架构中,捕获数据库变更,实时更新 Redis 等缓存,保证缓存和数据库数据一致,避免脏读。
- 数据双写一致性:在需写入多个存储系统的场景中,基于 binlog 数据同步,解决双写不一致问题,如 MySQL 与 Elasticsearch 数据同步。
- 数据库审计和监控:可记录数据库操作,用于安全审计(监控敏感数据变更)和系统监控(分析操作频率优化性能)。
- 微服务解耦与扩展:在微服务架构中,实现数据库变更事件的发布订阅,减少微服务对数据库的直接访问压力,提供变更事件流供下游服务按需消费。
二、环境部署
1. 部署mysql 和kafka
canal是基于 mysql
实现的,所以必须要有 mysql
,消息队列可以使用 kafka, rocketMQ, rabbitMQ, pulsarMQ
这些,这里我们选择kafka
,在前面的文章中,我已经详细介绍了如何部署这两个组件,这里就不再赘述。如果有疑问,可以参考之前的文章进行操作。
2. 拉取canal
我们可以选择本机部署或者使用 docker 来部署 canal。考虑到之前的环境都是基于 docker 搭建的,为了保持环境的一致性和便于管理,我这次仍然使用 docker。
使用命令:
docker pull canal/canal-server:latest
3.设置docker网络
由于我们的 canal
、mysql
和kafka
都将部署在 docker 中,为了确保它们之间能够相互通信,我们需要将它们部署到同一个网络中。
先查看docker中已有网络:
docker network ls
执行上述命令后,会输出以下类似的结果:
NETWORK ID NAME DRIVER SCOPE
bbc5f382f29d bridge bridge local
3dd7cd1fa576 devdocker_gozero_net bridge local
141f19eb472a host host local
01a4992a132d none null local
因为我之前的环境都是基于devdocker_gozero_net
网络,所以我们这边也使用这个网络启动canal
,使用命令:
docker run --name canal-server --network devdocker_gozero_net canal/canal-server:latest
上面的命令会创建一个名为 canal-server
的容器,并将其加入到devdocker_gozero_net
网络中。容器启动后,我们可以通过进入容器并使用 ping 命令来测试 canal 与 mysql 和 kafka 是否能够连通:
# ping mysql
docker exec -it canal-server ping mysql
#ping kafka
docker exec -it canal-server ping kafka
如果 ping 命令能够成功执行,说明 canal 与 mysql 和 kafka 之间的网络连接正常,可以进行下一步操作。如果 ping 不通,需要检查网络配置、容器的启动参数等是否正确。
4.创建topic
进入kafka容器
可以通过以下两种方式进入 Kafka 容器:
docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash
进入kafka执行命令目录
进入 Kafka 容器后,我们需要进入 Kafka 执行命令的目录,这个目录包含了 Kafka 相关的命令行工具,如创建 topic、启动消费者等。:
cd /opt/bitnami/kafka/bin
创建topic
创建名为topic-test
的topic
./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092
上面的命令会在 Kafka 中创建一个名为 topic-test
的 topic,并指定其 bootstrap 服务器为 localhost:9092
。
5. 修改配置
到目前为止canal还不能使用,我们需要修改配置文件。
修改mysql配置
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,到/etc/my.cnf
添加下面的配置:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
添加上述配置后,保存并退出 /etc/my.cnf
文件。为了使配置生效,需要重启 MySQL 服务。
为了方便操作,我们可以使用 docker desktop 直接修改文件。在 docker desktop 中找到对应的 MySQL 容器,进入容器的文件系统,找到 /etc/my.cnf
文件进行修改。修改完成后,确保 MySQL 容器已经重启,以使配置生效
我们还需要授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限。如果已有账户可直接使用 grant 命令进行授权:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
上面的SQL 语句创建了一个名为 canal
的用户,并赋予其相应的权限。
注意:这里的账号和密码都为canal
,在实际开发环境中,为了安全起见,需要修改为更复杂的密码。
修改instance配置文件
这个文件主要用于配置 MySQL 连接
和 消息队列的topic
在/home/admin/canal-server/conf/example
中修改instance.properties
,主要修改以下字段:
# position info
canal.instance.master.address=mysql:3306 #因为都在docker中,我们直接使用容器名代替IP地址
# table meta tsdb info
canal.instance.tsdb.enable=false
# username/password
canal.instance.dbUsername=canal # 配置canal的的账号密码
canal.instance.dbPassword=canal # 如果密码使用自定义的,请使用你自己的密码
# mq config
canal.mq.topic=topic-test # 配置需要使用的kafka的topic ,我们使用了刚刚创建的topic-test
canal.mq.partitionsNum=1
#库名.表名: 唯一主键,多个表之间用逗号分隔
canal.mq.partitionHash=zero_test.user:id # 使用zero_test库中的user表
修改canal配置文件
这个配置文件,是用来设置使用消息队列的组件和,消息队列的地址
进入/home/admin/canal-server/conf/
目录,修改canal.properties
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = kafka:9092 #因为都在docker中,我们直接使用容器名代替IP地址
dokcer 默认帮我们启动canal
,刚刚我们修改了配置,可以把服务重启下。
检查连接是否成功
我们可以通过canal日志,来看下mysql和kafka 是否连接成功
检查mysql连接
进入/home/admin/canal-server/logs/example/
目录,打开example.log
如果没有保持,像下面图片这样就代表连接成功
三、 使用Canal
1. Canal脚本
docker中的canal脚本在/home/admin/canal-server/bin/ 目录中
启动canal
运行以下命令:
cd /home/admin/canal-server/bin/
sh startup.sh
停止canal
cd /home/admin/canal-server/bin/
sh stop.sh
重启canal
cd /home/admin/canal-server/bin/
sh restart.sh
2.消费Canal
进入kafka 启动 消费者
./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092
接着在mysql 中修改user表,可以看到 kafka自动消费了数据:
传递的是json 字符串,
{
"data": [
{
"id": "5",
"username": "999",
"avatar": "222222",
"mobile": "9999",
"create_time": "2024-12-11 05:12:22",
"update_time": "2024-12-12 13:05:59",
"password": "999"
}
],
"database": "beyond_user",
"es": 1734008759000,
"gtid": "",
"id": 5,
"isDdl": false,
"mysqlType": {
"id": "bigint unsigned",
"username": "varchar(32)",
"avatar": "varchar(256)",
"mobile": "varchar(128)",
"create_time": "timestamp",
"update_time": "timestamp",
"password": "varchar(50)"
},
"old": [
{
"avatar": "1111",
"update_time": "2024-12-11 09:54:06"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"username": 12,
"avatar": 12,
"mobile": 12,
"create_time": 93,
"update_time": 93,
"password": 12
},
"table": "user",
"ts": 1734008812652,
"type": "UPDATE"
}
四、项目中使用canal
在go-zero中使用canal也非常简单,只需要创建mq来消费kafka的消息即可, 例如文章的评论、文章的点赞数,这些需要高频访问数据的内容,都可以使用canal获取数据,然后使用消息队列来处理数据。
可以参考消息队列这篇文章
https://blog.csdn.net/yang731227/article/details/144379133
原文地址:https://blog.csdn.net/yang731227/article/details/145250761
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!