自学内容网 自学内容网

go-zero(十六)结合Canal :实现高效数据同步与处理

go-zero 结合Canal

一、Canal介绍

1.Canal 简介

Canal 是阿里巴巴开源的一款数据库 binlog 增量订阅&消费的中间件,最初用于 MySQL 数据库主从复制协议解析。它能够模拟 MySQL slave 获取 binlog 并解析为结构化的数据,广泛应用于数据同步、实时数据处理等场景。

github地址:

https://github.com/alibaba/canal

2.Canal 的原理

Canal 的工作原理主要依赖MySQL主备复制原理,可以总结为以下几点:

  • MySQL Binlog:Canal 通过模拟 MySQL Slave 的通信协议连接到主库,拉取 binlog 数据。
  • Binlog 解析:将获取到的 binlog 数据进行解析,转化为像 JSON 这样易于处理的结构化格式,方便后续使用。
  • 增量数据订阅:把解析后的结构化数据推送给消费端,消费端可以是 Kafka、MQ 等消息队列,或者是直接接收数据的应用程序。
    在这里插入图片描述

3.Canal 的作用

  • 数据同步:能实现 MySQL 数据库到其他存储系统(如 Elasticsearch、MongoDB)的数据实时同步,用于全文搜索、数据分析等场景,比如电商商品信息同步到 Elasticsearch 以支持快速搜索。
  • 事件驱动:订阅 binlog 数据,捕获数据库的增删改操作,实时触发下游业务逻辑,像异步处理(如订单生成后发通知)、业务解耦(通过消息队列实现)。
  • 实时缓存更新:在分布式架构中,捕获数据库变更,实时更新 Redis 等缓存,保证缓存和数据库数据一致,避免脏读。
  • 数据双写一致性:在需写入多个存储系统的场景中,基于 binlog 数据同步,解决双写不一致问题,如 MySQL 与 Elasticsearch 数据同步。
  • 数据库审计和监控:可记录数据库操作,用于安全审计(监控敏感数据变更)和系统监控(分析操作频率优化性能)。
  • 微服务解耦与扩展:在微服务架构中,实现数据库变更事件的发布订阅,减少微服务对数据库的直接访问压力,提供变更事件流供下游服务按需消费。

二、环境部署

1. 部署mysql 和kafka

canal是基于 mysql实现的,所以必须要有 mysql ,消息队列可以使用 kafka, rocketMQ, rabbitMQ, pulsarMQ这些,这里我们选择kafka ,在前面的文章中,我已经详细介绍了如何部署这两个组件,这里就不再赘述。如果有疑问,可以参考之前的文章进行操作。

2. 拉取canal

我们可以选择本机部署或者使用 docker 来部署 canal。考虑到之前的环境都是基于 docker 搭建的,为了保持环境的一致性和便于管理,我这次仍然使用 docker。

使用命令:

docker pull canal/canal-server:latest

3.设置docker网络

由于我们的 canalmysqlkafka 都将部署在 docker 中,为了确保它们之间能够相互通信,我们需要将它们部署到同一个网络中。

先查看docker中已有网络:

docker network ls

执行上述命令后,会输出以下类似的结果:

NETWORK ID     NAME                   DRIVER    SCOPE
bbc5f382f29d   bridge                 bridge    local
3dd7cd1fa576   devdocker_gozero_net   bridge    local
141f19eb472a   host                   host      local
01a4992a132d   none                   null      local

因为我之前的环境都是基于devdocker_gozero_net 网络,所以我们这边也使用这个网络启动canal,使用命令:

docker run --name canal-server --network devdocker_gozero_net  canal/canal-server:latest

上面的命令会创建一个名为 canal-server 的容器,并将其加入到devdocker_gozero_net 网络中。容器启动后,我们可以通过进入容器并使用 ping 命令来测试 canal 与 mysql 和 kafka 是否能够连通:

# ping mysql
docker exec -it canal-server ping mysql
#ping kafka
docker exec -it canal-server ping kafka

如果 ping 命令能够成功执行,说明 canal 与 mysql 和 kafka 之间的网络连接正常,可以进行下一步操作。如果 ping 不通,需要检查网络配置、容器的启动参数等是否正确。

4.创建topic

进入kafka容器
可以通过以下两种方式进入 Kafka 容器:

docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash

进入kafka执行命令目录
进入 Kafka 容器后,我们需要进入 Kafka 执行命令的目录,这个目录包含了 Kafka 相关的命令行工具,如创建 topic、启动消费者等。:

cd /opt/bitnami/kafka/bin

创建topic

创建名为topic-test的topic

./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092

上面的命令会在 Kafka 中创建一个名为 topic-test 的 topic,并指定其 bootstrap 服务器为 localhost:9092

5. 修改配置

到目前为止canal还不能使用,我们需要修改配置文件。
修改mysql配置

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,到/etc/my.cnf 添加下面的配置:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

添加上述配置后,保存并退出 /etc/my.cnf 文件。为了使配置生效,需要重启 MySQL 服务。

为了方便操作,我们可以使用 docker desktop 直接修改文件。在 docker desktop 中找到对应的 MySQL 容器,进入容器的文件系统,找到 /etc/my.cnf 文件进行修改。修改完成后,确保 MySQL 容器已经重启,以使配置生效
在这里插入图片描述
我们还需要授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限。如果已有账户可直接使用 grant 命令进行授权:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

上面的SQL 语句创建了一个名为 canal 的用户,并赋予其相应的权限。
注意:这里的账号和密码都为canal ,在实际开发环境中,为了安全起见,需要修改为更复杂的密码。

修改instance配置文件

这个文件主要用于配置 MySQL 连接消息队列的topic

/home/admin/canal-server/conf/example中修改instance.properties,主要修改以下字段:

# position info
canal.instance.master.address=mysql:3306 #因为都在docker中,我们直接使用容器名代替IP地址

# table meta tsdb info
canal.instance.tsdb.enable=false

# username/password
canal.instance.dbUsername=canal   # 配置canal的的账号密码
canal.instance.dbPassword=canal   # 如果密码使用自定义的,请使用你自己的密码

# mq config
canal.mq.topic=topic-test  # 配置需要使用的kafka的topic ,我们使用了刚刚创建的topic-test
canal.mq.partitionsNum=1
#库名.表名: 唯一主键,多个表之间用逗号分隔
canal.mq.partitionHash=zero_test.user:id # 使用zero_test库中的user表

修改canal配置文件
这个配置文件,是用来设置使用消息队列的组件和,消息队列的地址

进入/home/admin/canal-server/conf/目录,修改canal.properties

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = kafka:9092 #因为都在docker中,我们直接使用容器名代替IP地址

dokcer 默认帮我们启动canal ,刚刚我们修改了配置,可以把服务重启下。

检查连接是否成功
我们可以通过canal日志,来看下mysql和kafka 是否连接成功

检查mysql连接

进入/home/admin/canal-server/logs/example/目录,打开example.log

如果没有保持,像下面图片这样就代表连接成功
在这里插入图片描述

三、 使用Canal

1. Canal脚本

docker中的canal脚本在/home/admin/canal-server/bin/ 目录中
在这里插入图片描述

启动canal
运行以下命令:

cd /home/admin/canal-server/bin/
sh startup.sh

停止canal

cd /home/admin/canal-server/bin/
sh stop.sh

重启canal

cd /home/admin/canal-server/bin/
sh restart.sh

2.消费Canal

进入kafka 启动 消费者

./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092

接着在mysql 中修改user表,可以看到 kafka自动消费了数据:
在这里插入图片描述

传递的是json 字符串,

{
    "data": [
        {
            "id": "5",
            "username": "999",
            "avatar": "222222",
            "mobile": "9999",
            "create_time": "2024-12-11 05:12:22",
            "update_time": "2024-12-12 13:05:59",
            "password": "999"
        }
    ],
    "database": "beyond_user",
    "es": 1734008759000,
    "gtid": "",
    "id": 5,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint unsigned",
        "username": "varchar(32)",
        "avatar": "varchar(256)",
        "mobile": "varchar(128)",
        "create_time": "timestamp",
        "update_time": "timestamp",
        "password": "varchar(50)"
    },
    "old": [
        {
            "avatar": "1111",
            "update_time": "2024-12-11 09:54:06"
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "username": 12,
        "avatar": 12,
        "mobile": 12,
        "create_time": 93,
        "update_time": 93,
        "password": 12
    },
    "table": "user",
    "ts": 1734008812652,
    "type": "UPDATE"
}

四、项目中使用canal

在go-zero中使用canal也非常简单,只需要创建mq来消费kafka的消息即可, 例如文章的评论、文章的点赞数,这些需要高频访问数据的内容,都可以使用canal获取数据,然后使用消息队列来处理数据。

可以参考消息队列这篇文章
https://blog.csdn.net/yang731227/article/details/144379133


原文地址:https://blog.csdn.net/yang731227/article/details/145250761

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!