Java基础之《netty(31)—用netty实现RPC》
创始人
2024-05-16 00:01:40
0

一、需求说明

1、dubbo底层使用了netty作为网络通讯框架,要求使用netty实现一个简单的RPC框架。

2、模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用netty4.x。

二、设计说明

1、创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。

三、代码

1、接口
HelloService.java

package netty.dubborpc.publicinterface;/*** 接口,服务提供方和服务消费方都需要* @author user**/
public interface HelloService {public String hello(String msg);
}

2、netty服务端
NettyServer.java

package netty.dubborpc.netty;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class NettyServer {public static void startServer(String hostname, int port) {startServer0(hostname, port);}private static void startServer0(String hostname, int port) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(8);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler()); //业务处理类}}); //自定义一个初始化类ChannelFuture cf = bootstrap.bind(7000).sync();System.out.println("服务提供方启动,开始监听了......");cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler.java

package netty.dubborpc.netty;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import netty.dubborpc.provider.HelloServiceImpl;public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//获取客户端发送的消息,并调用服务System.out.println("msg=" + msg);//客户端在调用服务器的api时,我们需要定义一个协议//比如我们要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#"if (msg.toString().startsWith("HelloService#hello#")) {//去除协议头//这里可以用反射生成处理类//还要考虑粘包拆包问题String result = new HelloServiceImpl().hello(msg.toString().substring(19));ctx.writeAndFlush(result);}}
}

3、netty客户端
NettyClient.java

package netty.dubborpc.netty;import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class NettyClient {//创建线程池private static ExecutorService executor = Executors.newFixedThreadPool(10);private static NettyClientHandler client;//编写方法,使用代理模式获取代理对象public Object getBean(final Class serviceClass, final String protocolHeader) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {serviceClass}, //lambel表达式就是实现的接口InvocationHangler的invoke方法(proxy, method, args) -> {//这部分代码,客户端每调用一次hello,就会进入该代码块if (client == null) {initClient();initConnect();}//设置要发给服务器的信息//protocolHeader协议头,args[0]就是客户端调用api hello()里传的参数client.setParam(protocolHeader + args[0]);return executor.submit(client).get();});}//初始化客户端private static void initClient() {client = new NettyClientHandler();}//初始化连接private static void initConnect() {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new LoggingHandler(LogLevel.DEBUG)).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(client);}}); //自定义一个初始化对象ChannelFuture cf = bootstrap.connect("127.0.0.1", 7000).sync();//这里不能阻塞必须返回,因为后续代理还要调用call方法,所以不能closeFuture sync,但可以对closeFuture加一个listener回调//cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//这里也不能shutdown//group.shutdownGracefully();}}
}

NettyClientHandler.java

package netty.dubborpc.netty;import java.util.concurrent.Callable;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {//定义属性//感觉这三个变量都有并发问题???private ChannelHandlerContext context; //上下文private String result; //调用后返回的结果private String param; //客户端调用方法时,传入的参数/*** Callable接口有一个非常重要的方法call()* 被代理对象调用,发送数据给服务器 -> wait -> 等待被唤醒 -> 返回结果* 一句话,就是客户端要自己控制什么时候发消息,channelActive不行,而且还要等待结果返回所以要wait,等channelRead返回后call再返回*/@Overridepublic synchronized Object call() throws Exception {System.out.println("call 被调用 before");context.writeAndFlush(param); //把参数发过去//进行waitwait(); //等待channelRead方法获取到服务器的结果后,唤醒System.out.println("call 被调用 after");return result; //服务方返回的结果}/*** 与服务器的连接创建成功后,就会被调用*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive 被调用");if (context == null) {context = ctx; //因为在其他方法会使用到这个ctx}}/*** 收到服务器的数据后,就会被调用*/@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("channelRead 被调用");result = msg.toString();notify(); //唤醒等待的线程}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}public void setParam(String param) {System.out.println("setParam 被调用");this.param = param;}} 

4、服务提供者
ServerBootstrap.java

package netty.dubborpc.provider;import netty.dubborpc.netty.NettyServer;/*** ServerBootstrap会启动一个服务提供者,就是NettyServer* @author user**/
public class ServerBootstrap {public static void main(String[] args) {NettyServer.startServer("127.0.0.1", 7000);}
}

HelloServiceImpl.java

package netty.dubborpc.provider;import netty.dubborpc.publicinterface.HelloService;public class HelloServiceImpl implements HelloService {private int count = 0;@Overridepublic String hello(String msg) {System.out.println("收到客户端消息:" + msg);//根据msg返回不同的结果if (msg != null) {return "你好客户端,我已经收到你的消息 [" + msg + "] 第" + (++count) + "次";} else {return "你好客户端,消息为空";}}}

5、服务消费者
ClientBootstrap.java

package netty.dubborpc.consumer;import netty.dubborpc.netty.NettyClient;
import netty.dubborpc.publicinterface.HelloService;public class ClientBootstrap {//这里定义协议头public static final String providerName = "HelloService#hello#";public static void main(String[] args) {//创建一个消费者NettyClient consumer = new NettyClient();for (int i=0; i<10; i++) {//创建代理对象HelloService helloService = (HelloService)consumer.getBean(HelloService.class, providerName);//通过代理对象调用服务提供者的方法String result = helloService.hello("你好 RPC~");System.out.println("调用结果 result:" + result);System.out.println("-------------------------");}}
}

四、执行结果

1、服务端

服务提供方启动,开始监听了......
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~

2、客户端

channelActive 被调用
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------

五、说明

1、netty客户端和服务端之间是长连接,如果要短连接用socket连服务端就好了,netty客户端做短连接不合适,EventLoopGroup每次创建销毁开销太大了。
2、NettyClientHandler类内部的调用顺序:第一个调用的channelActive(首次时),第二个调用的setParam,第三个调用的call,然后wait,第四个调用channelRead,然后notify,唤醒线程继续在call里执行
3、NettyClientHandler类是所有请求共用的,它的成员变量有并发问题
4、客户端每调用一次,服务端都产生新的HelloServiceImpl对象(这个是自己new出来的)
5、客户端的channelActive只会在连接成功后被调用一次
6、NettyClient类里其实不需要线程池,因为客户端就只有一个,顶多newSingleThreadExecutor。如果真的起多个客户端notify唤醒谁?
7、用户线程想拿到结果需要等待react线程唤醒。context.write方法是在用户线程调用,netty会把这个读封装成task任务丢到react线程。最终发送到服务端,服务端返回结果,客户端在react线程读取
8、客户端NettyClient和NettyClientHandler只有一个实例,是自己new出来的

六、问题

这里demo的客户端有并发和性能问题

相关内容

热门资讯

怎么解除订阅安卓系统,安卓系统... 你是不是也和我一样,手机里订阅了好多服务,结果现在想解除订阅,却一头雾水?别急,今天就来手把手教你如...
安卓系统停用怎么开启,轻松恢复... 亲爱的手机控们,你是否曾经遇到过安卓系统突然停用的情况,让你手忙脚乱,不知所措?别担心,今天就来教你...
安卓系统电池健康度,电池健康度... 你有没有发现,你的安卓手机最近是不是有点儿不给力了?电池续航能力大不如前,充电速度也慢了不少?别急,...
安卓系统按键怎么截图,安卓系统... 你是不是也和我一样,有时候想截个图分享给朋友,却发现安卓手机的截图功能有点神秘呢?别急,今天就来手把...
购票系统安卓源代码,架构设计与... 你有没有想过,那些我们每天离不开的购票系统,它们背后的秘密是什么呢?今天,就让我带你一探究竟,揭开购...
安卓手机系统后台测试,深度解析... 你有没有发现,你的安卓手机后台总是悄悄地忙碌着?别小看了这些后台程序,它们可是手机系统稳定运行的关键...
安卓系统重启的图标,解锁设备新... 手机突然重启,是不是心里有点慌?别急,今天就来和你聊聊安卓系统重启的图标,让你一眼就能认出它,再也不...
车载智慧屏安卓系统,智能出行新... 你有没有发现,现在的车载智慧屏越来越智能了?尤其是那些搭载了安卓系统的,简直就像是个移动的小电脑,不...
安卓系统连上网权限,解锁设备无... 你有没有发现,你的安卓手机里有些应用总是偷偷连上网?别小看这个小小的网络权限,它可是能影响你隐私、消...
安卓谷歌操作系统,探索安卓谷歌... 你知道吗?在智能手机的世界里,有一个操作系统可是无人不知、无人不晓,那就是安卓谷歌操作系统。它就像一...
安卓系统手写%怎样调出,具体实... 你有没有遇到过这种情况:在使用安卓手机的时候,突然想用手写输入法来记录一些灵感或者重要信息,可是怎么...
安卓手机重置 系统设置,轻松恢... 手机用久了是不是感觉卡顿得厉害?别急,今天就来教你怎么给安卓手机来个大变身——重置系统设置!想象你的...
win如何安装安卓系统,Win... 哇,你有没有想过,让你的Win系统也能玩转安卓应用?没错,就是那种在手机上轻松自如的安卓系统,现在也...
苹果qq和安卓系统,跨平台体验... 你有没有发现,现在手机市场上,苹果和安卓的较量可是越来越激烈了呢!咱们就来聊聊这个话题,看看苹果QQ...
显示最好的安卓系统,探索最新旗... 你有没有想过,为什么安卓系统那么受欢迎呢?它就像一个魔法盒子,里面装满了各种神奇的魔法。今天,就让我...
安卓app怎么降级系统,系统版... 你有没有发现,有时候安卓手机的系统更新后,新功能虽然炫酷,但老系统用起来更顺手呢?别急,今天就来教你...
雷军脱离安卓系统,引领科技变革... 你知道吗?最近科技圈可是炸开了锅,因为我们的雷军大大竟然宣布要脱离安卓系统,这可真是让人大跌眼镜啊!...
安卓系统自动开网络,安卓系统自... 你有没有发现,手机里的安卓系统有时候会自动开启网络连接,这可真是让人又爱又恨啊!有时候,你正专心致志...
安卓系统怎样控制后台,因为服务... 手机里的安卓系统是不是感觉越来越卡了?后台程序太多,不仅耗电还影响性能。别急,今天就来教你怎么巧妙地...
安卓系统打游戏推荐,一触即达! 你有没有发现,现在手机游戏越来越好玩了?不管是休闲小游戏还是大型MMORPG,都能在手机上畅玩。但是...