canal入门篇:2.实现MySQL&Redis双写一致性

本文将介绍如何使用canal实现MySQl和Redis的双写一致性。前文为:canal入门篇:1.介绍&配置&启动

github仓库链接

canalAPI讲解

本文将使用SpringBoot框架搭建一个应用,通过canal获得数据库的更改消息,并将发生变更的实体同步更新到Redis中。

优点

使用canal的好处:

  1. 透明性与低侵入:使用 Canal 通常不需要对既有业务逻辑进行大规模改造,因为它作为独立的服务组件,可以透明地监听和处理数据库的变化,将更新事件推送给相应的缓存系统,实现数据同步,降低了对业务代码的入侵性。
  2. 扩展性与灵活性:Canal 支持多种数据源以及不同的消费方式,可以根据实际需求构建灵活的消息处理和分发机制。它可以与各种消息队列或数据处理系统集成,实现复杂的数据同步和多级缓存更新。
  3. 可靠性:由于 Canal 是基于 MySQL 的主从复制协议设计的,它继承了数据库层面的数据可靠性保障。同时,消息投递失败时(未进行ACK)可以通过重试机制确保最终一致性,而且可以支持幂等处理,避免重复消费造成的问题。

类说明

canal Client

先介绍一下canal Client相关的一些配置类:

  • ClientIdentity:canal client和server交互之间的身份标识。
  • CanalConnector:SimpleCanalConnector/ClusterCanalConnector是两种connector的实现。simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制。(确保 Canal 在发生故障时能够无缝地切换到备份节点或者恢复服务,暂时不关注)
  • CanalNodeAccessStrategy:SimpleNodeAccessStrategy/ClusterNodeAccessStrategy是两种failover的实现。simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server。(暂时不关注)

然后即可从connector中通过get/getWithoutAck获得的Message的这个类,该类主要的功能就是使用getEntries函数获得包装好的List<Entry>对象。

Entry

介绍一下Entry、RowChange、rowdata、column的概念。

  1. Entry:Entry 是 Canal 解析 MySQL Binlog 后产生的基本单元,它代表了Binlog中的一条记录。每个Entry对象对应了MySQL数据库的一个事务或一个GTID事件(如果启用了GTID)。Entry包含了事务的相关元数据如执行时间、事务ID等,以及实际的数据库更改内容。
  2. RowChange: RowChange 是更进一步封装的事件变更细节,它描述了一次数据库表级别的行级别数据变化。一个RowChange对象对应binlog中的一次行变更事件,它可以包含多行数据的插入、更新或删除操作。
  3. RowData:RowData 表示单行数据的变更内容。对于RowChange中的每一次行操作(insert/update/delete),都会有对应的RowData对象来具体描述变更前后的列值信息。
    1. RowData有两个重要的字段:
      • beforeColumnsList:在UPDATE或DELETE事件中,存储的是变更前的列数据集合。
      • afterColumnsList:在INSERT或UPDATE事件中,存储的是变更后的列数据集合。
    2. 对于INSERT、UPDATE和DELETE事件,其表现形式不同:
      • INSERT事件:RowData会包含这一行插入后的所有列值。
      • UPDATE事件:RowData包含两部分,即变更前的列值(beforeColumns)和变更后的列值(afterColumns)。
      • DELETE事件:RowData仅包含被删除前的那一行的所有列值。
  4. Column:Column 对象更加细致地表示了数据库表中某一列的数据变化情况,包括列名、列值、是否为空、是否是主键等属性信息。在RowData的beforeColumns和afterColumns列表中,每一项就是一个Column对象。

解析Entry

在获得一个List<Entry>后,我们需要对其中每个条目进行逐步解析:

  1. 判断类型:通过调用getEntryType()获得该EntryEntryType,常见的类型有**事务开始(BEGIN)、事务结束(COMMIT)、行数据(ROWDATA)**等。
  2. 对于 ROWDATA 类型的 Entry,可以从中提取 RowChange 对象(包含了数据库行级别的变更详情):通过调用CanalEntry.RowChange.parseFrom(entry.getStoreValue())获得 RowChange 对象。
  3. 获得 RowChange 对象后,可根据EventType获得具体操作的类型,如INSERTUPDATEALTER等,如: rowChage.getEventType()==EventType.DELETE
  4. 可从 RowChange 对象中获得 RowData 对象,并从后者中获得一行数据变更前的状态和变更后的状态,再从中获得单独的column进行解析:
    1
    2
    3
    for (RowData rowData : rowChage.getRowDatasList()) {
    rowData.getBeforeColumnsList().forEach(column -> System.out.println(column.getName() + " : " + column.getValue()));
    rowData.getAfterColumnsList().forEach(column -> System.out.println(column.getName() + " : " + column.getValue()));

实现双写一致性

(省略启动类)

添加依赖

需要添加Springboot和连接Redis、canal的依赖。
pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>

注册CanalConnector Bean

使用配置文件注入依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class CanalConfig {
@Value("${mycanal.hostname}")
private String hostname;
@Value("${mycanal.port}")
private Integer port;
@Value("${mycanal.destination}")
private String destination;
@Value("${mycanal.username}")
private String username = "";
@Value("${mycanal.password}")
private String password = "";


@Bean
public CanalConnector createCanalConnector() {
return CanalConnectors.newSingleConnector(new InetSocketAddress(hostname,
port), destination, username, password);
}
}

配置文件application.yml:

1
2
3
4
5
6
mycanal:
hostname: 192.168.146.132
port: 11111
destination: example
username:
password:

设置Redis转码方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json,使用GenericJackson2JsonRedisSerializer替换默认序列化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

补充:如果使用GenericJackson2JsonRedisSerializer充当序列化/反序列化工具的话redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());,请保证各组件内类名一致(包含包路径)。

创建实体类

Redis中value对应的实体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
public class InterfaceAccess implements Serializable {
private String accesskey;
private Integer verifyType;
private String secretkey;
private BigDecimal remainingAmount;
private Integer remainingTimes;
private Long interfaceId;
private Long userid;
private Integer callTimes;
private Integer failedCallTimes;
private Date createTime;
private Date updateTime;
private Integer isDeleted;
private static final long serialVersionUID = 1L;
}

设计Redis服务接口并实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface RedisService {
String ACCESS_PREFIX = "InterfaceAccess_";
void updateInterfaceAccess(InterfaceAccess interfaceAccess);
void deleteInterfaceAccess(InterfaceAccess interfaceAccess);
}

@Service
public class RedisServiceImpl implements RedisService {
@Resource
private RedisTemplate<String, InterfaceAccess> redisTemplate;

@Override
public void updateInterfaceAccess(InterfaceAccess interfaceAccess) {
redisTemplate.opsForValue().set(RedisService.ACCESS_PREFIX + interfaceAccess.getAccesskey(), interfaceAccess, 2, TimeUnit.MINUTES);
}

@Override
public void deleteInterfaceAccess(InterfaceAccess interfaceAccess) {
redisTemplate.delete(RedisService.ACCESS_PREFIX + interfaceAccess.getAccesskey());
}
}

创建CanalMessageHandler类

使用ApplicationRunner接口实现容器启动后开始监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Component
public class CanalMessageHandler implements ApplicationRunner {
@Resource
private CanalConnector connector;

@Resource
private RedisService redisService;

@Override
public void run(ApplicationArguments args) {
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
try {
connector.connect();
// 监听api_open_platform数据库下的interface_access表
connector.subscribe("api_open_platform.interface_access");
// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
// 如果3600s内没有监听到更改,则报错并停止运行
int totalEmptyCount = 3600;
while (emptyCount < totalEmptyCount) {
System.out.println("正在监听canal Server: " + System.currentTimeMillis());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//计数器重新置零
emptyCount = 0;
handlerMessage(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
log.error("已经监听了" + totalEmptyCount + "秒,无任何消息,请检查canal是否正常运行或连接是否成功......");
} finally {
connector.disconnect();
}
}

public void handlerMessage(List<Entry> entrys) {
//...
}
}

实现handlerMessage方法

对消息进行处理,并通过调用RedisService的方法同步到redis。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public void handlerMessage(List<Entry> entrys) {
ObjectMapper objectMapper = new ObjectMapper();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Entry entry : entrys) {
//如果不是ROWDATA,则忽略
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowchange = null;
try {
//获取变更的row数据
rowchange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("解析Entry获得RowChange失败:" + entry, e);
return;
}
//获取变动类型
EventType eventType = rowchange.getEventType();
log.info(String.format("================binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

for (RowData rowData : rowchange.getRowDatasList()) {
//由于使用逻辑删除,delete实现方式为将is_deleted属性设置为1,所以删除逻辑写在update中
if (eventType == EventType.UPDATE) {
//可使用该方式查看各个index对应的列名是什么
//rowData.getAfterColumnsList().stream().forEach(column -> {
// System.out.println(column.getName());
//});
InterfaceAccess interfaceAccess = parseRowDateIntoInterfaceAccess(rowData);
if (interfaceAccess.getIsDeleted() == 1) {
//说明已经删除,需要同步删除
redisService.deleteInterfaceAccess(interfaceAccess);
} else {
//说明未删除,需要同步更新
redisService.updateInterfaceAccess(interfaceAccess);
}
}
}
}
}

private InterfaceAccess parseRowDateIntoInterfaceAccess(RowData rowData) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
InterfaceAccess interfaceAccess = new InterfaceAccess();
String accesskey = rowData.getAfterColumns(0).getValue();
Integer verifyType = Integer.parseInt(rowData.getAfterColumns(1).getValue());
//允许为空的参数如果为空,则column.getValue().equals("")
String secretkey = rowData.getAfterColumns(2).getValue().equals("") ? null : rowData.getAfterColumns(2).getValue();
BigDecimal remainingAmount = null;
if (!rowData.getAfterColumns(3).getValue().equals("")) {
remainingAmount = new BigDecimal(rowData.getAfterColumns(3).getValue());
}
Integer remainingTimes = null;
if (!rowData.getAfterColumns(4).getValue().equals("")) {
remainingTimes = Integer.parseInt(rowData.getAfterColumns(4).getValue());
}
Long interfaceId = Long.parseLong(rowData.getAfterColumns(5).getValue());
Long userid = Long.parseLong(rowData.getAfterColumns(6).getValue());
Integer callTimes = Integer.parseInt(rowData.getAfterColumns(7).getValue());
Integer failedCallTimes = Integer.parseInt(rowData.getAfterColumns(8).getValue());
Date createTime = null;
Date updateTime = null;
try {
createTime = sdf.parse(rowData.getAfterColumns(9).getValue());
updateTime = sdf.parse(rowData.getAfterColumns(10).getValue());
} catch (ParseException e) {
log.error("解析日期失败:", e);
}
Integer isDeleted = Integer.parseInt(rowData.getAfterColumns(11).getValue());
interfaceAccess.setAccesskey(accesskey);
interfaceAccess.setVerifyType(verifyType);
interfaceAccess.setSecretkey(secretkey);
interfaceAccess.setRemainingAmount(remainingAmount);
interfaceAccess.setRemainingTimes(remainingTimes);
interfaceAccess.setInterfaceId(interfaceId);
interfaceAccess.setUserid(userid);
interfaceAccess.setCallTimes(callTimes);
interfaceAccess.setFailedCallTimes(failedCallTimes);
interfaceAccess.setCreateTime(createTime);
interfaceAccess.setUpdateTime(updateTime);
interfaceAccess.setIsDeleted(isDeleted);
return interfaceAccess;
}

效果

1. 数据库更新后:

标准输出:

正在监听canal Server: 1709812780346
2024-03-07 19:59:40.394 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : ================binlog[mysql-bin.000354:6832] , name[api_open_platform,interface_access] , eventType : UPDATE
2024-03-07 19:59:40.425 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : 在redis中同步f607abdc-c80e-4255-81d3-37ea6fe355b4
正在监听canal Server: 1709812780425
正在监听canal Server: 1709812781481

查询redis:

2. 数据库删除后

标准输出:

正在监听canal Server: 1709812979293
正在监听canal Server: 1709812980340
2024-03-07 20:03:00.383 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : ================binlog[mysql-bin.000354:7413] , name[api_open_platform,interface_access] , eventType : UPDATE
2024-03-07 20:03:00.384 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : 从redis中删除f607abdc-c80e-4255-81d3-37ea6fe355b4
正在监听canal Server: 1709812980384

查询redis:


canal入门篇:2.实现MySQL&Redis双写一致性
http://shoumingchilun.github.io/2024/03/06/技能/开发/微服务中间件/canal/canal_MySQL&Redis/
作者
寿命齿轮
发布于
2024年3月6日
许可协议