SpringBoot整合canal实现数据同步的示例代码
作者:郑清 发布时间:2022-05-07 19:51:24
一、前言
canal:阿里巴巴 MySQL binlog 增量订阅&消费组件
https://github.com/alibaba/canal
tips: 环境要求和配置参考 https://github.com/alibaba/canal/wiki/AdminGuide
这里额外提下Redis缓存和MySQL数据一致性解决方案
延时双删策略
异步更新缓存(基于订阅binlog的同步机制)
…
我们的canal即可作为MySQL binlog增量订阅消费组件+MQ消息队列将增量数据更新到redis
二、docker-compose部署canal
tips: 详情可查看 https://gitee.com/zhengqingya/docker-compose
# 准备
git clone https://gitee.com/zhengqingya/docker-compose.git
cd docker-compose/Liunx
# 导入初始化SQL
Liunx/canal/canal_admin/canal_manager.sql
# 运行 (tips:先修改配置文件信息)
docker-compose -f docker-compose-canal.yml -p canal up -d
相关配置文件
docker-compose-canal.yml
# 可参考 https://github.com/alibaba/canal/wiki/QuickStart
version: '3'
# 网桥canal -> 方便相互通讯
networks:
canal:
services:
canal_admin:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-admin:v1.1.5 # 原镜像`canal/canal-admin:v1.1.5`
container_name: canal_admin # 容器名为'canal_admin'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh"
- "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
environment: # 设置环境变量,相当于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.adminUser: admin
canal.adminPasswd: 123456
spring.datasource.address: www.zhengqingya.com:3306
spring.datasource.database: canal_manager
spring.datasource.username: root
spring.datasource.password: root
ports:
- "8089:8089"
networks:
- canal
canal_server:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-server:v1.1.5 # 原镜像`canal/canal-server:v1.1.5`
container_name: canal_server # 容器名为'canal_server'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "./canal/canal-server/logs:/home/admin/canal-server/logs"
environment: # 设置环境变量,相当于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.register.ip: www.zhengqingya.com
canal.admin.manager: canal_admin:8089
canal.admin.port: 11110
canal.admin.user: admin
canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
ports:
- "11110:11110"
- "11111:11111"
- "11112:11112"
depends_on:
- canal_admin
links:
- canal_admin
networks:
- canal
访问地址:http://ip地址:8089
默认登录账号密码:admin/123456
三、canal-admin可视化管理
tips: 页面使用自行多点点就会了
^_^
将数据投递到RabbitMQ配置为例,附带些其中重要的配置信息
mq的交换机和队列可通过后面java代码自动生成,无需自己手动建立
1、canal.properties
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
rabbitmq.host = www.zhengqingya.com:5672
rabbitmq.virtual.host = my_vhost
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
2、example/instance.propertios
canal.instance.master.address=www.zhengqingya.com:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# mq config
canal.mq.topic=canal_routing_key
四、springboot整合canal实现数据同步
1、pom.xml
引入mq依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml
配置
# RabbitMQ配置
spring:
rabbitmq:
addresses: www.zhengqingya.com:5672,www.zhengqingya.com:5673 # 指定client连接到的server的地址,多个以逗号分隔
# 填写自己安装rabbitmq时设置的账号密码,默认账号密码为`guest`
username: admin
password: admin
virtual-host: my_vhost # 填写自己的虚拟机名,对应可查看 `127.0.0.1:15672/#/users` 下Admin中的`Can access virtual hosts`信息
3、mq监听canal消息数据
@Slf4j
@Component
public class CanalRabbitMqListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
key = MqConstant.CANAL_ROUTING_KEY
)
})
public void handleCanalDataChange(String message) {
log.info("[canal] 接收消息: {}", JSON.toJSONString(message));
}
}
public interface MqConstant {
String CANAL_EXCHANGE = "canal.exchange";
String CANAL_QUEUE = "canal_queue";
String CANAL_ROUTING_KEY = "canal_routing_key";
}
4、测试
先启动项目让程序自动建立所需mq中的交换机和队列
再修改canal监听的表数据
查看程序监听的mq消息数据如下,拿到数据就可以进行数据解析处理了…
五、canal-spring-boot-starter
tips: 可参考 https://github.com/NormanGyllenhaal/canal-client
此方式需将canal.properties
配置文件中的canal.serverMode
属性值修改为tcp
1、pom.xml
中引入依赖
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、application.yml
配置
canal:
server: www.zhengqingya.com:11111
destination: example # tips:canal-admin中Instance管理下需存在example实例配置
3、监听canal数据 – 订阅数据库增删改操作
@Slf4j
@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
log.info("insert message {}", user);
}
@Override
public void update(User before, User after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(User user) {
log.info("delete {}", user);
}
}
@Data
@Table(name = "t_user")
public class User implements Serializable {
/**
* 主键
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "user_id")
private Integer userId;
/**
* 用户名
*/
@Column(name = "username")
private String username;
/**
* 密码
*/
@Column(name = "password")
private String password;
/**
* 性别
*/
@Column(name = "sex")
private Integer sex;
/**
* 备注
*/
private String remark;
/**
* 时间
*/
private Date date;
}
经测试发现这个jar存在一些bug,ex:针对表字段,数据原本为空,修改为有值的时候,如果java这边用非String字段类型去接收会报错!
本文案例demo源码
https://gitee.com/zhengqingya/java-workspace
来源:https://blog.csdn.net/qq_38225558/article/details/122587532


猜你喜欢
- 应用的配置源通常都是远端的Config Server服务器,默认情况下,本地的配置优先级低于远端配置仓库。如果想实现本地应用的系统变量和co
- 今天碰到一个非常奇怪的问题: 在Android中ImageView无法显示加载的本地SDCard图片。 具体过程是:先调用本地照相机程序摄像
- private void button2_Click(object sender, EventArgs e) &nbs
- 本文实例为大家分享了Java实现五子棋游戏的具体代码,供大家参考,具体内容如下学习目的:熟悉java中swing类与java基础知识的巩固.
- 引言对使用 lombok 还是有很多争议的,有些公司不建议使用,有些公司又大量使用。我们的想法是:可以使用,但是不要滥用。什么是 lombo
- 本篇介绍我们如何利用selenium 来操作各种页面元素阅读目录链接(link)输入框 textbox按钮(Button)下拉选择框(Sel
- 初学spring,我在dao层初始化c3p0的时候,使用@Resource注解新建对象是发现注入为null,告诉我 java.la
- Java8被称作Java史上变化最大的一个版本。其中包含很多重要的新特性,最核心的就是增加了Lambda表达式和StreamAPI。这两者也
- 单例模式是23种设计模式之一,是比较简单的一种设计模式,它的目的是无论调用多少次,都返回同一个对象,它的特点是构造器私有化。它分为两种结构,
- 圆形识别方案识别流程判断是否为封闭图形;根据圆的方程,取输入点集中的1/6、3/6、5/6处的三个点,求得圆的方程,获取圆心及半径;取点集中
- 用C#想写一个直接将数据库查询得到的datatable,直接导出为csv格式的文件,拷贝到导出的操作类后,一直catch到的错误提示是对路径
- 本文实例讲述了C#使用HttpDownLoadHelper下载文件的方法。分享给大家供大家参考。具体实现方法如下:using System;
- 大家好,今天我和大家分享一下用Java编写日历表,我用了3种方式实现这一功能的。首先我们来看看一个日历表有什么特征。在这里我就把我电脑上的日
- 1 引言伴随着信息技术的快速发展,近年来,人们的生活已经离不开计算机。生活娱乐几乎都是在计算机上进行的。其中的扫雷游戏就是之一。扫雷游戏是微
- 外观模式: 又称门面模式: 外观Facade为子系统的一组接口提供一个一致界面,使得这组子系统易于使用(通过引入一个新的外观角色降低原系统复
- ArrayList与Array的区别概述 ArrayList 是数
- 一 前言springboot 额外的特色是提供了后台应用监控,可以通过 HTTP 或者 JMX的方式管理监控应用,本文主讲HTTP方式;其主
- 1.pom.xml文件引入druid和数据库连接jar包<properties><druid.version>1.0
- 个人详情页滑动到顶部最近产品提了个新需求,需要实现点击App内的某个按钮跳转到个人详情页并且滑动到顶部,个人详情页的页面交互稍微复杂,技术角
- 我们在j2ee当中,连接数据库的时候经常会用到properties配置文件,我们原来在eclipse或者myeclipse当中会在src文件