![canal-adapter 同步mysql到es [基于tcp模式],第1张 canal-adapter 同步mysql到es [基于tcp模式],第1张](/aiimages/canal-adapter+%E5%90%8C%E6%AD%A5mysql%E5%88%B0es+%5B%E5%9F%BA%E4%BA%8Etcp%E6%A8%A1%E5%BC%8F%5D.png)
canal环境搭建canal-servercanal-admincanal-adapter
canal环境搭建- canal官网下载 https://github.com/alibaba/canal/tags分别将三个tar.gz包解压到指定的包下(adapter|admin|deployer)
- 将自己伪装成mysql的slave节点,来订阅mysql binlog的变更配置mysql,开启binlog
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 配置canal.properties
canal.id = 101 canal.register.ip = 127.0.0.1 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # 当前server上部署的instance列表,对应conf目录下创建对应的文件夹,文件copy example中做修改即可 canal.destinations = example,testusers
- 配置example.properties
canal.instance.mysql.slaveId=103 # 不能和mysql的server_id重复 canal.instance.master.address=mysql地址:3306 canal.instance.dbUsername=root canal.instance.dbPassword=xxxxxx canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex=canal_manager..* # 配置包含数据库和表 canal.instance.filter.black.regex=mysql\.slave_.* # 配置不包含数据库和表 canal.mq.topic=xxx
配置过滤正则表达式说明
启动canal
/bin/startup.sh 查看canal.log,cat logs/canal/canal.log 查看example.log,cat logs/example/example.logcanal-admin
- 提供了WebUI *** 作界面用来配置集群、节点、实例初始化数据库:conf/canal_manager.sql配置application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
- 启动
bin/startup.sh 查看日志:cat logs/admin.log
- WebUI
浏览器访问:http://localhost:8089/ 用户名/密码:admin/123456canal-adapter
- 可以将数据库变更同步给MQ、ES、DB、LOGGER,需要我们根据实际业务需要去修改配置文件conf/application.yml创建表
DROp TABLE IF EXISTS `users`;
CREATE TABLE `users` (
`id` varchar(64) NOT NULL,
`username` varchar(20) NOT NULL COMMENT '用户名',
`phone` varchar(64) NOT NULL COMMENT '手机号',
`nickname` varchar(20) NOT NULL COMMENT '昵称',
`last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`),
UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
BEGIN;
INSERT INTO `users` VALUES ('1', '郑强', '13181838112', '爸爸', '2022-01-24 17:13:00');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
- 创建索引
PUT /mytest_user/
{
"mappings": {
"_doc":{
"properties": {
"id": {
"type": "keyword"
},
"username": {
"type": "text"
},
"phone": {
"type": "text"
},
"nickname": {
"type": "text"
},
"last_modified":{
"type": "date"
}
}
}
}
}
- 修改canal-adapter配置
server:
port: 8088
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.22.77.76:3306/canal_manager?useUnicode=true
username: root
password: rc2021!
canalAdapters:
- instance: testusers # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: es-01
- 修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: testusers # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user # 索引名称
_type: _doc
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配, **_id要与下面sql语句字段 as _id一样**
upsert: true
#pk: id
sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users"
# objFields:
# # _labels: array:;
# # etlCondition: "where a.c_time>={}"
# commitBatch: 3000 # 提交批大小
- 启动
bin/startup.sh
- 全量同步
adapter根目录下创建目录 mkdir sh
vim users_all.sh curl http://localhost:8088/etl/es7/mytest_user.yml -X POST
chmod vim users_all.sh 777
./vim users_all.sh
{"succeeded":true,"resultMessage":"导入ES 数据:4 条"}
增量同步
启动服务之后就已经增量同步,打开日志即可看到tail -f logs/adapter/adapter.log
解决问题
(1) druid类型转换错误
1> 下载源码包:[https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz](https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz) 2> 定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为3> mvn clean package 4> 到canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的 5> 重启adapter com.alibaba druidprovided
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)