diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java index ec9a53a..1b03483 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java @@ -63,6 +63,13 @@ public interface RemoteVehicleService { */ @PostMapping("/vehicleAndLogo/queryByLogoIds/{vehicleId}") public Result> queryByLogoIds(@PathVariable("vehicleId") Long vehicleId); + /** + * 根据车辆id查询绑定的标识 + * @param + * @return + */ +// @PostMapping("/vehicleAndLogo/queryByLogoIds/{vehicleId}") +// public Result> queryByLogoIds(@PathVariable("vehicleId") Long vehicleId); @PostMapping("/list") public Result list(@RequestBody VehicleListParams listParams); diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java index f920863..cd18fcf 100644 --- a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java @@ -1,27 +1,31 @@ package com.couplet.analyze.common.contents; +import org.springframework.stereotype.Component; + /** * @Author: LiJiaYao * @Date: 2024/4/7 * @Description: 事件内容 */ + +@Component public class AnalyzeEventContents { /** * 故障 */ - String BREAKDOWN = "breakdown"; + static final String BREAKDOWN = "breakdown"; /** * 电子围栏 */ - String ELECTRONIC_FENCE = "electronic-fence"; + static final String ELECTRONIC_FENCE = "electronic-fence"; /** * 实时数据 */ - String REAL_TIME_DATA = "real-time-data"; + static final String REAL_TIME_DATA = "real-time-data"; /** * 存储 */ - String STORED_EVENT = "stored-event"; + static final String STORED_EVENT = "stored-event"; } diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java index 32d043a..1602a67 100644 --- a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java @@ -43,8 +43,7 @@ public class AnalyzeEventCache { * * @return */ - public Set getEventList(String vin){ - + public Set getEventList(String vin){ return redisService.getCacheSet(encode(vin)); } diff --git a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java index 67ec7c2..7845610 100644 --- a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java +++ b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java @@ -182,7 +182,32 @@ public class RedisService { } return setOperation; } + /** + * 缓存Set + * + * @param key 缓存键值 + * @param setValue 缓存的数据 + * + * @return 缓存数据的对象 + */ + public BoundSetOperations setCacheSet (final String key, final T setValue) { + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + setOperation.add(setValue); + return setOperation; + } + /** + * 缓存Set + * + * @param key 缓存键值 + * @param setValue 缓存的数据 + * + * @return 缓存数据的对象 + */ + public void deleteSet(String key, String setValue) { + BoundSetOperations setOperations = redisTemplate.boundSetOps(key); + setOperations.remove(setValue); + } /** * 获得缓存的set * @@ -194,6 +219,8 @@ public class RedisService { return redisTemplate.opsForSet().members(key); } + + /** * 缓存Map * @@ -280,4 +307,6 @@ public class RedisService { String key = "vin:"+realTimeDataRequest.getVin()+"userId:"+realTimeDataRequest.getUserId(); redisTemplate.opsForValue().set(key, realTimeDataRequest); } + + } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index 006bb88..fd6a1d7 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -96,6 +96,10 @@ org.springframework.kafka spring-kafka + + com.couplet + couplet-common-business + diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java index 15e4140..c49f871 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java @@ -1,150 +1,150 @@ -package com.couplet.analyze.msg.consumer; - -import com.alibaba.fastjson.JSONObject; -import com.couplet.analyze.msg.domain.CoupletMsgData; -import com.couplet.analyze.msg.utils.MsgUtils; -import com.couplet.common.domain.CoupletTroubleCode; -import com.couplet.common.redis.service.RedisService; -import com.couplet.remote.RemoteTroubleService; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.amqp.core.Message; -import org.springframework.stereotype.Component; - -import java.util.Date; -import java.util.concurrent.CompletableFuture; - -/** - * @author DongXiaoDong - * @version 1.0 - * @date 2024/4/6 15:37 - * @description - */ -@Component -@Slf4j -public class CodeConsumer { +//package com.couplet.analyze.msg.consumer; +// +//import com.alibaba.fastjson.JSONObject; +//import com.couplet.analyze.msg.domain.CoupletMsgData; +//import com.couplet.analyze.msg.utils.MsgUtils; +//import com.couplet.common.domain.CoupletTroubleCode; +//import com.couplet.common.redis.service.RedisService; +//import com.couplet.remote.RemoteTroubleService; +//import com.rabbitmq.client.Channel; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.rabbit.annotation.Queue; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.amqp.core.Message; +//import org.springframework.stereotype.Component; +// +//import java.util.Date; +//import java.util.concurrent.CompletableFuture; +// +///** +// * @author DongXiaoDong +// * @version 1.0 +// * @date 2024/4/6 15:37 +// * @description +// */ +//@Component +//@Slf4j +//public class CodeConsumer { +//// @Autowired +//// private RedisTemplate redisTemplate; +// // @Autowired -// private RedisTemplate redisTemplate; - - @Autowired - private RedisService redisService; - @Autowired - private RemoteTroubleService remoteTroubleService; - - @RabbitListener(queuesToDeclare = {@Queue("couplet-code-queue")}) - public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) { - log.info("日志队列:{},接收到的消息:{},开始消费...","couplet-code-queue", JSONObject.toJSONString(msgData)); - long start = System.currentTimeMillis(); - - String messageId = message.getMessageProperties().getMessageId(); - - try { - boolean addToSetIfNotExists = redisService.addToSetIfNotExists("couplet-code-queue", messageId); - if (addToSetIfNotExists) { - //异步保存记录 - CompletableFuture.runAsync(() -> { - CoupletTroubleCode troubleCode = new CoupletTroubleCode(); - troubleCode.setTroubleStartTime(new Date()); - troubleCode.setTroubleVin(msgData.getVin()); - // 随机生成故障码 - String faultCode = MsgUtils.generateGTA(); - troubleCode.setTroubleCode(faultCode); - - // 检查车辆状态,若为0,则设置故障位置为"190" - if(msgData.getVehicleStatus() == 0) { - troubleCode.setTroublePosition("190"); - } - - // 检查充电状态,若为0,则设置故障位置为"191" - if (msgData.getChargingStatus() == 0) { - troubleCode.setTroublePosition("191"); - } - - // 检查运行状态,若为0,则设置故障位置为"192" - if (msgData.getOperatingStatus() == 0) { - troubleCode.setTroublePosition("192"); - } - - // 检查电池荷电状态(SOC), 若为0,则设置故障位置为"193" - if (msgData.getSocStatus() == 0) { - troubleCode.setTroublePosition("193"); - } - - // 检查充电能源存储状态,若为0,则设置故障位置为"194" - if (msgData.getChargingEnergyStorageStatus() == 0) { - troubleCode.setTroublePosition("194"); - } - - // 检查驱动电机状态,若为0,则设置故障位置为"195" - if (msgData.getDriveMotorStatus() == 0) { - troubleCode.setTroublePosition("195"); - } - - // 检查定位状态,若为0,则设置故障位置为"196" - if (msgData.getPositionStatus() == 0) { - troubleCode.setTroublePosition("196"); - } - - // 检查电子驻车系统(EAS)状态,若为0,则设置故障位置为"197" - if (msgData.getEasStatus() == 0) { - troubleCode.setTroublePosition("197"); - } - - // 检查PTC(正温度系数热敏电阻)状态,若为0,则设置故障位置为"198" - if (msgData.getPtcStatus() == 0) { - troubleCode.setTroublePosition("198"); - } - - // 检查电动助力转向系统(EPS)状态,若为0,则设置故障位置为"199" - if (msgData.getEpsStatus() == 0) { - troubleCode.setTroublePosition("199"); - } - - // 检查防抱死制动系统(ABS)状态,若为0,则设置故障位置为"200" - if (msgData.getAbsStatus() == 0) { - troubleCode.setTroublePosition("200"); - } - - // 检查主控制器(MCU)状态,若为0,则设置故障位置为"201" - if (msgData.getMcuStatus() == 0) { - troubleCode.setTroublePosition("201"); - } - - // 检查加热状态,若为0,则设置故障位置为"202" - if (msgData.getHeatingStatus() == 0) { - troubleCode.setTroublePosition("202"); - } - - // 检查电池状态,若为0,则设置故障位置为"203" - if (msgData.getBatteryStatus() == 0) { - troubleCode.setTroublePosition("203"); - } - - // 检查电池绝缘状态,若为0,则设置故障位置为"204" - if (msgData.getBatteryInsulationStatus() == 0) { - troubleCode.setTroublePosition("204"); - } - - // 检查直流-直流转换器(DC/DC)状态,若为0,则设置故障位置为"205" - if (msgData.getDcdcStatus() == 0) { - troubleCode.setTroublePosition("205"); - } - - // 检查充电机(CHG)状态,若为0,则设置故障位置为"206" - if (msgData.getChgStatus() == 0) { - troubleCode.setTroublePosition("206"); - } - remoteTroubleService.newFaultData(troubleCode); - }); - } - long end = System.currentTimeMillis(); - log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","couplet-code-queue", JSONObject.toJSONString(msgData), (end-start)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} +// private RedisService redisService; +// @Autowired +// private RemoteTroubleService remoteTroubleService; +// +// @RabbitListener(queuesToDeclare = {@Queue("couplet-code-queue")}) +// public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) { +// log.info("日志队列:{},接收到的消息:{},开始消费...","couplet-code-queue", JSONObject.toJSONString(msgData)); +// long start = System.currentTimeMillis(); +// +// String messageId = message.getMessageProperties().getMessageId(); +// +// try { +// boolean addToSetIfNotExists = redisService.addToSetIfNotExists("couplet-code-queue", messageId); +// if (addToSetIfNotExists) { +// //异步保存记录 +// CompletableFuture.runAsync(() -> { +// CoupletTroubleCode troubleCode = new CoupletTroubleCode(); +// troubleCode.setTroubleStartTime(new Date()); +// troubleCode.setTroubleVin(msgData.getVin()); +// // 随机生成故障码 +// String faultCode = MsgUtils.generateGTA(); +// troubleCode.setTroubleCode(faultCode); +// +// // 检查车辆状态,若为0,则设置故障位置为"190" +// if(msgData.getVehicleStatus() == 0) { +// troubleCode.setTroublePosition("190"); +// } +// +// // 检查充电状态,若为0,则设置故障位置为"191" +// if (msgData.getChargingStatus() == 0) { +// troubleCode.setTroublePosition("191"); +// } +// +// // 检查运行状态,若为0,则设置故障位置为"192" +// if (msgData.getOperatingStatus() == 0) { +// troubleCode.setTroublePosition("192"); +// } +// +// // 检查电池荷电状态(SOC), 若为0,则设置故障位置为"193" +// if (msgData.getSocStatus() == 0) { +// troubleCode.setTroublePosition("193"); +// } +// +// // 检查充电能源存储状态,若为0,则设置故障位置为"194" +// if (msgData.getChargingEnergyStorageStatus() == 0) { +// troubleCode.setTroublePosition("194"); +// } +// +// // 检查驱动电机状态,若为0,则设置故障位置为"195" +// if (msgData.getDriveMotorStatus() == 0) { +// troubleCode.setTroublePosition("195"); +// } +// +// // 检查定位状态,若为0,则设置故障位置为"196" +// if (msgData.getPositionStatus() == 0) { +// troubleCode.setTroublePosition("196"); +// } +// +// // 检查电子驻车系统(EAS)状态,若为0,则设置故障位置为"197" +// if (msgData.getEasStatus() == 0) { +// troubleCode.setTroublePosition("197"); +// } +// +// // 检查PTC(正温度系数热敏电阻)状态,若为0,则设置故障位置为"198" +// if (msgData.getPtcStatus() == 0) { +// troubleCode.setTroublePosition("198"); +// } +// +// // 检查电动助力转向系统(EPS)状态,若为0,则设置故障位置为"199" +// if (msgData.getEpsStatus() == 0) { +// troubleCode.setTroublePosition("199"); +// } +// +// // 检查防抱死制动系统(ABS)状态,若为0,则设置故障位置为"200" +// if (msgData.getAbsStatus() == 0) { +// troubleCode.setTroublePosition("200"); +// } +// +// // 检查主控制器(MCU)状态,若为0,则设置故障位置为"201" +// if (msgData.getMcuStatus() == 0) { +// troubleCode.setTroublePosition("201"); +// } +// +// // 检查加热状态,若为0,则设置故障位置为"202" +// if (msgData.getHeatingStatus() == 0) { +// troubleCode.setTroublePosition("202"); +// } +// +// // 检查电池状态,若为0,则设置故障位置为"203" +// if (msgData.getBatteryStatus() == 0) { +// troubleCode.setTroublePosition("203"); +// } +// +// // 检查电池绝缘状态,若为0,则设置故障位置为"204" +// if (msgData.getBatteryInsulationStatus() == 0) { +// troubleCode.setTroublePosition("204"); +// } +// +// // 检查直流-直流转换器(DC/DC)状态,若为0,则设置故障位置为"205" +// if (msgData.getDcdcStatus() == 0) { +// troubleCode.setTroublePosition("205"); +// } +// +// // 检查充电机(CHG)状态,若为0,则设置故障位置为"206" +// if (msgData.getChgStatus() == 0) { +// troubleCode.setTroublePosition("206"); +// } +// remoteTroubleService.newFaultData(troubleCode); +// }); +// } +// long end = System.currentTimeMillis(); +// log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","couplet-code-queue", JSONObject.toJSONString(msgData), (end-start)); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +//} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index c9bb4b5..da4c02d 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -1,6 +1,7 @@ package com.couplet.analyze.msg.model; import com.couplet.analyze.common.contents.AnalyzeEventContents; +import com.couplet.analyze.common.event.AnalyzeEventCache; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; import com.couplet.common.core.exception.vehicle.VehicleException; @@ -34,19 +35,16 @@ public class ModelsKafkaMessage { private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; - static ArrayList strings = new ArrayList<>() { - { - add("breakdown"); - add("electronic-fence"); - add("real-time-data"); - add("stored-event"); - } - }; + @Autowired + private AnalyzeEventContents analyzeEventContents; + @Autowired + private AnalyzeEventCache analyzeEventCache; // @Autowired // private RabbitTemplate rabbitTemplate; /** * 消费者配置 + * * @return */ @Scheduled(fixedDelay = 50) @@ -64,7 +62,6 @@ public class ModelsKafkaMessage { //订阅主题 consumer.subscribe(Collections.singletonList(TOPIC_NAME)); - //持续消费消息 while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); @@ -74,11 +71,12 @@ public class ModelsKafkaMessage { List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { log.info("解析到车辆数据:{}", msgData); - for (String string : strings) { + Set eventList = analyzeEventCache.getEventList(msgData.getVin()); + for (String string : eventList) { IncidentService incidentService = SpringUtils.getBean(string); incidentService.incident(msgData); } - //发送消息 + // 发送消息 // rabbitTemplate.convertAndSend("couplet-code-queue",msgData,message -> { // message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); // return message; @@ -87,7 +85,7 @@ public class ModelsKafkaMessage { try { sleep(100); } catch (Exception e) { - throw new VehicleException("睡眠失败"+e); + throw new VehicleException("睡眠失败" + e); } } }); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index 2d1f10d..7ff9a1b 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -4,6 +4,9 @@ import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.mapper.IncidentMapper; import com.couplet.analyze.msg.service.IncidentService; import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; +import com.couplet.common.core.utils.StringUtils; +import com.couplet.common.domain.request.RealTimeDataRequest; +import com.couplet.common.redis.service.RedisService; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; @@ -25,7 +28,7 @@ public class RealTimeDataServiceImpl implements IncidentService { private IncidentMapper incidentMapper; @Autowired - private StringRedisTemplate redisTemplate; + private RedisService redisService; /** * 实时数据事件 @@ -37,6 +40,11 @@ public class RealTimeDataServiceImpl implements IncidentService { log.info("实时数据事件开始....."); + RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); + if (StringUtils.isArray(cacheObject)){ + log.info("[{}]有缓存数据,值为:[{}]", coupletMsgData.getVin(), cacheObject); + + } if (RealTimeJudge.isJudge(coupletMsgData.getVin())){ log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index 83462d9..bed4d26 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -15,7 +15,6 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -24,7 +23,6 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true rabbitmq: diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index 16394e6..30780e5 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -15,7 +15,6 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -24,7 +23,6 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true logging: @@ -38,7 +36,7 @@ mqtt: # broker: mqtt://115.159.47.13:1883 username: password: - clientId: fluxMq + clientId: xiaoYao qos: 0 - topic: test + topic: xiaoYao