From 869f4391a2373fc9f9cb8957b1eeafb370a79e3c Mon Sep 17 00:00:00 2001 From: one Date: Mon, 4 Dec 2023 13:36:08 +0800 Subject: [PATCH] =?UTF-8?q?webSocket=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/websocket/WebSocketServer.java | 83 ++++++++++++------- 1 file changed, 52 insertions(+), 31 deletions(-) diff --git a/car-business-server/src/main/java/com/god/base/server/websocket/WebSocketServer.java b/car-business-server/src/main/java/com/god/base/server/websocket/WebSocketServer.java index 68a42f2..ab65f73 100644 --- a/car-business-server/src/main/java/com/god/base/server/websocket/WebSocketServer.java +++ b/car-business-server/src/main/java/com/god/base/server/websocket/WebSocketServer.java @@ -1,17 +1,24 @@ package com.god.base.server.websocket; import com.alibaba.fastjson.JSON; -import com.mysql.cj.xdevapi.JsonString; + +import com.god.common.core.utils.SpringUtils; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.messaging.simp.annotation.SendToUser; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import springfox.documentation.spring.web.json.Json; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.Arrays; -import java.util.Objects; +import java.math.BigDecimal; +import java.util.*; + +import com.god.common.redis.service.RedisService; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -27,6 +34,15 @@ import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/websocket/{vin}") public class WebSocketServer{ + /** + * 注入redis + */ + @Autowired + private RedisService redisService = SpringUtils.getBean(RedisService.class); +// @Autowired +// public WebSocketServer(RedisService redisService) { +// this.redisService = redisService; +// } /** *与某个客户端建立连接会话 通过session给客户端发送消息 */ @@ -37,6 +53,11 @@ public class WebSocketServer{ */ private String vin; + /** + * redis轨迹标识 + */ + public static final String ALL_LOCUS_INFO = "all_locus_info"; + /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 * 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 @@ -49,6 +70,8 @@ public class WebSocketServer{ */ private static final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + /** * 连接事件 * @param session @@ -62,32 +85,8 @@ public class WebSocketServer{ webSocketServers.add(this); map.put(vin,session); log.info("【webSocket消息】 有新的连接,总数为:"+webSocketServers.size()); - //发送实时轨迹数据 - double[][] arr = new double[][]{ - {116.478935,39.997761}, - {116.478939,39.997825}, - {116.478912,39.998549}, - {116.478998,39.998555}, - {116.479282,39.99856}, - {116.479658,39.998528}, - {116.480151,39.998453}, - {116.480784,39.998302}, - {116.481149,39.998184}, - {116.481573,39.997997}, - {116.481863,39.997846}, - {116.482072,39.997718}, - {116.482362,39.997718}, - {116.483633,39.998935}, - {116.48367,39.998968}, - {116.484648,39.999861} - }; - - for (double[] doubles : arr) { - Thread.sleep(500); - String string = Arrays.toString(doubles); - System.out.println(string); - Thread.sleep(1000); - sendOneMessage(vin , string); + if (webSocketServers.size() == 1) { + timedMsg(); } }catch (Exception e){ log.info("【webSocket消息】 客户端 {} 连接失败",vin); @@ -95,6 +94,28 @@ public class WebSocketServer{ } } + + public void timedMsg(){ + new Thread(() -> { + while (!map.isEmpty()) { + try { + map.forEach((key,session) -> { + if (session.isOpen()){ + Map cacheMap = redisService.getCacheMap(ALL_LOCUS_INFO); + ArrayList> orDefault = (ArrayList>) cacheMap.getOrDefault(key, new ArrayList>()); + ArrayList bigDecimals = orDefault.get(orDefault.size() - 1); + log.info("【webSocket消息】服务端发起了广播消息:{}",bigDecimals.toString()); + sendOneMessage(key, JSON.toJSONString(bigDecimals)); + } + }); + Thread.sleep(1800); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }).start(); + } + /** * 连接关闭调用的方法 */