webSocket完善

master
one 2023-12-04 13:36:08 +08:00
parent a56cf16c23
commit 869f4391a2
1 changed files with 52 additions and 31 deletions

View File

@ -1,17 +1,24 @@
package com.god.base.server.websocket;
import com.alibaba.fastjson.JSON;
import com.mysql.cj.xdevapi.JsonString;
import com.god.common.core.utils.SpringUtils;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import springfox.documentation.spring.web.json.Json;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Arrays;
import java.util.Objects;
import java.math.BigDecimal;
import java.util.*;
import com.god.common.redis.service.RedisService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@ -27,6 +34,15 @@ import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/websocket/{vin}")
public class WebSocketServer{
/**
* redis
*/
@Autowired
private RedisService redisService = SpringUtils.getBean(RedisService.class);
// @Autowired
// public WebSocketServer(RedisService redisService) {
// this.redisService = redisService;
// }
/**
* session
*/
@ -37,6 +53,11 @@ public class WebSocketServer{
*/
private String vin;
/**
* redis
*/
public static final String ALL_LOCUS_INFO = "all_locus_info";
/**
* concurrent线SetMyWebSocket
* @Componentspringbootwebsocketbeanset
@ -49,6 +70,8 @@ public class WebSocketServer{
*/
private static final ConcurrentHashMap<String,Session> map = new ConcurrentHashMap<>();
/**
*
* @param session
@ -62,32 +85,8 @@ public class WebSocketServer{
webSocketServers.add(this);
map.put(vin,session);
log.info("【webSocket消息】 有新的连接,总数为:"+webSocketServers.size());
//发送实时轨迹数据
double[][] arr = new double[][]{
{116.478935,39.997761},
{116.478939,39.997825},
{116.478912,39.998549},
{116.478998,39.998555},
{116.479282,39.99856},
{116.479658,39.998528},
{116.480151,39.998453},
{116.480784,39.998302},
{116.481149,39.998184},
{116.481573,39.997997},
{116.481863,39.997846},
{116.482072,39.997718},
{116.482362,39.997718},
{116.483633,39.998935},
{116.48367,39.998968},
{116.484648,39.999861}
};
for (double[] doubles : arr) {
Thread.sleep(500);
String string = Arrays.toString(doubles);
System.out.println(string);
Thread.sleep(1000);
sendOneMessage(vin , string);
if (webSocketServers.size() == 1) {
timedMsg();
}
}catch (Exception e){
log.info("【webSocket消息】 客户端 {} 连接失败",vin);
@ -95,6 +94,28 @@ public class WebSocketServer{
}
}
public void timedMsg(){
new Thread(() -> {
while (!map.isEmpty()) {
try {
map.forEach((key,session) -> {
if (session.isOpen()){
Map<String, Object> cacheMap = redisService.getCacheMap(ALL_LOCUS_INFO);
ArrayList<ArrayList<BigDecimal>> orDefault = (ArrayList<ArrayList<BigDecimal>>) cacheMap.getOrDefault(key, new ArrayList<ArrayList<BigDecimal>>());
ArrayList<BigDecimal> bigDecimals = orDefault.get(orDefault.size() - 1);
log.info("【webSocket消息】服务端发起了广播消息{}",bigDecimals.toString());
sendOneMessage(key, JSON.toJSONString(bigDecimals));
}
});
Thread.sleep(1800);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
/**
*
*/