Merge branch 'master' of https://gitea.qinmian.online/dragon/vechile-parse-system
# Conflicts: # src/main/java/com/parseSystem/event/impl/FaultEvent.java # src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.javamaster
commit
006991864e
|
@ -10,7 +10,7 @@ target/
|
||||||
|
|
||||||
######################################################################
|
######################################################################
|
||||||
# IDE
|
# IDE
|
||||||
|
IDEA/
|
||||||
### STS ###
|
### STS ###
|
||||||
.apt_generated
|
.apt_generated
|
||||||
.classpath
|
.classpath
|
||||||
|
@ -43,4 +43,4 @@ nbdist/
|
||||||
|
|
||||||
!*/build/*.java
|
!*/build/*.java
|
||||||
!*/build/*.html
|
!*/build/*.html
|
||||||
!*/build/*.xml
|
!*/build/*.xml
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
#起始镜像
|
||||||
|
FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6
|
||||||
|
#暴露端口号
|
||||||
|
EXPOSE 8068
|
||||||
|
#挂载目录的位置
|
||||||
|
VOLUME /home/logs/parseSystem
|
||||||
|
#构建复制外部文件到docker
|
||||||
|
COPY /target/parsesystem.jar /home/app.jar
|
||||||
|
#工作目录 exec -it 进入容器内部后的默认的起始目录
|
||||||
|
WORKDIR /home
|
||||||
|
ENV TIME_ZONE Asia/Shanghai
|
||||||
|
#指定东八区
|
||||||
|
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||||
|
|
||||||
|
#启动java 程序
|
||||||
|
ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"]
|
||||||
|
|
||||||
|
|
32
pom.xml
32
pom.xml
|
@ -5,7 +5,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<groupId>com.parseSystem</groupId>
|
<groupId>com.parseSystem</groupId>
|
||||||
<artifactId>parseSystem</artifactId>
|
<artifactId>parsesystem</artifactId>
|
||||||
<version>3.6.3</version>
|
<version>3.6.3</version>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -19,6 +19,11 @@
|
||||||
<artifactId>spring-boot-starter</artifactId>
|
<artifactId>spring-boot-starter</artifactId>
|
||||||
<version>2.7.15</version>
|
<version>2.7.15</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
<version>2.7.15</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-web</artifactId>
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
@ -68,6 +73,7 @@
|
||||||
<artifactId>dragon-common-redis</artifactId>
|
<artifactId>dragon-common-redis</artifactId>
|
||||||
<version>3.6.3</version>
|
<version>3.6.3</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<distributionManagement>
|
<distributionManagement>
|
||||||
<repository>
|
<repository>
|
||||||
|
@ -91,4 +97,28 @@
|
||||||
</releases>
|
</releases>
|
||||||
</repository>
|
</repository>
|
||||||
</repositories>
|
</repositories>
|
||||||
|
<build>
|
||||||
|
<finalName>${project.artifactId}</finalName>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<goals>
|
||||||
|
<goal>repackage</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-deploy-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<skip>true</skip>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>16</source><target>16</target></configuration></plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -2,6 +2,7 @@ package com.parseSystem;
|
||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author 冯凯
|
* @author 冯凯
|
||||||
|
@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
* @date 2023/11/25 15:34
|
* @date 2023/11/25 15:34
|
||||||
*/
|
*/
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
|
@EnableScheduling
|
||||||
public class ParseSystemApplication {
|
public class ParseSystemApplication {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,25 +1,20 @@
|
||||||
package com.parseSystem.event;
|
package com.parseSystem.event;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.dragon.common.redis.service.RedisService;
|
import com.dragon.common.redis.service.RedisService;
|
||||||
import com.parseSystem.utils.SpringUtils;
|
import com.parseSystem.utils.SpringUtils;
|
||||||
import com.parseSystem.vehicle.VehicleData;
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 事件处理接口
|
||||||
*
|
*
|
||||||
*事件处理接口
|
|
||||||
* @author 冯凯
|
* @author 冯凯
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
* @description:
|
* @description:
|
||||||
|
@ -29,41 +24,39 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
@Log4j2
|
@Log4j2
|
||||||
public class EventHandlerService {
|
public class EventHandlerService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件处理map
|
||||||
|
*/
|
||||||
|
public static final Map<String, List<String>> eventMap = new ConcurrentHashMap<>();
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisService redisService;
|
private RedisService redisService;
|
||||||
|
|
||||||
/**
|
|
||||||
* 事件处理map
|
|
||||||
*/
|
|
||||||
public static final Map<String, List<String>> eventMap=new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 注册事件
|
* 注册事件
|
||||||
|
*
|
||||||
* @param vin
|
* @param vin
|
||||||
* @param eventServiceName
|
* @param eventServiceName
|
||||||
*/
|
*/
|
||||||
public void registerEvent(String vin,String eventServiceName){
|
public void registerEvent(String vin, String eventServiceName) {
|
||||||
getEventList(vin).add(eventServiceName);
|
getEventList(vin).add(eventServiceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 移除事件
|
* 移除事件
|
||||||
|
*
|
||||||
* @param vin
|
* @param vin
|
||||||
* @param eventServiceName
|
* @param eventServiceName
|
||||||
*/
|
*/
|
||||||
public void removeEvent(String vin,String eventServiceName){
|
public void removeEvent(String vin, String eventServiceName) {
|
||||||
getEventList(vin).remove(eventServiceName);
|
getEventList(vin).remove(eventServiceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 执行事件
|
* 执行事件
|
||||||
|
*
|
||||||
* @param vehicleData
|
* @param vehicleData
|
||||||
*/
|
*/
|
||||||
public void executeEvent(VehicleData vehicleData){
|
public void executeEvent(VehicleData vehicleData) {
|
||||||
List<String> eventList = getEventList(vehicleData.getVin());
|
List<String> eventList = getEventList(vehicleData.getVin());
|
||||||
eventList.forEach(eventServiceName -> {
|
eventList.forEach(eventServiceName -> {
|
||||||
VehicleEventService vehicleEventService = SpringUtils.getBean(eventServiceName);
|
VehicleEventService vehicleEventService = SpringUtils.getBean(eventServiceName);
|
||||||
|
@ -73,16 +66,17 @@ public class EventHandlerService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取事件列表
|
* 获取事件列表
|
||||||
|
*
|
||||||
* @param vin
|
* @param vin
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public List<String> getEventList(String vin){
|
public List<String> getEventList(String vin) {
|
||||||
List<String> cacheList = redisService.getCacheList("event_VIN123456789");
|
List<String> cacheList = redisService.getCacheList("event_VIN123456789");
|
||||||
eventMap.put(vin,cacheList);
|
eventMap.put(vin, cacheList);
|
||||||
List<String> eventList = eventMap.get(vin);
|
List<String> eventList = eventMap.get(vin);
|
||||||
if (eventList==null){
|
if (eventList == null) {
|
||||||
ArrayList<String> list = new ArrayList<>();
|
ArrayList<String> list = new ArrayList<>();
|
||||||
eventMap.put(vin,list);
|
eventMap.put(vin, list);
|
||||||
}
|
}
|
||||||
return eventList;
|
return eventList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
package com.parseSystem.event.impl;
|
package com.parseSystem.event.impl;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.dragon.common.redis.service.RedisService;
|
import com.dragon.common.redis.service.RedisService;
|
||||||
import com.parseSystem.event.EventHandlerService;
|
import com.parseSystem.event.EventHandlerService;
|
||||||
import com.parseSystem.event.VehicleEventService;
|
import com.parseSystem.event.VehicleEventService;
|
||||||
import com.parseSystem.utils.eventRuleJudge.PolygonUtil;
|
import com.parseSystem.utils.eventRuleJudge.PolygonUtil;
|
||||||
|
import com.parseSystem.vehicle.FenceData;
|
||||||
import com.parseSystem.vehicle.VehicleData;
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@ -23,6 +26,7 @@ import java.util.stream.Collectors;
|
||||||
* @date 2023/11/27 20:57
|
* @date 2023/11/27 20:57
|
||||||
*/
|
*/
|
||||||
@Service("fenceEvent")
|
@Service("fenceEvent")
|
||||||
|
@Log4j2
|
||||||
public class FenceEvent extends EventHandlerService implements VehicleEventService {
|
public class FenceEvent extends EventHandlerService implements VehicleEventService {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
|
@ -33,25 +37,40 @@ public class FenceEvent extends EventHandlerService implements VehicleEventServi
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void executeEvent(VehicleData vehicleData) {
|
public void executeEvent(VehicleData vehicleData) {
|
||||||
|
//获取报文解析的车辆vin
|
||||||
String vin = vehicleData.getVin();
|
String vin = vehicleData.getVin();
|
||||||
List<String> fenceList = redisService.getCacheList("fence_VIN123456789");
|
List<VehicleData> arr1 = null;
|
||||||
|
arr1.add(vehicleData);
|
||||||
|
//redis为(vin,fenceList)即vin为键,电子围栏为值存入redis
|
||||||
|
List<String> fenceList = redisService.getCacheList("fence_"+vin);
|
||||||
|
redisService.setCacheList("fence_VIN123456789",arr1);
|
||||||
|
//将报文的经纬度赋值给变量
|
||||||
Double latitude = Double.valueOf(vehicleData.getLatitude()); //实时纬度
|
Double latitude = Double.valueOf(vehicleData.getLatitude()); //实时纬度
|
||||||
Double longitude = Double.valueOf(vehicleData.getLongitude());
|
Double longitude = Double.valueOf(vehicleData.getLongitude());
|
||||||
|
//创建工具类使用所需要的对象变量类赋值
|
||||||
Point2D.Double point = new Point2D.Double();
|
Point2D.Double point = new Point2D.Double();
|
||||||
|
//赋值
|
||||||
point.setLocation(longitude,latitude);
|
point.setLocation(longitude,latitude);
|
||||||
List<Point2D.Double> pts = new ArrayList<>();
|
//循环切割redis中拿到的电子围栏数据
|
||||||
fenceList.stream().forEach(item -> {
|
fenceList.stream().forEach(item -> {
|
||||||
|
//将围栏存放的信息转型拿到每个对象的属性
|
||||||
|
FenceData fenceData = JSONObject.parseObject(item, FenceData.class);
|
||||||
//循环得到每个电子围栏的坐标拼接的一个以;为分割点表示多边形的一个字符串
|
//循环得到每个电子围栏的坐标拼接的一个以;为分割点表示多边形的一个字符串
|
||||||
List<Point2D.Double> locationPts = Arrays.stream(item.split(";"))
|
List<Point2D.Double> locationPts = Arrays.stream(item.split(";"))
|
||||||
.map(s -> s.split(","))
|
.map(s -> s.split(","))
|
||||||
.map(arr -> new Point2D.Double(Double.valueOf(arr[0]), Double.valueOf(arr[1])))
|
.map(arr -> new Point2D.Double(Double.valueOf(arr[0]), Double.valueOf(arr[1])))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
boolean inPolygon = PolygonUtil.isInPolygon(point, locationPts);
|
boolean inPolygon = PolygonUtil.isInPolygon(point, locationPts);
|
||||||
if (inPolygon){
|
//判断如果在围栏平面内,则判断围栏的报警状态 0-驶入警告 1-驶出警告
|
||||||
System.out.println("在电子围栏内");
|
if (inPolygon == true && fenceData.getAlarmType()==0){
|
||||||
}else{
|
log.info("----当前车辆已驶入围栏 警告!!!");
|
||||||
System.out.println("在电子围栏外");
|
}else if(inPolygon == false && fenceData.getAlarmType()==1){
|
||||||
|
log.info("----当前车辆已驶出围栏 警告!!!");
|
||||||
|
}else {
|
||||||
|
log.info("----该车辆为绑定围栏");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,20 @@
|
||||||
package com.parseSystem.event.impl;
|
package com.parseSystem.event.impl;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.dragon.common.redis.service.RedisService;
|
||||||
import com.parseSystem.event.EventHandlerService;
|
import com.parseSystem.event.EventHandlerService;
|
||||||
import com.parseSystem.event.VehicleEventService;
|
import com.parseSystem.event.VehicleEventService;
|
||||||
import com.parseSystem.vehicle.VehicleData;
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.xml.crypto.Data;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* 实时轨迹
|
* 实时轨迹
|
||||||
|
@ -15,6 +25,8 @@ import org.springframework.stereotype.Service;
|
||||||
*/
|
*/
|
||||||
@Service("runtimeTraceEvent")
|
@Service("runtimeTraceEvent")
|
||||||
public class RuntimeTraceEvent extends EventHandlerService implements VehicleEventService {
|
public class RuntimeTraceEvent extends EventHandlerService implements VehicleEventService {
|
||||||
|
@Autowired
|
||||||
|
private RedisService redisService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 实时轨迹事件执行逻辑
|
* 实时轨迹事件执行逻辑
|
||||||
|
@ -22,6 +34,6 @@ public class RuntimeTraceEvent extends EventHandlerService implements VehicleEve
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void executeEvent(VehicleData vehicleData) {
|
public void executeEvent(VehicleData vehicleData) {
|
||||||
System.out.println("你好实时轨迹");
|
redisService.setCacheObject("runtimeTraceEvent_"+vehicleData.getVin(),vehicleData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package com.parseSystem.kafka.ConsumerConfig;
|
package com.parseSystem.kafka.ConsumerConfig;
|
||||||
|
|
||||||
|
import com.parseSystem.storage.service.StorageDateService;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -15,8 +17,10 @@ import java.util.Properties;
|
||||||
@Component
|
@Component
|
||||||
public class KafkaConsumerConfig {
|
public class KafkaConsumerConfig {
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public KafkaConsumer<String, String> consumerInit() {
|
public KafkaConsumer<String, String> consumerInit() {
|
||||||
|
|
||||||
KafkaConsumer<String, String> consumer;
|
KafkaConsumer<String, String> consumer;
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put("bootstrap.servers", "117.72.43.22:9092");
|
properties.put("bootstrap.servers", "117.72.43.22:9092");
|
||||||
|
|
|
@ -3,7 +3,7 @@ package com.parseSystem.rabbitmq;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.parseSystem.event.EventHandlerService;
|
import com.parseSystem.event.EventHandlerService;
|
||||||
import com.parseSystem.kafka.constant.kafkaConstants;
|
import com.parseSystem.kafka.constant.kafkaConstants;
|
||||||
import com.parseSystem.storage.service.StorageDateService;
|
import com.parseSystem.utils.DataQueueManager;
|
||||||
import com.parseSystem.utils.ParseUtil;
|
import com.parseSystem.utils.ParseUtil;
|
||||||
import com.parseSystem.vehicle.VehicleData;
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
|
@ -23,25 +23,24 @@ import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 监听车辆事件变更消费队列
|
||||||
*
|
*
|
||||||
* 监听事件更改 Rabbit Mq
|
|
||||||
* @author 冯凯
|
* @author 冯凯
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
*
|
|
||||||
* @date 2023/11/27 22:19
|
* @date 2023/11/27 22:19
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Log4j2
|
@Log4j2
|
||||||
public class ListenEventChangeRabbitMq {
|
public class ListenEventChangeRabbitMq {
|
||||||
@Autowired
|
|
||||||
private KafkaConsumer<String, String> consumer; // 自动装配一个String类型键和String类型值的Kafka消费者
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private StorageDateService storageDateService; // 自动装配StorageDateService对象
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private EventHandlerService eventHandlerService; // 自动装配EventHandlerService对象
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DataQueueManager dataQueueManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaConsumer<String, String> consumer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private EventHandlerService eventHandlerService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RabbitMQ消费者,订阅"kafka_top"队列中的消息并处理
|
* RabbitMQ消费者,订阅"kafka_top"队列中的消息并处理
|
||||||
|
@ -52,7 +51,7 @@ public class ListenEventChangeRabbitMq {
|
||||||
*/
|
*/
|
||||||
@RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")})
|
@RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")})
|
||||||
public void consumerSubscribe(String mesg, Message message, Channel channel) {
|
public void consumerSubscribe(String mesg, Message message, Channel channel) {
|
||||||
log.info("收到准备订阅主题:" + mesg);
|
log.info("收到准备订阅主题:{}", mesg);
|
||||||
|
|
||||||
// 将接收到的消息解析为kafkaConstants对象
|
// 将接收到的消息解析为kafkaConstants对象
|
||||||
kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class);
|
kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class);
|
||||||
|
@ -66,35 +65,31 @@ public class ListenEventChangeRabbitMq {
|
||||||
|
|
||||||
// 将消费者订阅的主题设置为"kafka_top"队列的第一个分区
|
// 将消费者订阅的主题设置为"kafka_top"队列的第一个分区
|
||||||
consumer.assign(Collections.singleton(topicPartition));
|
consumer.assign(Collections.singleton(topicPartition));
|
||||||
|
|
||||||
// 开启线程持续拉取数据
|
// 开启线程持续拉取数据
|
||||||
while (true) {
|
while (true) {
|
||||||
ConsumerRecords<String, String> records = null;
|
ConsumerRecords<String, String> records = null;
|
||||||
try {
|
try {
|
||||||
// 每个一秒钟拉取一次数据
|
// 每个一秒钟拉取一次数据
|
||||||
records = consumer.poll(Duration.ofMillis(1000));
|
records = consumer.poll(Duration.ofMillis(1000));
|
||||||
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
System.out.println(record.value());
|
||||||
System.out.println(record.value());
|
long startTime = System.currentTimeMillis();
|
||||||
|
log.info("开始消费时间{}:", startTime);
|
||||||
// 解析数据
|
// 解析数据
|
||||||
String data = ParseUtil.sixteenToStr(record.value());
|
String data = ParseUtil.sixteenToStr(record.value());
|
||||||
|
// 构建数据对象
|
||||||
// 构建数据对象
|
VehicleData vehicleData = VehicleData.getBuild(data);
|
||||||
VehicleData vehicleData = VehicleData.getBuild(data);
|
log.info(vehicleData);
|
||||||
|
dataQueueManager.enqueueData(vehicleData);
|
||||||
// 存储数据到tiDB
|
// 调用执行事件
|
||||||
storageDateService.save(vehicleData);
|
eventHandlerService.executeEvent(vehicleData);
|
||||||
System.out.println(vehicleData);
|
log.info("耗费时间{}", System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
// 调用执行事件
|
|
||||||
eventHandlerService.executeEvent(vehicleData);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.info("records: {}", records);
|
|
||||||
log.error(e);
|
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("records: {}", records);
|
||||||
|
log.error(e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// 订阅特定分区
|
// 订阅特定分区
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
package com.parseSystem.utils;
|
||||||
|
|
||||||
|
import com.parseSystem.storage.service.StorageDateService;
|
||||||
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author 冯凯
|
||||||
|
* @version 1.0
|
||||||
|
* @description:
|
||||||
|
* @date 2023/12/1 20:47
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Log4j2
|
||||||
|
public class DataQueueManager {
|
||||||
|
|
||||||
|
private int count;
|
||||||
|
@Autowired
|
||||||
|
private StorageDateService storageDateService;
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private BlockingQueue<VehicleData> dataQueue1;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private BlockingQueue<VehicleData> dataQueue2;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public void enqueueData(VehicleData vehicleData) {
|
||||||
|
// 将数据放入第一个队列中,如果已满,则放入第二个队列中
|
||||||
|
if (!dataQueue1.offer(vehicleData)) {
|
||||||
|
log.warn("第一个队列已满,将数据放入第二个队列");
|
||||||
|
dataQueue2.offer(vehicleData);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Scheduled(fixedRate = 5000)
|
||||||
|
public void processQueues() {
|
||||||
|
// 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作
|
||||||
|
if (!dataQueue1.isEmpty()) {
|
||||||
|
|
||||||
|
List<VehicleData> batchData = new ArrayList<>();
|
||||||
|
count+=batchData.size();
|
||||||
|
dataQueue1.drainTo(batchData);
|
||||||
|
storageDateService.saveBatch(batchData);
|
||||||
|
}
|
||||||
|
// 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作
|
||||||
|
else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) {
|
||||||
|
log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作");
|
||||||
|
List<VehicleData> batchData = new ArrayList<>();
|
||||||
|
count+=batchData.size();
|
||||||
|
dataQueue2.drainTo(batchData);
|
||||||
|
storageDateService.saveBatch(batchData);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package com.parseSystem.utils;
|
||||||
|
|
||||||
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author 冯凯
|
||||||
|
* @version 1.0
|
||||||
|
* @description:
|
||||||
|
* @date 2023/12/1 21:15
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class QueueConfig {
|
||||||
|
@Bean
|
||||||
|
public BlockingQueue<VehicleData> dataQueue1() {
|
||||||
|
return new LinkedBlockingQueue<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public BlockingQueue<VehicleData> dataQueue2() {
|
||||||
|
return new LinkedBlockingQueue<>();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
package com.parseSystem.vehicle;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 电子围栏实体类
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class FenceData {
|
||||||
|
/**
|
||||||
|
* 字段属性:围栏主键id
|
||||||
|
*/
|
||||||
|
@TableId(type = IdType.AUTO)
|
||||||
|
private Integer fenceId;
|
||||||
|
/**
|
||||||
|
* 字段属性:围栏名称
|
||||||
|
*/
|
||||||
|
private String fenceName;
|
||||||
|
/**
|
||||||
|
* 字段属性:围栏数据
|
||||||
|
*/
|
||||||
|
private String fenceData;
|
||||||
|
/**s
|
||||||
|
* 字段属性:围栏状态
|
||||||
|
*/
|
||||||
|
private Integer status;
|
||||||
|
/**
|
||||||
|
* 字段属性:告警类型
|
||||||
|
*/
|
||||||
|
private Integer alarmType;
|
||||||
|
/**
|
||||||
|
* 字段属性:围栏标签id
|
||||||
|
*/
|
||||||
|
private Integer fenceTagId;
|
||||||
|
/**
|
||||||
|
* 字段属性:创建人
|
||||||
|
*/
|
||||||
|
private String createdBy;
|
||||||
|
/**
|
||||||
|
* 字段属性:创建时间
|
||||||
|
*/
|
||||||
|
private Date createdTime;
|
||||||
|
/**
|
||||||
|
* 字段属性:更新人
|
||||||
|
*/
|
||||||
|
private String updatedBy;
|
||||||
|
/**
|
||||||
|
* 字段属性:更新时间
|
||||||
|
*/
|
||||||
|
private Date updatedTime;
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ import java.util.Date;
|
||||||
@ToString
|
@ToString
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@TableName("vehicle_data")
|
@TableName("vehicle_data_copy1")
|
||||||
public class VehicleData {
|
public class VehicleData {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
spring:
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
|
redis:
|
||||||
|
host: 10.100.1.2
|
||||||
|
port: 6379
|
||||||
|
password:
|
||||||
|
application:
|
||||||
|
name: parseSystem
|
||||||
|
datasource:
|
||||||
|
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||||
|
url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
||||||
|
server:
|
||||||
|
port: 8097
|
|
@ -1,4 +1,6 @@
|
||||||
spring:
|
spring:
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
redis:
|
redis:
|
||||||
host: 10.100.1.2
|
host: 10.100.1.2
|
||||||
port: 6379
|
port: 6379
|
||||||
|
@ -7,11 +9,11 @@ spring:
|
||||||
name: parseSystem
|
name: parseSystem
|
||||||
datasource:
|
datasource:
|
||||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||||
url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
url: jdbc:mysql://117.72.43.22:4000/test1?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
||||||
username: root
|
username: root
|
||||||
password: 123456
|
password: 123456
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
host: 47.120.38.248
|
host: 182.254.222.21
|
||||||
port: 5672
|
port: 5672
|
||||||
template:
|
template:
|
||||||
mandatory: true
|
mandatory: true
|
||||||
|
@ -24,4 +26,4 @@ spring:
|
||||||
publisher-confirms: true #确认消息已发送到交换机(Exchange)
|
publisher-confirms: true #确认消息已发送到交换机(Exchange)
|
||||||
publisher-returns: true #确认消息已发送到队列(Queue)
|
publisher-returns: true #确认消息已发送到队列(Queue)
|
||||||
server:
|
server:
|
||||||
port: 8066
|
port: 8068
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
import com.parseSystem.ParseSystemApplication;
|
||||||
|
import com.parseSystem.event.impl.FenceEvent;
|
||||||
|
import com.parseSystem.vehicle.VehicleData;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@SpringBootTest(classes = ParseSystemApplication.class)
|
||||||
|
public class Test {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private FenceEvent fenceEvent;
|
||||||
|
@org.junit.jupiter.api.Test
|
||||||
|
public void aa(){
|
||||||
|
ArrayList<VehicleData> vehicleData = new ArrayList<>();
|
||||||
|
VehicleData vehicleData1 = new VehicleData();
|
||||||
|
//车辆当前地位
|
||||||
|
vehicleData1.setVin("1234567989000");
|
||||||
|
vehicleData1.setLongitude("116.48965348217772");
|
||||||
|
vehicleData1.setLatitude("39.90816602515441");
|
||||||
|
|
||||||
|
//车辆一分钟后的位置
|
||||||
|
VehicleData vehicleData2 = new VehicleData();
|
||||||
|
vehicleData2.setVin("1234567989000");
|
||||||
|
vehicleData2.setLongitude("117.48965348217772");
|
||||||
|
vehicleData2.setLatitude("40.90816602515441");
|
||||||
|
|
||||||
|
//车辆二分钟后的位置
|
||||||
|
VehicleData vehicleData3 = new VehicleData();
|
||||||
|
vehicleData3.setVin("1234567989000");
|
||||||
|
vehicleData3.setLongitude("118.48965348217772");
|
||||||
|
vehicleData3.setLatitude("41.90816602515441");
|
||||||
|
|
||||||
|
vehicleData.add(vehicleData1);
|
||||||
|
vehicleData.add(vehicleData2);
|
||||||
|
vehicleData.add(vehicleData3);
|
||||||
|
|
||||||
|
for (VehicleData vehicleDatum : vehicleData) {
|
||||||
|
fenceEvent.executeEvent(vehicleDatum);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue