diff --git a/.gitignore b/.gitignore index 09bdfea..ea9089e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ target/ ###################################################################### # IDE - +IDEA/ ### STS ### .apt_generated .classpath @@ -43,4 +43,4 @@ nbdist/ !*/build/*.java !*/build/*.html -!*/build/*.xml \ No newline at end of file +!*/build/*.xml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7365543 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +#起始镜像 +FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6 +#暴露端口号 +EXPOSE 8068 +#挂载目录的位置 +VOLUME /home/logs/parseSystem +#构建复制外部文件到docker +COPY /target/parsesystem.jar /home/app.jar +#工作目录 exec -it 进入容器内部后的默认的起始目录 +WORKDIR /home +ENV TIME_ZONE Asia/Shanghai +#指定东八区 +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +#启动java 程序 +ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"] + + diff --git a/pom.xml b/pom.xml index c949e43..275147a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.parseSystem - parseSystem + parsesystem 3.6.3 @@ -19,6 +19,11 @@ spring-boot-starter 2.7.15 + + org.springframework.boot + spring-boot-starter-test + 2.7.15 + org.springframework.boot spring-boot-starter-web @@ -68,6 +73,7 @@ dragon-common-redis 3.6.3 + @@ -91,4 +97,28 @@ + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + org.apache.maven.pluginsmaven-compiler-plugin1616 + + diff --git a/src/main/java/com/parseSystem/ParseSystemApplication.java b/src/main/java/com/parseSystem/ParseSystemApplication.java index 864408c..9a7e942 100644 --- a/src/main/java/com/parseSystem/ParseSystemApplication.java +++ b/src/main/java/com/parseSystem/ParseSystemApplication.java @@ -2,6 +2,7 @@ package com.parseSystem; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; /** * @author 冯凯 @@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @date 2023/11/25 15:34 */ @SpringBootApplication +@EnableScheduling public class ParseSystemApplication { /** diff --git a/src/main/java/com/parseSystem/event/EventHandlerService.java b/src/main/java/com/parseSystem/event/EventHandlerService.java index b70e3d2..4c81b59 100644 --- a/src/main/java/com/parseSystem/event/EventHandlerService.java +++ b/src/main/java/com/parseSystem/event/EventHandlerService.java @@ -1,25 +1,20 @@ package com.parseSystem.event; -import com.alibaba.fastjson.JSONObject; import com.dragon.common.redis.service.RedisService; import com.parseSystem.utils.SpringUtils; import com.parseSystem.vehicle.VehicleData; import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** + * 事件处理接口 * - *事件处理接口 * @author 冯凯 * @version 1.0 * @description: @@ -29,41 +24,39 @@ import java.util.concurrent.ConcurrentHashMap; @Log4j2 public class EventHandlerService { + /** + * 事件处理map + */ + public static final Map> eventMap = new ConcurrentHashMap<>(); @Autowired private RedisService redisService; - /** - * 事件处理map - */ - public static final Map> eventMap=new ConcurrentHashMap<>(); - - - - - /** * 注册事件 + * * @param vin * @param eventServiceName */ - public void registerEvent(String vin,String eventServiceName){ + public void registerEvent(String vin, String eventServiceName) { getEventList(vin).add(eventServiceName); } /** * 移除事件 + * * @param vin * @param eventServiceName */ - public void removeEvent(String vin,String eventServiceName){ + public void removeEvent(String vin, String eventServiceName) { getEventList(vin).remove(eventServiceName); } /** * 执行事件 + * * @param vehicleData */ - public void executeEvent(VehicleData vehicleData){ + public void executeEvent(VehicleData vehicleData) { List eventList = getEventList(vehicleData.getVin()); eventList.forEach(eventServiceName -> { VehicleEventService vehicleEventService = SpringUtils.getBean(eventServiceName); @@ -73,16 +66,17 @@ public class EventHandlerService { /** * 获取事件列表 + * * @param vin * @return */ - public List getEventList(String vin){ + public List getEventList(String vin) { List cacheList = redisService.getCacheList("event_VIN123456789"); - eventMap.put(vin,cacheList); + eventMap.put(vin, cacheList); List eventList = eventMap.get(vin); - if (eventList==null){ + if (eventList == null) { ArrayList list = new ArrayList<>(); - eventMap.put(vin,list); + eventMap.put(vin, list); } return eventList; } diff --git a/src/main/java/com/parseSystem/event/impl/FenceEvent.java b/src/main/java/com/parseSystem/event/impl/FenceEvent.java index 377a34e..c820b71 100644 --- a/src/main/java/com/parseSystem/event/impl/FenceEvent.java +++ b/src/main/java/com/parseSystem/event/impl/FenceEvent.java @@ -1,10 +1,13 @@ package com.parseSystem.event.impl; +import com.alibaba.fastjson.JSONObject; import com.dragon.common.redis.service.RedisService; import com.parseSystem.event.EventHandlerService; import com.parseSystem.event.VehicleEventService; import com.parseSystem.utils.eventRuleJudge.PolygonUtil; +import com.parseSystem.vehicle.FenceData; import com.parseSystem.vehicle.VehicleData; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -23,6 +26,7 @@ import java.util.stream.Collectors; * @date 2023/11/27 20:57 */ @Service("fenceEvent") +@Log4j2 public class FenceEvent extends EventHandlerService implements VehicleEventService { @Autowired @@ -33,25 +37,40 @@ public class FenceEvent extends EventHandlerService implements VehicleEventServi */ @Override public void executeEvent(VehicleData vehicleData) { + //获取报文解析的车辆vin String vin = vehicleData.getVin(); - List fenceList = redisService.getCacheList("fence_VIN123456789"); + List arr1 = null; + arr1.add(vehicleData); + //redis为(vin,fenceList)即vin为键,电子围栏为值存入redis + List fenceList = redisService.getCacheList("fence_"+vin); + redisService.setCacheList("fence_VIN123456789",arr1); + //将报文的经纬度赋值给变量 Double latitude = Double.valueOf(vehicleData.getLatitude()); //实时纬度 Double longitude = Double.valueOf(vehicleData.getLongitude()); + //创建工具类使用所需要的对象变量类赋值 Point2D.Double point = new Point2D.Double(); + //赋值 point.setLocation(longitude,latitude); - List pts = new ArrayList<>(); + //循环切割redis中拿到的电子围栏数据 fenceList.stream().forEach(item -> { + //将围栏存放的信息转型拿到每个对象的属性 + FenceData fenceData = JSONObject.parseObject(item, FenceData.class); //循环得到每个电子围栏的坐标拼接的一个以;为分割点表示多边形的一个字符串 List locationPts = Arrays.stream(item.split(";")) .map(s -> s.split(",")) .map(arr -> new Point2D.Double(Double.valueOf(arr[0]), Double.valueOf(arr[1]))) .collect(Collectors.toList()); boolean inPolygon = PolygonUtil.isInPolygon(point, locationPts); - if (inPolygon){ - System.out.println("在电子围栏内"); - }else{ - System.out.println("在电子围栏外"); + //判断如果在围栏平面内,则判断围栏的报警状态 0-驶入警告 1-驶出警告 + if (inPolygon == true && fenceData.getAlarmType()==0){ + log.info("----当前车辆已驶入围栏 警告!!!"); + }else if(inPolygon == false && fenceData.getAlarmType()==1){ + log.info("----当前车辆已驶出围栏 警告!!!"); + }else { + log.info("----该车辆为绑定围栏"); } }); } + + } diff --git a/src/main/java/com/parseSystem/event/impl/RuntimeTraceEvent.java b/src/main/java/com/parseSystem/event/impl/RuntimeTraceEvent.java index 67aaa9b..dec0be0 100644 --- a/src/main/java/com/parseSystem/event/impl/RuntimeTraceEvent.java +++ b/src/main/java/com/parseSystem/event/impl/RuntimeTraceEvent.java @@ -1,10 +1,20 @@ package com.parseSystem.event.impl; +import com.alibaba.fastjson2.JSONObject; +import com.dragon.common.redis.service.RedisService; import com.parseSystem.event.EventHandlerService; import com.parseSystem.event.VehicleEventService; import com.parseSystem.vehicle.VehicleData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import javax.xml.crypto.Data; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.TimeUnit; + /** * * 实时轨迹 @@ -15,6 +25,8 @@ import org.springframework.stereotype.Service; */ @Service("runtimeTraceEvent") public class RuntimeTraceEvent extends EventHandlerService implements VehicleEventService { + @Autowired + private RedisService redisService; /** * 实时轨迹事件执行逻辑 @@ -22,6 +34,6 @@ public class RuntimeTraceEvent extends EventHandlerService implements VehicleEve */ @Override public void executeEvent(VehicleData vehicleData) { - System.out.println("你好实时轨迹"); + redisService.setCacheObject("runtimeTraceEvent_"+vehicleData.getVin(),vehicleData); } } diff --git a/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java b/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java index 615f5e7..d9368a2 100644 --- a/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java +++ b/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java @@ -1,6 +1,8 @@ package com.parseSystem.kafka.ConsumerConfig; +import com.parseSystem.storage.service.StorageDateService; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @@ -15,8 +17,10 @@ import java.util.Properties; @Component public class KafkaConsumerConfig { + @Bean public KafkaConsumer consumerInit() { + KafkaConsumer consumer; Properties properties = new Properties(); properties.put("bootstrap.servers", "117.72.43.22:9092"); diff --git a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java index f2e438f..45eee9b 100644 --- a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java +++ b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java @@ -3,7 +3,7 @@ package com.parseSystem.rabbitmq; import com.alibaba.fastjson.JSONObject; import com.parseSystem.event.EventHandlerService; import com.parseSystem.kafka.constant.kafkaConstants; -import com.parseSystem.storage.service.StorageDateService; +import com.parseSystem.utils.DataQueueManager; import com.parseSystem.utils.ParseUtil; import com.parseSystem.vehicle.VehicleData; import com.rabbitmq.client.Channel; @@ -23,25 +23,24 @@ import java.time.Duration; import java.util.Collections; /** + * 监听车辆事件变更消费队列 * - * 监听事件更改 Rabbit Mq * @author 冯凯 * @version 1.0 - * * @date 2023/11/27 22:19 */ @Component @Log4j2 public class ListenEventChangeRabbitMq { - @Autowired - private KafkaConsumer consumer; // 自动装配一个String类型键和String类型值的Kafka消费者 - - @Autowired - private StorageDateService storageDateService; // 自动装配StorageDateService对象 - - @Autowired - private EventHandlerService eventHandlerService; // 自动装配EventHandlerService对象 + @Autowired + private DataQueueManager dataQueueManager; + + @Autowired + private KafkaConsumer consumer; + + @Autowired + private EventHandlerService eventHandlerService; /** * RabbitMQ消费者,订阅"kafka_top"队列中的消息并处理 @@ -52,7 +51,7 @@ public class ListenEventChangeRabbitMq { */ @RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")}) public void consumerSubscribe(String mesg, Message message, Channel channel) { - log.info("收到准备订阅主题:" + mesg); + log.info("收到准备订阅主题:{}", mesg); // 将接收到的消息解析为kafkaConstants对象 kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class); @@ -66,35 +65,31 @@ public class ListenEventChangeRabbitMq { // 将消费者订阅的主题设置为"kafka_top"队列的第一个分区 consumer.assign(Collections.singleton(topicPartition)); - // 开启线程持续拉取数据 - while (true) { - ConsumerRecords records = null; - try { - // 每个一秒钟拉取一次数据 - records = consumer.poll(Duration.ofMillis(1000)); - - for (ConsumerRecord record : records) { - System.out.println(record.value()); - - // 解析数据 - String data = ParseUtil.sixteenToStr(record.value()); - - // 构建数据对象 - VehicleData vehicleData = VehicleData.getBuild(data); - - // 存储数据到tiDB - storageDateService.save(vehicleData); - System.out.println(vehicleData); - - // 调用执行事件 - eventHandlerService.executeEvent(vehicleData); - } - } catch (Exception e) { - log.info("records: {}", records); - log.error(e); + while (true) { + ConsumerRecords records = null; + try { + // 每个一秒钟拉取一次数据 + records = consumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : records) { + System.out.println(record.value()); + long startTime = System.currentTimeMillis(); + log.info("开始消费时间{}:", startTime); + // 解析数据 + String data = ParseUtil.sixteenToStr(record.value()); + // 构建数据对象 + VehicleData vehicleData = VehicleData.getBuild(data); + log.info(vehicleData); + dataQueueManager.enqueueData(vehicleData); + // 调用执行事件 + eventHandlerService.executeEvent(vehicleData); + log.info("耗费时间{}", System.currentTimeMillis() - startTime); } + } catch (Exception e) { + log.info("records: {}", records); + log.error(e); } + } // 订阅特定分区 diff --git a/src/main/java/com/parseSystem/utils/DataQueueManager.java b/src/main/java/com/parseSystem/utils/DataQueueManager.java new file mode 100644 index 0000000..b1988d7 --- /dev/null +++ b/src/main/java/com/parseSystem/utils/DataQueueManager.java @@ -0,0 +1,68 @@ +package com.parseSystem.utils; + +import com.parseSystem.storage.service.StorageDateService; +import com.parseSystem.vehicle.VehicleData; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/12/1 20:47 + */ +@Component +@Log4j2 +public class DataQueueManager { + + private int count; + @Autowired + private StorageDateService storageDateService; + + + @Autowired + private BlockingQueue dataQueue1; + + @Autowired + private BlockingQueue dataQueue2; + + + + public void enqueueData(VehicleData vehicleData) { + // 将数据放入第一个队列中,如果已满,则放入第二个队列中 + if (!dataQueue1.offer(vehicleData)) { + log.warn("第一个队列已满,将数据放入第二个队列"); + dataQueue2.offer(vehicleData); + } + + } + + + @Scheduled(fixedRate = 5000) + public void processQueues() { + // 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作 + if (!dataQueue1.isEmpty()) { + + List batchData = new ArrayList<>(); + count+=batchData.size(); + dataQueue1.drainTo(batchData); + storageDateService.saveBatch(batchData); + } + // 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作 + else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) { + log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作"); + List batchData = new ArrayList<>(); + count+=batchData.size(); + dataQueue2.drainTo(batchData); + storageDateService.saveBatch(batchData); + } + + } + +} diff --git a/src/main/java/com/parseSystem/utils/QueueConfig.java b/src/main/java/com/parseSystem/utils/QueueConfig.java new file mode 100644 index 0000000..156c5d4 --- /dev/null +++ b/src/main/java/com/parseSystem/utils/QueueConfig.java @@ -0,0 +1,27 @@ +package com.parseSystem.utils; + +import com.parseSystem.vehicle.VehicleData; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/12/1 21:15 + */ +@Configuration +public class QueueConfig { + @Bean + public BlockingQueue dataQueue1() { + return new LinkedBlockingQueue<>(); + } + + @Bean + public BlockingQueue dataQueue2() { + return new LinkedBlockingQueue<>(); + } +} diff --git a/src/main/java/com/parseSystem/vehicle/FenceData.java b/src/main/java/com/parseSystem/vehicle/FenceData.java new file mode 100644 index 0000000..805de8a --- /dev/null +++ b/src/main/java/com/parseSystem/vehicle/FenceData.java @@ -0,0 +1,61 @@ +package com.parseSystem.vehicle; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * 电子围栏实体类 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class FenceData { + /** + * 字段属性:围栏主键id + */ + @TableId(type = IdType.AUTO) + private Integer fenceId; + /** + * 字段属性:围栏名称 + */ + private String fenceName; + /** + * 字段属性:围栏数据 + */ + private String fenceData; + /**s + * 字段属性:围栏状态 + */ + private Integer status; + /** + * 字段属性:告警类型 + */ + private Integer alarmType; + /** + * 字段属性:围栏标签id + */ + private Integer fenceTagId; + /** + * 字段属性:创建人 + */ + private String createdBy; + /** + * 字段属性:创建时间 + */ + private Date createdTime; + /** + * 字段属性:更新人 + */ + private String updatedBy; + /** + * 字段属性:更新时间 + */ + private Date updatedTime; +} diff --git a/src/main/java/com/parseSystem/vehicle/VehicleData.java b/src/main/java/com/parseSystem/vehicle/VehicleData.java index 674d31a..9a793c7 100644 --- a/src/main/java/com/parseSystem/vehicle/VehicleData.java +++ b/src/main/java/com/parseSystem/vehicle/VehicleData.java @@ -14,7 +14,7 @@ import java.util.Date; @ToString @NoArgsConstructor @AllArgsConstructor -@TableName("vehicle_data") +@TableName("vehicle_data_copy1") public class VehicleData { /** diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml new file mode 100644 index 0000000..2c65c29 --- /dev/null +++ b/src/main/resources/application-test.yml @@ -0,0 +1,14 @@ +spring: + profiles: + active: dev + redis: + host: 10.100.1.2 + port: 6379 + password: + application: + name: parseSystem + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 +server: + port: 8097 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 84ff9fe..206f0e7 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,4 +1,6 @@ spring: + profiles: + active: dev redis: host: 10.100.1.2 port: 6379 @@ -7,11 +9,11 @@ spring: name: parseSystem datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 + url: jdbc:mysql://117.72.43.22:4000/test1?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 username: root password: 123456 rabbitmq: - host: 47.120.38.248 + host: 182.254.222.21 port: 5672 template: mandatory: true @@ -24,4 +26,4 @@ spring: publisher-confirms: true #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue) server: - port: 8066 + port: 8068 diff --git a/src/test/java/Test.java b/src/test/java/Test.java new file mode 100644 index 0000000..0698051 --- /dev/null +++ b/src/test/java/Test.java @@ -0,0 +1,45 @@ +import com.parseSystem.ParseSystemApplication; +import com.parseSystem.event.impl.FenceEvent; +import com.parseSystem.vehicle.VehicleData; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; + +@SpringBootTest(classes = ParseSystemApplication.class) +public class Test { + + @Autowired + private FenceEvent fenceEvent; + @org.junit.jupiter.api.Test + public void aa(){ + ArrayList vehicleData = new ArrayList<>(); + VehicleData vehicleData1 = new VehicleData(); + //车辆当前地位 + vehicleData1.setVin("1234567989000"); + vehicleData1.setLongitude("116.48965348217772"); + vehicleData1.setLatitude("39.90816602515441"); + + //车辆一分钟后的位置 + VehicleData vehicleData2 = new VehicleData(); + vehicleData2.setVin("1234567989000"); + vehicleData2.setLongitude("117.48965348217772"); + vehicleData2.setLatitude("40.90816602515441"); + + //车辆二分钟后的位置 + VehicleData vehicleData3 = new VehicleData(); + vehicleData3.setVin("1234567989000"); + vehicleData3.setLongitude("118.48965348217772"); + vehicleData3.setLatitude("41.90816602515441"); + + vehicleData.add(vehicleData1); + vehicleData.add(vehicleData2); + vehicleData.add(vehicleData3); + + for (VehicleData vehicleDatum : vehicleData) { + fenceEvent.executeEvent(vehicleDatum); + } + + } +}