From eec1033f734689bfb4027680e5a768ba4dd5ad61 Mon Sep 17 00:00:00 2001 From: one Date: Tue, 28 Nov 2023 16:45:16 +0800 Subject: [PATCH] commit --- car-base-server/pom.xml | 6 + .../webSocket/config/WebSocketConfig.java | 36 ++-- .../controller/WebSocketBatchSever.java | 58 +++---- .../webSocket/controller/WebSocketServer.java | 163 ++++++++++++++++-- .../webSocket/handler/SocketBatchHandler.java | 114 ++++++------ .../webSocket/handler/SocketHandler.java | 84 ++++----- 6 files changed, 296 insertions(+), 165 deletions(-) diff --git a/car-base-server/pom.xml b/car-base-server/pom.xml index 5b45067..88cafb4 100644 --- a/car-base-server/pom.xml +++ b/car-base-server/pom.xml @@ -29,6 +29,12 @@ spring-cloud-starter-alibaba-nacos-discovery + + + org.springframework.boot + spring-boot-starter-websocket + + com.alibaba.cloud diff --git a/car-base-server/src/main/java/com/god/base/server/webSocket/config/WebSocketConfig.java b/car-base-server/src/main/java/com/god/base/server/webSocket/config/WebSocketConfig.java index bf3589a..2aa1eb2 100644 --- a/car-base-server/src/main/java/com/god/base/server/webSocket/config/WebSocketConfig.java +++ b/car-base-server/src/main/java/com/god/base/server/webSocket/config/WebSocketConfig.java @@ -1,18 +1,18 @@ -//package com.god.base.server.webSocket.config; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Configuration; -//import org.springframework.web.server.ServerWebInputException; -// -///** -// * WebSocketConfig -// * 开始webSocket -// * WenHao.Sao -// */ -//@Configuration -//public class WebSocketConfig { -// -// @Bean -// public ServerEndpointExporter serverEndpointExporter(){ -// return new ServerEndpointExporter(); -// } -//} +package com.god.base.server.webSocket.config; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * WebSocketConfig + * 开始webSocket + * WenHao.Sao + */ +@Configuration +public class WebSocketConfig { + + @Bean + public ServerEndpointExporter serverEndpointExporter(){ + return new ServerEndpointExporter(); + } +} diff --git a/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketBatchSever.java b/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketBatchSever.java index 03738fd..2e21c10 100644 --- a/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketBatchSever.java +++ b/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketBatchSever.java @@ -1,29 +1,29 @@ -package com.god.base.server.webSocket.controller; - -import com.god.base.server.webSocket.handler.SocketBatchHandler; -import lombok.extern.log4j.Log4j2; -import org.springframework.stereotype.Component; - -import javax.websocket.OnOpen; -import javax.websocket.Session; -import javax.websocket.server.ServerEndpoint; - -/** - * @ClassName WebSocketBatchSever - * @Author WenHao.Sao - */ -@ServerEndpoint("/car/batch") -@Component -@Log4j2 -public class WebSocketBatchSever { - - /** - * 连接建立成功方法 - */ - @OnOpen - public void onOpen(Session session){ - log.info("车辆大屏新连接" + session.getId()); - SocketBatchHandler.sessions.add(session); - } - -} +//package com.god.base.server.webSocket.controller; +// +//import com.god.base.server.webSocket.handler.SocketBatchHandler; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.stereotype.Component; +// +//import javax.websocket.OnOpen; +//import javax.websocket.Session; +//import javax.websocket.server.ServerEndpoint; +// +///** +// * @ClassName WebSocketBatchSever +// * @Author WenHao.Sao +// */ +//@ServerEndpoint("/car/batch") +//@Component +//@Log4j2 +//public class WebSocketBatchSever { +// +// /** +// * 连接建立成功方法 +// */ +// @OnOpen +// public void onOpen(Session session){ +// log.info("车辆大屏新连接" + session.getId()); +// SocketBatchHandler.sessions.add(session); +// } +// +//} diff --git a/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketServer.java b/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketServer.java index e488eeb..9abc457 100644 --- a/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketServer.java +++ b/car-base-server/src/main/java/com/god/base/server/webSocket/controller/WebSocketServer.java @@ -1,31 +1,156 @@ package com.god.base.server.webSocket.controller; -import com.god.base.server.webSocket.entity.SocketData; -import com.god.base.server.webSocket.handler.SocketHandler; -import lombok.extern.log4j.Log4j2; -import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.PathVariable; -import javax.websocket.OnOpen; -import javax.websocket.Session; + +import java.net.http.WebSocket; +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.websocket.*; +import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; +import com.alibaba.fastjson.JSON; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; -/** - * WebSocketServer服务类 - */ -@ServerEndpoint("/vehicle/realTime/{vin}") @Component -@Log4j2 +@Slf4j +@ServerEndpoint("/websocket/{vin}") // 接口路径 ws://localhost:8087/webSocket/userId; public class WebSocketServer { + + //与某个客户端的连接会话,需要通过它来给客户端发送数据 + private Session session; /** - * 连接建立成功调用的方法 + * 用户ID + */ + private String userId; + + //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 + //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。 + // 注:底下WebSocket是当前类名 + private static CopyOnWriteArraySet webSockets =new CopyOnWriteArraySet<>(); + // 用来存在线连接用户信息 + private static ConcurrentHashMap sessionPool = new ConcurrentHashMap(); + + /** + * 链接成功调用的方法 */ @OnOpen - public void onOpen(Session session , @PathVariable("vin") String vin){ - log.info("车辆实时轨迹,新连接:{}" , session.getId()); - SocketData socketData = new SocketData(); - socketData.setVin(vin); - socketData.setSession(session); - SocketHandler.socketList.add(socketData); + public void onOpen(Session session, @PathParam(value="vin")String userId) { + try { + this.session = session; + this.userId = userId; + webSockets.add(this); + sessionPool.put(userId, session); + log.info("【websocket消息】有新的连接,总数为:"+webSockets.size()); + //发送实时轨迹数据 + double[][] arr = new double[][]{ + {116.478935,39.997761}, + {116.478939,39.997825}, + {116.478912,39.998549}, + {116.478998,39.998555}, + {116.479282,39.99856}, + {116.479658,39.998528}, + {116.480151,39.998453}, + {116.480784,39.998302}, + {116.481149,39.998184}, + {116.481573,39.997997}, + {116.481863,39.997846}, + {116.482072,39.997718}, + {116.482362,39.997718}, + {116.483633,39.998935}, + {116.48367,39.998968}, + {116.484648,39.999861} + }; + + for (double[] doubles : arr) { + Thread.sleep(500); + String string = Arrays.toString(doubles); + System.out.println(string); + Thread.sleep(1000); + sendOneMessage(userId , string); + } + } catch (Exception e) { + } + } + + /** + * 链接关闭调用的方法 + */ + @OnClose + public void onClose() { + try { + webSockets.remove(this); + sessionPool.remove(this.userId); + log.info("【websocket消息】连接断开,总数为:"+webSockets.size()); + } catch (Exception e) { + } + } + /** + * 收到客户端消息后调用的方法 + * + * @param message + * @param session + */ + @OnMessage + public void onMessage(String message,Session session) { + log.info("【websocket消息】收到客户端消息:"+message); + } + + /** 发送错误时的处理 + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + + log.error("用户错误,原因:"+error.getMessage()); + error.printStackTrace(); + } + + + // 此为广播消息 + public void sendAllMessage(String message) { + log.info("【websocket消息】广播消息:"+message); + for(WebSocketServer webSocket : webSockets) { + try { + if(webSocket.session.isOpen()) { + webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + Session session = sessionPool.get(userId); + if (session != null&&session.isOpen()) { + try { + log.info("【websocket消息】 单点消息:"+message); + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 此为单点消息(多人) + public void sendMoreMessage(String[] userIds, String message) { + for(String userId:userIds) { + Session session = sessionPool.get(userId); + if (session != null&&session.isOpen()) { + try { + log.info("【websocket消息】 单点消息:"+message); + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } } diff --git a/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketBatchHandler.java b/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketBatchHandler.java index bfa2332..849aca3 100644 --- a/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketBatchHandler.java +++ b/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketBatchHandler.java @@ -1,57 +1,57 @@ -package com.god.base.server.webSocket.handler; - -import com.alibaba.fastjson.JSON; -import com.god.base.domain.VehicleMessage; -import com.god.base.common.constant.RedisConstant; -import com.god.base.server.mapper.CarMapper; -import com.god.base.server.webSocket.entity.VehicleAllData; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.websocket.Session; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.stream.Collectors; - -/** - * @author swh的mac - * @Date 2023/11/20 14:37 - */ -@Log4j2 -@Component -public class SocketBatchHandler { - public static CopyOnWriteArrayList sessions = new CopyOnWriteArrayList<>(); - - @Autowired - private CarMapper carMapper; - - @Autowired - private RedisTemplate redisTemplate; - - @Scheduled(cron = "0/1 * * * * ?") - public void sendMessageFlush(){ - for (Session session : sessions) { - List ids = carMapper.getOnlineCarIds(); - List messages = ids.stream().map(id -> { - VehicleMessage message = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + id); - return message; - }).collect(Collectors.toList()); - Integer day = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_DAY_FAULT); - Integer month = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_MONTH_FAULT); - Integer onlineCount = carMapper.getOnlineCarCount(); - Integer unOnlineCount = carMapper.getUnOnlineCount(); - VehicleAllData vehicleAllData = new VehicleAllData(); - vehicleAllData.setMessages(messages); - vehicleAllData.setInfo(day+":"+month+":"+onlineCount+":"+unOnlineCount); - try { - session.getBasicRemote().sendText(JSON.toJSONString(vehicleAllData)); - } catch (Exception e) { - log.info("{}:连接中断",session.getId()); - sessions.remove(session); - } - } - } -} +//package com.god.base.server.webSocket.handler; +// +//import com.alibaba.fastjson.JSON; +//import com.god.base.domain.VehicleMessage; +//import com.god.base.common.constant.RedisConstant; +//import com.god.base.server.mapper.CarMapper; +//import com.god.base.server.webSocket.entity.VehicleAllData; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import javax.websocket.Session; +//import java.util.List; +//import java.util.concurrent.CopyOnWriteArrayList; +//import java.util.stream.Collectors; +// +///** +// * @author swh的mac +// * @Date 2023/11/20 14:37 +// */ +//@Log4j2 +//@Component +//public class SocketBatchHandler { +// public static CopyOnWriteArrayList sessions = new CopyOnWriteArrayList<>(); +// +// @Autowired +// private CarMapper carMapper; +// +// @Autowired +// private RedisTemplate redisTemplate; +// +// @Scheduled(cron = "0/1 * * * * ?") +// public void sendMessageFlush(){ +// for (Session session : sessions) { +// List ids = carMapper.getOnlineCarIds(); +// List messages = ids.stream().map(id -> { +// VehicleMessage message = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + id); +// return message; +// }).collect(Collectors.toList()); +// Integer day = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_DAY_FAULT); +// Integer month = (Integer) redisTemplate.opsForValue().get(RedisConstant.VEHICLE_INFO_TOKEN+RedisConstant.CURRENT_MONTH_FAULT); +// Integer onlineCount = carMapper.getOnlineCarCount(); +// Integer unOnlineCount = carMapper.getUnOnlineCount(); +// VehicleAllData vehicleAllData = new VehicleAllData(); +// vehicleAllData.setMessages(messages); +// vehicleAllData.setInfo(day+":"+month+":"+onlineCount+":"+unOnlineCount); +// try { +// session.getBasicRemote().sendText(JSON.toJSONString(vehicleAllData)); +// } catch (Exception e) { +// log.info("{}:连接中断",session.getId()); +// sessions.remove(session); +// } +// } +// } +//} diff --git a/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketHandler.java b/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketHandler.java index fa39651..8880814 100644 --- a/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketHandler.java +++ b/car-base-server/src/main/java/com/god/base/server/webSocket/handler/SocketHandler.java @@ -1,42 +1,42 @@ -package com.god.base.server.webSocket.handler; - -import com.alibaba.fastjson.JSON; -import com.god.base.domain.VehicleMessage; -import com.god.base.common.constant.RedisConstant; -import com.god.base.server.webSocket.entity.SocketData; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.websocket.Session; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * @author swh的mac - * @Date 2023/11/20 15:51 - */ -@Log4j2 -@Component -public class SocketHandler { - //存放每个客户端对应的MyWebSocket - public static CopyOnWriteArrayList socketList = new CopyOnWriteArrayList<>(); - - @Autowired - private RedisTemplate redisTemplate; - - @Scheduled(cron = "0/1 * * * * ?") - public void sendMessageFlush(){ - for (SocketData socketData : socketList) { - Session session = socketData.getSession(); - VehicleMessage msg = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + socketData.getVin()); - try { - session.getBasicRemote().sendText(JSON.toJSONString(msg)); - } catch (Exception e) { - log.info("{}:连接中断",socketData.getSession().getId()); - socketList.remove(socketData); - } - } - } -} +//package com.god.base.server.webSocket.handler; +// +//import com.alibaba.fastjson.JSON; +//import com.god.base.domain.VehicleMessage; +//import com.god.base.common.constant.RedisConstant; +//import com.god.base.server.webSocket.entity.SocketData; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import javax.websocket.Session; +//import java.util.concurrent.CopyOnWriteArrayList; +// +///** +// * @author swh的mac +// * @Date 2023/11/20 15:51 +// */ +//@Log4j2 +//@Component +//public class SocketHandler { +// //存放每个客户端对应的MyWebSocket +// public static CopyOnWriteArrayList socketList = new CopyOnWriteArrayList<>(); +// +// @Autowired +// private RedisTemplate redisTemplate; +// +// @Scheduled(cron = "0/1 * * * * ?") +// public void sendMessageFlush(){ +// for (SocketData socketData : socketList) { +// Session session = socketData.getSession(); +// VehicleMessage msg = (VehicleMessage) redisTemplate.opsForValue().get(RedisConstant.CURRENT_INFO + ":" + socketData.getVin()); +// try { +// session.getBasicRemote().sendText(JSON.toJSONString(msg)); +// } catch (Exception e) { +// log.info("{}:连接中断",socketData.getSession().getId()); +// socketList.remove(socketData); +// } +// } +// } +//}