diff --git a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java index 3db9232..5e5dcd7 100644 --- a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java +++ b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java @@ -25,6 +25,19 @@ public class RedisService { return redisTemplate.opsForList().range("coupletMsgData", 0, -1); } + // ... 其他已有方法 ... + + /** + * 向指定集合中添加值,如果值不存在则添加并返回true,否则返回false。 + * + * @param setKey 集合键名 + * @param value 要添加的值 + * @return true表示值已成功添加(之前不存在),false表示值已存在 + */ + public boolean addToSetIfNotExists(String setKey, String value) { + return redisTemplate.opsForSet().add(setKey, value) == 1; + } + /** * 缓存基本的对象,Integer、String、实体类等 diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java new file mode 100644 index 0000000..15e4140 --- /dev/null +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/CodeConsumer.java @@ -0,0 +1,150 @@ +package com.couplet.analyze.msg.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.couplet.analyze.msg.domain.CoupletMsgData; +import com.couplet.analyze.msg.utils.MsgUtils; +import com.couplet.common.domain.CoupletTroubleCode; +import com.couplet.common.redis.service.RedisService; +import com.couplet.remote.RemoteTroubleService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.amqp.core.Message; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/6 15:37 + * @description + */ +@Component +@Slf4j +public class CodeConsumer { +// @Autowired +// private RedisTemplate redisTemplate; + + @Autowired + private RedisService redisService; + @Autowired + private RemoteTroubleService remoteTroubleService; + + @RabbitListener(queuesToDeclare = {@Queue("couplet-code-queue")}) + public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) { + log.info("日志队列:{},接收到的消息:{},开始消费...","couplet-code-queue", JSONObject.toJSONString(msgData)); + long start = System.currentTimeMillis(); + + String messageId = message.getMessageProperties().getMessageId(); + + try { + boolean addToSetIfNotExists = redisService.addToSetIfNotExists("couplet-code-queue", messageId); + if (addToSetIfNotExists) { + //异步保存记录 + CompletableFuture.runAsync(() -> { + CoupletTroubleCode troubleCode = new CoupletTroubleCode(); + troubleCode.setTroubleStartTime(new Date()); + troubleCode.setTroubleVin(msgData.getVin()); + // 随机生成故障码 + String faultCode = MsgUtils.generateGTA(); + troubleCode.setTroubleCode(faultCode); + + // 检查车辆状态,若为0,则设置故障位置为"190" + if(msgData.getVehicleStatus() == 0) { + troubleCode.setTroublePosition("190"); + } + + // 检查充电状态,若为0,则设置故障位置为"191" + if (msgData.getChargingStatus() == 0) { + troubleCode.setTroublePosition("191"); + } + + // 检查运行状态,若为0,则设置故障位置为"192" + if (msgData.getOperatingStatus() == 0) { + troubleCode.setTroublePosition("192"); + } + + // 检查电池荷电状态(SOC), 若为0,则设置故障位置为"193" + if (msgData.getSocStatus() == 0) { + troubleCode.setTroublePosition("193"); + } + + // 检查充电能源存储状态,若为0,则设置故障位置为"194" + if (msgData.getChargingEnergyStorageStatus() == 0) { + troubleCode.setTroublePosition("194"); + } + + // 检查驱动电机状态,若为0,则设置故障位置为"195" + if (msgData.getDriveMotorStatus() == 0) { + troubleCode.setTroublePosition("195"); + } + + // 检查定位状态,若为0,则设置故障位置为"196" + if (msgData.getPositionStatus() == 0) { + troubleCode.setTroublePosition("196"); + } + + // 检查电子驻车系统(EAS)状态,若为0,则设置故障位置为"197" + if (msgData.getEasStatus() == 0) { + troubleCode.setTroublePosition("197"); + } + + // 检查PTC(正温度系数热敏电阻)状态,若为0,则设置故障位置为"198" + if (msgData.getPtcStatus() == 0) { + troubleCode.setTroublePosition("198"); + } + + // 检查电动助力转向系统(EPS)状态,若为0,则设置故障位置为"199" + if (msgData.getEpsStatus() == 0) { + troubleCode.setTroublePosition("199"); + } + + // 检查防抱死制动系统(ABS)状态,若为0,则设置故障位置为"200" + if (msgData.getAbsStatus() == 0) { + troubleCode.setTroublePosition("200"); + } + + // 检查主控制器(MCU)状态,若为0,则设置故障位置为"201" + if (msgData.getMcuStatus() == 0) { + troubleCode.setTroublePosition("201"); + } + + // 检查加热状态,若为0,则设置故障位置为"202" + if (msgData.getHeatingStatus() == 0) { + troubleCode.setTroublePosition("202"); + } + + // 检查电池状态,若为0,则设置故障位置为"203" + if (msgData.getBatteryStatus() == 0) { + troubleCode.setTroublePosition("203"); + } + + // 检查电池绝缘状态,若为0,则设置故障位置为"204" + if (msgData.getBatteryInsulationStatus() == 0) { + troubleCode.setTroublePosition("204"); + } + + // 检查直流-直流转换器(DC/DC)状态,若为0,则设置故障位置为"205" + if (msgData.getDcdcStatus() == 0) { + troubleCode.setTroublePosition("205"); + } + + // 检查充电机(CHG)状态,若为0,则设置故障位置为"206" + if (msgData.getChgStatus() == 0) { + troubleCode.setTroublePosition("206"); + } + remoteTroubleService.newFaultData(troubleCode); + }); + } + long end = System.currentTimeMillis(); + log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","couplet-code-queue", JSONObject.toJSONString(msgData), (end-start)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/KafkaConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/KafkaConsumer.java deleted file mode 100644 index 6b91d24..0000000 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/KafkaConsumer.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.couplet.analyze.msg.consumer; - -import org.springframework.kafka.annotation.KafkaListener; - -/** - * @author DongXiaoDong - * @version 1.0 - * @date 2024/4/6 15:37 - * @description - */ -public class KafkaConsumer { - @KafkaListener(topics = "test", groupId = "group", properties = {"bootstrap.servers = 39.103.133.136:9092"}) - public void getMessage(String msg) { - System.out.println("接收到消息:" + msg); - } -} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 56c5d92..84fbd01 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -8,15 +8,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; +import java.util.*; import static com.couplet.analyze.msg.utils.MsgUtils.hexToString; import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg; @@ -43,12 +42,15 @@ public class ModelsKafkaMessage { add("stored-event"); } }; + @Autowired + private RabbitTemplate rabbitTemplate; + /** * 消费者配置 * @return */ @Scheduled(fixedDelay = 50) - private static void consumerMessages() { + public void consumerMessages() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); @@ -72,10 +74,15 @@ public class ModelsKafkaMessage { List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { log.info("解析到车辆数据:{}", msgData); - for (String string : strings) { - IncidentService incidentService = SpringUtils.getBean(string); - incidentService.incident(msgData); - } +// for (String string : strings) { +// IncidentService incidentService = SpringUtils.getBean(string); +// incidentService.incident(msgData); +// } + //发送消息 + rabbitTemplate.convertAndSend("couplet-code-queue",msgData,message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); try { sleep(100); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java index 6186eb7..4690279 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java @@ -44,11 +44,11 @@ public class ElectronicFenceServiceImpl implements IncidentService { ArrayList fences = new ArrayList<>(); if (redisService.hasKey(fenceKey)) { log.info("电子围栏事件redis存在......."); - redisService.getCacheObject(); - for (String s : fence) { - Fence parseObject = JSON.parseObject(s, Fence.class); - fences.add(parseObject); - } +// redisService.getCacheObject(); +// for (String s : fence) { +// Fence parseObject = JSON.parseObject(s, Fence.class); +// fences.add(parseObject); +// } // jingdu; // longitude; // weidu; @@ -83,7 +83,7 @@ public class ElectronicFenceServiceImpl implements IncidentService { } } - log.info("更改的电子围栏内容是:"+fence); +// log.info("更改的电子围栏内容是:"+fence); log.info("电子围栏事件结束......."); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index bed4d26..83462d9 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -15,6 +15,7 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -23,6 +24,7 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true rabbitmq: diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/ParsingMsg.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/ParsingMsg.java index 527a27f..697f79c 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/ParsingMsg.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/ParsingMsg.java @@ -25,21 +25,21 @@ public class ParsingMsg { String hexStringWithoutSpaces = substring.replaceAll("\\s+", ""); String asciiString = hexToString(hexStringWithoutSpaces); System.out.println("16进制解析后的数据:"+asciiString); -// //截取前17位 -// String substring1 = asciiString.substring(0, 17); -// System.out.println("VIN:"+substring1); -// String substring2 = asciiString.substring(17, 30); -// System.out.println("时间戳:"+substring2); -// String substring3 = asciiString.substring(30, 40); -// System.out.println("经度:" +substring3); -// String substring4 = asciiString.substring(41, 50); -// System.out.println("纬度:"+ substring4); -// String substring5 = asciiString.substring(51, 56); -// System.out.println("车速:"+ substring5); -// String substring6 = asciiString.substring(57, 67); -// System.out.println("总里程:"+ substring6); -// String substring7 = asciiString.substring(68, 73); -// System.out.println("总电压:"+ substring7); + //截取前17位 + String substring1 = asciiString.substring(0, 17); + System.out.println("VIN:"+substring1); + String substring2 = asciiString.substring(17, 30); + System.out.println("时间戳:"+substring2); + String substring3 = asciiString.substring(30, 40); + System.out.println("经度:" +substring3); + String substring4 = asciiString.substring(41, 50); + System.out.println("纬度:"+ substring4); + String substring5 = asciiString.substring(51, 56); + System.out.println("车速:"+ substring5); + String substring6 = asciiString.substring(57, 67); + System.out.println("总里程:"+ substring6); + String substring7 = asciiString.substring(68, 73); + System.out.println("总电压:"+ substring7); String pattern = "(.{17})(.{10})(.{9})(.{8})(.{2})"; Pattern compile = Pattern.compile(pattern); Matcher matcher = compile.matcher(asciiString); diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java index 5e789ec..d5fc751 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java @@ -130,15 +130,15 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem * @Return: 配置好的RabbitTemplate实例。 **/ @Primary - @Bean - public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { - RabbitTemplate rabbitTempalte = new RabbitTemplate(connectionFactory); - this.rabbitTemplate = rabbitTempalte; - rabbitTempalte.setMessageConverter(messageConverter()); - rabbitTempalte(); - - return rabbitTempalte; - } +// @Bean +// public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { +// RabbitTemplate rabbitTempalte = new RabbitTemplate(connectionFactory); +// this.rabbitTemplate = rabbitTempalte; +// rabbitTempalte.setMessageConverter(messageConverter()); +// rabbitTempalte(); +// +// return rabbitTempalte; +// } /* * @Author: LiuYunHu @@ -147,10 +147,10 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem * @Param: [] * @Return: void **/ - public void rabbitTempalte() { - rabbitTemplate.setConfirmCallback(this); - rabbitTemplate.setReturnsCallback(this); - } +// public void rabbitTempalte() { +// rabbitTemplate.setConfirmCallback(this); +// rabbitTemplate.setReturnsCallback(this); +// } /* * @Author: LiuYunHu @@ -188,14 +188,14 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem * @Param: ack 消息是否被成功处理 * @Param: s 附加信息 **/ - @Override - public void confirm(CorrelationData correlationData, boolean ack, String s) { - if (ack) { - log.info("{}消息到达交换机", correlationData.getId()); - } else { - log.error("{}消息丢失", correlationData.getId()); - } - } +// @Override +// public void confirm(CorrelationData correlationData, boolean ack, String s) { +// if (ack) { +// log.info("{}消息到达交换机", correlationData.getId()); +// } else { +// log.error("{}消息丢失", correlationData.getId()); +// } +// } /* * @Author: LiuYunHu @@ -209,4 +209,9 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem public void returnedMessage(ReturnedMessage returnedMessage) { log.error("{}消息未到达队列", returnedMessage.getMessage().getMessageProperties().getMessageId()); } + + @Override + public void confirm(CorrelationData correlationData, boolean b, String s) { + + } } diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index d5c530c..d5c0dc2 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -15,6 +15,7 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -23,6 +24,7 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true logging: @@ -36,7 +38,7 @@ mqtt: # broker: mqtt://115.159.47.13:1883 username: password: - clientId: fluxMq + clientId: Mqfghh qos: 0 - topic: test + topic: dxd