Spring boot 整合websocket 客户端
创始人
2024-05-03 07:31:56
0

Spring boot 整合websocket 客户端

      • 前言
        • 一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类
        • 二、DeviceWebsocketClient.java 子类根据实际场景重写方法
        • 三、WorkPoolConfig.java 线程池
        • 三、实际场景封装
        • 四、启动客户端

前言

在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案

一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类

其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置

package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/*** @Author huasheng* @Date 2022/11/1 9:49* @Description*/
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {//客户端标识private String clientName;//客户端连接状态private boolean isConnect = false;//spring包下的线程池类private ThreadPoolTaskExecutor workPoolScheduler;public BaseWebsocketClient(URI serverUri, Map httpHeaders,String clientName,ThreadPoolTaskExecutor workPoolScheduler) {super(serverUri, new Draft_6455(), httpHeaders, 0);this.clientName = clientName;this.workPoolScheduler = workPoolScheduler;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {}@Overridepublic void onMessage(String s) {}/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/@Overridepublic void onClose(int i, String s, boolean b) {log.info("------ {} onClose ------{}", clientName, b);setConnectState(false);recontact();}/***检测到错误,更新连接状态***/@Overridepublic void onError(Exception e) {log.info("------ {} onError ------{}", clientName, e);setConnectState(false);}public void setConnectState(boolean isConnect) {this.isConnect = isConnect;}public boolean getConnectState(){return this.isConnect;}public ThreadPoolTaskExecutor getWorkPoolScheduler() {return workPoolScheduler;}/*** 重连*/public void recontact() {workPoolScheduler.execute(() -> {Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );try {Thread.sleep(10000);log.info("重连开始");if (isConnect) {log.info("{} 重连停止", clientName);return;}this.reconnect();log.info("重连结束");} catch (Exception e) {log.info("{} 重连失败", clientName);}});}
}

二、DeviceWebsocketClient.java 子类根据实际场景重写方法

package com.enrising.ctsc.park.security.service;import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/****/
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";private static final String SUBSCRIBE = "subscribe";private WebSocketReportService deviceService;/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
//	private String sendStr = "{\n" +
//			"    \"method\": \"subscribe\",\n" +
//			"    \"params\": \"device\"\n" +
//			"}";private String sendStr = "hello";/*** 建立连接* @param serverUri serverUri* @param httpHeaders httpHeaders* @param workPoolScheduler workPoolScheduler* @param deviceService deviceService*/public DeviceWebsocketClient(URI serverUri, Map httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);this.deviceService = deviceService;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("------ {} onOpen ------", ACS_CTRL_RESULT);this.send(sendStr);setConnectState(true);}public void sendMessage(String str) {log.info("------ {} send ------", str);this.send(str);setConnectState(true);}@Overridepublic void onMessage(String msg) {log.info("WebSocketReportService.onMessage()接收消息={}", msg);ThreadUtil.execAsync(() -> {MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);try {//业务代码deviceService.saveDeviceReportInfo(msg);} catch (Exception e) {log.info("WebSocketReportService.onMessage()上报异常={}", e);}//请求完成,从MDC中移除requestIdMDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);});log.info("WebSocketReportService.onMessage()推送结束={}", msg);}
}

三、WorkPoolConfig.java 线程池

配置文件

#线程池配置
settings: work-pool: core-pool-size: 10max-pool-size: 20queue-capacity: 200

实体类

package com.enrising.ctsc.park.security.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** @author huasheng* @date 2022/11/1*/
@Configuration
public class WorkPoolConfig {@Value("${settings.work-pool.core-pool-size}")private Integer workPoolCoreSize;@Value("${settings.work-pool.max-pool-size}")private Integer workPoolMaxSize;@Value("${settings.work-pool.queue-capacity}")private Integer queueCapacity;@Bean("workPoolScheduler")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(workPoolCoreSize);executor.setMaxPoolSize(workPoolMaxSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("-device-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);return executor;}
}

三、实际场景封装

WebSocketReportService .java

package xxx.service;/*** 公共父类*/
public interface WebSocketReportService {/*** @throws Exception* @throws* @Title: saveDeviceReportInfo* @Description: 保存设备上报信息* @param: @param data 参数说明* @return: void 返回类型*/void saveDeviceReportInfo(Object report) throws Exception;}

四、启动客户端

配置文件

#开关
api:initListen:sideSlop:device:open: false

根据实际场景用到两个实例,监听并各自处理

package com.enrising.ctsc.park.security.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @Author huasheng* @Date 2022/12/3 17:04* @Description*/
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {@Resourceprivate 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;@Resourceprivate 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;@Resourceprivate ThreadPoolTaskExecutor workPoolScheduler;static DeviceWebsocketClient deviceWebsocketClient;static DeviceWebsocketClient warnWebsocketClient;@PostConstructpublic void start() {try {log.info("start to receive device data");URI uri = new URI("ws://127.0.0.1:8888");Map httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);deviceWebsocketClient.connect();}catch (Exception e){log.error("start to receive device data failed", e);}}@PostConstructpublic void startStatus() {try {log.info("start to receive device status");URI uri = new URI("ws://127.0.0.1:8889");Map httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);warnWebsocketClient.connect();}catch (Exception e){log.error("start to receive device status failed", e);}}public void sendMessage(String str) {deviceWebsocketClient.sendMessage(str);}}

相关内容

热门资讯

开源电脑安卓系统排行,探索自由... 亲爱的电脑爱好者们,你是否曾想过,在电脑的世界里,也能体验到安卓系统的便捷与乐趣?没错,这就是今天我...
如何清空相册安卓系统,轻松恢复... 手机里的相册是不是越来越满,看着那些堆积如山的照片,是不是有点头疼呢?别急,今天就来教你怎么在安卓系...
安卓系统要停止更新,拥抱新变革 你知道吗?最近有个大消息在安卓圈里炸开了锅!安卓系统,这个陪伴我们多年的老朋友,竟然要停止更新了!这...
安卓系统怎样强行关机,安卓系统... 手机突然卡壳了,是不是又想强行关机了?别急,今天就来教你安卓系统怎样强行关机,让你轻松应对各种突发状...
安卓系统如何删除桌面,轻松删除... 手机桌面乱糟糟的,是不是感觉像你的房间一样,东西堆得有点多?别急,今天就来教你怎么给安卓系统的桌面来...
安卓系统怎么发英语,Andro... 你有没有想过,在安卓系统上发送英语信息竟然也能变得如此简单有趣?没错,就是那种轻松自如,仿佛英语是你...
最早期的安卓系统,揭秘最早期安... 亲爱的读者,你是否曾好奇过,那个陪伴我们手机成长的安卓系统,它的起源究竟是怎样的呢?今天,就让我们一...
安卓双系统添加应用,轻松实现多... 你有没有想过,你的安卓手机里可以同时运行两个系统呢?听起来是不是很酷?想象一边是熟悉的安卓系统,一边...
pipo安卓进系统慢,探究pi... 最近是不是发现你的Pipo安卓系统更新或者运行起来特别慢?别急,今天就来给你好好分析分析这个问题,让...
怎样使用安卓手机系统,安卓手机... 你有没有发现,安卓手机已经成为我们生活中不可或缺的一部分呢?从早晨闹钟响起,到晚上睡前刷剧,安卓手机...
双系统安卓安装caj,轻松实现... 你有没有想过,你的安卓手机里装上双系统,是不是就能同时享受安卓和Windows系统的乐趣呢?没错,这...
安卓使用ios系统教程,安卓用... 你是不是也和我一样,对安卓手机上的iOS系统充满了好奇?想要体验一下苹果的优雅和流畅?别急,今天我就...
安卓系统gps快速定位,畅享便... 你有没有遇到过这样的情况:手机里装了各种地图导航软件,但每次出门前都要等上好几分钟才能定位成功,急得...
安卓手机系统更新原理,原理与流... 你有没有发现,你的安卓手机最近是不是总在提醒你更新系统呢?别急,别急,让我来给你揭秘一下安卓手机系统...
安卓系统通知管理,全面解析与优... 你有没有发现,手机里的通知就像是一群调皮的小精灵,时不时地跳出来和你互动?没错,说的就是安卓系统的通...
安卓系统手机哪买,揭秘哪里购买... 你有没有想过,拥有一部安卓系统手机是多么酷的事情呢?想象你可以自由安装各种应用,不受限制地探索各种功...
安卓系统 ipv4,基于安卓系... 你知道吗?在智能手机的世界里,有一个系统可是无人不知、无人不晓,那就是安卓系统。而在这个庞大的安卓家...
目前安卓是什么系统,探索安卓系... 亲爱的读者,你是否曾好奇过,如今安卓系统究竟是什么模样?在这个科技飞速发展的时代,操作系统如同人体的...
安卓6.0系统比5.0,从5.... 你有没有发现,自从手机更新了安卓6.0系统,感觉整个人都清爽了不少呢?没错,今天咱们就来聊聊这个话题...
安卓2.36系统升级,功能革新... 你知道吗?最近安卓系统又来了一次大变身,那就是安卓2.36系统升级!这可不是一个小打小闹的更新,而是...