小改动

master
fst1996 2023-12-02 09:06:37 +08:00
parent e942dc4a65
commit 9dfd9befcd
5 changed files with 18 additions and 9 deletions

View File

@ -24,8 +24,8 @@ public class CarMessage {
/**
*
*/
@TableId(value = "message_id",type = IdType.AUTO)
private Long messageId;
@TableId(value = "message_id",type = IdType.ASSIGN_UUID)
private String messageId;
/**
* VIN

View File

@ -49,8 +49,11 @@ public class MessageProcessor {
log.info("处理队列A中的消息" + message);
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
this.eventProducer(carMessage);
log.info("A队列消息持久化开始");
long startTime = System.currentTimeMillis();
//数据缓存
carMsgService.insert(carMessage);
log.info("A队列消息持久化消耗时间{}", System.currentTimeMillis() - startTime);
}
/**
@ -62,14 +65,18 @@ public class MessageProcessor {
log.info("处理队列B中的消息" + message);
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
this.eventProducer(carMessage);
log.info("B队列消息持久化开始");
long startTime = System.currentTimeMillis();
//数据缓存
carMsgService.insert(carMessage);
log.info("B队列消息持久化消耗时间{}", System.currentTimeMillis() - startTime);
}
/**
*
* @param carMessage
*/
public void eventProducer(CarMessage carMessage){
private void eventProducer(CarMessage carMessage){
// 根据对象车辆vin获取事件集合从redis中获取
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
// 执行事件
@ -115,7 +122,6 @@ public class MessageProcessor {
if (message != null) {
CompletableFuture.runAsync(() -> processFromQueueA(message));
}else {
checkQueueStatus();
break;
}
}
@ -125,7 +131,6 @@ public class MessageProcessor {
if (message != null) {
CompletableFuture.runAsync(() -> processFromQueueB(message));
}else {
checkQueueStatus();
break;
}
}

View File

@ -49,6 +49,7 @@ public class ParseDataService {
records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
long startTime = System.currentTimeMillis();
log.info("{}为从kafak拉取到的10进制报文",value);
//判断往那条队列发送消息
if (!messageProcessor.getIsQueueABlocked().get()) {
@ -61,8 +62,7 @@ public class ParseDataService {
// 两个队列都被阻塞,根据自定义的策略进行处理,这里可以根据具体情况进行处理
log.info("队列A和队列B都被阻塞根据自定义的策略进行处理");
}
System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value());
log.info("消息消费时间:{}", System.currentTimeMillis() - startTime);
}
}catch (Exception e){
log.info("records {}", records);

View File

@ -3,6 +3,7 @@ package com.god.data.service.impl;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.CarMessage;
import com.god.data.service.EventService;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -15,6 +16,7 @@ import java.util.ArrayList;
* @date 2023/11/27 21:50
*/
@Service(value = "FaultAlarm")
@Log4j2
public class FaultAlarmEvent implements EventService {
@Autowired
private RedisService redisService;
@ -30,7 +32,7 @@ public class FaultAlarmEvent implements EventService {
@Override
public void execute(CarMessage carMessage) {
System.out.println("孙继哲大傻逼");
log.info("故障报警事件执行");
// //创建集合存故障码
// ArrayList<String> strings = new ArrayList<>();
// //判断车辆目前的报错故障,添加对应的故障码

View File

@ -3,6 +3,7 @@ package com.god.data.service.impl;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.CarMessage;
import com.god.data.service.EventService;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
@ -14,6 +15,7 @@ import org.springframework.stereotype.Service;
* @date 2023/11/27 21:47
*/
@Service(value = "RealTimeTrajectory")
@Log4j2
public class RealTimeTrajectoryEvent implements EventService {
@Autowired
@ -32,7 +34,7 @@ public class RealTimeTrajectoryEvent implements EventService {
@Override
public void execute(CarMessage carMessage) {
System.out.println("冯凯牛魔王");
log.info("实时轨迹事件执行");
}
@Override