在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案
其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置
package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/*** @Author huasheng* @Date 2022/11/1 9:49* @Description*/
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {//客户端标识private String clientName;//客户端连接状态private boolean isConnect = false;//spring包下的线程池类private ThreadPoolTaskExecutor workPoolScheduler;public BaseWebsocketClient(URI serverUri, Map httpHeaders,String clientName,ThreadPoolTaskExecutor workPoolScheduler) {super(serverUri, new Draft_6455(), httpHeaders, 0);this.clientName = clientName;this.workPoolScheduler = workPoolScheduler;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {}@Overridepublic void onMessage(String s) {}/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/@Overridepublic void onClose(int i, String s, boolean b) {log.info("------ {} onClose ------{}", clientName, b);setConnectState(false);recontact();}/***检测到错误,更新连接状态***/@Overridepublic void onError(Exception e) {log.info("------ {} onError ------{}", clientName, e);setConnectState(false);}public void setConnectState(boolean isConnect) {this.isConnect = isConnect;}public boolean getConnectState(){return this.isConnect;}public ThreadPoolTaskExecutor getWorkPoolScheduler() {return workPoolScheduler;}/*** 重连*/public void recontact() {workPoolScheduler.execute(() -> {Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );try {Thread.sleep(10000);log.info("重连开始");if (isConnect) {log.info("{} 重连停止", clientName);return;}this.reconnect();log.info("重连结束");} catch (Exception e) {log.info("{} 重连失败", clientName);}});}
}
package com.enrising.ctsc.park.security.service;import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.net.URI;
import java.util.Map;/****/
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";private static final String SUBSCRIBE = "subscribe";private WebSocketReportService deviceService;/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
// private String sendStr = "{\n" +
// " \"method\": \"subscribe\",\n" +
// " \"params\": \"device\"\n" +
// "}";private String sendStr = "hello";/*** 建立连接* @param serverUri serverUri* @param httpHeaders httpHeaders* @param workPoolScheduler workPoolScheduler* @param deviceService deviceService*/public DeviceWebsocketClient(URI serverUri, Map httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);this.deviceService = deviceService;}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("------ {} onOpen ------", ACS_CTRL_RESULT);this.send(sendStr);setConnectState(true);}public void sendMessage(String str) {log.info("------ {} send ------", str);this.send(str);setConnectState(true);}@Overridepublic void onMessage(String msg) {log.info("WebSocketReportService.onMessage()接收消息={}", msg);ThreadUtil.execAsync(() -> {MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);try {//业务代码deviceService.saveDeviceReportInfo(msg);} catch (Exception e) {log.info("WebSocketReportService.onMessage()上报异常={}", e);}//请求完成,从MDC中移除requestIdMDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);});log.info("WebSocketReportService.onMessage()推送结束={}", msg);}
}
配置文件
#线程池配置
settings: work-pool: core-pool-size: 10max-pool-size: 20queue-capacity: 200
实体类
package com.enrising.ctsc.park.security.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** @author huasheng* @date 2022/11/1*/
@Configuration
public class WorkPoolConfig {@Value("${settings.work-pool.core-pool-size}")private Integer workPoolCoreSize;@Value("${settings.work-pool.max-pool-size}")private Integer workPoolMaxSize;@Value("${settings.work-pool.queue-capacity}")private Integer queueCapacity;@Bean("workPoolScheduler")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(workPoolCoreSize);executor.setMaxPoolSize(workPoolMaxSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("-device-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);return executor;}
}
WebSocketReportService .java
package xxx.service;/*** 公共父类*/
public interface WebSocketReportService {/*** @throws Exception* @throws* @Title: saveDeviceReportInfo* @Description: 保存设备上报信息* @param: @param data 参数说明* @return: void 返回类型*/void saveDeviceReportInfo(Object report) throws Exception;}
配置文件
#开关
api:initListen:sideSlop:device:open: false
根据实际场景用到两个实例,监听并各自处理
package com.enrising.ctsc.park.security.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @Author huasheng* @Date 2022/12/3 17:04* @Description*/
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {@Resourceprivate 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;@Resourceprivate 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;@Resourceprivate ThreadPoolTaskExecutor workPoolScheduler;static DeviceWebsocketClient deviceWebsocketClient;static DeviceWebsocketClient warnWebsocketClient;@PostConstructpublic void start() {try {log.info("start to receive device data");URI uri = new URI("ws://127.0.0.1:8888");Map httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);deviceWebsocketClient.connect();}catch (Exception e){log.error("start to receive device data failed", e);}}@PostConstructpublic void startStatus() {try {log.info("start to receive device status");URI uri = new URI("ws://127.0.0.1:8889");Map httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);warnWebsocketClient.connect();}catch (Exception e){log.error("start to receive device status failed", e);}}public void sendMessage(String str) {deviceWebsocketClient.sendMessage(str);}}
上一篇:1343:【例4-2】牛的旅行