软件研究之Canal

标签: 无 分类: 未分类 创建时间:2024-09-13 01:08:40 更新时间:2024-11-24 10:04:02

1.前言

数据同步工具,其实我用过很多,比如TIS,比如pci这个工具,还有DataX工具,最近在用 MySQL 数据库的时候,又找到了一个阿里开源的 Canal 数据同步工具。开始的时候我始终无法理解这个canal,还有 canal-adapter 和 canal-admin 到底是什么东西。经过我自己部署了一遍,查看了很多的文章,终于有了一定的理解。canal分为两部分,一部分是 server,一部分是 client 端。

  • Server端
    Server端,其实就是Deployer服务,Deployer服务负责从上游拉取binlog数据、记录位点等。可以对外提供接口,供外部程序订阅,比如自己写的java程序。server代表一个canal运行实例,对应于一个jvm。一个server,可以包含多个instance实例。instance对应于一个数据队列 (1个server对应1..n个instance)。一个instance就是需要监听的数据库或者数据库表。

  • Client端
    Client端,就是Adapter服务,Client-Adapter服务负责对接Deployer解析过的数据,并将数据传输到目标库中。通过配置Adapter,可以监听Server中的某个实例,然后通过配置,同步到其他的数据库或者系统中,比如MySQL,Orcal、ES等。一个Adapter,可以配置监听多个instance,也可以将一个instance同时分发到多处,比如同时用日志打印和输出到数据库中。

参考文章:
【1】.超详细的canal入门,看这篇就够了 这篇文章其实用了java,自己写了一个监听器
【2】.常见数据同步工具之实时同步 1.Flume、Flink CDC。2.DataX CDC:复杂性:DataX CDC 的配置和使用相对复杂,需要熟悉 CDC 技术和相关的配置知识。4.Kettle:学习曲线较陡峭:尽管Kettle提供了图形化界面,但对于没有经验的用户来说,学习和掌握Kettle的各种功能和操作仍然需要一定的时间和精力。
【3】.canal配置mysql到mysql的数据同步
【4】.canal deployer 包 & canal adapter 包 参数详解
【5】.使用Canal同步MySQL数据 Deployer服务负责从上游拉取binlog数据、记录位点等。Client-Adapter服务负责对接Deployer解析过的数据,并将数据传输到目标库中。这里还有各个参数的具体说明,进入canal.adapter安装目录的conf路径下,编辑application.yml文件。
【6】.使用canal同步MySQL数据 安装了 canal-server,canal-apapter 和 canal-admin

2.canal-deployer

canal server 端,在仓库中是 canal.deployer 安装包。默认启动端口 :11111 端口。canal deployer 为 canal 标准包,可将其看做 canal server。它负责伪装成 mysql 从库,接收、解析 binlog 并投递到指定的目标端(RDS、MQ 或 canal adapter)

参考文章:
【1】.Canal-adapter的简单配置 HBase适配器、关系型数据库适配器、ElasticSearch适配器

2.1修改mysql配置

1
2
3
4
5
6
7
8
9
10
11
# 编辑配置
vi /etc/my.cnf

# 开启 Binlog 写入
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=10001

2.2.创建用户

1
2
3
4
5
6
7
8
9
10
11
mysql -u root -p
# 创建用户并授权
CREATE USER canal IDENTIFIED BY 'can8@Hjkj';
# 授予部分权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

## 授予全部权限
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

2.3.安装

1
2
3
4
5
6
# 访问 release 页面 , 选择需要的包下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

# 解压缩
mkdir /usr/local/canal/canal-deployer
tar zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/canal-deployer

2.4.修改 canal 配置文件

可以通过

1
2
3
4
5
6
7
8
9
10
11
12
13
cd /usr/local/canal/canal-deployer


# canal-deployer的ip地址,不配置默认是本机
canal.ip =
# canal-deployer的端口号
canal.port = 11111
# 模式
canal.serverMode = tcp
# 加载多个配置文件,需在canal.deployer-1.1.5\conf下创建对应的文件夹,并加上instance.properties配置文件
canal.destinations = example
# 配置多个使用英文逗号隔开
# canal.destinations = example,example2

2.4.修改 instance 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 修改配置文件
cd /usr/local/canal/canal-deployer
vi conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 20001
# Canal监听的源数据库地址,格式为host:port。
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# 源数据库账号用户名。
canal.instance.dbUsername = canal
# 源数据库账号密码。
canal.instance.dbPassword = can8@Hjkj
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# Canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。
canal.instance.filter.regex = .\*\\\\..\*
# 配置只监听test库的user表,如果需要读取多个表可以使用正则表达式或者用逗号隔开
canal.instance.filter.regex=test.user

2.5.启动

有些文章说要:检查lib目录下的jar包,一定要有对应的数据库驱动包,且版本要对得上,但是这部我没有做,也是可以正常启动和同步的。

1
2
3
4
5
6
cd /usr/local/canal/canal-deployer/bin
# 启动
sh bin/startup.sh

# 关闭
sh bin/stop.sh
参考文章:
【1】.QuickStart 官方文档
【2】.Canal详细入门实战(使用总结) 1.Canal-deployer使用.2.Canal-adapter使用,我大部分的操作都是在这里做的。踩坑记录:数据库名不要使用下划线,一开始用canal_test_01的方式命名,发现扫描不到数据库。检查lib目录下的jar包,一定要有对应的数据库驱动包,且版本要对得上。
【3】.canal 同时监听两个数据库实例 1.复制一个example 并修改为example2。2.复制一个example 并修改为example2。3.在example目录中修改实例配置文件 instance.properties。4.在example2目录中修改实例配置文件 instance.properties。
【4】.阿里同步利器之canal使用案例

3.client-adapter

canal adapter 为 canal 的客户端,可将其看作 canal client。其中 RDB 包括 Oracle、MySQL、PostgreSQL、SQLServer 等数据库,目前Adapter具备以下基本能力:

  • 对接上游消息,包括kafka、rocketmq、canal-server
  • 实现mysql数据的增量同步
  • 实现mysql数据的全量同步
  • 下游写入支持mysql、es、hbase

3.1.下载

1
2
3
4
5
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
# 解压
mkdir /usr/local/canal/canal-adapter
tar zxvf canal.adapter-1.1.7.tar.gz -C /usr/local/canal/canal-adapter

3.2.修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 修改配置
cd /usr/local/canal/canal-adapter
vi conf/application.yml

# 修改如下内容
server:
port: 8081 # canal-adapter运行端口号

canal.conf:
mode: tcp # canal client的模式: tcp kafka rocketMQ rabbitMQ
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:

srcDataSources: # 源数据库
canal01: # 自定义名称
# 这里使用的是mysql8.0,需要加上serverTimezone=UTC
url: jdbc:mysql://127.0.0.1:3306/canal01?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
username: root
password: root
# 配置多个源数据库
# canal02:
# url: jdbc:mysql://127.0.0.1:3306/canal02?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
# username: root
# password: root

canalAdapters: # 适配器列表
- instance: example # canal 实例名或者 MQ topic 名
groups: # 分组列表
- groupId: # 分组id, 如果是MQ模式将用到该值
outerAdapters: # 分组内适配器列表
# - name: logger # 日志打印适配器
- name: rdb # 指定为rdb类型同步
key: mysql1 # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
# 目标数据库配置
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/canal02?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
jdbc.username: root
jdbc.password: root
threads: 5 # 并行执行的线程数,可以不修改,默认为1
# 配置多个目标数据库
# - instance: example2 # canal instance Name or mq topic name
# groups:
# - groupId: g2
# outerAdapters:
# - name: rdb
# key: mysql2
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/canal01?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
# jdbc.username: root
# jdbc.password: root

3.3.配置表映射

在 conf/rdb 目录下,新建 .yml 结尾的配置,然后编写内容如下,如果有多个数据表需要同步,可以配置多个 .yml文件,如 example_1.yml,example_2.yml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 配置rdb表映射
vi conf/rdb/example.yml

# 内容如下
dataSourceKey: canal01 # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
outerAdapterKey: mysql1 # adapter key, 对应上面配置outAdapters中的key
concurrent: true # 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
dbMapping:
database: canal01 # 源数据源的database/shcema
table: user01 # 源数据源表名
targetTable: user02 # 直接写表名
targetPk: # 主键映射
id: id # 如果是复合主键可以换行映射多个
mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
# targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
# id: id
# name: name
commitBatch: 3000 # 批量提交的大小

3.4.启动

有些文章说要:检查lib目录下的jar包,一定要有对应的数据库驱动包,且版本要对得上,但是这部我没有做,也是可以正常启动和同步的。

1
2
3
4
5
6
# 在bin目录中启动canal-adapter
cd /usr/local/canal/canal-adapter/bin
# 启动
sh startup.sh
# 关闭
sh stop.sh
参考文章:
【1】.基于canal的client-adapter数据同步必读指南 从1.1.1版本开始,canal实现了一个配套的落地模块,实现对canal订阅的消息进行消费,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。目前的最新稳定版1.1.4版本中,client-adapter已经实现了同步数据到RDS、ES、HBase的能力。

4.canal-admin

参考文章:
【1】.canal web管理界面配置
【2】.Canal Admin 高可用集群使用教程
【3】.Canal 整合 canal-admin ,canal-adapter 创建canal-admin对应数据库,在代码里面有canal_manager.sql脚本。通过 ui 界面启动canal-server,增加canal -instance,以后通过canal-admin 修改配置文件,启动关闭服务,查看操作日志等等一系列操作。这里还有 targetColumns 部分表字段映射。

5.自启动

本来我想着能开机自动启动,后来发现好像没有人这么干过,网上的资料很少,我就放弃了。

参考文章:
【1】.canal开机自启动 这里用了 chkconfig –add canal 管理

5.docker

看到successful之后,就代表canal-server启动成功,可以启动canal-client链接上来进行binlog订阅了

6.配置多个

6.1.配置多个instance

(1) 在canal-server/conf/canal.properties 加入如下信息

1
2
# 多个使用 逗号 分割
canal.destinations = example,example2

(2) 在 canal-server/conf 创建文件夹 example 和 example2

1
2
3
4
5
6
conf
│ ├── example2
│ │ └── instance.properties
│ ├── canal.properties
│ └── example
│ │ └── instance.properties

(3) 修改不同的配置项,instance.properties

1
2
3
4
5
6
canal.instance.master.address=192.168.1.23:3306
canal.instance.dbUsername=slave_canal
canal.instance.dbPassword=password
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=database.table_name3,database.table_name4
canal.mq.topic=example2

(4) 修改 canal-adapter/conf/application.yml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
srcDataSources:
defaultDS:
url: jdbc:mysql://xxx/drone-cloud?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
username: hjkj
password: xxx
innerDS:
url: jdbc:mysql://127.0.0.1:3306/drone-manage?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
username: hjkj
password: xxx
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
#- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/drone-manage?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
jdbc.username: hjkj
jdbc.password: xxx
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
- instance: example2 # canal instance Name or mq topic name
groups:
- groupId: g2
outerAdapters:
#- name: logger
- name: rdb
key: mysql2
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://xxx/drone-cloud?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
jdbc.username: hjkj
jdbc.password: xxx
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000

(4)修改数据库映射配置
在rdb配置canal-adapter/conf/rdb下分别增加,相关的数据库配置.分别指向不同的 数据库和实例,example1.yml,example2.yml等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 源数据库
dataSourceKey: innerDS
# 对应实例名称
destination: example2
# 对应分组名称
groupId: g2
# 对应 outerAdapterKey
outerAdapterKey: mysql2
concurrent: true
dbMapping:
database: drone-manage
table: hzscreen_pin
targetTable: manage_pin
targetPk:
id: id
# mapAll: true
targetColumns:
id: id
time: time
lon: lon
tjzt:
#etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
参考文章:
【1】.canal第三篇:配置多个instance 在canal-server/conf/canal.properties 加入如下信息,多配置了多个instance,并且使用了 docker compose 进行配置。

7.增加配置

当上面的配置弄好了之后,双向同步也做好了,于是下一次需要增加同步的时候
(1)修改 canal-deployer/conf 中 instance 中监听表,canal.instance.filter.regex
(2)增加 canal-adapter/conf/rdb 中关于表的映射

不需要重启 canal-deployer 和 canal-adapter 这样就可以实现同步了。

问题

1.Unrecognized VM option ‘AggressiveOpts’

修改启动脚本,删除 AggressiveOpts 参数,

1
JAVA_OPTS="$JAVA_OPTS -Xss1m -XX:+AggressiveOpts -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"

2.无法同步mysql

我搭建了 canal-deployer,搭建了 canal-adapter,配置了好几次,都无法完成映射,但是使用 logger,却可以打印日志。

【尝试方法】
(1)修改 targetTable 字段,结果无效。
(2)修改 rdp中conf/rdb/example.yml的配置destination,和conf/application.yml里面的配置 canalAdapters.instance 配置一致,文件名字也改成了一样。
(3)修改了rdb的配置 使用了 targetColumns。原本我是空的映射,后来改为了固定映射.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 原始
targetColumns:
id:
time:
lon:
lat:
photo:
wtms:
sqyc:
# 修改后
targetColumns:
id: id
time: time
lon: lon
lat: lat
photo: photo
wtms: wtms
sqyc: sqyc

【解决方案】
最后我发现其实数据库更新了,不能同步,但是如果新增了一条记录,那就是可以同步的。也就是说,源数据库某条记录修改了,但是目标数据库相同主键记录不存在,虽然能打印日志,但是数据无法同步,只能修改相同主键信息的记录,才能实现同步。

参考文章:
【1】.canal-adapter同步mysql问题记录 这里有一个地方写错误了,就是 targetTable: mytest2.tb1 # 目标数据源的库名.表名,这里的 mytest2 是多余配置了。
【2】.canal adapter同步问题:部分字段数据没有同步 而在esMapping的配置中的sql我使用大写,虽然在mysql中对大小写不敏感,显示的也是期望结果。但是在adpater中无法识别这种情况,必须使用字段别名的方式。
【3】.canal-adapter字段过滤 如果表结构不一致的话,可以用targetColumns设置 : 从表字段名字: 主表字段名字
【4】.阮胜昌的技术记录站-LinuxMySQL数据库运维 如果表结构不一致的话,可以用targetColumns设置 : 从表字段名字: 主表字段名字
【5】.canal adapter没有同步成功无异常 适配器匹配原理分别对应:destination:config/xx/xxx.yml文件中的destination;groupId:config/xx/xxx.yml文件中的groupId;database: application.yml中srcDataSources.{xdb}.url中的database,如jdbc:mysql://10.0.0.10:3306/{database}?useUnicode=true;table:config/xx/xxx.yml文件中的sql中的表名

3.重新记录binlog值

重启之后,还是出现错误记录。在同步的时候,出现了一条记录某一个字段为空的问题:java.sql.SQLException: Field ‘create_time’ doesn’t have a default value,我想着把数据库里面的这条记录删除,但是我始终无法删除,因为我删除之后,就出出现,我重启了 canal,重启了canal-adapter,还是不行,问题到底出现在了哪里。

【解决方案】
先停止canal,把conf->example->meta.dat文件删除,在重启canal。重启会重新生成meta.dat文件,所记录的最新 binlog 文件和位置。

参考文章:
【1】.canal meta.dat引起数据同步问题 这里提到了删除 meta.dat 数据的方法。
小额赞助
本人提供免费与付费咨询服务,感谢您的支持!赞助请发邮件通知,方便公布您的善意!
**光 3.01 元
Sun 3.00 元
bibichuan 3.00 元
微信公众号
广告位
诚心邀请广大金主爸爸洽谈合作
每日一省
isNaN 和 Number.isNaN 函数的区别?

1.函数 isNaN 接收参数后,会尝试将这个参数转换为数值,任何不能被转换为数值的的值都会返回 true,因此非数字值传入也会返回 true ,会影响 NaN 的判断。

2.函数 Number.isNaN 会首先判断传入参数是否为数字,如果是数字再继续判断是否为 NaN ,不会进行数据类型的转换,这种方法对于 NaN 的判断更为准确。

每日二省
为什么0.1+0.2 ! == 0.3,如何让其相等?

一个直接的解决方法就是设置一个误差范围,通常称为“机器精度”。对JavaScript来说,这个值通常为2-52,在ES6中,提供了Number.EPSILON属性,而它的值就是2-52,只要判断0.1+0.2-0.3是否小于Number.EPSILON,如果小于,就可以判断为0.1+0.2 ===0.3。

每日三省
== 操作符的强制类型转换规则?

1.首先会判断两者类型是否**相同,**相同的话就比较两者的大小。

2.类型不相同的话,就会进行类型转换。

3.会先判断是否在对比 null 和 undefined,是的话就会返回 true。

4.判断两者类型是否为 string 和 number,是的话就会将字符串转换为 number。

5.判断其中一方是否为 boolean,是的话就会把 boolean 转为 number 再进行判断。

6.判断其中一方是否为 object 且另一方为 string、number 或者 symbol,是的话就会把 object 转为原始类型再进行判断。

每日英语
Happiness is time precipitation, smile is the lonely sad.
幸福是年华的沉淀,微笑是寂寞的悲伤。