Spring boot 整合websocket 客户端
创始人
2024-05-03 07:31:56
0

Spring boot 整合websocket 客户端

      • 前言
        • 一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类
        • 二、DeviceWebsocketClient.java 子类根据实际场景重写方法
        • 三、WorkPoolConfig.java 线程池
        • 三、实际场景封装
        • 四、启动客户端

前言

在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案

一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类

其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置

package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/*** @Author huasheng* @Date 2022/11/1 9:49* @Description*/
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {//客户端标识private String clientName;//客户端连接状态private boolean isConnect = false;//spring包下的线程池类private ThreadPoolTaskExecutor workPoolScheduler;public BaseWebsocketClient(URI serverUri, Map httpHeaders,String clientName,ThreadPoolTaskExecutor workPoolScheduler) {super(serverUri, new Draft_6455(), httpHeaders, 0);this.clientName = clientName;this.workPoolScheduler = workPoolScheduler;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {}@Overridepublic void onMessage(String s) {}/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/@Overridepublic void onClose(int i, String s, boolean b) {log.info("------ {} onClose ------{}", clientName, b);setConnectState(false);recontact();}/***检测到错误,更新连接状态***/@Overridepublic void onError(Exception e) {log.info("------ {} onError ------{}", clientName, e);setConnectState(false);}public void setConnectState(boolean isConnect) {this.isConnect = isConnect;}public boolean getConnectState(){return this.isConnect;}public ThreadPoolTaskExecutor getWorkPoolScheduler() {return workPoolScheduler;}/*** 重连*/public void recontact() {workPoolScheduler.execute(() -> {Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );try {Thread.sleep(10000);log.info("重连开始");if (isConnect) {log.info("{} 重连停止", clientName);return;}this.reconnect();log.info("重连结束");} catch (Exception e) {log.info("{} 重连失败", clientName);}});}
}

二、DeviceWebsocketClient.java 子类根据实际场景重写方法

package com.enrising.ctsc.park.security.service;import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/****/
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";private static final String SUBSCRIBE = "subscribe";private WebSocketReportService deviceService;/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
//	private String sendStr = "{\n" +
//			"    \"method\": \"subscribe\",\n" +
//			"    \"params\": \"device\"\n" +
//			"}";private String sendStr = "hello";/*** 建立连接* @param serverUri serverUri* @param httpHeaders httpHeaders* @param workPoolScheduler workPoolScheduler* @param deviceService deviceService*/public DeviceWebsocketClient(URI serverUri, Map httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);this.deviceService = deviceService;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("------ {} onOpen ------", ACS_CTRL_RESULT);this.send(sendStr);setConnectState(true);}public void sendMessage(String str) {log.info("------ {} send ------", str);this.send(str);setConnectState(true);}@Overridepublic void onMessage(String msg) {log.info("WebSocketReportService.onMessage()接收消息={}", msg);ThreadUtil.execAsync(() -> {MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);try {//业务代码deviceService.saveDeviceReportInfo(msg);} catch (Exception e) {log.info("WebSocketReportService.onMessage()上报异常={}", e);}//请求完成,从MDC中移除requestIdMDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);});log.info("WebSocketReportService.onMessage()推送结束={}", msg);}
}

三、WorkPoolConfig.java 线程池

配置文件

#线程池配置
settings: work-pool: core-pool-size: 10max-pool-size: 20queue-capacity: 200

实体类

package com.enrising.ctsc.park.security.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** @author huasheng* @date 2022/11/1*/
@Configuration
public class WorkPoolConfig {@Value("${settings.work-pool.core-pool-size}")private Integer workPoolCoreSize;@Value("${settings.work-pool.max-pool-size}")private Integer workPoolMaxSize;@Value("${settings.work-pool.queue-capacity}")private Integer queueCapacity;@Bean("workPoolScheduler")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(workPoolCoreSize);executor.setMaxPoolSize(workPoolMaxSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("-device-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);return executor;}
}

三、实际场景封装

WebSocketReportService .java

package xxx.service;/*** 公共父类*/
public interface WebSocketReportService {/*** @throws Exception* @throws* @Title: saveDeviceReportInfo* @Description: 保存设备上报信息* @param: @param data 参数说明* @return: void 返回类型*/void saveDeviceReportInfo(Object report) throws Exception;}

四、启动客户端

配置文件

#开关
api:initListen:sideSlop:device:open: false

根据实际场景用到两个实例,监听并各自处理

package com.enrising.ctsc.park.security.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @Author huasheng* @Date 2022/12/3 17:04* @Description*/
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {@Resourceprivate 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;@Resourceprivate 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;@Resourceprivate ThreadPoolTaskExecutor workPoolScheduler;static DeviceWebsocketClient deviceWebsocketClient;static DeviceWebsocketClient warnWebsocketClient;@PostConstructpublic void start() {try {log.info("start to receive device data");URI uri = new URI("ws://127.0.0.1:8888");Map httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);deviceWebsocketClient.connect();}catch (Exception e){log.error("start to receive device data failed", e);}}@PostConstructpublic void startStatus() {try {log.info("start to receive device status");URI uri = new URI("ws://127.0.0.1:8889");Map httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);warnWebsocketClient.connect();}catch (Exception e){log.error("start to receive device status failed", e);}}public void sendMessage(String str) {deviceWebsocketClient.sendMessage(str);}}

相关内容

热门资讯

安卓系统自带的网页,功能与特色... 你有没有发现,每次打开安卓手机,那熟悉的系统界面里总有一个默默无闻的小家伙——安卓系统自带的网页浏览...
美咖云系统安卓版,开启智能生活... 你有没有发现,最近手机上多了一个叫“美咖云系统安卓版”的小家伙?它就像一个魔法师,轻轻一点,就能让你...
安卓系统推荐最好的手机,盘点性... 你有没有想过,拥有一部性能卓越的手机,就像是拥有了移动的宝藏库?在这个信息爆炸的时代,一部好手机不仅...
安卓11系统能精简吗,释放潜能 你有没有发现,随着手机越来越智能,系统也越来越庞大?安卓11系统,这个最新的操作系统,是不是也让你觉...
安卓自动重启系统软件,揭秘原因... 手机突然自动重启,是不是感觉整个人都不好了?别急,今天就来和你聊聊这个让人头疼的安卓自动重启系统软件...
苹果手机x刷安卓系统,探索安卓... 你有没有想过,你的苹果手机X竟然也能刷上安卓系统?是的,你没听错,就是那个一直以来都和我们苹果手机X...
安卓系统智商低吗,智商低下的真... 你有没有想过,为什么安卓系统的智商总被调侃得好像有点低呢?是不是觉得它总是慢吞吞的,有时候还犯点小错...
安卓系统手机联系人,揭秘你的社... 你有没有发现,手机里的联系人列表就像是一个小小的社交圈呢?里面藏着我们的亲朋好友、工作伙伴,甚至还有...
安卓系统免费铃声下载,打造个性... 手机里那首老掉牙的铃声是不是让你觉得有点out了呢?别急,今天就来给你支个招,让你轻松给安卓手机换上...
安卓系统用哪个桌面好,打造个性... 你有没有发现,手机桌面可是我们每天都要面对的“脸面”呢?换一个好看的桌面,心情都能跟着好起来。那么,...
虚拟大师是安卓10系统,功能与... 你知道吗?最近在手机圈里,有个新玩意儿引起了不小的轰动,那就是虚拟大师!而且,更让人惊喜的是,这个虚...
安卓系统与苹果优缺点,系统优缺... 说到手机操作系统,安卓和苹果绝对是两大巨头,它们各有各的特色,就像两道不同的美味佳肴,让人难以抉择。...
安卓win双系统主板,融合与创... 你有没有想过,一台电脑如果既能流畅运行安卓系统,又能轻松驾驭Windows系统,那该有多爽啊?没错,...
安卓系统可精简软件,轻松提升手... 你有没有发现,手机里的安卓系统越来越庞大,软件也越装越多,有时候感觉手机就像个“大肚子”,不仅运行速...
安卓系统基于linux的代码,... 你有没有想过,那个陪伴你每天刷抖音、玩游戏、办公的安卓系统,其实背后有着一套复杂的基于Linux的代...
苹果和安卓的拍照系统,谁更胜一... 你有没有发现,现在手机拍照已经成为我们生活中不可或缺的一部分呢?无论是记录生活的点滴,还是捕捉美丽的...
苹果和安卓系统不同吗,系统差异... 你有没有想过,为什么你的手机里装的是苹果的iOS系统,而朋友的手机却是安卓系统呢?这两种系统,看似都...
安卓系统有多少级,揭秘其多级架... 你有没有想过,那个陪伴我们日常生活的安卓系统,它其实有着丰富的层级结构呢?没错,就是那个让我们的手机...
华为鸿蒙系统与安卓的,技术融合... 你知道吗?最近科技圈可是炸开了锅,华为鸿蒙系统与安卓的较量成为了大家热议的话题。这不,今天我就来给你...
什么安卓手机是苹果系统,搭载苹... 你有没有想过,为什么有些人宁愿花大价钱买苹果手机,而有些人却对安卓手机情有独钟呢?其实,这个问题背后...