MySQL特定表全量、增量数据同步到消息队列-解决方案
作者:李雷 发布时间:2024-01-24 04:36:47
目录
1、原始需求
2、解决方案
3、canal介绍、安装
canal的工作原理
架构
安装
4、验证
1、原始需求
既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应。
数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。
应用场景:数据ETL同步、降低业务服务器压力。
2、解决方案
3、canal介绍、安装
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
工作原理:mysql主备复制实现
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
架构
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
安装
1、mysql、kafka环境准备
2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz
4、对目录conf里文件参数配置
对canal.properties配置:
进入conf/example里,对instance.properties配置:
5、启动:bin/startup.sh
6、日志查看:
4、验证
1、开发对应的kafka消费者
package org.kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
*
* Title: KafkaConsumerTest
* Description:
* kafka消费者 demo
* Version:1.0.0
* @author pancm
* @date 2018年1月26日
*/
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.7.193:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
System.out.println("---------开始消费---------");
try {
for (; ; ) {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
// String v = decodeUnicode(record.value());
// System.out.println(v);
//当消费了1000条就退出
if (messageNo % 1000 == 0) {
break;
}
messageNo++;
}
} else {
Thread.sleep(11);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
Thread thread1 = new Thread(test1);
thread1.start();
}
/*
* 中文转unicode编码
*/
public static String gbEncoding(final String gbString) {
char[] utfBytes = gbString.toCharArray();
String unicodeBytes = "";
for (int i = 0; i < utfBytes.length; i++) {
String hexB = Integer.toHexString(utfBytes[i]);
if (hexB.length() <= 2) {
hexB = "00" + hexB;
}
unicodeBytes = unicodeBytes + "\\u" + hexB;
}
return unicodeBytes;
}
/*
* unicode编码转中文
*/
public static String decodeUnicode(final String dataStr) {
int start = 0;
int end = 0;
final StringBuffer buffer = new StringBuffer();
while (start > -1) {
end = dataStr.indexOf("\\u", start + 2);
String charStr = "";
if (end == -1) {
charStr = dataStr.substring(start + 2, dataStr.length());
} else {
charStr = dataStr.substring(start + 2, end);
}
char letter = (char) Integer.parseInt(charStr, 16); // 16进制parse整形字符串。
buffer.append(new Character(letter).toString());
start = end;
}
return buffer.toString();
}
}
2、对表bak1进行增加数据
CREATE TABLE `bak1` (
`vin` varchar(20) NOT NULL,
`p1` double DEFAULT NULL,
`p2` double DEFAULT NULL,
`p3` double DEFAULT NULL,
`p4` double DEFAULT NULL,
`p5` double DEFAULT NULL,
`p6` double DEFAULT NULL,
`p7` double DEFAULT NULL,
`p8` double DEFAULT NULL,
`p9` double DEFAULT NULL,
`p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
show create table bak1;
insert into bak1 select '李雷abcv',
`p1` ,
`p2` ,
`p3` ,
`p4` ,
`p5` ,
`p6` ,
`p7` ,
`p8` ,
`p9` ,
`p0` from moci limit 10
3、查看输出结果:
来源:https://www.cnblogs.com/lilei2blog/p/15608206.html


猜你喜欢
- 目录1. 线程的概念1.1 Manager_进程通信1.2 线程的概念2. 线程的基本使用3. 自定义线程_守护线程3.1 自定义线程3.2
- 快速排序的基本思想:首先选定一个数组中的一个初始值,将数组中比该值小的放在左边,比该值大的放在右边,然后分别对左边的数组进行如上的操作,对右
- 我们知道,正则表达式是一个处理字符串中很实用的技巧。然而,即便是Javascript写的很厉害的程序猿,有时也会忘掉正则表达式的语法,从而使
- WordPress 页面模板是特定的模板文件,用于特定页面或页面组,这些用于单页数据的模板显示在前端。我们还可以在 WordPre
- 项目说明 该电商项目类似于京东商城,主要模块有验证、用户、第三方登录、首页广告、商品、购物车、订单、支付以及后台管理系统。项目开发模式采用前
- 查看系统原有Python注:可以将python指向python3,但必须修改一些命令如yum的配置,不然会报错。安装依赖yum instal
- 本文实例讲述了JS定义函数的几种常用方法。分享给大家供大家参考,具体如下:在 JavaScript 语言里,函数是一种对象,所以可以说函数是
- 背景说明:10 * time.Second //正常数字相乘没错但是package mainimport "time"f
- 因为主键可以唯一标识某一行记录,所以可以确保执行数据更新、删除的时候不会出现张冠李戴的错误。当然,其它字段可以辅助我们在执行这些操作时消除共
- 本文为大家分享了vue $emit 和 $on 组件通信,供大家参考,具体内容如下<!DOCTYPE html> <htm
- show profile是由Jeremy Cole捐献给MySQL社区版本的。默认的是关闭的,但是会话级别可以开启这个功能。开启它可以让My
- 前言简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探HTML5播放控件
- 一、Background当想将照片序列合成延时摄影视频时,可能会发现照片中缺少一张,或者照片序列是跨时间、并不连续的,如图1所示,但PR中只
- 本文实例讲述了Python字符串的全排列算法。分享给大家供大家参考,具体如下:题目描述输入一个字符串,按字典序打印出该字符串中字符的所有排列
- 在我们平时的开发过程中,为了方便调试程序,我们都是打开开发者模式,即Debug=True,当我们正式上线的时候肯定就需要把开发者模式关掉,用
- 如下所示:# -*- coding: utf-8 -*-import sysfrom PyQt5.QtWidgets import (QAp
- 为什么我也要说SQL Server的并行:这几天园子里写关于SQL Server并行的文章很多,不管怎么样,都让人对并行操作有了更深刻的认识
- 1. 哈希算法基础1.1 哈希算法的定义哈希算法(Hash Algorithm)是一种将任意长度的输入数据映射为固定长度哈希值的算法。它具有
- 利用oracle的dbms_random包结合rownum来实现,示例如下,随机取499户:select * from ( select *
- 本文将结合实例代码,介绍 OpenCV 如何查找轮廓、获取边界框。代码: contours.pyOpenCV 提供了 findContour