实时轨迹事件缓存代码更新优化
commit
4fb4842cfb
|
@ -5,6 +5,7 @@ import com.god.common.security.annotation.EnableMyFeignClients;
|
||||||
import com.god.common.swagger.annotation.EnableCustomSwagger2;
|
import com.god.common.swagger.annotation.EnableCustomSwagger2;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 系统模块
|
* 系统模块
|
||||||
|
@ -15,6 +16,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
@EnableCustomSwagger2
|
@EnableCustomSwagger2
|
||||||
@EnableMyFeignClients
|
@EnableMyFeignClients
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
|
@EnableScheduling
|
||||||
public class GodCarDataApplication {
|
public class GodCarDataApplication {
|
||||||
public static void main (String[] args) {
|
public static void main (String[] args) {
|
||||||
SpringApplication.run(GodCarDataApplication.class);
|
SpringApplication.run(GodCarDataApplication.class);
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
package com.god.data.common.domain;
|
package com.god.data.common.domain;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
@ -16,7 +19,14 @@ import java.math.BigDecimal;
|
||||||
@Builder
|
@Builder
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@TableName("t_message_model")
|
||||||
public class CarMessage {
|
public class CarMessage {
|
||||||
|
/**
|
||||||
|
* 主键
|
||||||
|
*/
|
||||||
|
@TableId(value = "message_id",type = IdType.ASSIGN_UUID)
|
||||||
|
private String messageId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 车辆VIN
|
* 车辆VIN
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -3,10 +3,8 @@ package com.god.data.config;
|
||||||
import com.god.common.redis.service.RedisService;
|
import com.god.common.redis.service.RedisService;
|
||||||
import com.god.data.common.domain.ElePoint;
|
import com.god.data.common.domain.ElePoint;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Configurable;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
@ -21,7 +19,10 @@ public class RedisInitTest {
|
||||||
private RedisService redisService;
|
private RedisService redisService;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试数据
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
public void redisInit()
|
public void redisInit()
|
||||||
{
|
{
|
||||||
redisService.deleteObject("Fence" + "VIN12345678912345");
|
redisService.deleteObject("Fence" + "VIN12345678912345");
|
||||||
|
@ -36,6 +37,10 @@ public class RedisInitTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试数据
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
public void parseInit()
|
public void parseInit()
|
||||||
{
|
{
|
||||||
redisService.deleteObject("event" + "VIN12345678912345");
|
redisService.deleteObject("event" + "VIN12345678912345");
|
||||||
|
|
|
@ -33,7 +33,7 @@ public class KafkaConsumerConfig {
|
||||||
//设置kafka服务器地址
|
//设置kafka服务器地址
|
||||||
props.put("bootstrap.servers", KAFKA_CON);
|
props.put("bootstrap.servers", KAFKA_CON);
|
||||||
//每个消费者分配独立的组号
|
//每个消费者分配独立的组号
|
||||||
props.put("group.id", "g2");
|
props.put("group.id", "fst1");
|
||||||
//如果value合法,则自动提交偏移量
|
//如果value合法,则自动提交偏移量
|
||||||
props.put("enable.auto.commit", "true");
|
props.put("enable.auto.commit", "true");
|
||||||
//设置多久一次更新被消费消息的偏移量
|
//设置多久一次更新被消费消息的偏移量
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.god.data.controller;
|
||||||
|
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 报文对象控制层
|
||||||
|
* @Author fst
|
||||||
|
* @date 2023/12/1 20:51
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
public class CarMsgController {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.god.data.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.god.data.common.domain.CarMessage;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 报文持久层
|
||||||
|
* @Author fst
|
||||||
|
* @date 2023/12/1 20:53
|
||||||
|
*/
|
||||||
|
@Mapper
|
||||||
|
public interface CarMsgMapper extends BaseMapper<CarMessage> {
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
package com.god.data.queue;
|
||||||
|
|
||||||
|
import com.god.common.core.utils.SpringUtils;
|
||||||
|
import com.god.common.redis.service.RedisService;
|
||||||
|
import com.god.data.common.domain.CarMessage;
|
||||||
|
import com.god.data.service.CarMsgService;
|
||||||
|
import com.god.data.service.EventService;
|
||||||
|
import com.god.data.utils.AnalyzeUtils;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 队列消息处理
|
||||||
|
* @author fst
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@EnableAsync
|
||||||
|
@Data
|
||||||
|
@Log4j2
|
||||||
|
public class MessageProcessor {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private CarMsgService carMsgService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisService redisService;
|
||||||
|
|
||||||
|
private ConcurrentLinkedQueue<String> queueA = new ConcurrentLinkedQueue<>();
|
||||||
|
private ConcurrentLinkedQueue<String> queueB = new ConcurrentLinkedQueue<>();
|
||||||
|
private AtomicBoolean isQueueABlocked = new AtomicBoolean(false);
|
||||||
|
private AtomicBoolean isQueueBBlocked = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// 定义一个类成员变量,用于执行定时任务
|
||||||
|
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理队列A中的消息
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
@Async
|
||||||
|
public void processFromQueueA(String message) {
|
||||||
|
log.info("处理队列A中的消息:" + message);
|
||||||
|
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
|
||||||
|
this.eventProducer(carMessage);
|
||||||
|
log.info("A队列消息持久化开始");
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
//数据缓存
|
||||||
|
carMsgService.insert(carMessage);
|
||||||
|
log.info("A队列消息持久化消耗时间:{}", System.currentTimeMillis() - startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理队列B中的消息
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
@Async
|
||||||
|
public void processFromQueueB(String message) {
|
||||||
|
log.info("处理队列B中的消息:" + message);
|
||||||
|
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
|
||||||
|
this.eventProducer(carMessage);
|
||||||
|
log.info("B队列消息持久化开始");
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
//数据缓存
|
||||||
|
carMsgService.insert(carMessage);
|
||||||
|
log.info("B队列消息持久化消耗时间:{}", System.currentTimeMillis() - startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行事件
|
||||||
|
* @param carMessage
|
||||||
|
*/
|
||||||
|
private void eventProducer(CarMessage carMessage){
|
||||||
|
// 根据对象车辆vin获取事件集合,从redis中获取
|
||||||
|
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
|
||||||
|
// 执行事件
|
||||||
|
if(!eventList.isEmpty()){
|
||||||
|
for (String event : eventList) {
|
||||||
|
EventService eventService = SpringUtils.getBean(event);
|
||||||
|
eventService.execute(carMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向队列A中添加消息 如果阻塞,则去B添加消息
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void addToQueueA(String message) {
|
||||||
|
if (!isQueueABlocked.get()) {
|
||||||
|
queueA.offer(message);
|
||||||
|
} else {
|
||||||
|
queueB.offer(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向队列B中添加消息 如果阻塞,则去A添加消息
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void addToQueueB(String message) {
|
||||||
|
if (!isQueueBBlocked.get()) {
|
||||||
|
queueB.offer(message);
|
||||||
|
} else {
|
||||||
|
queueA.offer(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消费队列中的消息
|
||||||
|
*/
|
||||||
|
public void consumeMessages() {
|
||||||
|
// 尝试从queueA中取出5条消息,然后分别异步处理这5条消息
|
||||||
|
while (true) {
|
||||||
|
String message = queueA.poll();
|
||||||
|
if (message != null) {
|
||||||
|
CompletableFuture.runAsync(() -> processFromQueueA(message));
|
||||||
|
}else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 尝试从queueB中取出5条消息,然后分别异步处理这5条消息
|
||||||
|
while (true) {
|
||||||
|
String message = queueB.poll();
|
||||||
|
if (message != null) {
|
||||||
|
CompletableFuture.runAsync(() -> processFromQueueB(message));
|
||||||
|
}else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void startScheduledTask() {
|
||||||
|
scheduler.scheduleAtFixedRate(this::consumeMessages, 0, 5, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void stopScheduledTask() {
|
||||||
|
scheduler.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkQueueStatus() {
|
||||||
|
if (queueA.isEmpty()) {
|
||||||
|
isQueueABlocked.set(true);
|
||||||
|
}
|
||||||
|
if (queueB.isEmpty()) {
|
||||||
|
isQueueBBlocked.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.god.data.service;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.god.data.common.domain.CarMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @description: 报文业务层
|
||||||
|
* @Author fst
|
||||||
|
* @date 2023/12/1 20:52
|
||||||
|
*/
|
||||||
|
public interface CarMsgService extends IService<CarMessage> {
|
||||||
|
|
||||||
|
public void insert(CarMessage carMessage);
|
||||||
|
}
|
|
@ -1,24 +1,16 @@
|
||||||
package com.god.data.service;
|
package com.god.data.service;
|
||||||
|
|
||||||
import com.god.common.core.utils.SpringUtils;
|
import com.god.data.queue.MessageProcessor;
|
||||||
import com.god.common.redis.service.RedisService;
|
|
||||||
import com.god.data.common.domain.CarMessage;
|
|
||||||
import com.god.data.service.impl.ElectronicFenceEvent;
|
|
||||||
import com.god.data.utils.AnalyzeUtils;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,12 +21,16 @@ import java.util.List;
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@Component
|
@Component
|
||||||
public class ParseDataService {
|
public class ParseDataService {
|
||||||
@Autowired
|
|
||||||
private RedisService redisService;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaConsumer<String, String> consumer;
|
private KafkaConsumer<String, String> consumer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageProcessor messageProcessor;
|
||||||
|
|
||||||
|
private int count=0;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
|
@ -49,21 +45,23 @@ public class ParseDataService {
|
||||||
//死循环拉取kafka数据
|
//死循环拉取kafka数据
|
||||||
records = consumer.poll(Duration.ofMillis(1000));
|
records = consumer.poll(Duration.ofMillis(1000));
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
System.out.println(record.value());
|
String value = record.value();
|
||||||
//把报文解析成对象
|
long startTime = System.currentTimeMillis();
|
||||||
CarMessage carMessage = AnalyzeUtils.parseVehicleData(record.value());
|
log.info("{}为从kafka拉取到的10进制报文",value);
|
||||||
//根据对象车辆vin获取事件集合,从redis中获取
|
//判断往那条队列发送消息
|
||||||
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
|
if (!messageProcessor.getIsQueueABlocked().get()) {
|
||||||
//执行事件
|
log.info("将消息放入队列A");
|
||||||
if(eventList!= null && !eventList.isEmpty()){
|
messageProcessor.addToQueueA(value);
|
||||||
for (String event : eventList) {
|
} else if (!messageProcessor.getIsQueueBBlocked().get()) {
|
||||||
EventService eventService = SpringUtils.getBean(event);
|
log.info("将消息放入队列B");
|
||||||
eventService.execute(carMessage);
|
messageProcessor.addToQueueB(value);
|
||||||
}
|
} else {
|
||||||
|
// 两个队列都被阻塞,根据自定义的策略进行处理,这里可以根据具体情况进行处理
|
||||||
|
log.info("队列A和队列B都被阻塞,根据自定义的策略进行处理");
|
||||||
}
|
}
|
||||||
// System.out.println("======="+carMessage+"====================");
|
log.info("消息消费时间:{}", System.currentTimeMillis() - startTime);
|
||||||
|
count++;
|
||||||
System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value());
|
log.info("消费了{}条消息",count);
|
||||||
}
|
}
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.info("records: {}", records);
|
log.info("records: {}", records);
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
package com.god.data.service.impl;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.god.common.core.exception.ServiceException;
|
||||||
|
import com.god.data.common.domain.CarMessage;
|
||||||
|
import com.god.data.mapper.CarMsgMapper;
|
||||||
|
import com.god.data.service.CarMsgService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 报文业务实现层
|
||||||
|
* @Author fst
|
||||||
|
* @date 2023/12/1 20:52
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class CarMsgServiceImpl extends ServiceImpl<CarMsgMapper, CarMessage> implements CarMsgService {
|
||||||
|
@Autowired
|
||||||
|
private CarMsgMapper carMsgMapper;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保存报文
|
||||||
|
* @param carMessage
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void insert(CarMessage carMessage) {
|
||||||
|
int insert = carMsgMapper.insert(carMessage);
|
||||||
|
if (insert < 1) {
|
||||||
|
throw new ServiceException("保存报文失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -82,6 +82,11 @@ public class ElectronicFenceEvent implements EventService {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.info("电子围栏事件结束测试");
|
||||||
|
rabbitTemplate.convertAndSend("FENCE_STATUS", "电子围栏事件结束测试", message -> {
|
||||||
|
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||||
|
return message;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package com.god.data.service.impl;
|
||||||
import com.god.common.redis.service.RedisService;
|
import com.god.common.redis.service.RedisService;
|
||||||
import com.god.data.common.domain.CarMessage;
|
import com.god.data.common.domain.CarMessage;
|
||||||
import com.god.data.service.EventService;
|
import com.god.data.service.EventService;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
@ -15,6 +16,7 @@ import java.util.ArrayList;
|
||||||
* @date 2023/11/27 21:50
|
* @date 2023/11/27 21:50
|
||||||
*/
|
*/
|
||||||
@Service(value = "FaultAlarm")
|
@Service(value = "FaultAlarm")
|
||||||
|
@Log4j2
|
||||||
public class FaultAlarmEvent implements EventService {
|
public class FaultAlarmEvent implements EventService {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisService redisService;
|
private RedisService redisService;
|
||||||
|
@ -30,7 +32,7 @@ public class FaultAlarmEvent implements EventService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(CarMessage carMessage) {
|
public void execute(CarMessage carMessage) {
|
||||||
System.out.println("孙继哲大傻逼");
|
log.info("故障报警事件执行");
|
||||||
// //创建集合存故障码
|
// //创建集合存故障码
|
||||||
// ArrayList<String> strings = new ArrayList<>();
|
// ArrayList<String> strings = new ArrayList<>();
|
||||||
// //判断车辆目前的报错故障,添加对应的故障码
|
// //判断车辆目前的报错故障,添加对应的故障码
|
||||||
|
|
|
@ -6,20 +6,23 @@ import com.god.data.service.EventService;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 实时轨迹事件
|
* 实时轨迹事件
|
||||||
* @Author fst
|
* @Author fst
|
||||||
* @date 2023/11/27 21:47
|
* @date 2023/11/27 21:47
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
|
||||||
@Service(value = "RealTimeTrajectory")
|
@Service(value = "RealTimeTrajectory")
|
||||||
|
@Log4j2
|
||||||
public class RealTimeTrajectoryEvent implements EventService {
|
public class RealTimeTrajectoryEvent implements EventService {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
|
@ -36,17 +39,15 @@ public class RealTimeTrajectoryEvent implements EventService {
|
||||||
/**
|
/**
|
||||||
* 添加时间
|
* 添加时间
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insert() {
|
public void insert() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行事件
|
|
||||||
* @param carMessage
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(CarMessage carMessage) {
|
public void execute(CarMessage carMessage) {
|
||||||
|
|
||||||
log.info("车辆:{} 触发实时轨迹,经度:{},纬度:{}",
|
log.info("车辆:{} 触发实时轨迹,经度:{},纬度:{}",
|
||||||
carMessage.getVin(),
|
carMessage.getVin(),
|
||||||
carMessage.getLongitude(),
|
carMessage.getLongitude(),
|
||||||
|
@ -91,12 +92,10 @@ public class RealTimeTrajectoryEvent implements EventService {
|
||||||
lists.add(list);
|
lists.add(list);
|
||||||
map.put(vin,lists);
|
map.put(vin,lists);
|
||||||
redisService.setCacheMap(LOCUS + vin,map);
|
redisService.setCacheMap(LOCUS + vin,map);
|
||||||
|
log.info("实时轨迹事件执行");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除事件
|
|
||||||
* @param event
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void remove(String event) {
|
public void remove(String event) {
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,4 @@ public class AnalyzeUtils {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue