初始化 车辆解析系统
commit
0d79359080
|
@ -0,0 +1,46 @@
|
|||
######################################################################
|
||||
# Build Tools
|
||||
|
||||
.gradle
|
||||
/build/
|
||||
!gradle/wrapper/gradle-wrapper.jar
|
||||
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
|
||||
######################################################################
|
||||
# IDE
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### JRebel ###
|
||||
rebel.xml
|
||||
### NetBeans ###
|
||||
nbproject/private/
|
||||
build/*
|
||||
nbbuild/
|
||||
dist/
|
||||
nbdist/
|
||||
.nb-gradle/
|
||||
|
||||
######################################################################
|
||||
# Others
|
||||
*.log
|
||||
*.xml.versionsBackup
|
||||
*.swp
|
||||
|
||||
!*/build/*.java
|
||||
!*/build/*.html
|
||||
!*/build/*.xml
|
|
@ -0,0 +1,94 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.parseSystem</groupId>
|
||||
<artifactId>parseSystem</artifactId>
|
||||
<version>3.6.3</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
<version>2.7.15</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>2.7.15</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.29</version>
|
||||
</dependency>
|
||||
<!-- mybatis - plus 依赖 -->
|
||||
<dependency>
|
||||
<groupId>com.baomidou</groupId>
|
||||
<artifactId>mybatis-plus-boot-starter</artifactId>
|
||||
<version>3.5.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>3.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.28</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
<version>2.7.15</version>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringBoot Boot Redis -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
<version>2.7.15</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>2.0.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.dragon</groupId>
|
||||
<artifactId>dragon-common-redis</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>dragon-release</id>
|
||||
<name>dragon-releases</name>
|
||||
<url>http://10.100.1.7:8081/repository/maven-releases/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>dragon-public</id>
|
||||
<name>dragon-maven</name>
|
||||
<url>http://10.100.1.7:8081/repository/maven-public/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>public</id>
|
||||
<name>aliyun nexus</name>
|
||||
<url>http://10.100.1.7:8081/repository/maven-releases/</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
</repository>
|
||||
</repositories>
|
||||
</project>
|
|
@ -0,0 +1,24 @@
|
|||
package com.parseSystem;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/25 15:34
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class ParseSystemApplication {
|
||||
|
||||
/**
|
||||
* 主方法
|
||||
*
|
||||
* @param args 命令行参数
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ParseSystemApplication.class,args);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package com.parseSystem.event;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.dragon.common.redis.service.RedisService;
|
||||
import com.parseSystem.utils.SpringUtils;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
*
|
||||
*事件处理接口
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/27 20:54
|
||||
*/
|
||||
@Component
|
||||
@Log4j2
|
||||
public class EventHandlerService {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
/**
|
||||
* 事件处理map
|
||||
*/
|
||||
public static final Map<String, List<String>> eventMap=new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 注册事件
|
||||
* @param vin
|
||||
* @param eventServiceName
|
||||
*/
|
||||
public void registerEvent(String vin,String eventServiceName){
|
||||
getEventList(vin).add(eventServiceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除事件
|
||||
* @param vin
|
||||
* @param eventServiceName
|
||||
*/
|
||||
public void removeEvent(String vin,String eventServiceName){
|
||||
getEventList(vin).remove(eventServiceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行事件
|
||||
* @param vehicleData
|
||||
*/
|
||||
public void executeEvent(VehicleData vehicleData){
|
||||
List<String> eventList = getEventList(vehicleData.getVin());
|
||||
eventList.forEach(eventServiceName -> {
|
||||
VehicleEventService vehicleEventService = SpringUtils.getBean(eventServiceName);
|
||||
vehicleEventService.executeEvent(vehicleData);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取事件列表
|
||||
* @param vin
|
||||
* @return
|
||||
*/
|
||||
public List<String> getEventList(String vin){
|
||||
List<String> cacheList = redisService.getCacheList("event_VIN123456789");
|
||||
eventMap.put(vin,cacheList);
|
||||
List<String> eventList = eventMap.get(vin);
|
||||
if (eventList==null){
|
||||
ArrayList<String> list = new ArrayList<>();
|
||||
eventMap.put(vin,list);
|
||||
}
|
||||
return eventList;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package com.parseSystem.event;
|
||||
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
|
||||
/**
|
||||
*
|
||||
*事件执行接口
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @date 2023/11/27 22:45
|
||||
*/
|
||||
public interface VehicleEventService {
|
||||
|
||||
/**
|
||||
* 事件执行抽象逻辑
|
||||
* @param vehicleData
|
||||
*/
|
||||
void executeEvent(VehicleData vehicleData);
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.parseSystem.event.impl;
|
||||
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.event.VehicleEventService;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
*
|
||||
* 故障事件
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
*
|
||||
* @date 2023/11/27 20:59
|
||||
*/
|
||||
@Service("faultEvent")
|
||||
public class FaultEvent extends EventHandlerService implements VehicleEventService {
|
||||
|
||||
/**
|
||||
* 故障事件执行逻辑
|
||||
* @param vehicleData
|
||||
*/
|
||||
@Override
|
||||
public void executeEvent(VehicleData vehicleData) {
|
||||
System.out.println("你好故障");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package com.parseSystem.event.impl;
|
||||
|
||||
import com.dragon.common.redis.service.RedisService;
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.event.VehicleEventService;
|
||||
import com.parseSystem.utils.eventRuleJudge.PolygonUtil;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.awt.geom.Point2D;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*
|
||||
* 电子围栏事件
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
*
|
||||
* @date 2023/11/27 20:57
|
||||
*/
|
||||
@Service("fenceEvent")
|
||||
public class FenceEvent extends EventHandlerService implements VehicleEventService {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
/**
|
||||
* 电子围栏事件执行逻辑
|
||||
* @param vehicleData
|
||||
*/
|
||||
@Override
|
||||
public void executeEvent(VehicleData vehicleData) {
|
||||
String vin = vehicleData.getVin();
|
||||
List<String> fenceList = redisService.getCacheList("fence_VIN123456789");
|
||||
Double latitude = Double.valueOf(vehicleData.getLatitude()); //实时纬度
|
||||
Double longitude = Double.valueOf(vehicleData.getLongitude());
|
||||
Point2D.Double point = new Point2D.Double();
|
||||
point.setLocation(longitude,latitude);
|
||||
List<Point2D.Double> pts = new ArrayList<>();
|
||||
fenceList.stream().forEach(item -> {
|
||||
//循环得到每个电子围栏的坐标拼接的一个以;为分割点表示多边形的一个字符串
|
||||
List<Point2D.Double> locationPts = Arrays.stream(item.split(";"))
|
||||
.map(s -> s.split(","))
|
||||
.map(arr -> new Point2D.Double(Double.valueOf(arr[0]), Double.valueOf(arr[1])))
|
||||
.collect(Collectors.toList());
|
||||
boolean inPolygon = PolygonUtil.isInPolygon(point, locationPts);
|
||||
if (inPolygon){
|
||||
System.out.println("在电子围栏内");
|
||||
}else{
|
||||
System.out.println("在电子围栏外");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.parseSystem.event.impl;
|
||||
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.event.VehicleEventService;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 历史轨迹事假
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
*
|
||||
* @date 2023/11/27 20:59
|
||||
*/
|
||||
@Service("historyTrackEvent")
|
||||
public class HistoryTrackEvent extends EventHandlerService implements VehicleEventService {
|
||||
|
||||
/**
|
||||
* 历史轨迹执行逻辑
|
||||
* @param vehicleData
|
||||
*/
|
||||
@Override
|
||||
public void executeEvent(VehicleData vehicleData) {
|
||||
System.out.println("你好历史轨迹");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.parseSystem.event.impl;
|
||||
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.event.VehicleEventService;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
*
|
||||
* 实时轨迹
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description: 实时轨迹事假
|
||||
* @date 2023/11/27 20:58
|
||||
*/
|
||||
@Service("runtimeTraceEvent")
|
||||
public class RuntimeTraceEvent extends EventHandlerService implements VehicleEventService {
|
||||
|
||||
/**
|
||||
* 实时轨迹事件执行逻辑
|
||||
* @param vehicleData
|
||||
*/
|
||||
@Override
|
||||
public void executeEvent(VehicleData vehicleData) {
|
||||
System.out.println("你好实时轨迹");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.parseSystem.kafka.ConsumerConfig;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/25 15:37
|
||||
*/
|
||||
@Component
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Bean
|
||||
public KafkaConsumer<String, String> consumerInit() {
|
||||
KafkaConsumer<String, String> consumer;
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", "117.72.43.22:9092");
|
||||
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
properties.put("partitioner.class", "com.parseSystem.kafka.ConsumerConfig.MyPartitioner");
|
||||
consumer = new KafkaConsumer<>(properties);
|
||||
return consumer;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package com.parseSystem.kafka.ConsumerConfig;
|
||||
|
||||
import org.apache.kafka.clients.producer.Partitioner;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author:fst
|
||||
* @date:2023/11/25
|
||||
* @aim:自定义分区器
|
||||
*/
|
||||
@Component
|
||||
public class MyPartitioner implements Partitioner {
|
||||
/**
|
||||
* 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区)
|
||||
* @param topic 消息队列名
|
||||
* @param key 用户传入key
|
||||
* @param keyBytes key字节数组
|
||||
* @param value 用户传入value
|
||||
* @param valueBytes value字节数组
|
||||
* @param cluster 当前kafka节点数
|
||||
* @return 如果3个分区,返回 0 1 2
|
||||
*/
|
||||
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
||||
//获取topic的partitions信息
|
||||
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
|
||||
int partitionsNum = partitionInfos.size();
|
||||
// 这里以 key 的哈希值作为分区选择依据
|
||||
System.out.println("--------------------------------------");
|
||||
System.out.println(Math.abs(key.hashCode()) % partitionsNum);
|
||||
return Math.abs(key.hashCode()) % partitionsNum;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public void configure(Map<String, ?> map) {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.parseSystem.kafka.constant;
|
||||
|
||||
import java.security.Provider;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/27 10:27
|
||||
*/
|
||||
public class A {
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println("=============启动类加载器================");
|
||||
Class<A> aClass = A.class;
|
||||
System.out.println(aClass.getClassLoader().getParent());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package com.parseSystem.kafka.constant;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
*
|
||||
* kafka常量
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @date 2023/11/25 15:36
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class kafkaConstants {
|
||||
|
||||
public String topic;
|
||||
|
||||
private String partition;
|
||||
public static final String BOOSTRAP_SERVERS="117.72.43.22:9092";
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package com.parseSystem.kafka.service;
|
||||
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.storage.service.StorageDateService;
|
||||
import com.parseSystem.utils.ParseUtil;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/25 15:52
|
||||
*/
|
||||
@Service
|
||||
@Log4j2
|
||||
public class ConsumerService {
|
||||
|
||||
// /**
|
||||
// * 注入kafka消费者
|
||||
// */
|
||||
// @Autowired
|
||||
// private KafkaConsumer<String, String> consumer;
|
||||
//
|
||||
// /**
|
||||
// * 注入事件处理服务
|
||||
// */
|
||||
// @Autowired
|
||||
// private EventHandlerService eventHandlerService;
|
||||
//
|
||||
// /**
|
||||
// * 注入存储服务
|
||||
// */
|
||||
// @Autowired
|
||||
// private StorageDateService storageDateService;
|
||||
//
|
||||
// /**
|
||||
// * 在容器启动后自动执行的初始化方法
|
||||
// */
|
||||
// @PostConstruct
|
||||
// public void consumerInit() {
|
||||
//
|
||||
//
|
||||
//
|
||||
// TopicPartition topicPartition = new TopicPartition("test", 0);
|
||||
//
|
||||
// // 订阅特定分区
|
||||
// consumer.assign(Collections.singleton(topicPartition));
|
||||
// //开启线程持续拉取数据
|
||||
// new Thread(() -> {
|
||||
// while (true) {
|
||||
// ConsumerRecords<String, String> records = null;
|
||||
// try {
|
||||
// //每个一秒钟拉取一次
|
||||
// records = consumer.poll(Duration.ofMillis(1000));
|
||||
// for (ConsumerRecord<String, String> record : records) {
|
||||
// System.out.println(record.value());
|
||||
// //解析数据
|
||||
// String data = ParseUtil.sixteenToStr(record.value());
|
||||
// //构建数据对象
|
||||
// VehicleData vehicleData = VehicleData.getBuild(data);
|
||||
// //存储数据到tiDB
|
||||
// storageDateService.save(vehicleData);
|
||||
// System.out.println(vehicleData);
|
||||
// //调用执行事件
|
||||
// eventHandlerService.executeEvent(vehicleData);
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// log.info("records: {}", records);
|
||||
// log.error(e);
|
||||
// }
|
||||
// }
|
||||
// }).start();
|
||||
// }
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
package com.parseSystem.rabbitmq;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.parseSystem.event.EventHandlerService;
|
||||
import com.parseSystem.kafka.constant.kafkaConstants;
|
||||
import com.parseSystem.storage.service.StorageDateService;
|
||||
import com.parseSystem.utils.ParseUtil;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
*
|
||||
* 监听车辆事件变更消费队列
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
*
|
||||
* @date 2023/11/27 22:19
|
||||
*/
|
||||
@Component
|
||||
@Log4j2
|
||||
public class ListenEventChangeRabbitMq {
|
||||
@Autowired
|
||||
private KafkaConsumer<String, String> consumer;
|
||||
@Autowired
|
||||
private StorageDateService storageDateService;
|
||||
|
||||
@Autowired
|
||||
private EventHandlerService eventHandlerService;
|
||||
|
||||
/**
|
||||
* RabbitMQ消费者,订阅"kafka_top"队列中的消息并处理
|
||||
*
|
||||
* @param mesg 接收到的消息内容
|
||||
* @param message RabbitMQ中的Message对象
|
||||
* @param channel RabbitMQ中的Channel对象
|
||||
*/
|
||||
@RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")})
|
||||
public void consumerSubscribe(String mesg, Message message, Channel channel) {
|
||||
log.info("收到准备订阅主题:" + mesg);
|
||||
|
||||
// 将接收到的消息解析为kafkaConstants对象
|
||||
kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class);
|
||||
|
||||
// 创建TopicPartition对象
|
||||
TopicPartition topicPartition = new TopicPartition(kafkaConstants.getTopic(), Integer.valueOf(kafkaConstants.getPartition()));
|
||||
|
||||
try {
|
||||
// 手动确认接收到的消息
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
|
||||
// 将消费者订阅的主题设置为"kafka_top"队列的第一个分区
|
||||
consumer.assign(Collections.singleton(topicPartition));
|
||||
|
||||
// 开启线程持续拉取数据
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = null;
|
||||
try {
|
||||
// 每个一秒钟拉取一次数据
|
||||
records = consumer.poll(Duration.ofMillis(1000));
|
||||
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
System.out.println(record.value());
|
||||
|
||||
// 解析数据
|
||||
String data = ParseUtil.sixteenToStr(record.value());
|
||||
|
||||
// 构建数据对象
|
||||
VehicleData vehicleData = VehicleData.getBuild(data);
|
||||
|
||||
// 存储数据到tiDB
|
||||
storageDateService.save(vehicleData);
|
||||
System.out.println(vehicleData);
|
||||
|
||||
// 调用执行事件
|
||||
eventHandlerService.executeEvent(vehicleData);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("records: {}", records);
|
||||
log.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 订阅特定分区
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.parseSystem.storage.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/28 15:14
|
||||
*/
|
||||
@Mapper
|
||||
public interface StorageDateMapper extends BaseMapper<VehicleData> {
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.parseSystem.storage.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description: 数据存储逻辑层
|
||||
* @date 2023/11/28 15:13
|
||||
*/
|
||||
public interface StorageDateService extends IService<VehicleData> {
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package com.parseSystem.storage.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.parseSystem.storage.mapper.StorageDateMapper;
|
||||
import com.parseSystem.storage.service.StorageDateService;
|
||||
import com.parseSystem.vehicle.VehicleData;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
* @description:
|
||||
* @date 2023/11/28 15:14
|
||||
*/
|
||||
@Service
|
||||
public class StorageDateServiceImpl extends ServiceImpl<StorageDateMapper, VehicleData> implements StorageDateService {
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package com.parseSystem.utils;
|
||||
|
||||
/**
|
||||
* 解析工具类
|
||||
* @author fengkai
|
||||
* @version 1.0
|
||||
* @date 2023/11/26 22:11
|
||||
*/
|
||||
public class ParseUtil {
|
||||
|
||||
public static String sixteenToStr(String s) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String[] arr = s.split(" ");
|
||||
int length = arr.length;
|
||||
for (int i = 0; i < length; i++) {
|
||||
int ch = Integer.parseInt(arr[i], 16);
|
||||
sb.append((char) ch);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package com.parseSystem.utils;
|
||||
|
||||
import org.springframework.aop.framework.AopContext;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
|
||||
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* spring工具类 方便在非spring管理环境中获取bean
|
||||
*
|
||||
* @author dragon
|
||||
*
|
||||
*/
|
||||
@Component
|
||||
public final class SpringUtils implements BeanFactoryPostProcessor {
|
||||
/**
|
||||
* Spring应用上下文环境
|
||||
*/
|
||||
private static ConfigurableListableBeanFactory beanFactory;
|
||||
|
||||
/**
|
||||
* 获取对象
|
||||
*
|
||||
* @param name
|
||||
* @return Object 一个以所给名字注册的bean的实例
|
||||
* @throws BeansException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T getBean(String name) throws BeansException {
|
||||
return (T) beanFactory.getBean(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取类型为requiredType的对象
|
||||
*
|
||||
* @param clz
|
||||
* @return
|
||||
* @throws BeansException
|
||||
*/
|
||||
public static <T> T getBean(Class<T> clz) throws BeansException {
|
||||
T result = (T) beanFactory.getBean(clz);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
|
||||
*
|
||||
* @param name
|
||||
* @return boolean
|
||||
*/
|
||||
public static boolean containsBean(String name) {
|
||||
return beanFactory.containsBean(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
|
||||
*
|
||||
* @param name
|
||||
* @return boolean
|
||||
* @throws NoSuchBeanDefinitionException
|
||||
*/
|
||||
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
|
||||
return beanFactory.isSingleton(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param name
|
||||
* @return Class 注册对象的类型
|
||||
* @throws NoSuchBeanDefinitionException
|
||||
*/
|
||||
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
|
||||
return beanFactory.getType(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
|
||||
*
|
||||
* @param name
|
||||
* @return
|
||||
* @throws NoSuchBeanDefinitionException
|
||||
*/
|
||||
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
|
||||
return beanFactory.getAliases(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取aop代理对象
|
||||
*
|
||||
* @param invoker
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T getAopProxy(T invoker) {
|
||||
return (T) AopContext.currentProxy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
|
||||
SpringUtils.beanFactory = beanFactory;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package com.parseSystem.utils.eventRuleJudge;
|
||||
|
||||
import java.awt.geom.Point2D;
|
||||
import java.util.List;
|
||||
/**
|
||||
* 电子围栏判定规则类
|
||||
*/
|
||||
public class PolygonUtil {
|
||||
/**
|
||||
* 发现给定点是否在给定多边形内
|
||||
* @param point 给定点
|
||||
* @param pts 给定的多边形顶点列表
|
||||
* @return 如果给定点在多边形内,则返回true;否则返回false
|
||||
*/
|
||||
public static boolean isInPolygon(Point2D.Double point, List<Point2D.Double> pts){
|
||||
int N = pts.size();
|
||||
boolean boundOrVertex = true;
|
||||
int intersectCount = 0;//交叉点数量
|
||||
double precision = 2e-10;//浮点类型计算时候与0比较时候的容差
|
||||
Point2D.Double p1, p2;//临近顶点
|
||||
Point2D.Double p = point; //当前点
|
||||
|
||||
p1 = pts.get(0);
|
||||
for (int i = 1; i <= N; i++) {
|
||||
if(p.equals(p1)){
|
||||
return boundOrVertex;
|
||||
}
|
||||
|
||||
p2 = pts.get(i % N);
|
||||
if(p.x < Math.min(p1.x,p2.x) || p.x > Math.max(p1.x, p2.x)){
|
||||
p1=p2;
|
||||
continue;
|
||||
}
|
||||
|
||||
//射线穿过算法
|
||||
if(p.x > Math.min(p1.x, p2.x) && p.x < Math.max(p1.x, p2.x)){
|
||||
if(p.y <= Math.max(p1.y, p2.y)){
|
||||
if(p1.x == p2.x && p.y >= Math.min(p1.y, p2.y)){
|
||||
return boundOrVertex;
|
||||
}
|
||||
|
||||
if(p1.y == p2.y){
|
||||
if(p1.y == p.y){
|
||||
return boundOrVertex;
|
||||
}else{
|
||||
++intersectCount;
|
||||
}
|
||||
}else{
|
||||
double xinters = (p.x - p1.x) * (p2.y - p1.y) / (p2.x - p1.x) + p1.y;
|
||||
if(Math.abs(p.y - xinters) < precision){
|
||||
return boundOrVertex;
|
||||
}
|
||||
|
||||
if(p.y < xinters){
|
||||
++intersectCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
}else{
|
||||
if(p.x == p2.x && p.y <= p2.y){
|
||||
Point2D.Double p3 = pts.get((i+1) % N);
|
||||
|
||||
if(p.x >= Math.min(p1.x, p3.x) && p.x <= Math.max(p1.x, p3.x)){
|
||||
++intersectCount;
|
||||
intersectCount += 2 ;
|
||||
}
|
||||
}
|
||||
}
|
||||
p1 = p2;
|
||||
}
|
||||
if (intersectCount % 2 ==0){
|
||||
return false;
|
||||
}else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,308 @@
|
|||
package com.parseSystem.vehicle;
|
||||
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Date;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@TableName("vehicle_data")
|
||||
public class VehicleData {
|
||||
|
||||
/**
|
||||
* 主键id
|
||||
*/
|
||||
@TableId(type = IdType.AUTO)
|
||||
private Integer id;
|
||||
|
||||
/**
|
||||
* VIN
|
||||
*/
|
||||
private String vin;
|
||||
|
||||
/**
|
||||
* 时间戳
|
||||
*/
|
||||
private Date createTime;
|
||||
|
||||
/**
|
||||
* 经度
|
||||
*/
|
||||
private String longitude;
|
||||
|
||||
/**
|
||||
* 维度
|
||||
*/
|
||||
private String latitude;
|
||||
|
||||
/**
|
||||
* 速度
|
||||
*/
|
||||
private String speed;
|
||||
|
||||
/**
|
||||
* 里程
|
||||
*/
|
||||
private BigDecimal mileage;
|
||||
|
||||
/**
|
||||
* 总电压
|
||||
*/
|
||||
private String voltage;
|
||||
|
||||
/**
|
||||
* 总电流
|
||||
*/
|
||||
private String current;
|
||||
|
||||
/**
|
||||
* 绝缘电阻
|
||||
*/
|
||||
private String resistance;
|
||||
|
||||
/**
|
||||
* 档位
|
||||
*/
|
||||
private String gear;
|
||||
|
||||
/**
|
||||
* 加速踏板行程值
|
||||
*/
|
||||
private String accelerationPedal;
|
||||
|
||||
/**
|
||||
* 制动踏板行程值
|
||||
*/
|
||||
private String brakePedal;
|
||||
|
||||
/**
|
||||
* 燃料消耗率
|
||||
*/
|
||||
private String fuelConsumptionRate;
|
||||
|
||||
/**
|
||||
* 电机控制器温度
|
||||
*/
|
||||
private String motorControllerTemperature;
|
||||
|
||||
/**
|
||||
* 电机转速
|
||||
*/
|
||||
private String motorSpeed;
|
||||
|
||||
/**
|
||||
* 电机转矩
|
||||
*/
|
||||
private String motorTorque;
|
||||
|
||||
/**
|
||||
* 电机温度
|
||||
*/
|
||||
private String motorTemperature;
|
||||
|
||||
/**
|
||||
* 电机电压
|
||||
*/
|
||||
private String motorVoltage;
|
||||
|
||||
/**
|
||||
* 电机电流
|
||||
*/
|
||||
private String motorCurrent;
|
||||
|
||||
/**
|
||||
* 动力电池剩余电量SOC
|
||||
*/
|
||||
private BigDecimal remainingBattery;
|
||||
|
||||
|
||||
/**
|
||||
* 当前状态允许的最大反馈功率
|
||||
*/
|
||||
private String maximumFeedbackPower;
|
||||
|
||||
/**
|
||||
* 当前状态允许最大放电功率
|
||||
*/
|
||||
private String maximumDischargePower;
|
||||
|
||||
/**
|
||||
* BMS自检计数器
|
||||
*/
|
||||
private String selfCheckCounter;
|
||||
|
||||
/**
|
||||
* 动力电池充放电电流
|
||||
*/
|
||||
private String totalBatteryCurrent;
|
||||
|
||||
/**
|
||||
* 动力电池负载端总电压V3
|
||||
*/
|
||||
private String totalBatteryVoltage;
|
||||
|
||||
/**
|
||||
* 单次最大电压
|
||||
*/
|
||||
private String singleBatteryMaxVoltage;
|
||||
|
||||
/**
|
||||
* 单体电池最低电压
|
||||
*/
|
||||
private String singleBatteryMinVoltage;
|
||||
|
||||
/**
|
||||
* 单体电池最高温度
|
||||
*/
|
||||
private String singleBatteryMaxTemperature;
|
||||
|
||||
/**
|
||||
* 单体电池最低温度
|
||||
*/
|
||||
private String singleBatteryMinTemperature;
|
||||
|
||||
/**
|
||||
* 动力电池可用容量
|
||||
*/
|
||||
private String availableBatteryCapacity;
|
||||
|
||||
/**
|
||||
* 车辆状态
|
||||
*/
|
||||
private int vehicleStatus;
|
||||
|
||||
/**
|
||||
* 充电状态
|
||||
*/
|
||||
private int chargingStatus;
|
||||
|
||||
/**
|
||||
* 运行状态
|
||||
*/
|
||||
private int operatingStatus;
|
||||
|
||||
/**
|
||||
* SOC
|
||||
*/
|
||||
private int socStatus;
|
||||
|
||||
/**
|
||||
* 可充电储能装置工作状态
|
||||
*/
|
||||
private int chargingEnergyStorageStatus;
|
||||
|
||||
/**
|
||||
* 驱动电机状态
|
||||
*/
|
||||
private int driveMotorStatus;
|
||||
|
||||
/**
|
||||
* 定位是否有效
|
||||
*/
|
||||
private int positionStatus;
|
||||
|
||||
/**
|
||||
* EAS(汽车防盗系统)状态
|
||||
*/
|
||||
private int easStatus;
|
||||
|
||||
/**
|
||||
* PTC(电动加热器)状态
|
||||
*/
|
||||
private int ptcStatus;
|
||||
|
||||
/**
|
||||
* EPS(电动助力系统)状态
|
||||
*/
|
||||
private int epsStatus;
|
||||
|
||||
/**
|
||||
* ABS(防抱死)状态
|
||||
*/
|
||||
private int absStatus;
|
||||
|
||||
/**
|
||||
* MCU(电机/逆变器)状态
|
||||
*/
|
||||
private int mcuStatus;
|
||||
|
||||
/**
|
||||
* 动力电池加热状态
|
||||
*/
|
||||
private int heatingStatus;
|
||||
|
||||
/**
|
||||
* 动力电池当前状态
|
||||
*/
|
||||
private int batteryStatus;
|
||||
|
||||
/**
|
||||
* 动力电池保温状态
|
||||
*/
|
||||
private int batteryInsulationStatus;
|
||||
|
||||
|
||||
public static VehicleData getBuild(String messages) {
|
||||
char start = messages.charAt(0);
|
||||
char end = messages.charAt(messages.length() - 1);
|
||||
System.out.println(start);
|
||||
System.out.println(end);
|
||||
return VehicleData.builder()
|
||||
.vin(messages.substring(1, 18))
|
||||
//messages.substring(18, 31)
|
||||
.createTime(new Date())
|
||||
.longitude(messages.substring(31, 42))
|
||||
.latitude(messages.substring(42, 52))
|
||||
.speed(messages.substring(52, 58))
|
||||
.mileage(new BigDecimal(messages.substring(58, 69)))
|
||||
.voltage(messages.substring(69, 75))
|
||||
.current(messages.substring(75, 80))
|
||||
.resistance(messages.substring(80, 89))
|
||||
.gear(messages.substring(89, 90))
|
||||
.accelerationPedal(messages.substring(90, 92))
|
||||
.brakePedal(messages.substring(92, 94))
|
||||
.fuelConsumptionRate(messages.substring(94, 99))
|
||||
.motorControllerTemperature(messages.substring(99, 105))
|
||||
.motorSpeed(messages.substring(105, 110))
|
||||
.motorTorque(messages.substring(110, 114))
|
||||
.motorTemperature(messages.substring(114, 120))
|
||||
.motorVoltage(messages.substring(120, 125))
|
||||
.motorCurrent(messages.substring(125, 133))
|
||||
.remainingBattery(new BigDecimal(messages.substring(133, 139)))
|
||||
.maximumFeedbackPower(messages.substring(139, 145))
|
||||
.maximumDischargePower(messages.substring(145, 151))
|
||||
.selfCheckCounter(messages.substring(151, 153))
|
||||
.totalBatteryCurrent(messages.substring(153, 158))
|
||||
.totalBatteryVoltage(messages.substring(158, 164))
|
||||
.singleBatteryMaxVoltage(messages.substring(164, 168))
|
||||
.singleBatteryMinVoltage(messages.substring(168, 172))
|
||||
.singleBatteryMaxTemperature(messages.substring(172, 178))
|
||||
.singleBatteryMinTemperature(messages.substring(178, 184))
|
||||
.availableBatteryCapacity(messages.substring(184, 190))
|
||||
.vehicleStatus(Integer.valueOf(messages.substring(190, 191)))
|
||||
.chargingStatus(Integer.valueOf(messages.substring(191, 192)))
|
||||
.operatingStatus(Integer.valueOf(messages.substring(192, 193)))
|
||||
.socStatus(Integer.valueOf(messages.substring(193, 194)))
|
||||
.chargingEnergyStorageStatus(Integer.valueOf(messages.substring(194, 195)))
|
||||
.driveMotorStatus(Integer.valueOf(messages.substring(195, 196)))
|
||||
.positionStatus(Integer.valueOf(messages.substring(196, 197)))
|
||||
.easStatus(Integer.valueOf(messages.substring(197, 198)))
|
||||
.ptcStatus(Integer.valueOf(messages.substring(198, 199)))
|
||||
.epsStatus(Integer.valueOf(messages.substring(199, 200)))
|
||||
.absStatus(Integer.valueOf(messages.substring(200, 201)))
|
||||
.mcuStatus(Integer.valueOf(messages.substring(201, 202)))
|
||||
.heatingStatus(Integer.valueOf(messages.substring(202, 203)))
|
||||
.batteryStatus(Integer.valueOf(messages.substring(203, 204)))
|
||||
.batteryInsulationStatus(Integer.valueOf(messages.substring(204, 205)))
|
||||
.build();
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
spring:
|
||||
redis:
|
||||
host: 10.100.1.2
|
||||
port: 6379
|
||||
password:
|
||||
application:
|
||||
name: parseSystem
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
||||
username: root
|
||||
password: 123456
|
||||
rabbitmq:
|
||||
host: 47.120.38.248
|
||||
port: 5672
|
||||
template:
|
||||
mandatory: true
|
||||
listener:
|
||||
simple:
|
||||
prefetch: 1 # 每次取一条消息消费 消费完成取下一条
|
||||
acknowledge-mode: manual # 设置消费端手动ack确认
|
||||
retry:
|
||||
enabled: true # 支持重试
|
||||
publisher-confirms: true #确认消息已发送到交换机(Exchange)
|
||||
publisher-returns: true #确认消息已发送到队列(Queue)
|
||||
server:
|
||||
port: 8066
|
Loading…
Reference in New Issue