master
one 2023-11-28 16:45:16 +08:00
parent b5b21c6583
commit eec1033f73
6 changed files with 296 additions and 165 deletions

View File

@ -29,6 +29,12 @@
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency> </dependency>
<!-- webSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config --> <!-- SpringCloud Alibaba Nacos Config -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>

View File

@ -1,18 +1,18 @@
//package com.god.base.server.webSocket.config; package com.god.base.server.webSocket.config;
//import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
//import org.springframework.web.server.ServerWebInputException; import org.springframework.web.socket.server.standard.ServerEndpointExporter;
//
///** /**
// * WebSocketConfig * WebSocketConfig
// * 开始webSocket * webSocket
// * WenHao.Sao * WenHao.Sao
// */ */
//@Configuration @Configuration
//public class WebSocketConfig { public class WebSocketConfig {
//
// @Bean @Bean
// public ServerEndpointExporter serverEndpointExporter(){ public ServerEndpointExporter serverEndpointExporter(){
// return new ServerEndpointExporter(); return new ServerEndpointExporter();
// } }
//} }

View File

@ -1,29 +1,29 @@
package com.god.base.server.webSocket.controller; //package com.god.base.server.webSocket.controller;
//
import com.god.base.server.webSocket.handler.SocketBatchHandler; //import com.god.base.server.webSocket.handler.SocketBatchHandler;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import javax.websocket.OnOpen; //import javax.websocket.OnOpen;
import javax.websocket.Session; //import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint; //import javax.websocket.server.ServerEndpoint;
//
/** ///**
* @ClassName WebSocketBatchSever // * @ClassName WebSocketBatchSever
* @Author WenHao.Sao // * @Author WenHao.Sao
*/ // */
@ServerEndpoint("/car/batch") //@ServerEndpoint("/car/batch")
@Component //@Component
@Log4j2 //@Log4j2
public class WebSocketBatchSever { //public class WebSocketBatchSever {
//
/** // /**
* // * 连接建立成功方法
*/ // */
@OnOpen // @OnOpen
public void onOpen(Session session){ // public void onOpen(Session session){
log.info("车辆大屏新连接" + session.getId()); // log.info("车辆大屏新连接" + session.getId());
SocketBatchHandler.sessions.add(session); // SocketBatchHandler.sessions.add(session);
} // }
//
} //}

View File

@ -1,31 +1,156 @@
package com.god.base.server.webSocket.controller; package com.god.base.server.webSocket.controller;
import com.god.base.server.webSocket.entity.SocketData;
import com.god.base.server.webSocket.handler.SocketHandler; import java.net.http.WebSocket;
import lombok.extern.log4j.Log4j2; import java.util.Arrays;
import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.bind.annotation.PathVariable; import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocketServer
*/
@ServerEndpoint("/vehicle/realTime/{vin}")
@Component @Component
@Log4j2 @Slf4j
@ServerEndpoint("/websocket/{vin}") // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocketServer { public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/** /**
* * ID
*/
private String userId;
//concurrent包的线程安全Set用来存放每个客户端对应的MyWebSocket对象。
//虽然@Component默认是单例模式的但springboot还是会为每个websocket连接初始化一个bean所以可以用一个静态set保存起来。
// 注底下WebSocket是当前类名
private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
/**
*
*/ */
@OnOpen @OnOpen
public void onOpen(Session session , @PathVariable("vin") String vin){ public void onOpen(Session session, @PathParam(value="vin")String userId) {
log.info("车辆实时轨迹,新连接:{}" , session.getId()); try {
SocketData socketData = new SocketData(); this.session = session;
socketData.setVin(vin); this.userId = userId;
socketData.setSession(session); webSockets.add(this);
SocketHandler.socketList.add(socketData); sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接总数为:"+webSockets.size());
//发送实时轨迹数据
double[][] arr = new double[][]{
{116.478935,39.997761},
{116.478939,39.997825},
{116.478912,39.998549},
{116.478998,39.998555},
{116.479282,39.99856},
{116.479658,39.998528},
{116.480151,39.998453},
{116.480784,39.998302},
{116.481149,39.998184},
{116.481573,39.997997},
{116.481863,39.997846},
{116.482072,39.997718},
{116.482362,39.997718},
{116.483633,39.998935},
{116.48367,39.998968},
{116.484648,39.999861}
};
for (double[] doubles : arr) {
Thread.sleep(500);
String string = Arrays.toString(doubles);
System.out.println(string);
Thread.sleep(1000);
sendOneMessage(userId , string);
}
} catch (Exception e) {
}
}
/**
*
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开总数为:"+webSockets.size());
} catch (Exception e) {
}
}
/**
*
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message,Session session) {
log.info("【websocket消息】收到客户端消息:"+message);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:"+error.getMessage());
error.printStackTrace();
}
// 此为广播消息
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:"+message);
for(WebSocketServer webSocket : webSockets) {
try {
if(webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null&&session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息(多人)
public void sendMoreMessage(String[] userIds, String message) {
for(String userId:userIds) {
Session session = sessionPool.get(userId);
if (session != null&&session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
} }
} }

View File

@ -1,57 +1,57 @@
package com.god.base.server.webSocket.handler; //package com.god.base.server.webSocket.handler;
//
import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSON;
import com.god.base.domain.VehicleMessage; //import com.god.base.domain.VehicleMessage;
import com.god.base.common.constant.RedisConstant; //import com.god.base.common.constant.RedisConstant;
import com.god.base.server.mapper.CarMapper; //import com.god.base.server.mapper.CarMapper;
import com.god.base.server.webSocket.entity.VehicleAllData; //import com.god.base.server.webSocket.entity.VehicleAllData;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import javax.websocket.Session; //import javax.websocket.Session;
import java.util.List; //import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; //import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; //import java.util.stream.Collectors;
//
/** ///**
* @author swhmac // * @author swh的mac
* @Date 2023/11/20 14:37 // * @Date 2023/11/20 14:37
*/ // */
@Log4j2 //@Log4j2
@Component //@Component
public class SocketBatchHandler { //public class SocketBatchHandler {
public static CopyOnWriteArrayList<Session> sessions = new CopyOnWriteArrayList<>(); // public static CopyOnWriteArrayList<Session> sessions = new CopyOnWriteArrayList<>();
//
@Autowired // @Autowired
private CarMapper carMapper; // private CarMapper carMapper;
//
@Autowired // @Autowired
private RedisTemplate redisTemplate; // private RedisTemplate redisTemplate;
//
@Scheduled(cron = "0/1 * * * * ?") // @Scheduled(cron = "0/1 * * * * ?")
public void sendMessageFlush(){ // public void sendMessageFlush(){
for (Session session : sessions) { // for (Session session : sessions) {
List<String> ids = carMapper.getOnlineCarIds(); // List<String> ids = carMapper.getOnlineCarIds();
List<VehicleMessage> messages = ids.stream().map(id -> { // List<VehicleMessage> messages = ids.stream().map(id -> {
VehicleMessage message = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + id); // VehicleMessage message = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + id);
return message; // return message;
}).collect(Collectors.toList()); // }).collect(Collectors.toList());
Integer day = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_DAY_FAULT); // Integer day = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_DAY_FAULT);
Integer month = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_MONTH_FAULT); // Integer month = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_MONTH_FAULT);
Integer onlineCount = carMapper.getOnlineCarCount(); // Integer onlineCount = carMapper.getOnlineCarCount();
Integer unOnlineCount = carMapper.getUnOnlineCount(); // Integer unOnlineCount = carMapper.getUnOnlineCount();
VehicleAllData vehicleAllData = new VehicleAllData(); // VehicleAllData vehicleAllData = new VehicleAllData();
vehicleAllData.setMessages(messages); // vehicleAllData.setMessages(messages);
vehicleAllData.setInfo(day+":"+month+":"+onlineCount+":"+unOnlineCount); // vehicleAllData.setInfo(day+":"+month+":"+onlineCount+":"+unOnlineCount);
try { // try {
session.getBasicRemote().sendText(JSON.toJSONString(vehicleAllData)); // session.getBasicRemote().sendText(JSON.toJSONString(vehicleAllData));
} catch (Exception e) { // } catch (Exception e) {
log.info("{}:连接中断",session.getId()); // log.info("{}:连接中断",session.getId());
sessions.remove(session); // sessions.remove(session);
} // }
} // }
} // }
} //}

View File

@ -1,42 +1,42 @@
package com.god.base.server.webSocket.handler; //package com.god.base.server.webSocket.handler;
//
import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSON;
import com.god.base.domain.VehicleMessage; //import com.god.base.domain.VehicleMessage;
import com.god.base.common.constant.RedisConstant; //import com.god.base.common.constant.RedisConstant;
import com.god.base.server.webSocket.entity.SocketData; //import com.god.base.server.webSocket.entity.SocketData;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import javax.websocket.Session; //import javax.websocket.Session;
import java.util.concurrent.CopyOnWriteArrayList; //import java.util.concurrent.CopyOnWriteArrayList;
//
/** ///**
* @author swhmac // * @author swh的mac
* @Date 2023/11/20 15:51 // * @Date 2023/11/20 15:51
*/ // */
@Log4j2 //@Log4j2
@Component //@Component
public class SocketHandler { //public class SocketHandler {
//存放每个客户端对应的MyWebSocket // //存放每个客户端对应的MyWebSocket
public static CopyOnWriteArrayList<SocketData> socketList = new CopyOnWriteArrayList<>(); // public static CopyOnWriteArrayList<SocketData> socketList = new CopyOnWriteArrayList<>();
//
@Autowired // @Autowired
private RedisTemplate redisTemplate; // private RedisTemplate redisTemplate;
//
@Scheduled(cron = "0/1 * * * * ?") // @Scheduled(cron = "0/1 * * * * ?")
public void sendMessageFlush(){ // public void sendMessageFlush(){
for (SocketData socketData : socketList) { // for (SocketData socketData : socketList) {
Session session = socketData.getSession(); // Session session = socketData.getSession();
VehicleMessage msg = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + socketData.getVin()); // VehicleMessage msg = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + socketData.getVin());
try { // try {
session.getBasicRemote().sendText(JSON.toJSONString(msg)); // session.getBasicRemote().sendText(JSON.toJSONString(msg));
} catch (Exception e) { // } catch (Exception e) {
log.info("{}:连接中断",socketData.getSession().getId()); // log.info("{}:连接中断",socketData.getSession().getId());
socketList.remove(socketData); // socketList.remove(socketData);
} // }
} // }
} // }
} //}