dubbo源码实践-Exchange 信息交换层例子
创始人
2024-05-07 05:18:07
0

1 Exchange 层概述

官方定义:

exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。

其中Exchanger是SPI扩展点,是该层的入口。其中客户端通过ExchangeClient.request发送请求,服务端通过ExchangeHandler的reply方法处理请求并返回结果。

为了理解上面官方的定义,下面将使用该层的类创建一个客户端和服务器端的应用。

2 实践例子

2.1 项目结构

由于是TCP框架,所以有服务端和客户端,两端的代码。

服务端代码:ExchangeServerTest 启动类,AlfServerExchangeHandler服务端的业务逻辑处理类(类似Netty的Handler作用)。

客户端代码:ExchangeClientTest 启动类,AlfClientExchangeHandler 客户端的业务逻辑处理类。

2.2 服务端代码

ExchangeServerTest类,使用Exchanger接口绑定(bind)端口8888,启动服务器。

注意URL中添加的codec属性,如果不添加程序会走telnet的实现,程序会报错。可以参考:可以参考AbstractEndpoint的getChannelCodec函数。

package org.example.dubbo.exchange;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchanger;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;import java.io.IOException;/** 服务端代码 */
public class ExchangeServerTest {public static void main(String[] args) throws RemotingException, IOException {//构建URL, dubbo中靠URL来传递参数URLBuilder urlBuilder = new URLBuilder();urlBuilder.setHost("localhost");urlBuilder.setPort(8888);//指定超时事件, 调试时防止超时urlBuilder.addParameter("codec", "exchange");URL url = urlBuilder.build();//Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchangerExchanger exchanger = new HeaderExchanger();//服务端调用bind方法ExchangeServer exchangeServer = exchanger.bind(url, new AlfServerExchangeHandler());System.out.println("服务器启动完成");//等待,防止程序提前结束System.in.read();}
}

AlfServerExchangeHandler类,主要关注reply方法,该方法处理客户端发来的请求,然后通过CompletableFuture异步返回给客户端。还记得官方说的“同步转异步”吗?这里是一个体现。

package org.example.dubbo.exchange;import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;import java.util.concurrent.CompletableFuture;/** 服务端业务处理 */
public class AlfServerExchangeHandler implements ExchangeHandler {@Overridepublic CompletableFuture reply(ExchangeChannel channel, Object request) throws RemotingException {System.out.println("reply AAA, request=" + request);CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(() -> "服务器8888为你服务");return stringCompletableFuture;}@Overridepublic void connected(Channel channel) throws RemotingException {System.out.println("connected AAA");}@Overridepublic void disconnected(Channel channel) throws RemotingException {System.out.println("disconnected AAA");}@Overridepublic void sent(Channel channel, Object message) throws RemotingException {System.out.println("sent AAA, message =" + message);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {System.out.println("received AAA");}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {System.out.println("caught AAA");exception.printStackTrace();}@Overridepublic String telnet(Channel channel, String message) throws RemotingException {System.out.println("telnet AAA, message = " + message);return null;}
}

2.3 客户端代码

ExchangeClientTest客户端代码入口,使用Exchanger接口连接(connect)函数来连接本机的8888端口。

注意一下CompletableFuture completableFuture = exchangeClient.request("你是谁?", null); 这句代码,通过是哦也能够CompletableFuture,把获取相应结果异步了。

package org.example.dubbo.exchange;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.*;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/** 客户端*/
public class ExchangeClientTest {public static void main(String[] args) throws RemotingException, IOException, ExecutionException, InterruptedException {//构建URL, dubbo中靠URL来传递参数URLBuilder urlBuilder = new URLBuilder();urlBuilder.setHost("localhost");urlBuilder.setPort(8888);//指定超时事件, 调试时防止超时urlBuilder.addParameter("timeout", 1000 * 200);//指定编码器,可以参考AbstractEndpoint的getChannelCodec函数urlBuilder.addParameter("codec", "exchange");URL url = urlBuilder.build();//Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchangerExchanger exchanger = new HeaderExchanger();//客户端调用connect方法连接服务端ExchangeClient exchangeClient = exchanger.connect(url, new AlfClientExchangeHandler());//发送消息CompletableFuture completableFuture = exchangeClient.request("你是谁?", null);System.out.println("客户端发送消息----------");Object o = completableFuture.get();System.out.println("客户端接收到消息----------" + o);//等待,防止程序提前结束System.in.read();}
}

AlfClientExchangeHandler类,业务处理器。主要是展示一下发送消息和获取响应时被掉用的方法。

package org.example.dubbo.exchange;import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;import java.util.concurrent.CompletableFuture;/** 客户端处理器*/
public class AlfClientExchangeHandler implements ExchangeHandler {@Overridepublic CompletableFuture reply(ExchangeChannel channel, Object request) throws RemotingException {//本例中是客户端发送的请求,所以该方法不会被调用的System.out.println("reply BBB, request=" + request);CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(() -> "客户端返回数据");return stringCompletableFuture;}@Overridepublic void connected(Channel channel) throws RemotingException {System.out.println("connected BBB");}@Overridepublic void disconnected(Channel channel) throws RemotingException {System.out.println("disconnected BBB");}@Overridepublic void sent(Channel channel, Object message) throws RemotingException {System.out.println("sent BBB, message =" + message);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {System.out.println("received BBB");}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {System.out.println("caught BBB");}@Overridepublic String telnet(Channel channel, String message) throws RemotingException {System.out.println("telnet BBB, message = " + message);return null;}
}

2.4 运行结果

启动服务端,再启动客户端,然后查看日志。

2.4.1服务端日志

1处:服务器启动成功。

2处:AlfServerExchangeHandler中的connected方法别回调了,表明有客户端连接了服务器。

3 处:AlfServerExchangeHandler中的reply方法别回调了,表明服务器收到了客户端发送的Request请求,并在这个方法中进行逻辑处理。

4 处:AlfServerExchangeHandler中的sent方法别回调了,表明服务器发送了一个消息(这里的消息其实是我们收到request处理完毕后的返回结果Response对象)。

2.4.2 客户端日志

1 处:AlfClientExchangeHandler中的connected方法被回调了,表明客户端连接服务器成功了。

2 处:代码中打印的输出System.out.println("客户端发送消息----------")

3 处:AlfClientExchangeHandler中的sent方法被回调了,表明客户端发送了一个请求(Request对象)。

4 处:通过CompletableFuture对象获取服务器的返回内容(这里是字符串)

3 总结

看完上面的代码例子,应该能理解“exchange 信息交换层:封装请求响应模式,同步转异步。”的含义了吧。😁

顺便提一下,看到图中的request方法和reply方法了吗?Protocol层会调用,其实就是我们例子中调用的发送Request和响应Response的两个方法。

相关内容

热门资讯

安卓子系统windows11,... 你知道吗?最近科技圈可是炸开了锅,因为安卓子系统在Windows 11上的兼容性成了大家热议的话题。...
电脑里怎么下载安卓系统,电脑端... 你有没有想过,你的电脑里也能装上安卓系统呢?没错,就是那个让你手机不离手的安卓!今天,就让我来带你一...
索尼相机魔改安卓系统,魔改系统... 你知道吗?最近在摄影圈里掀起了一股热潮,那就是索尼相机魔改安卓系统。这可不是一般的改装,而是让这些专...
安卓系统哪家的最流畅,安卓系统... 你有没有想过,为什么你的手机有时候像蜗牛一样慢吞吞的,而别人的手机却能像风一样快?这背后,其实就是安...
安卓最新系统4.42,深度解析... 你有没有发现,你的安卓手机最近是不是有点儿不一样了?没错,就是那个一直在默默更新的安卓最新系统4.4...
android和安卓什么系统最... 你有没有想过,你的安卓手机到底是用的是什么系统呢?是不是有时候觉得手机卡顿,运行缓慢,其实跟这个系统...
平板装安卓xp系统好,探索复古... 你有没有想过,把安卓系统装到平板上,再配上XP系统,这会是怎样一番景象呢?想象一边享受着安卓的便捷,...
投影仪装安卓系统,开启智能投影... 你有没有想过,家里的老式投影仪也能焕发第二春呢?没错,就是那个曾经陪你熬夜看电影的“老伙计”,现在它...
安卓系统无线车载carplay... 你有没有想过,开车的时候也能享受到苹果设备的便利呢?没错,就是那个让你在日常生活中离不开的iOS系统...
谷歌安卓8系统包,系统包解析与... 你有没有发现,手机更新换代的速度简直就像坐上了火箭呢?这不,最近谷歌又发布了安卓8系统包,听说这个新...
微软平板下软件安卓系统,开启全... 你有没有想过,在微软平板上也能畅享安卓系统的乐趣呢?没错,这就是今天我要跟你分享的神奇故事。想象你手...
coloros是基于安卓系统吗... 你有没有想过,手机里的那个色彩斑斓的界面,背后其实有着一个有趣的故事呢?没错,我要说的就是Color...
安卓神盾系统应用市场,一站式智... 你有没有发现,手机里的安卓神盾系统应用市场最近可是火得一塌糊涂啊!这不,我就来给你好好扒一扒,看看这...
黑莓平板安卓系统升级,解锁无限... 亲爱的读者们,你是否还记得那个曾经风靡一时的黑莓手机?那个标志性的全键盘,那个独特的黑莓体验,如今它...
安卓文件系统采用华为,探索高效... 你知道吗?最近安卓系统在文件管理上可是有了大动作呢!华为这个科技巨头,竟然悄悄地给安卓文件系统来了个...
深度系统能用安卓app,探索智... 你知道吗?现在科技的发展真是让人惊叹不已!今天,我要给你揭秘一个超级酷炫的话题——深度系统能用安卓a...
安卓系统的分区类型,深度解析存... 你有没有发现,你的安卓手机里藏着不少秘密?没错,就是那些神秘的分区类型。今天,就让我带你一探究竟,揭...
安卓系统铠无法兑换,揭秘无法兑... 最近是不是有很多小伙伴在玩安卓系统的游戏,突然发现了一个让人头疼的问题——铠无法兑换!别急,今天就来...
汽车安卓系统崩溃怎么刷,一键刷... 亲爱的车主朋友们,你是否曾遇到过汽车安卓系统崩溃的尴尬时刻?手机系统崩溃还能重启,但汽车系统崩溃了,...
miui系统可以刷安卓p系统吗... 亲爱的手机控们,你是否对MIUI系统情有独钟,同时又对安卓P系统的新鲜功能垂涎欲滴?今天,就让我带你...