提示:长连接,采用了WebSocket,Redis,Quartz,SpringBoot等技术
在web开发中,服务端和客户端需要一条长连接,用于消息的推送下发。
使用场景:
在pom.xml中引入依赖
4.0.0 org.ym pushserver 1.0-SNAPSHOT 8 8 2.7.2 org.springframework.boot spring-boot-starter-web org.projectlombok lombok org.springframework.boot spring-boot-starter-websocket org.springframework.boot spring-boot-starter-quartz org.springframework.boot spring-boot-devtools org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2 cn.hutool hutool-all 5.8.9 com.google.code.gson gson org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import src/main/java **/*.xml **/*.yml **/*.html true src/main/resources **/*.xml **/*.yml **/*.vm **/*.txt **/*.html /static/ true
application.yml中配置:
server:port: 8081spring:application:name: PushServerredis:host: localhostport: 6379lettuce:pool:max-idle: 16max-active: 32min-idle: 8devtools:restart:enabled: true
测试路径:http://localhost:8081/push_client.html
链接socket url: ws://127.0.0.1:8081/websocket/"+userId
WebSocket
WebSocket
Please input text in the fields.
代码如下:
package com.ym.pushserver;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;/*** @Author: Yangmiao* @Date: 2023/3/9 19:09* @Desc:*/
@SpringBootApplication
@EnableScheduling
public class PushServerApplication {public static void main(String[] args) {SpringApplication.run(PushServerApplication.class,args);}
}
package com.ym.pushserver.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @Author: Yangmiao* @Date: 2023/3/9 19:20* @Desc:*/
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}
使用websocket建立连接,并且监听接口,引入Redis
调用路径:/websocket/{userId}
package com.ym.pushserver.service;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.StrUtil;
import com.ym.pushserver.entity.PushCommonVo;
import com.ym.pushserver.entity.R;
import com.ym.pushserver.redis.RedisConstant;
import com.ym.pushserver.redis.RedisUtil;
import com.ym.pushserver.utils.JsonUtil;
import com.ym.pushserver.utils.StrHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author: Yangmiao* @Date: 2023/3/9 19:23* @Desc: 发送消息*/
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {@Autowiredprivate RedisUtil redisUtil;private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);/*** 统计连接数量*/private static AtomicInteger atomicInteger = new AtomicInteger();/*** 根据userid记录对应的socket,TODO 后期加入设备id,防止统一设备一直重试*/private static Map map= new ConcurrentHashMap<>();private Session session;private String userId = "";/*** 建立连接* @param session* @param userId 用户id*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId")String userId){try {this.session = session;this.userId = userId;map.put(userId,this);atomicInteger.getAndIncrement();log.info("onOpen, connect a new Socket {}",userId);}catch (Exception e){log.error("onOpen error {}",e.getMessage());e.printStackTrace();}}/*** 关闭连接*/@OnClosepublic void onClose(){map.remove(this.userId);atomicInteger.getAndDecrement();removeRedisKey();log.info("onClose, disconnect a socket {}",this.userId);}@OnErrorpublic void onError(Throwable throwable){map.remove(this.userId);atomicInteger.getAndDecrement();removeRedisKey();log.error("onError, error! {}",throwable.getMessage());}@OnMessagepublic void onMessage(Session session,String msg){log.info("receive msg from client: {}, session: {}",msg,session);}/*** 获取链接数量* @return*/public int getConNum(){return atomicInteger.intValue();}/*** 发送消息* @param msg*/public void sendMessage(String msg){this.session.getAsyncRemote().sendText(msg);}/*** 向指定用户发送消息* @param userId* @param msg*/public void sendMessageToUser(String userId,String msg){if (map.containsKey(userId)){// 生成key==pushIdString key = StrHelper.getFormatStr(RedisConstant.KEY_PUSH_ID,userId);redisUtil.set(key, StrHelper.getRandNum());// 将消息封装到消息体中PushCommonVo pushCommonVo = PushCommonVo.builder().msg(msg).pushId(key).updateTime(DateTime.now()).build();String jsonStr = JsonUtil.toStr(R.ok(pushCommonVo));map.get(userId).sendMessage(jsonStr);}}/*** 向指定用户群发消息* @param userIds* @param msg*/public void sendBatchMessage(List userIds,String msg){userIds.forEach(userId->{sendMessageToUser(userId,msg);});}/*** 向所有用户发送消息* @param msg*/public void sendMessageToAll(String msg){map.forEach((k,v)->{sendMessageToUser(k,msg);});}/*** 删除redis key* @return*/public boolean removeRedisKey(){if (StrUtil.isNotEmpty(this.userId)) {String key = StrHelper.getFormatStr(RedisConstant.KEY_PUSH_ID, this.userId);log.info("removeRedisKey: "+key);if (redisUtil.hasKey(key)) {redisUtil.deleteByKey(key);return true;}}return false;}/*** 服务端向客户端发送ping*/public static void pingMessage(String userId){byte[] pingByte = new byte[1];pingByte[0]='p';ByteBuffer byteBuffer = ByteBuffer.wrap(pingByte);if (map.containsKey(userId)) {try {map.get(userId).session.getBasicRemote().sendPing(byteBuffer);} catch (IOException e) {log.error("ping error: {}",e.getMessage());e.printStackTrace();}}}
}
引入Redis,用于记录下发消息结构体重的pushId。
package com.ym.pushserver.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @Author: Yangmiao* @Date: 2023/3/11 12:18* @Desc: Redis 配置*/
@Configuration
public class RedisConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Beanpublic RedisTemplate redisTemplate(){RedisTemplate redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new StringRedisSerializer());redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}}
package com.ym.pushserver.redis;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @Author: Yangmiao* @Date: 2023/3/11 12:22* @Desc:*/
@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 给一个指定的 key 值附加过期时间** @param key* @param time* @return*/public boolean expire(String key, long time) {return redisTemplate.expire(key, time, TimeUnit.SECONDS);}/*** 根据key 获取过期时间** @param key* @return*/public long getTime(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 根据key 获取过期时间** @param key* @return*/public boolean hasKey(String key) {return redisTemplate.hasKey(key);}/*** 移除指定key 的过期时间** @param key* @return*/public boolean persist(String key) {return redisTemplate.boundValueOps(key).persist();}//- - - - - - - - - - - - - - - - - - - - - String类型 - - - - - - - - - - - - - - - - - - - -/*** 根据key获取值** @param key 键* @return 值*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 将值放入缓存** @param key 键* @param value 值* @return true成功 false 失败*/public void set(String key, String value) {redisTemplate.opsForValue().set(key, value);}/*** 将值放入缓存并设置时间** @param key 键* @param value 值* @param time 时间(秒) -1为无期限* @return true成功 false 失败*/public void set(String key, String value, long time) {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {redisTemplate.opsForValue().set(key, value);}}/*** 批量添加 key (重复的键会覆盖)** @param keyAndValue*/public void batchSet(Map keyAndValue) {redisTemplate.opsForValue().multiSet(keyAndValue);}/*** 批量添加 key-value 只有在键不存在时,才添加* map 中只要有一个key存在,则全部不添加** @param keyAndValue*/public void batchSetIfAbsent(Map keyAndValue) {redisTemplate.opsForValue().multiSetIfAbsent(keyAndValue);}/*** 对一个 key-value 的值进行加减操作,* 如果该 key 不存在 将创建一个key 并赋值该 number* 如果 key 存在,但 value 不是长整型 ,将报错** @param key* @param number*/public Long increment(String key, long number) {return redisTemplate.opsForValue().increment(key, number);}/*** 对一个 key-value 的值进行加减操作,* 如果该 key 不存在 将创建一个key 并赋值该 number* 如果 key 存在,但 value 不是 纯数字 ,将报错** @param key* @param number*/public Double increment(String key, double number) {return redisTemplate.opsForValue().increment(key, number);}//- - - - - - - - - - - - - - - - - - - - - set类型 - - - - - - - - - - - - - - - - - - - -/*** 将数据放入set缓存** @param key 键* @return*/public void sSet(String key, String value) {redisTemplate.opsForSet().add(key, value);}/*** 获取变量中的值** @param key 键* @return*/public Set
package com.ym.pushserver.redis;/*** @Author: Yangmiao* @Date: 2023/3/11 12:24* @Desc:*/
public class RedisConstant {/*** push结构体中携带pushId*/public static final String KEY_PUSH_ID = "pushId_%s";/*** 5分钟*/public static final long COMMON_EXPIRE_TIME = 300;
}
引入Quartz定时任务,用于定时触发消息的下发
配置JobDetail,Trigger,CronTrigger
package com.ym.pushserver.quartz;import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: Yangmiao* @Date: 2023/3/10 20:49* @Desc:*/
@Configuration
public class QuartzConfig {@Beanpublic JobDetail getJobDetail(){return JobBuilder.newJob(PushJob.class).withIdentity(QuartzConstant.IDENTITY).storeDurably().build();}@Beanpublic Trigger trigger(){return TriggerBuilder.newTrigger().forJob(getJobDetail()).withIdentity(QuartzConstant.IDENTITY).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(5)).build();}@Beanpublic CronTrigger cronTrigger(){return TriggerBuilder.newTrigger().withIdentity(QuartzConstant.IDENTITY).startNow().withSchedule(CronScheduleBuilder.dailyAtHourAndMinute(10,0)).build();}
}
package com.ym.pushserver.quartz;import com.ym.pushserver.service.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;/*** @Author: Yangmiao* @Date: 2023/3/10 20:47* @Desc:*/
@Component
@Slf4j
public class PushJob extends QuartzJobBean {@Autowiredprivate WebSocketServer webSocketServer;@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// log.info("start task!");// TODO 向客户端定时推送消息webSocketServer.sendMessageToAll("我成功连接了!");}
}
定义job名称常量
package com.ym.pushserver.quartz;/*** @Author: Yangmiao* @Date: 2023/3/11 09:41* @Desc:*/
public class QuartzConstant {public static final String IDENTITY = "PushJob";
}
BaseVo
package com.ym.pushserver.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;import java.io.Serializable;
import java.util.Date;/*** @Author: Yangmiao* @Date: 2023/3/11 10:34* @Desc: 返回消息给客户端*/
@Data
@SuperBuilder
public class BaseVo implements Serializable {public static final long serialVersionUID = 1L;/*** 每条消息自带一个ID*/public String pushId;/*** 下发消息的时间*/public Date updateTime;
}
package com.ym.pushserver.entity;import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;import java.util.Date;
import java.util.Objects;/*** @Author: Yangmiao* @Date: 2023/3/11 11:31* @Desc:*/
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@SuperBuilder
public class PushCommonVo extends BaseVo{private String msg;// @Builder
// public PushCommonVo(String pushId, Date updateTime,T msg){
// super(pushId, updateTime);
// this.msg = msg;
// }}
package com.ym.pushserver.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** @Author: Yangmiao* @Date: 2023/3/11 12:57* @Desc: 基础信息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class R implements Serializable {public static final long serialVersionUID = 1L;private T data;private int code;private String msg;public static final String MSG_SUCCESS = "success";public static final String MSG_FAIL = "服务开了一会儿小差,请稍后重试";public static final int SUCCESS = 200;public static final int FAIL = 500;public static R ok(){return R(SUCCESS,null,MSG_SUCCESS);}public static R ok(T data){return R(SUCCESS,data,MSG_SUCCESS);}public static R fail(){return R(FAIL,null,MSG_FAIL);}public static R fail(String msg){return R(FAIL,null,msg);}public static R fail(int code,String msg){return R(code,null,msg);}public static R R(int code, T data, String msg){return R.builder().code(code).data(data).msg(msg).build();}
}
package com.ym.pushserver.controller;import cn.hutool.core.util.StrUtil;
import com.ym.pushserver.entity.R;
import com.ym.pushserver.redis.RedisUtil;
import com.ym.pushserver.service.WebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: Yangmiao* @Date: 2023/3/9 20:00* @Desc:*/
@RestController
@RequestMapping("/push")
public class PushController {@Autowiredprivate WebSocketServer webSocketServer;@Autowiredprivate RedisUtil redisUtil;@GetMapping("/test/{msg}")public R test(@PathVariable("msg")String msg){webSocketServer.sendMessageToUser("1","hahhhh"+msg);return R.ok("测试成功: "+msg);}@GetMapping("/pushAck")public R pushAck(String pushId){if (StrUtil.isEmpty(pushId)){return R.fail("请求参数为空!");}redisUtil.deleteByKey(pushId);return R.ok();}}
Gson工具类封装,用于序列化和反序列化数据
package com.ym.pushserver.utils;import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.ssh.JschUtil;
import cn.hutool.json.JSONUtil;
import com.google.gson.Gson;
import org.springframework.util.StringUtils;import java.util.Objects;/*** @Author: Yangmiao* @Date: 2023/3/11 13:20* @Desc:*/
public class JsonUtil {public static final String EMPTY = "";private static Gson gson = new Gson();/*** 将object转为string* @param object* @param * @return*/public static String toStr(T object){if (Objects.isNull(object)){return EMPTY;}return gson.toJson(object);}/*** string转为object对象* @param jsonStr* @param clazz* @param * @return*/public static T toJson(String jsonStr, Class clazz){return gson.fromJson(jsonStr,clazz);}
}
str工具类封装
package com.ym.pushserver.utils;import java.util.UUID;/*** @Author: Yangmiao* @Date: 2023/3/9 20:07* @Desc:*/
public class StrHelper {/*** 获取随机数* @return*/public static String getRandNum(){return UUID.randomUUID().toString().replace("-","");}public static String getFormatStr(String format,String...args){return String.format(format,args);}
}
引入了websocket,redis,quartz,springboot等技术,实现了一个长连接Demo
存在问题:
待完成