本文将介绍如何使用canal实现MySQl和Redis的双写一致性。前文为:canal入门篇:1.介绍&配置&启动。
github仓库链接
canalAPI讲解
本文将使用SpringBoot框架搭建一个应用,通过canal获得数据库的更改消息,并将发生变更的实体同步更新到Redis中。
优点
使用canal的好处:
- 透明性与低侵入:使用 Canal 通常不需要对既有业务逻辑进行大规模改造,因为它作为独立的服务组件,可以透明地监听和处理数据库的变化,将更新事件推送给相应的缓存系统,实现数据同步,降低了对业务代码的入侵性。
- 扩展性与灵活性:Canal 支持多种数据源以及不同的消费方式,可以根据实际需求构建灵活的消息处理和分发机制。它可以与各种消息队列或数据处理系统集成,实现复杂的数据同步和多级缓存更新。
- 可靠性:由于 Canal 是基于 MySQL 的主从复制协议设计的,它继承了数据库层面的数据可靠性保障。同时,消息投递失败时(未进行ACK)可以通过重试机制确保最终一致性,而且可以支持幂等处理,避免重复消费造成的问题。
类说明
canal Client
先介绍一下canal Client相关的一些配置类:
- ClientIdentity:canal client和server交互之间的身份标识。
- CanalConnector:SimpleCanalConnector/ClusterCanalConnector是两种connector的实现。simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制。(确保 Canal 在发生故障时能够无缝地切换到备份节点或者恢复服务,暂时不关注)
- CanalNodeAccessStrategy:SimpleNodeAccessStrategy/ClusterNodeAccessStrategy是两种failover的实现。simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server。(暂时不关注)
然后即可从connector
中通过get
/getWithoutAck
获得的Message
的这个类,该类主要的功能就是使用getEntries
函数获得包装好的List<Entry>
对象。
Entry
介绍一下Entry、RowChange、rowdata、column的概念。
- Entry:Entry 是 Canal 解析 MySQL Binlog 后产生的基本单元,它代表了Binlog中的一条记录。每个Entry对象对应了MySQL数据库的一个事务或一个GTID事件(如果启用了GTID)。Entry包含了事务的相关元数据如执行时间、事务ID等,以及实际的数据库更改内容。
- RowChange: RowChange 是更进一步封装的事件变更细节,它描述了一次数据库表级别的行级别数据变化。一个RowChange对象对应binlog中的一次行变更事件,它可以包含多行数据的插入、更新或删除操作。
- RowData:RowData 表示单行数据的变更内容。对于RowChange中的每一次行操作(insert/update/delete),都会有对应的RowData对象来具体描述变更前后的列值信息。
- RowData有两个重要的字段:
- beforeColumnsList:在UPDATE或DELETE事件中,存储的是变更前的列数据集合。
- afterColumnsList:在INSERT或UPDATE事件中,存储的是变更后的列数据集合。
- 对于INSERT、UPDATE和DELETE事件,其表现形式不同:
- INSERT事件:RowData会包含这一行插入后的所有列值。
- UPDATE事件:RowData包含两部分,即变更前的列值(beforeColumns)和变更后的列值(afterColumns)。
- DELETE事件:RowData仅包含被删除前的那一行的所有列值。
- Column:Column 对象更加细致地表示了数据库表中某一列的数据变化情况,包括列名、列值、是否为空、是否是主键等属性信息。在RowData的beforeColumns和afterColumns列表中,每一项就是一个Column对象。
解析Entry
在获得一个List<Entry>
后,我们需要对其中每个条目进行逐步解析:
- 判断类型:通过调用
getEntryType()
获得该Entry
的EntryType
,常见的类型有**事务开始(BEGIN)、事务结束(COMMIT)、行数据(ROWDATA)**等。
- 对于 ROWDATA 类型的 Entry,可以从中提取 RowChange 对象(包含了数据库行级别的变更详情):通过调用
CanalEntry.RowChange.parseFrom(entry.getStoreValue())
获得 RowChange 对象。
- 获得 RowChange 对象后,可根据
EventType
获得具体操作的类型,如INSERT
、UPDATE
、ALTER
等,如: rowChage.getEventType()==EventType.DELETE
。
- 可从
RowChange
对象中获得 RowData
对象,并从后者中获得一行数据变更前的状态和变更后的状态,再从中获得单独的column
进行解析: 1 2 3
| for (RowData rowData : rowChage.getRowDatasList()) { rowData.getBeforeColumnsList().forEach(column -> System.out.println(column.getName() + " : " + column.getValue())); rowData.getAfterColumnsList().forEach(column -> System.out.println(column.getName() + " : " + column.getValue()));
|
实现双写一致性
(省略启动类)
添加依赖
需要添加Springboot和连接Redis、canal的依赖。
pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.7.9</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> </dependencies>
|
注册CanalConnector Bean
使用配置文件注入依赖。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Configuration public class CanalConfig { @Value("${mycanal.hostname}") private String hostname; @Value("${mycanal.port}") private Integer port; @Value("${mycanal.destination}") private String destination; @Value("${mycanal.username}") private String username = ""; @Value("${mycanal.password}") private String password = "";
@Bean public CanalConnector createCanalConnector() { return CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password); } }
|
配置文件application.yml:
1 2 3 4 5 6
| mycanal: hostname: 192.168.146.132 port: 11111 destination: example username: password:
|
设置Redis转码方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet(); return redisTemplate; } }
|
补充:如果使用GenericJackson2JsonRedisSerializer充当序列化/反序列化工具的话redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
,请保证各组件内类名一致(包含包路径)。
创建实体类
Redis中value对应的实体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Data public class InterfaceAccess implements Serializable { private String accesskey; private Integer verifyType; private String secretkey; private BigDecimal remainingAmount; private Integer remainingTimes; private Long interfaceId; private Long userid; private Integer callTimes; private Integer failedCallTimes; private Date createTime; private Date updateTime; private Integer isDeleted; private static final long serialVersionUID = 1L; }
|
设计Redis服务接口并实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface RedisService { String ACCESS_PREFIX = "InterfaceAccess_"; void updateInterfaceAccess(InterfaceAccess interfaceAccess); void deleteInterfaceAccess(InterfaceAccess interfaceAccess); }
@Service public class RedisServiceImpl implements RedisService { @Resource private RedisTemplate<String, InterfaceAccess> redisTemplate;
@Override public void updateInterfaceAccess(InterfaceAccess interfaceAccess) { redisTemplate.opsForValue().set(RedisService.ACCESS_PREFIX + interfaceAccess.getAccesskey(), interfaceAccess, 2, TimeUnit.MINUTES); }
@Override public void deleteInterfaceAccess(InterfaceAccess interfaceAccess) { redisTemplate.delete(RedisService.ACCESS_PREFIX + interfaceAccess.getAccesskey()); } }
|
创建CanalMessageHandler类
使用ApplicationRunner接口实现容器启动后开始监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Slf4j @Component public class CanalMessageHandler implements ApplicationRunner { @Resource private CanalConnector connector;
@Resource private RedisService redisService;
@Override public void run(ApplicationArguments args) { int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe("api_open_platform.interface_access"); connector.rollback(); int totalEmptyCount = 3600; while (emptyCount < totalEmptyCount) { System.out.println("正在监听canal Server: " + System.currentTimeMillis()); Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; handlerMessage(message.getEntries()); } connector.ack(batchId); } log.error("已经监听了" + totalEmptyCount + "秒,无任何消息,请检查canal是否正常运行或连接是否成功......"); } finally { connector.disconnect(); } }
public void handlerMessage(List<Entry> entrys) { } }
|
实现handlerMessage方法
对消息进行处理,并通过调用RedisService
的方法同步到redis。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public void handlerMessage(List<Entry> entrys) { ObjectMapper objectMapper = new ObjectMapper(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); for (Entry entry : entrys) { if (entry.getEntryType() != EntryType.ROWDATA) { continue; } RowChange rowchange = null; try { rowchange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { log.error("解析Entry获得RowChange失败:" + entry, e); return; } EventType eventType = rowchange.getEventType(); log.info(String.format("================binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowchange.getRowDatasList()) { if (eventType == EventType.UPDATE) { InterfaceAccess interfaceAccess = parseRowDateIntoInterfaceAccess(rowData); if (interfaceAccess.getIsDeleted() == 1) { redisService.deleteInterfaceAccess(interfaceAccess); } else { redisService.updateInterfaceAccess(interfaceAccess); } } } } }
private InterfaceAccess parseRowDateIntoInterfaceAccess(RowData rowData) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); InterfaceAccess interfaceAccess = new InterfaceAccess(); String accesskey = rowData.getAfterColumns(0).getValue(); Integer verifyType = Integer.parseInt(rowData.getAfterColumns(1).getValue()); String secretkey = rowData.getAfterColumns(2).getValue().equals("") ? null : rowData.getAfterColumns(2).getValue(); BigDecimal remainingAmount = null; if (!rowData.getAfterColumns(3).getValue().equals("")) { remainingAmount = new BigDecimal(rowData.getAfterColumns(3).getValue()); } Integer remainingTimes = null; if (!rowData.getAfterColumns(4).getValue().equals("")) { remainingTimes = Integer.parseInt(rowData.getAfterColumns(4).getValue()); } Long interfaceId = Long.parseLong(rowData.getAfterColumns(5).getValue()); Long userid = Long.parseLong(rowData.getAfterColumns(6).getValue()); Integer callTimes = Integer.parseInt(rowData.getAfterColumns(7).getValue()); Integer failedCallTimes = Integer.parseInt(rowData.getAfterColumns(8).getValue()); Date createTime = null; Date updateTime = null; try { createTime = sdf.parse(rowData.getAfterColumns(9).getValue()); updateTime = sdf.parse(rowData.getAfterColumns(10).getValue()); } catch (ParseException e) { log.error("解析日期失败:", e); } Integer isDeleted = Integer.parseInt(rowData.getAfterColumns(11).getValue()); interfaceAccess.setAccesskey(accesskey); interfaceAccess.setVerifyType(verifyType); interfaceAccess.setSecretkey(secretkey); interfaceAccess.setRemainingAmount(remainingAmount); interfaceAccess.setRemainingTimes(remainingTimes); interfaceAccess.setInterfaceId(interfaceId); interfaceAccess.setUserid(userid); interfaceAccess.setCallTimes(callTimes); interfaceAccess.setFailedCallTimes(failedCallTimes); interfaceAccess.setCreateTime(createTime); interfaceAccess.setUpdateTime(updateTime); interfaceAccess.setIsDeleted(isDeleted); return interfaceAccess; }
|
效果
1. 数据库更新后:
标准输出:
正在监听canal Server: 1709812780346
2024-03-07 19:59:40.394 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : ================binlog[mysql-bin.000354:6832] , name[api_open_platform,interface_access] , eventType : UPDATE
2024-03-07 19:59:40.425 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : 在redis中同步f607abdc-c80e-4255-81d3-37ea6fe355b4
正在监听canal Server: 1709812780425
正在监听canal Server: 1709812781481
查询redis:
2. 数据库删除后
标准输出:
正在监听canal Server: 1709812979293
正在监听canal Server: 1709812980340
2024-03-07 20:03:00.383 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : ================binlog[mysql-bin.000354:7413] , name[api_open_platform,interface_access] , eventType : UPDATE
2024-03-07 20:03:00.384 INFO 26712 — [ main] c.c.a.canal.handler.CanalMessageHandler : 从redis中删除f607abdc-c80e-4255-81d3-37ea6fe355b4
正在监听canal Server: 1709812980384
查询redis: