Canal监听Mysql回写Redis
目录
2.4.3 biz.RedisCanalClientExample
一、canal服务端
1.1 下载
1.2 解压
tar -zxvf canal.deployer-1.1.6.tar.gz 到mycanal文件夹
1.3 配置
修改/mycan/conf/example/instance.properties
文件
-
换成自己的mysql主机master的IP地址
-
换成自己的在mysql新建的canal账户
1.4 启动
注意这个地方需要JDK环境支持才能正常启动,那就补充一下安装JDK
-
Centos7安装JDK8
-
在下载linux64版本的jdk
-
解压后放到自己指定的文件夹
-
配置环境变量:
vim /ect/profile
新增内容后在source /etc/profile
最后java -version
看是否安装成功
-
export JAVA_HOME=/myjava/jdk1.8.0_371
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JRE_HOME/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
-
启动canal--->在canal的bin目录下执行
./startup.sh
1.5 查看
- 查看server日志 在目录mycanal/logs/canal/下执行cat canal.log
- 查看样例example的日志 在目录
mycanal/logs/example/
下执行cat example.log
二、canal客户端(Java编写业务程序)
2.1 SQL脚本
CREATE TABLE `t_user`(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`userName` VARCHAR(100) NOT NULL,
PRIMARY KEY(`id`)
)ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
2.2 写POM
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
2.3 写Yaml
server:
port: 5555
spring:
datasource:
url: jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: 980918
driver-class-name: com.mysql.jdbc.Driver
druid:
test-while-idle: false
2.4 写业务类
2.4.1.项目结构
2.4.2 Utils.RedisUtil
package com.atguigu.canal.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @author zhumq
* @date 2024/7/27 9:24
*/
public class RedisUtils
{
public static final String REDIS_IP_ADDR = "192.168.111.185";
public static final String REDIS_pwd = "111111";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
2.4.3 biz.RedisCanalClientExample
package com.atguigu.canal.biz;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.utils.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author zhumq
* @date 2024/7/27 9:26
*/
public class RedisCanalClientExample
{
/**
* 表示60秒的常量。
* 用于定义某些操作的时间间隔。
*/
public static final Integer _60SECONDS = 60;
/**
* Redis服务器的IP地址。
* 用于数据存储和检索操作中定位Redis服务器。
*/
public static final String REDIS_IP_ADDR = "192.168.111.185";
/**
* 将数据插入Redis。
*
* @param columns 列数据列表,每项包含列名、值和更新标志。
* 本方法首先将列数据列表转换为JSON对象,
* 然后使用第一列的值作为键,在Redis中存储JSON字符串。
*
* 设计目的可能是以结构化方式在Redis中存储实体的相关属性信息,
* 以便于快速检索和使用。
*/
private static void redisInsert(List<Column> columns)
{
// 创建一个JSON对象来存储列数据
JSONObject jsonObject = new JSONObject();
// 遍历列数据列表,填充JSON对象
for (Column column : columns)
{
// 打印列信息,用于调试或日志记录
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
// 将列名和值添加到JSON对象中
jsonObject.put(column.getName(), column.getValue());
}
// 如果列数据列表不为空,则执行Redis插入操作
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
// 使用第一列的值作为键,序列化的JSON对象作为值,存储到Redis中
jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
} catch (Exception e) {
// 打印异常堆栈跟踪,用于错误处理或日志记录
e.printStackTrace();
}
}
}
/**
* 删除Redis中的键值对。
*
* 此方法通过接收一列列名和对应的值,构建一个JSON对象。然后,它从这个JSON对象中提取第一个列的值,
* 并使用这个值作为Redis键来删除对应的条目。这个方法假设Redis已经连接,并且通过RedisUtils.getJedis()
* 提供了Jedis实例。
*
* @param columns 列表,包含要删除的Redis键对应的列名和值。
*/
private static void redisDelete(List<Column> columns)
{
// 构建一个JSON对象,用于存储列名和对应的值
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
// 将每一列的名称和值添加到JSON对象中
jsonObject.put(column.getName(),column.getValue());
}
// 当列表不为空时,尝试删除Redis中的条目
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
// 使用列表中第一个列的值作为键,删除Redis中的对应条目
jedis.del(columns.get(0).getValue());
}catch (Exception e){
// 打印堆栈跟踪,以记录任何异常
e.printStackTrace();
}
}
}
/**
* 更新Redis中的数据。
* 该方法接收一个列的列表,将这些列的名称和值存储到JSON对象中,然后将这个JSON对象存储到Redis中。
* 此方法主要用于在数据更新后,将更新的列及其值同步到Redis缓存中,以保持数据的一致性。
*
* @param columns 列的列表,每个列包含一个名称、一个值和一个标志位表示该列是否被更新。
*/
private static void redisUpdate(List<Column> columns)
{
// 创建一个JSON对象,用于存储列的名称和值。
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
// 打印列的名称、值和更新状态,用于调试和日志记录。
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
// 将列的名称和值添加到JSON对象中。
jsonObject.put(column.getName(),column.getValue());
}
// 检查列表是否为空,如果不为空,则更新Redis。
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
// 使用列列表中第一个列的值作为键,将JSON对象序列化为字符串后存储到Redis中。
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
// 打印更新后的Redis数据,用于调试和确认更新是否成功。
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
// 捕获并打印任何异常,确保方法在异常情况下不会中断执行。
e.printStackTrace();
}
}
}
/**
* 打印日志条目中的变更信息。
* 此方法忽略事务开始和结束的日志条目,因为它只对实际的数据变更感兴趣。
* 它解析每条日志条目中的RowChange数据,并根据变更类型(插入、删除、更新)执行相应的操作。
*
* @param entrys 日志条目的列表,这些条目包含数据库变更的信息。
*/
public static void printEntry(List<Entry> entrys) {
// 遍历每个日志条目
for (Entry entry : entrys) {
// 跳过事务开始和结束的日志条目
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
// 从日志条目的存储值中解析RowChange对象
// 获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
// 如果解析失败,抛出运行时异常,并包含错误详情
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
// 获取事件类型 获取变动类型
EventType eventType = rowChage.getEventType();
// 打印日志条目的基本信息,包括日志文件名、偏移量、模式名、表名和事件类型
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
// 遍历RowData列表,根据事件类型执行相应的操作(插入、删除、更新)
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
// 对于插入事件,调用redisInsert方法处理后的列数据
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
// 对于删除事件,调用redisDelete方法处理前的列数据
redisDelete(rowData.getBeforeColumnsList());
} else {// EventType.UPDATE
// 对于更新事件,调用redisUpdate方法处理后的列数据
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
/**
* 程序入口主方法,用于初始化并连接Canal服务器,以监听MySQL数据库的变化。
* @param args 命令行参数
*/
public static void main(String[] args)
{
// 初始化时的提示信息
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
// 创建Canal客户端连接器,用于连接和通信
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
"example",
"",
"");
// 定义每次获取的记录数量
int batchSize = 1000;
// 定义空闲循环计数器,用于判断是否需要重新连接
// 空闲空转计数器
int emptyCount = 0;
// 连接初始化完成的提示信息
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
// 连接Canal服务器
connector.connect();
// 订阅指定的数据库表变更事件
//connector.subscribe(".*\\..*");
connector.subscribe("bigdata.t_user");
// 回滚事务,确保数据一致性
connector.rollback();
// 定义空闲循环的总次数
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
// 每秒打印一次监控信息
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
// 获取一批变更记录
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
// 获取批次ID
long batchId = message.getId();
// 获取记录数量
int size = message.getEntries().size();
// 如果批次ID为-1或记录数量为0,表示没有数据变更
if (batchId == -1 || size == 0) {
// 空闲计数器加1
emptyCount++;
// 每秒检查一次
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
// 有数据变更,重置空闲计数器
//计数器重新置零
emptyCount = 0;
// 打印变更记录
printEntry(message.getEntries());
}
// 确认处理完成,提交批次
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
// 监听结束的提示信息
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
// 断开与Canal服务器的连接
connector.disconnect();
}
}
}
题外话:
-
Java程序下
connector.subscribe
配置的过滤正则
关闭资源简写
-
try-with-resources
释放资源
原文地址:https://blog.csdn.net/m0_52388979/article/details/140730307
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!