Merge branch 'server_five_dongxiaodong' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five

server_five_liuyunhu
lijiayao 2024-04-08 10:01:23 +08:00
commit 4ad3cfbe8e
8 changed files with 227 additions and 64 deletions

View File

@ -27,6 +27,19 @@ public class RedisService {
return redisTemplate.opsForList().range("coupletMsgData", 0, -1); return redisTemplate.opsForList().range("coupletMsgData", 0, -1);
} }
// ... 其他已有方法 ...
/**
* truefalse
*
* @param setKey
* @param value
* @return truefalse
*/
public boolean addToSetIfNotExists(String setKey, String value) {
return redisTemplate.opsForSet().add(setKey, value) == 1;
}
/** /**
* IntegerString * IntegerString

View File

@ -0,0 +1,150 @@
package com.couplet.analyze.msg.consumer;
import com.alibaba.fastjson.JSONObject;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.utils.MsgUtils;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.redis.service.RedisService;
import com.couplet.remote.RemoteTroubleService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 15:37
* @description
*/
@Component
@Slf4j
public class CodeConsumer {
// @Autowired
// private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedisService redisService;
@Autowired
private RemoteTroubleService remoteTroubleService;
@RabbitListener(queuesToDeclare = {@Queue("couplet-code-queue")})
public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) {
log.info("日志队列:{},接收到的消息:{},开始消费...","couplet-code-queue", JSONObject.toJSONString(msgData));
long start = System.currentTimeMillis();
String messageId = message.getMessageProperties().getMessageId();
try {
boolean addToSetIfNotExists = redisService.addToSetIfNotExists("couplet-code-queue", messageId);
if (addToSetIfNotExists) {
//异步保存记录
CompletableFuture.runAsync(() -> {
CoupletTroubleCode troubleCode = new CoupletTroubleCode();
troubleCode.setTroubleStartTime(new Date());
troubleCode.setTroubleVin(msgData.getVin());
// 随机生成故障码
String faultCode = MsgUtils.generateGTA();
troubleCode.setTroubleCode(faultCode);
// 检查车辆状态若为0则设置故障位置为"190"
if(msgData.getVehicleStatus() == 0) {
troubleCode.setTroublePosition("190");
}
// 检查充电状态若为0则设置故障位置为"191"
if (msgData.getChargingStatus() == 0) {
troubleCode.setTroublePosition("191");
}
// 检查运行状态若为0则设置故障位置为"192"
if (msgData.getOperatingStatus() == 0) {
troubleCode.setTroublePosition("192");
}
// 检查电池荷电状态SOC, 若为0则设置故障位置为"193"
if (msgData.getSocStatus() == 0) {
troubleCode.setTroublePosition("193");
}
// 检查充电能源存储状态若为0则设置故障位置为"194"
if (msgData.getChargingEnergyStorageStatus() == 0) {
troubleCode.setTroublePosition("194");
}
// 检查驱动电机状态若为0则设置故障位置为"195"
if (msgData.getDriveMotorStatus() == 0) {
troubleCode.setTroublePosition("195");
}
// 检查定位状态若为0则设置故障位置为"196"
if (msgData.getPositionStatus() == 0) {
troubleCode.setTroublePosition("196");
}
// 检查电子驻车系统EAS状态若为0则设置故障位置为"197"
if (msgData.getEasStatus() == 0) {
troubleCode.setTroublePosition("197");
}
// 检查PTC正温度系数热敏电阻状态若为0则设置故障位置为"198"
if (msgData.getPtcStatus() == 0) {
troubleCode.setTroublePosition("198");
}
// 检查电动助力转向系统EPS状态若为0则设置故障位置为"199"
if (msgData.getEpsStatus() == 0) {
troubleCode.setTroublePosition("199");
}
// 检查防抱死制动系统ABS状态若为0则设置故障位置为"200"
if (msgData.getAbsStatus() == 0) {
troubleCode.setTroublePosition("200");
}
// 检查主控制器MCU状态若为0则设置故障位置为"201"
if (msgData.getMcuStatus() == 0) {
troubleCode.setTroublePosition("201");
}
// 检查加热状态若为0则设置故障位置为"202"
if (msgData.getHeatingStatus() == 0) {
troubleCode.setTroublePosition("202");
}
// 检查电池状态若为0则设置故障位置为"203"
if (msgData.getBatteryStatus() == 0) {
troubleCode.setTroublePosition("203");
}
// 检查电池绝缘状态若为0则设置故障位置为"204"
if (msgData.getBatteryInsulationStatus() == 0) {
troubleCode.setTroublePosition("204");
}
// 检查直流-直流转换器DC/DC状态若为0则设置故障位置为"205"
if (msgData.getDcdcStatus() == 0) {
troubleCode.setTroublePosition("205");
}
// 检查充电机CHG状态若为0则设置故障位置为"206"
if (msgData.getChgStatus() == 0) {
troubleCode.setTroublePosition("206");
}
remoteTroubleService.newFaultData(troubleCode);
});
}
long end = System.currentTimeMillis();
log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","couplet-code-queue", JSONObject.toJSONString(msgData), (end-start));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,16 +0,0 @@
package com.couplet.analyze.msg.consumer;
import org.springframework.kafka.annotation.KafkaListener;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 15:37
* @description
*/
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group", properties = {"bootstrap.servers = 39.103.133.136:9092"})
public void getMessage(String msg) {
System.out.println("接收到消息:" + msg);
}
}

View File

@ -9,15 +9,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static com.couplet.analyze.msg.utils.MsgUtils.hexToString; import static com.couplet.analyze.msg.utils.MsgUtils.hexToString;
import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg; import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg;
@ -44,12 +43,15 @@ public class ModelsKafkaMessage {
add("stored-event"); add("stored-event");
} }
}; };
@Autowired
private RabbitTemplate rabbitTemplate;
/** /**
* *
* @return * @return
*/ */
@Scheduled(fixedDelay = 50) @Scheduled(fixedDelay = 50)
private static void consumerMessages() { public void consumerMessages() {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
@ -73,10 +75,15 @@ public class ModelsKafkaMessage {
List<CoupletMsgData> coupletMsgDataList = sendMsg(str); List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
for (CoupletMsgData msgData : coupletMsgDataList) { for (CoupletMsgData msgData : coupletMsgDataList) {
log.info("解析到车辆数据:{}", msgData); log.info("解析到车辆数据:{}", msgData);
for (String string : strings) { // for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string); // IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData); // incidentService.incident(msgData);
} // }
//发送消息
rabbitTemplate.convertAndSend("couplet-code-queue",msgData,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
try { try {
sleep(100); sleep(100);

View File

@ -15,6 +15,7 @@ spring:
discovery: discovery:
# 服务注册地址 # 服务注册地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
config: config:
# 配置中心地址 # 配置中心地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
@ -23,6 +24,7 @@ spring:
# 共享配置 # 共享配置
shared-configs: shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
main: main:
allow-bean-definition-overriding: true allow-bean-definition-overriding: true
rabbitmq: rabbitmq:

View File

@ -25,21 +25,21 @@ public class ParsingMsg {
String hexStringWithoutSpaces = substring.replaceAll("\\s+", ""); String hexStringWithoutSpaces = substring.replaceAll("\\s+", "");
String asciiString = hexToString(hexStringWithoutSpaces); String asciiString = hexToString(hexStringWithoutSpaces);
System.out.println("16进制解析后的数据"+asciiString); System.out.println("16进制解析后的数据"+asciiString);
// //截取前17位 //截取前17位
// String substring1 = asciiString.substring(0, 17); String substring1 = asciiString.substring(0, 17);
// System.out.println("VIN"+substring1); System.out.println("VIN"+substring1);
// String substring2 = asciiString.substring(17, 30); String substring2 = asciiString.substring(17, 30);
// System.out.println("时间戳:"+substring2); System.out.println("时间戳:"+substring2);
// String substring3 = asciiString.substring(30, 40); String substring3 = asciiString.substring(30, 40);
// System.out.println("经度:" +substring3); System.out.println("经度:" +substring3);
// String substring4 = asciiString.substring(41, 50); String substring4 = asciiString.substring(41, 50);
// System.out.println("纬度:"+ substring4); System.out.println("纬度:"+ substring4);
// String substring5 = asciiString.substring(51, 56); String substring5 = asciiString.substring(51, 56);
// System.out.println("车速:"+ substring5); System.out.println("车速:"+ substring5);
// String substring6 = asciiString.substring(57, 67); String substring6 = asciiString.substring(57, 67);
// System.out.println("总里程:"+ substring6); System.out.println("总里程:"+ substring6);
// String substring7 = asciiString.substring(68, 73); String substring7 = asciiString.substring(68, 73);
// System.out.println("总电压:"+ substring7); System.out.println("总电压:"+ substring7);
String pattern = "(.{17})(.{10})(.{9})(.{8})(.{2})"; String pattern = "(.{17})(.{10})(.{9})(.{8})(.{2})";
Pattern compile = Pattern.compile(pattern); Pattern compile = Pattern.compile(pattern);
Matcher matcher = compile.matcher(asciiString); Matcher matcher = compile.matcher(asciiString);

View File

@ -130,15 +130,15 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
* @Return: RabbitTemplate * @Return: RabbitTemplate
**/ **/
@Primary @Primary
@Bean // @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { // public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTempalte = new RabbitTemplate(connectionFactory); // RabbitTemplate rabbitTempalte = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTempalte; // this.rabbitTemplate = rabbitTempalte;
rabbitTempalte.setMessageConverter(messageConverter()); // rabbitTempalte.setMessageConverter(messageConverter());
rabbitTempalte(); // rabbitTempalte();
//
return rabbitTempalte; // return rabbitTempalte;
} // }
/* /*
* @Author: LiuYunHu * @Author: LiuYunHu
@ -147,10 +147,10 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
* @Param: [] * @Param: []
* @Return: void * @Return: void
**/ **/
public void rabbitTempalte() { // public void rabbitTempalte() {
rabbitTemplate.setConfirmCallback(this); // rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this); // rabbitTemplate.setReturnsCallback(this);
} // }
/* /*
* @Author: LiuYunHu * @Author: LiuYunHu
@ -188,14 +188,14 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
* @Param: ack * @Param: ack
* @Param: s * @Param: s
**/ **/
@Override // @Override
public void confirm(CorrelationData correlationData, boolean ack, String s) { // public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack) { // if (ack) {
log.info("{}消息到达交换机", correlationData.getId()); // log.info("{}消息到达交换机", correlationData.getId());
} else { // } else {
log.error("{}消息丢失", correlationData.getId()); // log.error("{}消息丢失", correlationData.getId());
} // }
} // }
/* /*
* @Author: LiuYunHu * @Author: LiuYunHu
@ -209,4 +209,9 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
public void returnedMessage(ReturnedMessage returnedMessage) { public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("{}消息未到达队列", returnedMessage.getMessage().getMessageProperties().getMessageId()); log.error("{}消息未到达队列", returnedMessage.getMessage().getMessageProperties().getMessageId());
} }
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
} }

View File

@ -15,6 +15,7 @@ spring:
discovery: discovery:
# 服务注册地址 # 服务注册地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
config: config:
# 配置中心地址 # 配置中心地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
@ -23,6 +24,7 @@ spring:
# 共享配置 # 共享配置
shared-configs: shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
main: main:
allow-bean-definition-overriding: true allow-bean-definition-overriding: true
logging: logging:
@ -36,7 +38,7 @@ mqtt:
# broker: mqtt://115.159.47.13:1883 # broker: mqtt://115.159.47.13:1883
username: username:
password: password:
clientId: xiaoyao clientId: fluxMq
qos: 0 qos: 0
topic: xiaoyao topic: test