Netty实现简易版的RPC框架过程详解
作者:我是小趴菜 发布时间:2023-05-23 23:19:58
项目地址:gitee.com/baojh123/rp…
netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的
1:如何运行项目
1:本地起一个zookeeper服务
2: 只需要运行 rpc-server 和 springboot-zk-study二个项目即可
3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息
4:启动好之后,在浏览器访问
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回结果
2:从客户端调用开始(springboot-zk-study项目)
@RestController
@RequestMapping("/zk")
public class ZkController {
@Resource
@MyResource
private UserService userService;
@Resource
@MyResource
private PeopleService peopleService;
@GetMapping("/test")
public String test() {
return userService.test("bjh-",1);
}
@GetMapping("/people")
public Object people() {
return peopleService.query(1L);
}
@GetMapping("/list")
public Object list() {
return peopleService.list();
}
}
只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法
@Slf4j
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
......省略一些代码
// 客户端执行方法之后,就会执行到这里的代理方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//从注册中心拿到服务列表
ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
for(ZkProperties zkProperties : zkPropertiesList) {
String interfaceName = zkProperties.getInterfaceName();
Class<?> declaringClass = method.getDeclaringClass();
if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
List<InterfaceInfo> info = zkProperties.getInfo();
InterfaceInfo interfaceInfo = info.get(0);
String ipAddress = interfaceInfo.getIpAddress();
List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
InterfaceImplInfo implInfo = interfaceImplInfo.get(0);
String[] strings = ipAddress.split(":");
//与远程Netty服务端发起连接
RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort());
/**
* 封装请求参数
*/
//获取方法参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
List<String> types = getTypes(parameterTypes);
//同步调用
result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
log.info("返回结果是:{}",result);
}
}
Class<?> returnType = method.getReturnType();
Object value = objectMapper.readValue(result.toString(), returnType);
return value;
}
private RpcClient connNettyServer(String ipAddress,Integer port) {
return new RpcClient(ipAddress,port);
}
private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
RpcMessage rpcMessage = new RpcMessage();
......
//发送请求
Response result = rpcClient.sendRequest(rpcMessage);
log.info("请求结果是:{}", JSONUtil.toJsonPrettyStr(result));
return result.getData();
}
......省略一些代码
我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码
@Slf4j
public class RpcClient {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap;
private String ip;
private Integer port;
RpcClientHandler rpcClientHandler;
private ChannelFuture channelFuture;
public RpcClient(String ip,Integer port) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
rpcClientHandler = new RpcClientHandler();
ch.pipeline().addLast(new RpcDecoder());
ch.pipeline().addLast(new RpcEncoder());
ch.pipeline().addLast(rpcClientHandler);
}
});
try {
// 和远程Nett服务端建立连接
channelFuture = bootstrap.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Response sendRequest(RpcMessage rpcMessage) throws Exception{
//发送请求
channelFuture.channel().writeAndFlush(rpcMessage).sync();
channelFuture.channel().closeFuture().sync();
log.info("获取返回结果=====================");
Response response = rpcClientHandler.getResponse();
return response;
}
}
客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的
3:服务端处理请求
服务端处理请求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
Object obj = rpcMessage.getObj();
RpcMessage rpcMessageResponse = new RpcMessage();
Response response = new Response();
try{
Request request = objectMapper.readValue(obj.toString(), Request.class);
String interfaceImplName = request.getInterfaceImplName();
Class<?> aClass = Class.forName(interfaceImplName);
List<String> paramsTypes = request.getParamsTypes();
try {
Object result = null;
//判读方法是有参数的还是没有参数的
if(paramsTypes.isEmpty()) {
Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
result = declaredMethod.invoke(aClass.newInstance());
}else {
Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray());
Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
Object[] args = (Object[]) paramsObjectMap.get("args");
result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
}
log.info("返回结果是:{}",result);
response.setData(objectMapper.writeValueAsString(result));
response.setIsOk(1);
response.setErrInfo("error");
rpcMessageResponse.setObj(response);
} catch (Throwable throwable) {
throwable.printStackTrace();
response.setData("error");
response.setIsOk(0);
response.setErrInfo(throwable.getMessage());
rpcMessageResponse.setObj(response);
}
}catch (Exception e) {
response.setData("error");
response.setIsOk(0);
response.setErrInfo(e.getMessage());
rpcMessageResponse.setObj(response);
}
String valueAsString = objectMapper.writeValueAsString(response);
rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length);
rpcMessageResponse.setObj(valueAsString);
channelHandlerContext.writeAndFlush(rpcMessageResponse);
}
}
服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可
4:接下来要做什么
上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如
1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢
2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的
@Resource
@MyResource
private PeopleService peopleService;
3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下
{
"zkPropertiesList": [{
"interfaceName": "com.bjh.service.PeopleService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.PeopleServiceImpl",
"value": "com.bjh.service.PeopleServiceImpl"
}]
}]
}, {
"interfaceName": "com.bjh.service.UserService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.UserServiceImpl",
"value": "com.bjh.service.UserServiceImpl"
}]
}]
}]
}
4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的
@Service
public class PeopleServiceImpl implements PeopleService{
@Override
public People query(long id) {
People people = new People();
people.setId(id);
people.setName("coco");
return people;
}
@Override
public List<People> list() {
List<People> list = new ArrayList<>();
People people = new People();
people.setId(123L);
people.setName("coco");
People people2 = new People();
people2.setId(124L);
people2.setName("baojh");
list.add(people);
list.add(people2);
return list;
}
}
5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新
来源:https://juejin.cn/post/7198041700563877945


猜你喜欢
- 概述从今天开始, 小白我将带大家开启 Java 数据结构 & 算法的新篇章.获取哈希值hashCode()方法可以返回一个对象的哈希
- 面试题一:判断下列程序运行结果package String_test;public class test_1 { public static
- 前言现在市面上很多应用都会有当用户按返回键的时候提示用户:再按一次将退出应用的提示,也就是双击双击返回键退出应用,接下来我们就用几种办法来实
- 树形结构很多地方都有应用,比如我们在构造网站后台的授权限树的时候,再比如我们在设计多级留言的时候、还有分类等等。有些时候我们的树形结构并不需
- 举个例子:我有如下的一个需求,当我想要取得用户信息的时候,会先从本地缓存中查找,找不到然后从分布式缓存中查找,最后找不到再从数据库中查询。但
- @Value注解内使用SPEL自定义函数@Value("#{T(com.cheetah.provider.utils.String
- 协议做如下规定:规定数据协议:序列号 长度 状态字 数据长度 数据1 &n
- 配置两个parent的方法在向pom.xml 文件中添加依赖之前需要先添加spring-boot-starter-parent。spring
- Logback TurboFilter实现日志级别等内容的动态修改可能看到这个标题,读者会问:要修改日志的级别,不是直接修改log.xxx就
- 作者: juky_huang 事件的简单解释: 事件是对象发送的消息,以发信号通知操作的发生。操作可能是由用户交互(例如
- js和python是两种语言,js处理网页数据,python可作为服务端开发,两者通过websocket进行通信。websocket是soc
- 在android6.0之后谷歌对指纹识别进行了官方支持,今天还在放假,所以就随意尝试了一下这个api,但是遇到了各种各样的问题 ①
- dart 是一个面向对象的语言;面向对象有继承封装多态dart的所有东西都是对象,所有的对象都是继承与object类一个类通常是由属性和方法
- 本文实例讲述了Android string.xml中的替换方法。分享给大家供大家参考,具体如下:在android的开发中,经常会遇见一句话,
- 一.字符串函数1. 求字符串长度的strlensize_t strlen ( const char * str );字符串以 ‘\0'
- 首先说明:如果没有进入调试模式的话,默认的调试窗口如下: 开始前的准备: 新建控制台程序DebugWindowDemo:修改Program.
- 传统“长轮询”实现Web端即时通讯的问题WebSocket出现之前,Web端为了实现即时通讯,所用的技术都是Ajax轮询(polling)。
- Android 显示刷新频率android11-release 开发者选项->显示刷新频率packages/apps/Settings
- @property可以将python定义的函数“当做”属性访问,从而提供更加友好访问方式,但是有时候setter/deleter也是需要的。
- 本文实例所述为C#生成随机数的类文件,按要求产生一些随机数,最大值、最小值可以自己进行设定。代码简单,可放在你的公共库内供调用使用。类文件具