canal入门篇:1.介绍&配置&启动

本文将讲述canal的安装、配置、启动以及验证监听是否成功。

介绍[1]

中间件功能:基于数据库增量日志解析,提供增量数据的订阅&消费。(目前主要支持了mysql)

使用canal的优点:

  • 实时性:Canal 直接对接 MySQL 的二进制日志(Binlog),实现了近乎实时的数据同步,可以实现实时数据流处理和近实时的数据集成。
  • 轻量级:作为一个开源组件,Canal 采用 Java 编写,具有较好的跨平台性和扩展性,部署相对简单,资源占用较小,适合大规模分布式环境下的部署与使用。
  • 可靠性:基于 MySQL 的 Binlog 记录机制,Canal 可以确保数据的完整性,不会丢失任何有效的数据库更改操作。
  • 灵活性:Canal 提供了丰富的过滤和路由策略,允许用户根据需求精确配置需要同步的数据库、表甚至是字段级别的数据变更。
  • 低侵入性:无需在业务代码中插入额外逻辑,就能实现数据的实时抓取,业务代码与数据同步逻辑完全解耦。
  • 易用性:提供了简洁易懂的配置方式,支持多种输出方式,如消息队列、HTTP API 推送等,方便与其他系统进行集成。

工作原理

1. MySQL主从复制原理

  1. 主库 Binlog 日志:在主数据库(Master)中,所有对数据库的更改操作都被记录在二进制日志(Binary Log,即binlog)中。这个日志包含了所有数据修改语句(如INSERT、UPDATE、DELETE),或者更底层的事件,如行格式的二进制日志。
  2. 日志传输:从数据库(Slave)连接到主数据库,请求主数据库发送 binlog 中的更新事件。主数据库有一个名为 Binlog Dump Thread 的线程,它负责读取 binlog 并将这些事件传送给从数据库。
  3. 中继日志 Relay Log:从数据库接收到主数据库的更新事件后,将它们存储在自己的中继日志(Relay Log)中。
  4. 事件重放:从数据库有自己的** SQL 线程(也称 I/O 线程和 SQL 线程)**,I/O 线程负责接收并写入 relay log,而 SQL 线程则负责读取 relay log 中的事件并在从数据库上重新执行(即重放)这些事件。

这个过程保证了主数据库的每一次更新操作都能在从数据库上按照同样的顺序执行一次,从而让从数据库的数据与主数据库保持一致。

复制模式(MySQL 支持多种复制模式):

  • 异步复制是最常见的模式,主库不等待从库确认就继续处理新的事务,因此可能存在一定的数据延迟。
  • 半同步复制在主库提交事务之前至少需要一个从库确认已接收事件,提供了一定程度的数据一致性保障。
  • 全同步复制要求所有从库都确认事务后,主库才提交事务,提供了最强的数据一致性,但可能会影响性能。

2. canal工作原理

  1. 模拟MySQL Slave: Canal通过模拟MySQL的Slave节点行为,连接到MySQL Master节点,并遵循MySQL的复制协议。它会像真正的MySQL slave那样,向MySQL master发送dump binlog的请求。
  2. 获取Binary Log: 当MySQL Master接收到这个dump请求后,它会开始将自身的二进制日志(Binary Log)推送给Canal。Binary Log中记录了所有对数据库的更改操作,如INSERT、UPDATE、DELETE等。
  3. 解析Binary Log: Canal接收并解析Master推送过来的Binary Log数据流,将其转换成结构化的事件(event)。这些事件包含了数据库变更的具体内容,如变更前后的行数据、执行的SQL语句等。

架构

  1. server:一个canal运行实例,对应于一个jvm
  2. instance:对应于一个数据队列(1个server对应1..n个instance)
    1. eventParser:数据源接入,模拟slave协议和master进行交互,协议解析。
    2. eventSink:Parser和Store链接器,进行数据过滤,加工,分发的工作。
    3. eventStore:数据存储。
    4. metaManager:增量订阅&消费信息管理器。

具体各个模块的功能请参考canal仓库

配置(快速启动canal)

1. 更改MySQL配置

修改MySQL配置文件my.ini,开启 Binlog 写入功能,并配置 binlog-format 为 ROW 模式:

1
2
3
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

Windows修改MySQL配置文件的方式:

  1. Win+R打开运行,输入services.msc,进入服务页面。
  2. 找到对应的MySQL服务,双击查看可执行文件的路径:条目下的--default-file=后的my.ini文件位置。
  3. 修改该文件。
  4. 右键MySQL服务条目,点击重新启动,重启MySQL服务。

2. 安装canal

进入Ubuntu安装canal:

  1. 进入canal的release页面,选择对应版本的压缩包。(我下载的是canal.deployer-1.1.7.tar.gz
  2. 在ubuntu中拉取该压缩包:wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
  3. 解压该压缩包tar -zxf canal.deployer-1.1.7.tar.gz
  4. 修改配置文件vim conf/example/instance.properties

修改配置:

1
2
3
4
5
6
7
8
canal.instance.master.address=192.168.112.1:3306
canal.instance.master.journal.name=mysql-bin.000353
canal.instance.master.position=1
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
canal.instance.filter.regex=api_open_platform\\..*
  • address: 数据库地址
  • journal.name: 日志文件名,去MySQL运行文件目录旁边找一下就能找到了
  • position: 从日志的第几个字节开始读取
  • dbUsername: 用户名
  • dbPassword: 用户密码
  • filter.regex:使用正则表达式选择数据库表(我设置的是监听api_open_platform数据库下所有的表)

3. 启动canal

  1. 开启命令bash bin/startup.sh
  2. 查看日志命令vim logs/canal/canal.logvim logs/example/example.log
  3. 关闭命令bash bin/stop.sh

成功后的运行日志:

  • canal.log:

    2024-03-06 22:16:46.110 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2024-03-06 22:16:46.117 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2024-03-06 22:16:46.126 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2024-03-06 22:16:46.152 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.20.0.1(172.20.0.1):11111]
    2024-03-06 22:16:47.076 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ……

  • example.log:

    2024-03-06 22:16:46.589 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
    2024-03-06 22:16:47.044 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table filter : ^api_open_platform..$
    2024-03-06 22:16:47.044 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table black filter : ^mysql.slave_.
    $
    2024-03-06 22:16:47.048 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful….
    2024-03-06 22:16:47.116 [destination = example , address = /192.168.112.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> begin to find start position, it will be long time for reset or first position
    2024-03-06 22:16:47.132 [destination = example , address = /192.168.112.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000353:4:1709690365000
    2024-03-06 22:16:47.437 [destination = example , address = /192.168.112.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000353,position=4,serverId=1,gtid=,timestamp=1709690365000] cost : 292ms , the next step is binlog dump

java连接canal

添加maven依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

main函数

注:该函数仅用于测试能否连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public static void main(String[] args){
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.146.132",11111), "example", "", "");
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
connector.subscribe(".*\\..*");
//connector.subscribe("petstoredb.pets");
connector.rollback();
int totalEmptyCount = 600;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
rowData.getAfterColumnsList().forEach(column->System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()));
//redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
rowData.getAfterColumnsList().forEach(column->System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()));
//redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
rowData.getAfterColumnsList().forEach(column->System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()));
//redisUpdate(rowData.getAfterColumnsList());
}
}
}
}

输出为:

我是canal,每秒一次正在监听:0d7700c0-0117-4a28-8d82-0a6ccbafadfd
我是canal,每秒一次正在监听:a658edf6-e56c-458e-bf13-25a64bffa7ec
我是canal,每秒一次正在监听:a6896fce-8f48-4478-983e-cf4bb05c1393
我是canal,每秒一次正在监听:e180a475-8a4a-44ee-84fa-096c2f44db16
我是canal,每秒一次正在监听:0fd10344-7eb1-412d-90fc-185e3ec69c51
================binlog[mysql-bin.000353:54296] , name[api_open_platform,interface_access] , eventType : INSERT
accesskey : a7e4c3c9-90be-434f-a201-1fb0c9c6921f update=true
verify_type : 1 update=true
secretkey : 41ee07aa-bd35-45f0-aa6f-b39351d8db0c update=true
remaining_amount : update=true
remaining_times : update=true
interface_id : 10 update=true
userid : 28 update=true
call_times : 0 update=true
failed_call_times : 0 update=true
create_time : 2024-03-06 22:42:25 update=true
update_time : 2024-03-06 22:42:25 update=true
is_deleted : 0 update=true
我是canal,每秒一次正在监听:1b80e597-9976-430a-b6f7-4fcaac936776
我是canal,每秒一次正在监听:9e89f0f4-0dfe-4f53-88c7-5731dbf0ebdf
我是canal,每秒一次正在监听:19322e19-0924-400d-a85b-bba58beb1a53
我是canal,每秒一次正在监听:0febf37a-df93-40b8-8c23-d0b89a1a1576

至此,canal启动完成。

  1. 本文基于canalv1.1.7版本。


canal入门篇:1.介绍&配置&启动
http://shoumingchilun.github.io/2024/03/06/技能/开发/微服务中间件/canal/canal_config/
作者
寿命齿轮
发布于
2024年3月6日
许可协议