master
tangwenkang 2023-12-02 09:20:45 +08:00
parent 66b7f13258
commit 32c8529d50
6 changed files with 123 additions and 7 deletions

View File

@ -108,9 +108,14 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.15</version>
</dependency>
<!-- SpringBoot Websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

View File

@ -88,7 +88,7 @@ public class CarController {
* @return
*/
@PostMapping("/realTime")
public Result realTime(String vin){
public Result realTime(@RequestParam String vin){
return carService.realTime(vin);
}
}

View File

@ -6,6 +6,7 @@ import com.dragon.common.core.domain.Result;
import com.dragon.vehicle.history.common.domain.Car;
import com.dragon.vehicle.history.common.domain.CarType;
import com.dragon.vehicle.history.common.domain.Fence;
import com.dragon.vehicle.history.common.domain.VehicleData;
import com.dragon.vehicle.history.common.domain.req.ReqCar;
import com.dragon.vehicle.history.common.domain.res.ResCar;

View File

@ -8,6 +8,7 @@ import com.dragon.common.redis.service.RedisService;
import com.dragon.vehicle.history.common.domain.Car;
import com.dragon.vehicle.history.common.domain.CarType;
import com.dragon.vehicle.history.common.domain.Fence;
import com.dragon.vehicle.history.common.domain.VehicleData;
import com.dragon.vehicle.history.common.domain.req.ReqCar;
import com.dragon.vehicle.history.common.domain.res.ResCar;
import com.dragon.vehicle.history.server.mapper.CarMapper;
@ -16,8 +17,12 @@ import com.dragon.vehicle.history.server.mapper.FenceMapper;
import com.dragon.vehicle.history.server.service.CarService;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import java.util.List;
@ -137,10 +142,12 @@ public class CarServiceImpl implements CarService {
*/
@Override
public Result realTime(String vin) {
List<String> cacheList = redisService.getCacheList(vin);//根据vin查询出车辆的事件列表
List<String> cacheList = redisService.getCacheList("event_"+vin);//根据vin查询出车辆的事件列表
redisService.deleteObject("event_"+vin);
cacheList.add("runtimeTraceEvent");//添加实时轨迹事件
redisService.setCacheList("event_"+vin,cacheList);//将事件列表存入redis
return Result.success("事件添加成功!");
return Result.success("添加事件成功!");
}
}

View File

@ -0,0 +1,90 @@
package com.dragon.vehicle.history.server.websocket;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Wenkang Tang
* @date 2023/12/1 20:51
* @description
*/
@ServerEndpoint("/ws")
@Service
@Log4j2
public class WebSocketServer {
//连接计数器
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
//存放每个客户端对应的Session对象
private static final CopyOnWriteArraySet<WebSocketServer> SOCKET_SERVERS =new CopyOnWriteArraySet<WebSocketServer>();
/**
*
* @param session
*/
@OnOpen
public void onOpen(Session session){
SOCKET_SERVERS.add(this);
int countSum = ONLINE_COUNT.incrementAndGet();
log.info("有链接加入,当前连接数为:{}",countSum);
}
/**
*
*/
@OnClose
public void onClose(){
SOCKET_SERVERS.remove(this);
int countSum = ONLINE_COUNT.decrementAndGet();
log.info("有链接关闭,当前连接数为:{}",countSum);
}
/**
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message,Session session){
log.info("收到客户端消息:{}",message);
sendMessage(session,"服务端接收到消息,消息内容为:{}"+message);
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session,Throwable error){
log.error("发生错误:{},Session ID:{}",error.getMessage(),session.getId());
error.printStackTrace();
}
/**
*
* @param session
* @param message
*/
public static void sendMessage(Session session, String message){
while (session.isOpen()){
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
session.getBasicRemote().sendText(message);
log.info("客户端开始发送数据");
log.info("发送时间为:"+simpleDateFormat.format(date));
} catch (IOException e) {
log.error("发送消息出现错误:{}",e.getMessage());
e.printStackTrace();
}
}
}
}

View File

@ -23,6 +23,19 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
rabbitmq:
host: 182.254.222.21
port: 5672
template:
mandatory: true
listener:
simple:
prefetch: 1 # 每次取一条消息消费 消费完成取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 支持重试
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl