dubbo如何实现consumer从多个group中调用指定group的provider
作者:自东向西 发布时间:2022-06-09 01:00:13
背景
在工作中,遇到这样的场景:
有个es索引构建服务,需要从各个业务服务获取索引的信息,从而构建索引,业务服务都实现同一个接口——IndexInfoProvider,通过设置不同的group来达到区分的效果(group就是es索引名)。
索引构建服务在内存维护了一个Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服务的consumer。
为了图方便,索引构建服务还一次性初始化了所有consumer,假设总共有200个分组,那么Map就会缓存200个consumer,初始化的时候巨慢。如果不一次性初始化,而是按需的话,代码会变得复杂一些,可能需要像双重检验这样的措施。而且因为是缓存,必然也会遇到缓存一致性的问题,例如新增一个索引。
基于这样的问题,就想着,dubbo的一个consumer,能不能按需调用指定分组的provider呢~
过程
为什么必须缓存那么多consumer
为什么有200个分组,就要缓存200个consumer,而不是像我们平时那样,1个就可以呢?
业务服务在实现IndexInfoProvider接口的时候,都指定是分组,而且分组名,就是对应的索引名。
相应的,消费者就必须指定消费的分组,200个索引,自然就需要200个consumer。
consumer只能调用一个group的provider么
在我们日常的使用中,consumer基本都是只能调用一个group的provider。但实际上,dubbo是支持调用多个group的provider的。
这样写是指定a分组和b分组,多个分组之间用英文逗号分隔
这样写,是所有的分组
group="*"能代替Map<String, IndexInfoProvider>么
如果将es索引构建服务的IndexInfoProvider接口的consumer分组设置为group="*",然后在调用接口的时候,根据需要,从一堆provider中筛选出指定group的provider,只调用这些provider,是不是就可以代替Map缓存了?
初步方案:
利用ThreadLocal来指定要调用的分组,在调用方法前设置group到ThreadLocal中。
实现dubbo的负载均衡拓展,只选取指定分组的节点来调用。
调用结束,清理ThreadLocal中的分组信息。
似乎是可行的?
理想很美好,现实很骨感
通过源码和代码debug发现,consumer持有的Invoker,有点像一个责任链。
如果是单分组——!group.contains(",") && !group.equals("*"),那么会是这样:
MockClusterInvoker->FailoverClusterInvoker
如果是多分组,则会变成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
负载均衡的机制,是得到了FailoverClusterInvoker才会生效
MockClusterInvoker只是一种降级机制,不是导致问题的原因
问题是出在MergeableClusterInvoker,下面是导致问题的代码!!!
代码大概的意思是:消费者没有指定合并策略,那么就会调用第一个有效的服务提供者,如果都是无效,就直接调用第一个。
我是没有指定合并策略的,所以会变成调用第一个有效的服务提供者。根本走不到负载均衡那里。
不指定合并策略不行,那指定呢?指定合并策略会怎样呢?
指定了合并策略,会调用所有invoker,然后用Merger合并结果。
这种很适合这样种场景:
一部分数据在服务A,一部分数据在服务B,需要分别调用A和B,然后把两者的结果集合并
设想着这样的方案:
指定了合并策略,那么所有分组的invoker都会被调用到,那么请求就到FailoverClusterInvoker了,负载均衡spi就生效了
修改一下负载均衡spi,如果目前调用的invoker不是指定分组的,那么就直接返回null
实现自己的合并spi,返回第一个不会null的结果
虽然不是什么优雅的方式,但是,似乎是能做到的???
实际上,行不通,达不到想要的效果。原因是MergeableClusterInvoker内部是用线程池并发调用的,ThreadLocal里的分组信息会丢失。
还有机会么
MergeableClusterInvoker类是没有留拓展的余地啦,还有其他机会么?MergeableClusterInvoker的Invokers列表从哪里来的?
Invokers是从directory来的,这里有没有拓展的余地呢?
有的,在AbstractDirectory的list方法
看到这,发现用路由spi,还是有机会的,如果实现动态路由,每次只给MergeableClusterInvoker返回指定分组的Invoker,是不是就可以呢?怎么让routers包含我们自定义的路由spi呢?
需要url上携带了router参数,但是这里的url的参数是由谁决定的?@Reference 注解是没有没有的指定router参数的…
最后debuig,兜兜转转,发现只能靠重写ReferenceConfig类的loadRegistries方法,往url上加上动态路由的参数。
成果
AssignGroupRouterFactory
public class AssignGroupRouterFactory implements RouterFactory {
public static final String NAME = "assignGroup";
@Override
public Router getRouter(URL url) {
return new AssignGroupRouter(url);
}
}
AssignGroupRouter
public class AssignGroupRouter implements Router {
private final URL url;
public AssignGroupRouter(URL url) {
this.url = url;
}
@Override
public URL getUrl() {
return url;
}
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
if (Objects.isNull(assignGroup)) {
return invokers;
}
return invokers.stream()
.filter(invoker -> {
URL invokerUrl = invoker.getUrl();
return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
}).collect(Collectors.toList());
}
@Override
public int compareTo(Router o) {
return 1;
}
}
AssignGroupReferenceConfig
public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
@Override
protected List<URL> loadRegistries(boolean provider) {
List<URL> urls = super.loadRegistries(provider);
if (CollectionUtils.isEmpty(urls)) {
return urls;
}
// 指定动态路由,路由方式为assignGroup
return urls.stream()
.map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
"true"))
.collect(Collectors.toList());
}
}
IndexInfoProviderInvokerProxy
@Component
public class IndexInfoProviderInvokerProxy {
public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();
private final IndexInfoProvider indexInfoProvider;
public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
this.indexInfoProvider = indexInfoProvider;
}
/**
* 获取文档id 在[start, end) 之间的所有索引文档
*
* @param indexType 索引名
* @param start 起始id
* @param end 结束id
* @return 文档列表
*/
public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
}
private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
INDEX_TYPE_THREAD_LOCAL.set(indexType);
T result = supplier.get();
INDEX_TYPE_THREAD_LOCAL.remove();
return result;
}
}
来源:https://blog.csdn.net/zidongxiangxi/article/details/109312544


猜你喜欢
- 一,设计多图片打包下载逻辑:1,如果是要拉取腾讯云等资源服务器的图片,2,我们先把远程图片拉取到本地的临时文件夹,3,然后压缩临时文件夹,4
- 自动配置底层源码分析本次springboot源码来自2.6.6版本。@EnableAutoConfiguration源码解析在springb
- 一、Stream流引入Lambda表达式,基于Lambda所带来的函数式编程,又引入了一个全新的Stream概念,用于解决集合类库既有的鼻端
- 1、实例解析先从一个例子开始:public class LambdaTest { public static vo
- 读取文件流时,经常会遇到乱码的现象,造成乱码的原因当然不可能是一个,这里主要介绍因为文件编码格式而导致的乱码的问题。首先,明确一点,文本文件
- 一:什么是classpath?classpath指的就是 *.java文件,资源文件等编译后存放的位置,对于maven项目就是指 targe
- 问题描述在应用MyBatis时,使用对象关系映射,将对象和Aliase映射起来。在Mybatis的文档明确写出,如果你没有明确定义实体类的A
- 本文实例为大家分享了Spring MVC接口防数据篡改和重复提交的具体代码,供大家参考,具体内容如下一、自定义一个注解,此注解可以使用在方法
- 本文实例为大家分享了利用Swing绘制一个动态时钟的具体代码,供大家参考,具体内容如下效果代码在下面,可跳过解析。前言编程实现一个时钟利用S
- 得到选中项的value值并拼接成一个字符串返回public string GetChecked(CheckBoxList checkList
- 本文实例为大家分享了PhotoView实现图片双击放大单击退出的具体代码,供大家参考,具体内容如下实现思路1.复制PhotoView&nbs
- 1.关系运算符!= 与等号共同组成关系运算符,检查两个操作数的值是否相等,如:A!=B2.逻辑运算符! 称为逻辑非运算符。用来逆转操作数的逻
- 一、Stream流简单示例需求:按照要求集合创建和遍历创建一个结合,存储多个字符串元素把集合中所有以"张"开头的元素存储
- 在Web应用系统开发中,文件上传和下载功能是非常常用的
- 在ibatis的xml文件里,我们去写sql语句,对应mapper类的方法,这些sql语句与控制台上没什么两样,但在有些功能上需要注意,如w
- 引言Java反射机制是一个非常强大的功能,在很多大型项目比如Spring, Mybatis都可以看见反射的身影。通过反射机制我们可以在运行期
- Command 常用属性CommText 要下达至数据源的命令CommanTimeout 出错等待时间Command 三种方法Execute
- 本文实例为大家分享了Java实现人机猜拳游戏的具体代码,供大家参考,具体内容如下实现:User类public class User { pr
- 修改JSP,刷新一下JSP结果就报错,错误如下:严重: Servlet.service() for servlet jsp threw ex
- 本文实例讲述了Java TreeSet实现学生按年龄大小和姓名排序的方法。分享给大家供大家参考,具体如下:import java.util.