From 32c8529d50390906940dfef2f629b7ef238f4c45 Mon Sep 17 00:00:00 2001 From: tangwenkang <2720983602@qq.com> Date: Sat, 2 Dec 2023 09:20:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vehicle-history-server/pom.xml | 11 ++- .../server/controller/CarController.java | 2 +- .../history/server/service/CarService.java | 1 + .../server/service/impl/CarServiceImpl.java | 13 ++- .../server/websocket/WebSocketServer.java | 90 +++++++++++++++++++ .../src/main/resources/bootstrap.yml | 13 +++ 6 files changed, 123 insertions(+), 7 deletions(-) create mode 100644 vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/websocket/WebSocketServer.java diff --git a/vehicle-history-server/pom.xml b/vehicle-history-server/pom.xml index b34d31c..d096799 100644 --- a/vehicle-history-server/pom.xml +++ b/vehicle-history-server/pom.xml @@ -108,9 +108,14 @@ - org.apache.kafka - kafka-clients - 3.1.0 + org.springframework.boot + spring-boot-starter-amqp + 2.7.15 + + + + org.springframework.boot + spring-boot-starter-websocket diff --git a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/controller/CarController.java b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/controller/CarController.java index 832204c..b98aa9a 100644 --- a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/controller/CarController.java +++ b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/controller/CarController.java @@ -88,7 +88,7 @@ public class CarController { * @return */ @PostMapping("/realTime") - public Result realTime(String vin){ + public Result realTime(@RequestParam String vin){ return carService.realTime(vin); } } diff --git a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/CarService.java b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/CarService.java index 46cc4ea..4d72ab4 100644 --- a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/CarService.java +++ b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/CarService.java @@ -6,6 +6,7 @@ import com.dragon.common.core.domain.Result; import com.dragon.vehicle.history.common.domain.Car; import com.dragon.vehicle.history.common.domain.CarType; import com.dragon.vehicle.history.common.domain.Fence; +import com.dragon.vehicle.history.common.domain.VehicleData; import com.dragon.vehicle.history.common.domain.req.ReqCar; import com.dragon.vehicle.history.common.domain.res.ResCar; diff --git a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/impl/CarServiceImpl.java b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/impl/CarServiceImpl.java index 285a054..6ea7425 100644 --- a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/impl/CarServiceImpl.java +++ b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/service/impl/CarServiceImpl.java @@ -8,6 +8,7 @@ import com.dragon.common.redis.service.RedisService; import com.dragon.vehicle.history.common.domain.Car; import com.dragon.vehicle.history.common.domain.CarType; import com.dragon.vehicle.history.common.domain.Fence; +import com.dragon.vehicle.history.common.domain.VehicleData; import com.dragon.vehicle.history.common.domain.req.ReqCar; import com.dragon.vehicle.history.common.domain.res.ResCar; import com.dragon.vehicle.history.server.mapper.CarMapper; @@ -16,8 +17,12 @@ import com.dragon.vehicle.history.server.mapper.FenceMapper; import com.dragon.vehicle.history.server.service.CarService; import com.github.yulichang.wrapper.MPJLambdaWrapper; import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.rabbitmq.client.Channel; import java.util.List; @@ -137,10 +142,12 @@ public class CarServiceImpl implements CarService { */ @Override public Result realTime(String vin) { - List cacheList = redisService.getCacheList(vin);//根据vin查询出车辆的事件列表 + List cacheList = redisService.getCacheList("event_"+vin);//根据vin查询出车辆的事件列表 + redisService.deleteObject("event_"+vin); cacheList.add("runtimeTraceEvent");//添加实时轨迹事件 redisService.setCacheList("event_"+vin,cacheList);//将事件列表存入redis - - return Result.success("事件添加成功!"); + return Result.success("添加事件成功!"); } + + } diff --git a/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/websocket/WebSocketServer.java b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/websocket/WebSocketServer.java new file mode 100644 index 0000000..5bb1efc --- /dev/null +++ b/vehicle-history-server/src/main/java/com/dragon/vehicle/history/server/websocket/WebSocketServer.java @@ -0,0 +1,90 @@ +package com.dragon.vehicle.history.server.websocket; + +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; + +import javax.websocket.*; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Wenkang Tang + * @date 2023/12/1 20:51 + * @description + */ +@ServerEndpoint("/ws") +@Service +@Log4j2 +public class WebSocketServer { + //连接计数器 + private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0); + //存放每个客户端对应的Session对象 + private static final CopyOnWriteArraySet SOCKET_SERVERS =new CopyOnWriteArraySet(); + + /** + * 连接建立成功调用的方法 + * @param session + */ + @OnOpen + public void onOpen(Session session){ + SOCKET_SERVERS.add(this); + int countSum = ONLINE_COUNT.incrementAndGet(); + log.info("有链接加入,当前连接数为:{}",countSum); + } + + /** + * 连接关闭调用的方法 + */ + @OnClose + public void onClose(){ + SOCKET_SERVERS.remove(this); + int countSum = ONLINE_COUNT.decrementAndGet(); + log.info("有链接关闭,当前连接数为:{}",countSum); + } + + /** + * 收到客户端消息后调用的方法 + * @param message 客户端发送过来的消息 + * @param session + */ + @OnMessage + public void onMessage(String message,Session session){ + log.info("收到客户端消息:{}",message); + sendMessage(session,"服务端接收到消息,消息内容为:{}"+message); + } + + /** + * 发生错误调用的方法 + * @param session + * @param error + */ + @OnError + public void onError(Session session,Throwable error){ + log.error("发生错误:{},Session ID:{}",error.getMessage(),session.getId()); + error.printStackTrace(); + } + + /** + * 发送消息 + * @param session + * @param message + */ + public static void sendMessage(Session session, String message){ + while (session.isOpen()){ + Date date = new Date(); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try { + session.getBasicRemote().sendText(message); + log.info("客户端开始发送数据"); + log.info("发送时间为:"+simpleDateFormat.format(date)); + } catch (IOException e) { + log.error("发送消息出现错误:{}",e.getMessage()); + e.printStackTrace(); + } + } + } +} diff --git a/vehicle-history-server/src/main/resources/bootstrap.yml b/vehicle-history-server/src/main/resources/bootstrap.yml index 22f7695..7adc75d 100644 --- a/vehicle-history-server/src/main/resources/bootstrap.yml +++ b/vehicle-history-server/src/main/resources/bootstrap.yml @@ -23,6 +23,19 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + rabbitmq: + host: 182.254.222.21 + port: 5672 + template: + mandatory: true + listener: + simple: + prefetch: 1 # 每次取一条消息消费 消费完成取下一条 + acknowledge-mode: manual # 设置消费端手动ack确认 + retry: + enabled: true # 支持重试 + publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue) mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl