一、Canal简介
canal是阿里巴巴旗下的一款开源项目,使用java语言进行开发,基于数据库增量日志解析,提供增量数据订阅与消费的功能。是一款很好用的数据库同步工具。目前只支持mysql。
Github:https://github.com/alibaba/canal
1、功能简介
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
2、MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
3、canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
二、部署
1、二进制
canal_version=1.1.6
wget https://github.com/alibaba/canal/releases/download/canal-$canal_version/canal.adapter-$canal_version.tar.gz -P /tmp
mkdir /opt/canal.adapter-$canal_version && tar -zxvf /tmp/canal.adapter-$canal_version.tar.gz -C /opt/canal.adapter-$canal_version
ln -s /opt/canal.adapter-$canal_version /opt/canal.adapter
canal_version=1.1.6
wget https://github.com/alibaba/canal/releases/download/canal-$canal_version/canal.admin-$canal_version.tar.gz -P /tmp
mkdir /opt/canal.admin-$canal_version && tar -zxvf /tmp/canal.admin-$canal_version.tar.gz -C /opt/canal.admin-$canal_version
ln -s /opt/canal.admin-$canal_version /opt/canal.admin
canal_version=1.1.6
wget https://github.com/alibaba/canal/releases/download/canal-$canal_version/canal.deployer-$canal_version.tar.gz -P /tmp
mkdir /opt/canal.deployer-$canal_version && tar -zxvf /tmp/canal.deployer-$canal_version.tar.gz -C /opt/canal.deployer-$canal_version
ln -s /opt/canal.deployer-$canal_version /opt/canal.deployer
2、Docker
文档:https://github.com/alibaba/canal/wiki/Docker-QuickStart
docker镜像:https://hub.docker.com/r/canal/canal-server/tags/
- docker模式下,单docker实例只能运行一个instance
三、启动
1、debug方式启动
默认使用suspend=y,阻塞等待你remote debug链接成功
sh startup.sh debug 9099
2、设置输出日志级别
修改conf/logback.xml
四、配置
1、数据库设置
开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
show variables like 'binlog_format';
show variables like 'log_bin';
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
2、配置文件
canal.properties (系统根配置文件)
#################################################
######### Canal配置 #############
#################################################
# canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.ip =
# canal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip)
canal.register.ip =
# canal server提供socket服务的端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal链接canal-admin的地址
# canal.admin.manager = 127.0.0.1:8089
# admin管理链接端口
canal.admin.port = 11110
# admin管理指令链接的ACL配置
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
# canal server链接zookeeper集群的链接信息
canal.zkServers =
# canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
# canal内存store中可缓存buffer记录数,需要为2的指数
canal.instance.memory.buffer.size = 16384
# 内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小
canal.instance.memory.buffer.memunit = 1024
# canal内存store中数据缓存模式
# 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
# 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
#################################################
### Canal 数据库连接心跳检查、切换配置 ###
#################################################
## 是否开启心跳检查
canal.instance.detecting.enable = false
# 心跳检查sql
canal.instance.detecting.sql = select 1
# 心跳检查频率,单位秒
canal.instance.detecting.interval.time = 3
# 心跳检查失败重试次数
canal.instance.detecting.retry.threshold = 3
# 心跳检查失败后,是否开启自动mysql自动切换。
# 说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
# 发生master/standby的切换的条件:(heartbeatHaEnable = true) && (失败次数>=retry.threshold).
canal.instance.detecting.heartbeatHaEnable = false
# 最大事务完整解析的长度支持超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
canal.instance.transaction.size = 1024
# canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒。
# 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
canal.instance.fallbackIntervalInSeconds = 60
#################################################
######### Canal 网络连接配置 #############
#################################################
# 网络链接参数,SocketOptions.SO_RCVBUF
canal.instance.network.receiveBufferSize = 16384
# 网络链接参数,SocketOptions.SO_SNDBUF
canal.instance.network.sendBufferSize = 16384
# 网络链接参数,SocketOptions.SO_TIMEOUT
canal.instance.network.soTimeout = 30
#################################################
######### Canal Binlog配置 #############
#################################################
# 是否使用druid处理所有的ddl解析来获取库和表名
canal.instance.filter.druid.ddl = true
# 是否忽略dcl语句
canal.instance.filter.query.dcl = false
# 是否忽略dml语句
# (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)
canal.instance.filter.query.dml = false
# 是否忽略ddl语句
canal.instance.filter.query.ddl = false
# 是否忽略binlog表结构获取失败的异常
# 主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况
canal.instance.filter.table.error = false
# 是否dml的数据变更事件.(主要针对用户只订阅ddl/dcl的操作)
canal.instance.filter.rows = false
# 是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# 支持的binlog format格式列表(otter会有支持format格式限制)
canal.instance.binlog.format = ROW,STATEMENT,MIXED
# 支持的binlog image格式列表(otter会有支持format格式限制)
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# ddl语句是否单独一个batch返回 (比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)
canal.instance.get.ddl.isolation = false
# 是否开启binlog并行解析模式(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)
# 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
canal.instance.parser.parallel = true
# binlog并行解析的异步ringbuffer队列(必须为2的指数)
canal.instance.parser.parallelBufferSize = 256
# 是否开启tablemeta的tsdb能力
canal.instance.tsdb.enable = true
# 主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
# jdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### Canal配置 #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### 消息队列通用配置 #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
# 是否为json格式,如果设置为false,对应MQ收到的消息为protobuf格式.需要通过CanalMessageDeserializer进行解码
canal.mq.flatMessage = true
# Canal的批处理数据大小, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为阿里云模式,可选值local/cloud
canal.mq.accessChannel = local
# 是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算
canal.mq.database.hash = true
# MQ消息发送并行度
canal.mq.send.thread.size = 30
# MQ消息构建并行度
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
# kafka为bootstrap.servers,rocketMQ中为nameserver列表
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
# 压缩类型
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
# 发送失败重试次数
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =
instance.properties (instance级别的配置文件,每个instance一份)
# 是否启用mysql gtid的订阅模式
canal.instance.gtidon=false
# mysql主库链接地址
canal.instance.master.address=127.0.0.1:3306
# mysql主库链接时起始的binlog文件
canal.instance.master.journal.name=
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
# mysql主库链接时对应的gtid位点
canal.instance.master.gtid=
# 目前配置只支持一个standby配置
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
# 库名.表名: 唯一主键,多个表之间用逗号分隔
canal.mq.partitionHash=test.table:id^name,.*\\..*
3、源库DB-表正则匹配设置
格式:schema.table
canal.instance.filter.regex=A.*,B.*,C.*
4、消息队列Topic正则匹配设置
使用正则路由schema和table到不同的Topic
格式:topic:schema.table,topic:schema.table,topic:schema.table
canal.mq.dynamicTopic=topic_A:db_test,topic_B:db_test\.*,topic_C:db_test.test_table
# topic_A:db_test :db_test库的所有表都会发送到topic_A下
# topic_B:db_test\.* :db_test库正则*匹配的所有表都会发送到topic_B下
# topic_C:db_test.test_table :db_test库的test_table表都会发送到topic_C下
默认的topic
canal.mq.topic=canal_ddl
一些mysql schema的binlog也会读进来(建表语句,grant语句等),如果没有这个默认的topic,会报找不到分区的错误,从而导致canal停止写入
参考:https://www.cnblogs.com/xiexiandong/p/12881011.html
五、MySQL To Kafka
conf/canal.properties
...
canal.serverMode = kafka
...
kafka.bootstrap.servers = localhost:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
conf/example/instance.properties
...
# mq config
canal.mq.topic=canal_ddl
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=db.table:db\\.table
canal.mq.partition=0
# hash partition config
canal.mq.enableDynamicQueuePartition=false
canal.mq.partitionsNum=3
canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
canal.mq.partitionHash=test.table:id^name,.*\\..*
...
六、MySQL To Pulsar
文档:https://pulsar.apache.org/docs/io-canal-source/#configuration
conf/canal.properties
...
canal.serverMode = pulsarMQ
...
pulsarmq.serverUrl = pulsar://127.0.0.1:30065
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix = public/default
conf/example/instance.properties
...
# mq config
canal.mq.topic=canal_ddl
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=db.table:db\\.table
canal.mq.partition=0
# hash partition config
canal.mq.enableDynamicQueuePartition=false
canal.mq.partitionsNum=3
canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
canal.mq.partitionHash=test.table:id^name,.*\\..*
...
七、Aliyun RDS MySQL To Pulsar
同第六章的相同配置。
八、表与消费队列Topic对应实操
适应于投递MySQL Binlog到kafka和pulsar
1、同库多表对应一个topic
canal.instance.filter.regex=test.jobs1,test.jobs2
canal.mq.dynamicTopic=test.jobs:test.*
九、监控
1、配置canal
conf/canal.properties
canal.metrics.pull.port = 11112
2、配置prometheus刮取canal端点数据
- job_name: "test-stg-canal"
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /
scheme: http
static_configs:
- targets: ['192.168.1.20:11112']
3、Grafana导入Dashboard
4、配置告警
去除默认无用的两条数据显示,只保存上图中的两条,同时查询条件进行精确匹配不要用变量引用进行可变搜索。不然无法创建告警Alert,会显示“Template variables are not supported in alert queries”
5、效果
6、监控指标释义
指标 | 说明 | 单位 | 精度 |
---|---|---|---|
canal_instance_transactions | instance接收transactions计数 | - | - |
canal_instance | instance基本信息 | - | - |
canal_instance_subscriptions | instance订阅数量 | - | - |
canal_instance_publish_blocking_time | instance dump线程提交到异步解析队列过程中的阻塞时间(仅parallel解析模式) | ms | ns |
canal_instance_received_binlog_bytes | instance接收binlog字节数 | byte | - |
canal_instance_parser_mode | instance解析模式(是否开启parallel解析) | - | - |
canal_instance_client_packets | instance client请求次数的计数 | - | - |
canal_instance_client_bytes | 向instance client发送数据包字节计数 | byte | - |
canal_instance_client_empty_batches | 向instance client发送get接口的空结果计数 | - | - |
canal_instance_client_request_error | instance client请求失败计数 | - | - |
canal_instance_client_request_latency | instance client请求的响应时间概况 | - | - |
canal_instance_sink_blocking_time | instance sink线程put数据至store的阻塞时间 | ms | ns |
canal_instance_store_produce_seq | instance store接收到的events sequence number | - | - |
canal_instance_store_consume_seq | instance store成功消费的events sequence number | - | - |
canal_instance_store | instance store基本信息 | - | - |
canal_instance_store_produce_mem | instance store接收到的所有events占用内存总量 | byte | - |
canal_instance_store_consume_mem | instance store成功消费的所有events占用内存总量 | byte | - |
canal_instance_put_rows | store put操作完成的table rows | - | - |
canal_instance_get_rows | client get请求返回的table rows | - | - |
canal_instance_ack_rows | client ack操作释放的table rows | - | - |
canal_instance_traffic_delay | server与MySQL master的延时 | ms | ms |
canal_instance_put_delay | store put操作events的延时 | ms | ms |
canal_instance_get_delay | client get请求返回events的延时 | ms | ms |
canal_instance_ack_delay | client ack操作释放events的延时 | ms | ms |
7、Dashboard
指标 | 简述 | 多指标 |
---|---|---|
Basic | Canal instance 基本信息。 | 是 |
Network bandwith | 网络带宽。包含inbound(canal server读取binlog的网络带宽)和outbound(canal server返回给canal client的网络带宽) | 是 |
Delay | Canal server与master延时;store 的put, get, ack操作对应的延时。 | 是 |
Blocking | sink线程blocking占比;dump线程blocking占比(仅parallel mode)。 | 是 |
TPS(transaction) | Canal instance 处理binlog的TPS,以MySQL transaction为单位计算。 | 否 |
TPS(tableRows) | 分别对应store的put, get, ack操作针对数据表变更行的TPS | 是 |
Client requests | Canal client请求server的请求数统计,结果按请求类型分类(比如get/ack/sub/rollback等)。 | 否 |
Response time | Canal client请求server的响应时间统计。 | 否 |
Empty packets | Canal client请求server返回空结果的统计。 | 是 |
Store remain events | Canal instance ringbuffer中堆积的events数量。 | 否 |
Store remain mem | Canal instance ringbuffer中堆积的events内存使用量。 | 否 |
Client QPS | client发送请求的QPS,按GET与CLIENTACK分类统计 | 是 |
参考:https://github.com/alibaba/canal/wiki/Prometheus-QuickStart
十、错误处理
1、如果canal启动时候从日志看到报这个错误:can't find start position for example
。
解决方法:
- 单机
删除meta.dat文件,重启canal,问题解决。 - 集群
进入canal对应的zookeeper集群下,删除节点/otter/canal/destinations/实例/1001/cursor
,重启canal即可恢复
评论区