Mysql到Elasticsearch高效实时同步Debezium实现
作者:铭毅天下 发布时间:2024-01-25 15:22:46
题记
来自Elasticsearch中文社区的问题——
MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中?
logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。
回到问题本身:如果库表里没有相关字段,该如何处理呢?
本文给出相关探讨和解决方案。
1、 binlog认知
1.1 啥是 binlog?
binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;
其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中;
作用主要有:
1)复制:达到master-slave数据一致的目的。2)数据恢复:通过mysqlbinlog工具恢复数据。3)增量备份。
1.2 阿里的Canal实现了增量Mysql同步
一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。
目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。
综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。
2、基于binlog的同步方式
1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/
2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/
由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。
3、Debezium介绍
Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。
特点:
1)简单。无需修改应用程序。可对外提供服务。
2)稳定。持续跟踪每一行的每一处变动。
3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。
4、同步架构
如图,Mysql到ES的同步策略,采取“曲线救国”机制。
步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。
步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。
5、Debezium实现Mysql到ES增删改实时同步
软件版本:
confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1
5.1 Debezium安装
confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。
Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。
MySQL Connector plugin 压缩包的下载地址:https://debezium.io/docs/install/。
注意重启一下confluent,以使得Debezium生效。
5.2 Mysql binlog等相关配置。
Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。
核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。
[mysqld]
server-id = 223344
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
然后,重启一下Mysql以使得binlog生效。
systemctl start mysqld.service
5.3 配置connector连接器。
配置confluent路径目录 : /etc
创建文件夹命令 :
mkdir kafka-connect-debezium
在mysql2kafka_debezium.json存放connector的配置信息 :
[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
{
"name" : "debezium-mysql-source-0223",
"config":
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "192.168.1.22",
"database.port" : "3306",
"database.user" : "root",
"database.password" : "XXXXXX",
"database.whitelist" : "kafka_base_db",
"table.whitlelist" : "accounts",
"database.server.id" : "223344",
"database.server.name" : "full",
"database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
"database.history.kafka.topic" : "account_topic",
"include.schema.changes" : "true" ,
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true",
"transforms": "unwrap,changetopic",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changetopic.regex":"(.*)",
"transforms.changetopic.replacement":"$1-smt"
}
}
注意如下配置:
“database.server.id”:对应Mysql中的server-id的配置。
“database.whitelist”: 待同步的Mysql数据库名。
“table.whitlelist”:待同步的Mysq表名。
“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic
“database.server.name”:逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。
坑1:transforms相关5行配置作用是写入数据格式转换。
如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。
这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。
格式转换相关原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
5.4 启动connector
curl -X POST -H "Content-Type:application/json"
--data @mysql2kafka_debezium.json.json
http://192.168.1.22:18083/connectors | jq
5.5 验证写入是否成功。
查看kafka-topic
kafka-topics --list --zookeeper localhost:2181
此处会看到写入数据topic的信息。
注意新写入数据topic的格式:database.schema.table-smt 三部分组成。
本示例topic名称:full.kafka_base_db.account-smt。
5.6 验证消费数据验证写入是否正常
./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning
至此,Debezium实现mysql同步kafka完成。
6、kafka-connector实现kafka同步Elasticsearch
6.1、Kafka-connector介绍
见官网:https://docs.confluent.io/current/connect.html
Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。
连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。
6.2、kafka到ES connector同步配置
配置路径:
/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
配置内容:
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "full.kafka_base_db.account-smt",
"key.ignore": "true",
"connection.url": "http://192.168.1.22:9200",
"type.name": "_doc",
"name": "elasticsearch-sink-test"
6.3 kafka到ES启动connector
启动命令
confluent load elasticsearch-sink-test
-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
6.4 Kafka-connctor RESTFul API查看
Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。
curl -X GET http://localhost:8083/connectors
7、坑复盘。
坑2: 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。
排解思路如下:
1)确认消费的topic是否是写入数据的topic;2)确认同步的过程中没有出错。可以借助connector如下命令查看。
curl -X GET http://localhost:8083/connectors-xxx/status
坑3: Mysql2ES出现日期格式不能识别。
是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。
坑4: kafka2ES,ES没有写入数据。
排解思路:
1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。
2)通过connetor/status排查出错原因,一步步分析。
8、小结
binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。
对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。
参考:
[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn
[3] https://juejin.im/post/5b7c036bf265da43506e8cfd
[4] https://debezium.io/docs/connectors/mysql/#configuration
[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
来源:https://blog.csdn.net/laoyang360/article/details/87897886
猜你喜欢
- python中的paramiko模块是用来实现ssh连接到远程服务器上的库,在进行连接的时候,可以用来执行命令,也可以用来上传文件。1、得到
- 1.选择File -> Settings2.选择 File and Code Templates -> Files ->
- 一、正则与LIKE的区别 Mysql的正则表达式仅仅使SQL语言的一个子集,可以匹配基本的字符、字符串。例如:select * f
- 前言django提供了commands类,允许我们编写命令行脚本,并且可以通过python manage.py拉起。了解commands具体
- MySQL的主键可以是自增的,那么如果在断电重启后新增的值还会延续断电前的自增值吗?自增值默认为1,那么可不可以改变呢?下面就说一下 MyS
- power(x, y) 函数,计算 x 的 y 次方。示例:x 和 y 为单个数字:import numpy as npprint(np.p
- 先来描述一下我遇到的问题,在进行matplotlib学习时, plot.show() 总是无法成功运行,总是会报一个错:RuntimeErr
- 第一次用Python写这种比较实用且好玩的东西,权当练手吧游戏说明:* P键控制“暂停/开始”* 方向键控制贪吃蛇的方向源代码如下:from
- 1、要点 (1) 在C语言中没有字符串,只有字符, 在python中的字符串hello,在C
- 如下所示:def sub(arr): finish=[] size = len(arr) end = 1 << size #en
- Python有自己内置的标准GUI库--Tkinter,只要安装好Python就可以调用。今天学习到了图形界面设计的问题,刚开始就卡住了。为
- 前言SPA项目中,首屏加载速度都是老生常谈的问题了,首屏时间直接反应了用户多久能看到页面的主要内容,这决定了用户体验,本文聊一聊如何采集首屏
- 因为编写了一个Python程序,密集的操作了一个Mysql库,之前数据量不大时,没发现很慢,后来越来越慢,以为只是数据量大了的原因,但是后来
- 在使用出colab进行模型训练时,发现colab的python版本更新为了3.7.11,而我的代码要在python3.6下才行配置好环境,于
- 一、torch.rand():构造均匀分布张量的方法torch.rand是用于生成均匀随机分布张量的函数,从区间[0,1)的均匀分布中随机抽
- 前言:列表框控件显示多行文本,用户可以选中一行或者多行。所有的文本只能使用一种字体,不能混合使用多种字体。1 属性常用的参数列表如下:1.1
- 简介枚举是与多个唯一常量绑定的一组符号因为枚举表示的是常量,建议枚举成员名用大写IntEnum 便于进行系统交互初试from enum im
- 在跨业务、跨网站发送数据或者业务升级的时候,我们有的时候需要指定发送数据的编码方式,比如页面是utf-8编码的,而发送出去的数据却是GB23
- 目录 一、前言1.1 什么是 import 机制?1.2 import 是如何执行的?二、import 机制概览三、import
- 基本属性定义当前地牢的等级,地图长宽,房间数量,房间的最小最大长度,如下class Map: def __init