TC 集群Seata1.6高可用架构源码解析
作者:Applehope 发布时间:2022-04-18 05:02:34
一、背景
TC 集群具有高可用架构,应用到集群是这样一个间接的关系:应用 -》事务分组 -》TC 集群,应用启动后所指定的事务分组不能变,可通过配置中心变更事务分组所属的 TC 集群,Seata 客户端监听到这个变更后,会切换到新的 TC 集群。
本篇从源码梳理这个高可用能力是如何实现的。
二、环境配置
客户端配置使用nacos配置中心和nacos注册中心,
seata:
enabled: true
# Seata 应用编号
application-id: seataclistock
# Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
tx-service-group: tx_group_stock
# 关闭自动代理
enable-auto-data-source-proxy: false
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
serverAddr:
namespace: seata # 需要与服务端添加的配置文件相同
group: SEATA_GROUP_ROCKTEST
username: seata
password: seata
data-id: seataClient.tx_group_busin.properties
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-server
serverAddr:
namespace: seata # 需要与服务端添加的配置文件相同
group: SEATA_GROUP_ROCKTEST # 需要与服务端添加的配置文件相同
username: seata
password: seata
三、从配置中心获取TC集群
服务注册的能力要依赖配置中心,从nacos的配置中心获取配置NacosConfiguration#initSeataConfig
Data Id:seataClient.tx_group_stock.properties
Group:SEATA_GROUP_LWKTEST
其中的service.vgroupMapping.tx_group_stock
的值是dev_cluster_1
,接下来注册能力就要使用这个集群来工作。
private static void initSeataConfig() {
try {
String nacosDataId = getNacosDataId();
String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(config)) {
seataConfig = ConfigProcessor.processConfig(config, getNacosDataType());
NacosListener nacosListener = new NacosListener(nacosDataId, null);
configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
}
} catch (NacosException | IOException e) {
LOGGER.error("init config properties error", e);
}
}
RegistryFactory#getInstance()
这是个单例机制,所以源码梳理起来很简单,下边获取TC服务的时候会调用此单例方法做初始化。
读取配置文件中
registry.type
,配置的值是
nacos
,所以读出的值是nacos
通过SPI加载并实例化
NacosRegistryProvider
public class RegistryFactory {
/**
* Gets instance.
*
* @return the instance
*/
public static RegistryService getInstance() {
return RegistryFactoryHolder.INSTANCE;
}
private static RegistryService buildRegistryService() {
RegistryType registryType;
//registryTypeName = "registry.type"
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
// nacos
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
// 通过SPI 加载并实例化 NacosRegistryProvider
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
private static class RegistryFactoryHolder {
private static final RegistryService INSTANCE = buildRegistryService();
}
}
TM、RM 客户端需要与TC通信,所以在其初始化时必然会有获取TC集群的逻辑,对应在源码TmNettyRemotingClient#init
中的reconnect
方法。
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
//父类中会开启定时任务来执行 getClientChannelManager().reconnect(transactionServiceGroup)
super.init();
if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
reconnect
中的.NettyClientChannelManager#getAvailServerList
是根据seata.tx-service-group
的值来检索TC集群信息。直接提供出来调用堆栈,方便大家快速熟悉调用链路:
getServiceGroup:111, RegistryService (io.seata.discovery.registry)
lookup:145, NacosRegistryServiceImpl (io.seata.discovery.registry.nacos)
getAvailServerList:257, NettyClientChannelManager (io.seata.core.rpc.netty)
reconnect:171, NettyClientChannelManager (io.seata.core.rpc.netty)
init:198, TmNettyRemotingClient (io.seata.core.rpc.netty)
init:47, TMClient (io.seata.tm)
initClient:220, GlobalTransactionScanner (io.seata.spring.annotation)
afterPropertiesSet:512, GlobalTransactionScanner (io.seata.spring.annotation)
这里便是通过Seata客户端 seata.tx-service-group
的值,找到最终TC集群的关键之处。在getServiceGroup
中从nacos中获取service.vgroupMapping.tx_group_stock
的值,即dev_cluster_1
default String getServiceGroup(String key) {
//key = service.vgroupMapping.tx_group_stock
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
return ConfigurationFactory.getInstance().getConfig(key);
}
然后NettyClientChannelManager#reconnect
中获取 TC 集群中的所有 TC 服务节点,对每个TC 服务节点建连。
for (String serverAddress : availList) {
try {
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
serverAddress, e.getMessage(), e);
}
}
再梳理一下,梳理这么多的关键就是通过tx_group_stock
找到 TC 集群 dev_cluster_1
客户端:
seata:
# 默认关闭,如需启用spring.datasource.dynami.seata需要同时开启
enabled: true
# Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
tx-service-group: tx_group_stock
nacos:
Data ID: seataClient.tx_group_stock.properties
Group: SEATA_GROUP_ROCKTEST
配置内容:
...
service.vgroupMapping.tx_group_stock=dev_cluster_1
...
TC服务端:
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 30.240.*
nacos:
application: seata-server
cluster: dev_cluster_1
RegistryService#getServiceGroup
中从nacos获取值的时候,有个细节需要注意:通过namespace + Group + Key(Data Id) 三维来唯一标示一个Key。
四、刷新TC集群
AbstractNettyRemotingClient#init
中默认会每隔10s进行一次 TC 服务清单刷新与重连
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
来源:https://juejin.cn/post/7180243089527668797


猜你喜欢
- 目录1、在运行时,由java解释器自动引入,而不用import语句引入的包是()。2、以下关于集合类ArrayList、LinkedList
- IDEA打成jar包并在windows后台运行一、IDEA打成jar包1、File=>Project Structure=>Pr
- 测试1: 先看一组String类型比较,废话不多说,直接上代码:public class Test {public static void
- 一、继承1、继承的概念继承机制:是面向对象程序设计是代码可以复用的最重要手段,允许程序员在保持原有类特性的基础上进行扩展,增加新的功能,产生
- Logger来自log4j自己的包。如果用Logger.getLogger,需要一个log4j的jar包,用此方式你只能依log4j:Log
- 一、为什么要有泛型?我们在写一些方法时可能会方法名相同,参数类型不同的方法,这种叫做重载。如果只是因为参数类型不同里面做的业务逻辑都是相同的
- 在代码中进行命令行交互是一个很常见的场景, 特别是在一些CI CD 自动化流程中, 在这之前我们会使用 System.Diagnostics
- 本文实例讲述了Java设计模式之 * 模式。分享给大家供大家参考,具体如下: * 模式有三个要素——事件源、事件对象、 * 。事件源:顾名思
- Java 数据库连接池详解数据库连接池的原理是:连接池基本的思想是在系统初始化的时候,将数据库连接作为对象存储在内存中,当用户需要访问数据库
- 其实是可以通过@Constraint来限定自定义注解的方法。@Constraint(validatedBy = xxxx.class)下面是
- 之前百度,google了很多,发现并没有介绍mongodb生产环境如何配置的文章, 当时想参考下都不行, 所以写篇文章,大家可以一块讨论下.
- 概念二叉搜索树又称二叉排序树,它或者是一棵空树,或者是具有以下性质的二叉树:1、若它的左子树不为空,则左子树上所有节点的值都小于根结点的值。
- 本文实例为大家分享了android自定义波浪加载动画的具体代码,供大家参考,具体内容如下效果图1.自定义控件 WaveViewpackage
- using System;using System.Data;using System.Configuration;using System
- OAuth 2.0 是一种工业级的授权协议。OAuth 2.0是从创建于2006年的OAuth 1.0继承而来的。OAuth 2.0致力于帮
- Java8中的lambda表达式、::符号和Optional类 0. 函数式编程 函
- 1.瞎叨叨也不知道写点什么,本来想写写Flutter的集成测试。因为前一阵子给flutter_deer写了一套,不过感觉也没啥内容,写不了几
- 1.概述数据库开发一直是JAVA开发的核心之一,作为现在JAVA EE的基石框架,Spring Boot自身携带了一个JDBCTemplat
- 实现“摇一摇”功能,其实很简单,就是检测手机的重力感应,具体实现代码如下:1、在 AndroidManifest.xml 中添加操作权限2、
- 说明:基于atguigu学习笔记。简介Webflux是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似