Compare commits

...

11 Commits

Author SHA1 Message Date
ffr 94a0e48edc ffffff 2024-04-06 16:34:44 +08:00
lijiayao 7aa223c8e4 Merge branch 'server_five_liuyunhu' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five 2024-04-06 16:30:45 +08:00
liuyunhu 78f59d7392 优化 2024-04-06 16:30:00 +08:00
dongxiaodong 6798b04fc4 kafka代码 2024-04-06 16:29:50 +08:00
liuyunhu b655ca9c30 打印SQL执行日志 2024-04-06 14:53:29 +08:00
liuyunhu 73342572cc 将远程调用的内容加 自动配置 2024-04-06 14:51:05 +08:00
liuyunhu 812b8d47f8 Kafka demo 2024-04-06 14:25:44 +08:00
lijiayao 957d3976b9 fix: 代码丢失找回 2024-04-06 14:09:42 +08:00
lijiayao a042d30eec Merge branch 'server_five_liuyunhu' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five 2024-04-06 14:09:35 +08:00
liuyunhu 858acc75ee Merge branch 'server_five' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five_liuyunhu
# Conflicts:
#	couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/SysTroubleMapper.java
#	couplet-modules/couplet-business/src/main/resources/mapper/business/SysTroubleMapper.xml
#	couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.java
2024-04-06 11:38:24 +08:00
liuyunhu e885895016 VS 2024-04-06 11:37:22 +08:00
23 changed files with 958 additions and 569 deletions

View File

@ -54,4 +54,5 @@ public interface RemoteVehicleService {
@GetMapping("onOrOutLineByVIN")
public Integer onOrOutLineByVIN(@RequestParam("params") String params);
}

View File

@ -54,7 +54,7 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory<RemoteVehic
@Override
public Integer onOrOutLineByVIN(String params) {
log.error("车辆服务调用失败:" + cause.getMessage());
return null;
return 0;
}
};
}

View File

@ -0,0 +1,2 @@
com.couplet.remote.factory.RemoteVehicleFallbackFactory
com.couplet.remote.factory.RemoteVehicleFallbackFactory

View File

@ -1,22 +0,0 @@
package com.couplet.common.system.remote;
import com.couplet.common.core.constant.ServiceNameConstants;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:00
* @description
*/
@FeignClient(contextId = "remoteCodeService",value = ServiceNameConstants.BUSINESS_SERVICE, fallbackFactory = RemoteCodeFallbackFactory.class)
public interface RemoteCodeService {
@PostMapping("trouble/insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode);
}

View File

@ -1,30 +0,0 @@
package com.couplet.common.system.remote.factory;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:03
* @description
*/
@Slf4j
@Component
public class RemoteCodeFallbackFactory implements FallbackFactory<RemoteCodeService> {
@Override
public RemoteCodeService create(Throwable cause) {
log.error("调用日志服务异常:{}", cause.getMessage());
return new RemoteCodeService()
{
@Override
public Result<Integer> insertCode(CoupletTroubleCode coupletTroubleCode) {
return null;
}
};
}
}

View File

@ -3,4 +3,3 @@ com.couplet.common.system.remote.factory.RemoteLogFallbackFactory
com.couplet.common.system.remote.factory.RemoteFileFallbackFactory
com.couplet.common.system.remote.factory.RemoteDeptFallbackFactory
com.couplet.common.system.remote.factory.RemoteEmployeeFallbackFactory
com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory

View File

@ -0,0 +1,16 @@
package com.couplet.analyze.msg.consumer;
import org.springframework.kafka.annotation.KafkaListener;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 15:37
* @description
*/
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group", properties = {"bootstrap.servers = 39.103.133.136:9092"})
public void getMessage(String msg) {
System.out.println("接收到消息:" + msg);
}
}

View File

@ -3,7 +3,6 @@ package com.couplet.analyze.msg.model;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.utils.SpringUtils;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -29,372 +28,369 @@ import static com.couplet.analyze.msg.contents.MsgContent.CLIENT_ID;
@Component
public class ModelMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
static ArrayList<String> strings = new ArrayList<>() {
{
add("breakdown");
add("electronic-fence");
add("real-time-data");
add("stored-event");
}
};
@Autowired
private RemoteCodeService remoteCodeService;
// @Value("${mq.queueName}")
// public String queueName;
// @Autowired
// private RabbitTemplate rabbitTemplate;
// static ArrayList<String> strings = new ArrayList<>() {
// {
// add("breakdown");
// add("electronic-fence");
// add("real-time-data");
// add("stored-event");
// }
// };
//
// //交换机
// @Value("${mq.exchangeName}")
// public String exchangeName;
//// @Value("${mq.queueName}")
//// public String queueName;
////
//// //交换机
//// @Value("${mq.exchangeName}")
//// public String exchangeName;
////
//// //路由键
//// @Value("${mq.routingKey}")
//// public String routingKey;
//
// //路由键
// @Value("${mq.routingKey}")
// public String routingKey;
@Scheduled(cron = "0/5 * * * * ?")
public void startMsg() {
try {
MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
mqttClient.connect(options);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
log.error("Mqtt[{}-{}]连接断开:[{}]", CLIENT_ID, BROKER_URL, throwable.getMessage(), throwable);
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//打印接收到的消息和主题
log.info("主题='{}':消息内容={}", topic, new String(mqttMessage.getPayload()));
String str = hexToString(new String(mqttMessage.getPayload()));
List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
for (CoupletMsgData msgData : coupletMsgDataList) {
log.info("解析到车辆数据:{}", msgData);
for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData);
}
// 不睡眠,他们也会主动拉取数据,不会影响其他服务
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// @Scheduled(cron = "0/5 * * * * ?")
// public void startMsg() {
// try {
// MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID);
//
// MqttConnectOptions options = new MqttConnectOptions();
//
// options.setCleanSession(true);
// mqttClient.connect(options);
//
// mqttClient.setCallback(new MqttCallback() {
// @Override
// public void connectionLost(Throwable throwable) {
// log.error("Mqtt[{}-{}]连接断开:[{}]", CLIENT_ID, BROKER_URL, throwable.getMessage(), throwable);
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// //打印接收到的消息和主题
// log.info("主题='{}':消息内容={}", topic, new String(mqttMessage.getPayload()));
//// String str = hexToString(new String(mqttMessage.getPayload()));
//// List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
//
// for (CoupletMsgData msgData : coupletMsgDataList) {
// log.info("解析到车辆数据:{}", msgData);
// for (String string : strings) {
// IncidentService incidentService = SpringUtils.getBean(string);
// incidentService.incident(msgData);
// }
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("消息已投递成功:{}",token);
}
});
mqttClient.subscribe("test",0);
Thread.sleep(1000*60*10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 16ASCII
* @param s 16
* @return ASCII
*/
public static String hexToString(String s) {
if (s == null || s.equals("")) {
return null;
}
s = s.replace(" ", "");
byte[] baKeyword = new byte[s.length() / 2];
for (int i = 0; i < baKeyword.length; i++) {
try {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
s = new String(baKeyword, StandardCharsets.UTF_8);
} catch (Exception e1) {
e1.printStackTrace();
return s;
}
return s;
}
public static List<CoupletMsgData> sendMsg(String str) {
List<CoupletMsgData> coupletMsgDataList = new ArrayList<>();
CoupletMsgData coupletMsgData = new CoupletMsgData();
coupletMsgData.setVin(str.substring(1,18));
log.info("vin=="+coupletMsgData.getVin());
//时间
String tim =str.substring(18,31);
long timestamp = Long.parseLong(tim);
Date date = new Date(timestamp);
coupletMsgData.setCreateTime(date);
//经度
String lt = str.substring(31,42);
// 如果末尾是零,则舍去
int endIndex = lt.length() - 1;
while (lt.charAt(endIndex) == '0'){
endIndex--;
}
String longitude = lt.substring(0, endIndex + 1);
coupletMsgData.setLongitude(longitude);
//维度
String latitudeIndex =str.substring(42,52);
int endIndexT = latitudeIndex.length() - 1;
while (latitudeIndex.charAt(endIndexT) == '0'){
endIndexT--;
}
String latitude = latitudeIndex.substring(0, endIndexT + 1);
coupletMsgData.setLatitude(latitude);
//速度speed
String speed =str.substring(52,58);
coupletMsgData.setSpeed(speed);
//里程
BigDecimal mileage= new BigDecimal(str.substring(58,69));
mileage=mileage.stripTrailingZeros();
coupletMsgData.setMileage(mileage);
//总电压
String voltage =str.substring(69,75);
while (voltage.endsWith("0")) {
voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零
}
coupletMsgData.setVoltage(voltage);
//总电流
String current =str.substring(75,80);
while (current.endsWith("0")){
current=current.substring(0,current.length()-1);
}
coupletMsgData.setCurrent(current);
//绝缘电阻 resistance
String res =str.substring(80,89);
String resistance = res.substring(0, 5);
coupletMsgData.setResistance(resistance);
//档位
String gear =str.substring(89,90);
coupletMsgData.setGear(gear);
//accelerationPedal 加速踏板行程值
String accelerationPedal =str.substring(90,91);
coupletMsgData.setAccelerationPedal(accelerationPedal);
//brakePedal 制动踏板行程值
String brakePedal =str.substring(92,93);
coupletMsgData.setBrakePedal(brakePedal);
//fuelConsumptionRate 燃料消耗率
String fuelConsumptionRate =str.substring(94,99);
coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate);
//motorControllerTemperature 电机控制器温度
String motorControllerTemperature =str.substring(99,105);
while (motorControllerTemperature.endsWith("0")){
motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1);
}
coupletMsgData.setMotorControllerTemperature(motorControllerTemperature);
//motorSpeed 电机转速
String motorSpeed =str.substring(105,110);
coupletMsgData.setMotorSpeed(motorSpeed);
//motorTorque 电机转矩
String motorTorque =str.substring(110,114);
while (motorTorque.endsWith("0")){
motorTorque=motorTorque.substring(0,motorTorque.length()-1);
}
coupletMsgData.setMotorTorque(motorTorque);
//motorTemperature 电机温度
String motorTemperature =str.substring(114,120);
while (motorTemperature.endsWith("0")){
motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1);
}
coupletMsgData.setMotorTemperature(motorTemperature);
//motorVoltage 电机电压
String motorVoltage =str.substring(120,125);
while (motorVoltage.endsWith("0")){
motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1);
}
coupletMsgData.setMotorVoltage(motorVoltage);
//motorCurrent 电机电流
String motorCurrent =str.substring(125,133);
while (motorCurrent.endsWith("0")){
motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1);
}
coupletMsgData.setMotorCurrent(motorCurrent);
//remainingBattery 动力电池剩余电量SOC
BigDecimal remainingBattery = new BigDecimal(str.substring(133,138));
coupletMsgData.setRemainingBattery(remainingBattery);
//maximumFeedbackPower 当前状态允许的最大反馈功率
String maximumFeedbackPower =str.substring(139,144);
while (maximumFeedbackPower.endsWith("0")){
maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1);
}
coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower);
//maximumDischargePower 当前状态允许最大放电功率
String maximumDischargePower =str.substring(145,151);
while (maximumDischargePower.endsWith("0")){
maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1);
}
coupletMsgData.setMaximumDischargePower(maximumDischargePower);
//selfCheckCounter BMS自检计数器
String selfCheckCounter =str.substring(151,153);
String selfCheckCounterReplace = selfCheckCounter.replace("0", "");
coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace);
//totalBatteryCurrent 动力电池充放电电流
String totalBatteryCurrent =str.substring(153,158);
while (totalBatteryCurrent.endsWith("0")){
totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1);
}
coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent);
//totalBatteryVoltage 动力电池负载端总电压V3
String totalBatteryVoltage =str.substring(158,164);
while (totalBatteryVoltage.endsWith("0")){
totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1);
}
coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage);
//singleBatteryMaxVoltage 单次最大电压
String singleBatteryMaxVoltage =str.substring(164,168);
while (singleBatteryMaxVoltage.endsWith("0")){
singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
//singleBatteryMinVoltage 单体电池最低电压
String singleBatteryMinVoltage =str.substring(168,172);
while (singleBatteryMinVoltage.endsWith("0")){
singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
//singleBatteryMaxTemperature 单体电池最高温度
String singleBatteryMaxTemperature =str.substring(172,178);
while (singleBatteryMaxTemperature.endsWith("0")){
singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
//singleBatteryMinTemperature 单体电池最低温度
String singleBatteryMinTemperature =str.substring(178,184);
while (singleBatteryMinTemperature.endsWith("0")){
singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
//availableBatteryCapacity 可用电池容量
String availableBatteryCapacity =str.substring(184,190);
while (availableBatteryCapacity.endsWith("0")){
availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1);
}
coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity);
//vehicleStatus 车辆状态
int vehicleStatus = Integer.parseInt(str.substring(190,191));
coupletMsgData.setVehicleStatus(vehicleStatus);
//chargingStatus 充电状态
int chargingStatus = Integer.parseInt(str.substring(191,192));
coupletMsgData.setChargingStatus(chargingStatus);
//operatingStatus 运行状态
int operatingStatus = Integer.parseInt(str.substring(192,193));
coupletMsgData.setOperatingStatus(operatingStatus);
//socStatus SOC
int socStatus = Integer.parseInt(str.substring(193,194));
coupletMsgData.setSocStatus(socStatus);
//chargingEnergyStorageStatus 可充电储能装置工作状态
int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195));
coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus);
//driveMotorStatus 驱动电机状态
int driveMotorStatus = Integer.parseInt(str.substring(195,196));
coupletMsgData.setDriveMotorStatus(driveMotorStatus);
//positionStatus 定位是否有效
int positionStatus = Integer.parseInt(str.substring(196,197));
coupletMsgData.setPositionStatus(positionStatus);
//easStatus EAS(汽车防盗系统)状态
int easStatus = Integer.parseInt(str.substring(197,198));
coupletMsgData.setEasStatus(easStatus);
//ptcStatus PTC(电动加热器)状态
int ptcStatus = Integer.parseInt(str.substring(198,199));
coupletMsgData.setPtcStatus(ptcStatus);
//epsStatus
int epsStatus = Integer.parseInt(str.substring(199,200));
coupletMsgData.setEpsStatus(epsStatus);
//absStatus EPS(电动助力系统)状态
int absStatus = Integer.parseInt(str.substring(200,201));
coupletMsgData.setAbsStatus(absStatus);
//mcuStatus MCU(电机/逆变器)状态
int mcuStatus = Integer.parseInt(str.substring(201,202));
coupletMsgData.setMcuStatus(mcuStatus);
//heatingStatus 动力电池加热状态
int heatingStatus = Integer.parseInt(str.substring(202,203));
coupletMsgData.setHeatingStatus(heatingStatus);
//batteryStatus 动力电池当前状态
int batteryStatus = Integer.parseInt(str.substring(203,204));
coupletMsgData.setBatteryStatus(batteryStatus);
//batteryInsulationStatus 动力电池保温状态
int batteryInsulationStatus = Integer.parseInt(str.substring(204,205));
coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus);
//dcdcStatus DCDC(电力交换系统)状态
int dcdcStatus = Integer.parseInt(str.substring(205,206));
coupletMsgData.setDcdcStatus(dcdcStatus);
//chgStatus CHG(充电机)状态
int chgStatus = Integer.parseInt(str.substring(206,207));
coupletMsgData.setChgStatus(chgStatus);
coupletMsgDataList.add(coupletMsgData);
return coupletMsgDataList;
}
//// 不睡眠,他们也会主动拉取数据,不会影响其他服务
//// try {
//// Thread.sleep(1000);
//// } catch (InterruptedException e) {
//// throw new RuntimeException(e);
//// }
// }
//
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) {
// log.info("消息已投递成功:{}",token);
// }
// });
// mqttClient.subscribe("test",0);
//
// Thread.sleep(1000*60*10);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
//
// /**
// * 将16进制字符串转换为ASCII字符串
// * @param s 16进制字符串
// * @return ASCII字符串
// */
//// public static String hexToString(String s) {
//// if (s == null || s.equals("")) {
//// return null;
//// }
//// s = s.replace(" ", "");
//// byte[] baKeyword = new byte[s.length() / 2];
//// for (int i = 0; i < baKeyword.length; i++) {
//// try {
//// baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
//// } catch (Exception e) {
//// e.printStackTrace();
//// }
//// }
//// try {
//// s = new String(baKeyword, StandardCharsets.UTF_8);
//// } catch (Exception e1) {
//// e1.printStackTrace();
//// return s;
//// }
//// return s;
//// }
////
//// public static List<CoupletMsgData> sendMsg(String str) {
//// List<CoupletMsgData> coupletMsgDataList = new ArrayList<>();
//// CoupletMsgData coupletMsgData = new CoupletMsgData();
////
//// coupletMsgData.setVin(str.substring(1,18));
////
//// log.info("vin=="+coupletMsgData.getVin());
////
//// //时间
//// String tim =str.substring(18,31);
//// long timestamp = Long.parseLong(tim);
////
//// Date date = new Date(timestamp);
//// coupletMsgData.setCreateTime(date);
////
//// //经度
//// String lt = str.substring(31,42);
//// // 如果末尾是零,则舍去
//// int endIndex = lt.length() - 1;
//// while (lt.charAt(endIndex) == '0'){
//// endIndex--;
//// }
////
//// String longitude = lt.substring(0, endIndex + 1);
//// coupletMsgData.setLongitude(longitude);
////
//// //维度
//// String latitudeIndex =str.substring(42,52);
//// int endIndexT = latitudeIndex.length() - 1;
//// while (latitudeIndex.charAt(endIndexT) == '0'){
//// endIndexT--;
//// }
////
//// String latitude = latitudeIndex.substring(0, endIndexT + 1);
//// coupletMsgData.setLatitude(latitude);
////
//// //速度speed
//// String speed =str.substring(52,58);
//// coupletMsgData.setSpeed(speed);
////
//// //里程
// BigDecimal mileage= new BigDecimal(str.substring(58,69));
// mileage=mileage.stripTrailingZeros();
// coupletMsgData.setMileage(mileage);
//
// //总电压
// String voltage =str.substring(69,75);
// while (voltage.endsWith("0")) {
// voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零
// }
// coupletMsgData.setVoltage(voltage);
//
// //总电流
// String current =str.substring(75,80);
// while (current.endsWith("0")){
// current=current.substring(0,current.length()-1);
// }
// coupletMsgData.setCurrent(current);
//
// //绝缘电阻 resistance
// String res =str.substring(80,89);
// String resistance = res.substring(0, 5);
// coupletMsgData.setResistance(resistance);
//
// //档位
// String gear =str.substring(89,90);
// coupletMsgData.setGear(gear);
//
// //accelerationPedal 加速踏板行程值
// String accelerationPedal =str.substring(90,91);
// coupletMsgData.setAccelerationPedal(accelerationPedal);
//
// //brakePedal 制动踏板行程值
// String brakePedal =str.substring(92,93);
// coupletMsgData.setBrakePedal(brakePedal);
//
// //fuelConsumptionRate 燃料消耗率
// String fuelConsumptionRate =str.substring(94,99);
// coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate);
//
// //motorControllerTemperature 电机控制器温度
// String motorControllerTemperature =str.substring(99,105);
// while (motorControllerTemperature.endsWith("0")){
// motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1);
// }
// coupletMsgData.setMotorControllerTemperature(motorControllerTemperature);
//
// //motorSpeed 电机转速
// String motorSpeed =str.substring(105,110);
// coupletMsgData.setMotorSpeed(motorSpeed);
//
// //motorTorque 电机转矩
// String motorTorque =str.substring(110,114);
// while (motorTorque.endsWith("0")){
// motorTorque=motorTorque.substring(0,motorTorque.length()-1);
// }
// coupletMsgData.setMotorTorque(motorTorque);
//
// //motorTemperature 电机温度
// String motorTemperature =str.substring(114,120);
// while (motorTemperature.endsWith("0")){
// motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1);
// }
// coupletMsgData.setMotorTemperature(motorTemperature);
//
// //motorVoltage 电机电压
// String motorVoltage =str.substring(120,125);
// while (motorVoltage.endsWith("0")){
// motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1);
// }
// coupletMsgData.setMotorVoltage(motorVoltage);
//
// //motorCurrent 电机电流
// String motorCurrent =str.substring(125,133);
// while (motorCurrent.endsWith("0")){
// motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1);
// }
// coupletMsgData.setMotorCurrent(motorCurrent);
//
// //remainingBattery 动力电池剩余电量SOC
// BigDecimal remainingBattery = new BigDecimal(str.substring(133,138));
// coupletMsgData.setRemainingBattery(remainingBattery);
//
// //maximumFeedbackPower 当前状态允许的最大反馈功率
// String maximumFeedbackPower =str.substring(139,144);
// while (maximumFeedbackPower.endsWith("0")){
// maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1);
// }
// coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower);
//
// //maximumDischargePower 当前状态允许最大放电功率
// String maximumDischargePower =str.substring(145,151);
// while (maximumDischargePower.endsWith("0")){
// maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1);
// }
// coupletMsgData.setMaximumDischargePower(maximumDischargePower);
//
// //selfCheckCounter BMS自检计数器
// String selfCheckCounter =str.substring(151,153);
// String selfCheckCounterReplace = selfCheckCounter.replace("0", "");
// coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace);
//
// //totalBatteryCurrent 动力电池充放电电流
// String totalBatteryCurrent =str.substring(153,158);
// while (totalBatteryCurrent.endsWith("0")){
// totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1);
// }
// coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent);
//
// //totalBatteryVoltage 动力电池负载端总电压V3
// String totalBatteryVoltage =str.substring(158,164);
// while (totalBatteryVoltage.endsWith("0")){
// totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1);
// }
// coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage);
//
// //singleBatteryMaxVoltage 单次最大电压
// String singleBatteryMaxVoltage =str.substring(164,168);
// while (singleBatteryMaxVoltage.endsWith("0")){
// singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1);
// }
// coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
//
// //singleBatteryMinVoltage 单体电池最低电压
// String singleBatteryMinVoltage =str.substring(168,172);
// while (singleBatteryMinVoltage.endsWith("0")){
// singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1);
// }
//
// coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
//
// //singleBatteryMaxTemperature 单体电池最高温度
// String singleBatteryMaxTemperature =str.substring(172,178);
// while (singleBatteryMaxTemperature.endsWith("0")){
// singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1);
// }
// coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
//
// //singleBatteryMinTemperature 单体电池最低温度
// String singleBatteryMinTemperature =str.substring(178,184);
// while (singleBatteryMinTemperature.endsWith("0")){
// singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1);
// }
// coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
//
// //availableBatteryCapacity 可用电池容量
// String availableBatteryCapacity =str.substring(184,190);
// while (availableBatteryCapacity.endsWith("0")){
// availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1);
// }
// coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity);
//
// //vehicleStatus 车辆状态
// int vehicleStatus = Integer.parseInt(str.substring(190,191));
// coupletMsgData.setVehicleStatus(vehicleStatus);
//
// //chargingStatus 充电状态
// int chargingStatus = Integer.parseInt(str.substring(191,192));
// coupletMsgData.setChargingStatus(chargingStatus);
//
// //operatingStatus 运行状态
// int operatingStatus = Integer.parseInt(str.substring(192,193));
// coupletMsgData.setOperatingStatus(operatingStatus);
//
// //socStatus SOC
// int socStatus = Integer.parseInt(str.substring(193,194));
// coupletMsgData.setSocStatus(socStatus);
//
// //chargingEnergyStorageStatus 可充电储能装置工作状态
// int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195));
// coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus);
//
// //driveMotorStatus 驱动电机状态
// int driveMotorStatus = Integer.parseInt(str.substring(195,196));
// coupletMsgData.setDriveMotorStatus(driveMotorStatus);
//
// //positionStatus 定位是否有效
// int positionStatus = Integer.parseInt(str.substring(196,197));
// coupletMsgData.setPositionStatus(positionStatus);
//
// //easStatus EAS(汽车防盗系统)状态
// int easStatus = Integer.parseInt(str.substring(197,198));
// coupletMsgData.setEasStatus(easStatus);
//
// //ptcStatus PTC(电动加热器)状态
// int ptcStatus = Integer.parseInt(str.substring(198,199));
// coupletMsgData.setPtcStatus(ptcStatus);
//
// //epsStatus
// int epsStatus = Integer.parseInt(str.substring(199,200));
// coupletMsgData.setEpsStatus(epsStatus);
//
// //absStatus EPS(电动助力系统)状态
// int absStatus = Integer.parseInt(str.substring(200,201));
// coupletMsgData.setAbsStatus(absStatus);
//
// //mcuStatus MCU(电机/逆变器)状态
// int mcuStatus = Integer.parseInt(str.substring(201,202));
// coupletMsgData.setMcuStatus(mcuStatus);
//
// //heatingStatus 动力电池加热状态
// int heatingStatus = Integer.parseInt(str.substring(202,203));
// coupletMsgData.setHeatingStatus(heatingStatus);
//
// //batteryStatus 动力电池当前状态
// int batteryStatus = Integer.parseInt(str.substring(203,204));
// coupletMsgData.setBatteryStatus(batteryStatus);
//
// //batteryInsulationStatus 动力电池保温状态
// int batteryInsulationStatus = Integer.parseInt(str.substring(204,205));
// coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus);
//
// //dcdcStatus DCDC(电力交换系统)状态
// int dcdcStatus = Integer.parseInt(str.substring(205,206));
// coupletMsgData.setDcdcStatus(dcdcStatus);
//
// //chgStatus CHG(充电机)状态
// int chgStatus = Integer.parseInt(str.substring(206,207));
// coupletMsgData.setChgStatus(chgStatus);
//
// coupletMsgDataList.add(coupletMsgData);
//
// return coupletMsgDataList;
// }
}

View File

@ -0,0 +1,86 @@
package com.couplet.analyze.msg.model;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static com.couplet.analyze.msg.utils.MsgUtils.hexToString;
import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 14:04
* @description
*/
@Component
@Slf4j
public class ModelsKafkaMessage {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
static ArrayList<String> strings = new ArrayList<>() {
{
add("breakdown");
add("electronic-fence");
add("real-time-data");
add("stored-event");
}
};
/**
*
* @return
*/
@Scheduled(fixedDelay = 50)
private static void consumerMessages() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("接收到的数据:" + record.value());
String str = hexToString(record.value());
List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
for (CoupletMsgData msgData : coupletMsgDataList) {
log.info("解析到车辆数据:{}", msgData);
for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData);
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,317 @@
package com.couplet.analyze.msg.utils;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 14:12
* @description
*/
@Slf4j
public class MsgUtils {
/**
* 16ASCII
* @param s 16
* @return ASCII
*/
public static String hexToString(String s) {
if (s == null || s.equals("")) {
return null;
}
s = s.replace(" ", "");
byte[] baKeyword = new byte[s.length() / 2];
for (int i = 0; i < baKeyword.length; i++) {
try {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
s = new String(baKeyword, StandardCharsets.UTF_8);
} catch (Exception e1) {
e1.printStackTrace();
return s;
}
return s;
}
/**
*
* @param str
* @return
*/
public static List<CoupletMsgData> sendMsg(String str) {
List<CoupletMsgData> coupletMsgDataList = new ArrayList<>();
CoupletMsgData coupletMsgData = new CoupletMsgData();
coupletMsgData.setVin(str.substring(1,18));
log.info("vin=="+coupletMsgData.getVin());
//时间
String tim =str.substring(18,31);
long timestamp = Long.parseLong(tim);
Date date = new Date(timestamp);
coupletMsgData.setCreateTime(date);
//经度
String lt = str.substring(31,42);
// 如果末尾是零,则舍去
int endIndex = lt.length() - 1;
while (lt.charAt(endIndex) == '0'){
endIndex--;
}
String longitude = lt.substring(0, endIndex + 1);
coupletMsgData.setLongitude(longitude);
//维度
String latitudeIndex =str.substring(42,52);
int endIndexT = latitudeIndex.length() - 1;
while (latitudeIndex.charAt(endIndexT) == '0'){
endIndexT--;
}
String latitude = latitudeIndex.substring(0, endIndexT + 1);
coupletMsgData.setLatitude(latitude);
//速度speed
String speed =str.substring(52,58);
coupletMsgData.setSpeed(speed);
//里程
BigDecimal mileage= new BigDecimal(str.substring(58,69));
mileage=mileage.stripTrailingZeros();
coupletMsgData.setMileage(mileage);
//总电压
String voltage =str.substring(69,75);
while (voltage.endsWith("0")) {
voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零
}
coupletMsgData.setVoltage(voltage);
//总电流
String current =str.substring(75,80);
while (current.endsWith("0")){
current=current.substring(0,current.length()-1);
}
coupletMsgData.setCurrent(current);
//绝缘电阻 resistance
String res =str.substring(80,89);
String resistance = res.substring(0, 5);
coupletMsgData.setResistance(resistance);
//档位
String gear =str.substring(89,90);
coupletMsgData.setGear(gear);
//accelerationPedal 加速踏板行程值
String accelerationPedal =str.substring(90,91);
coupletMsgData.setAccelerationPedal(accelerationPedal);
//brakePedal 制动踏板行程值
String brakePedal =str.substring(92,93);
coupletMsgData.setBrakePedal(brakePedal);
//fuelConsumptionRate 燃料消耗率
String fuelConsumptionRate =str.substring(94,99);
coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate);
//motorControllerTemperature 电机控制器温度
String motorControllerTemperature =str.substring(99,105);
while (motorControllerTemperature.endsWith("0")){
motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1);
}
coupletMsgData.setMotorControllerTemperature(motorControllerTemperature);
//motorSpeed 电机转速
String motorSpeed =str.substring(105,110);
coupletMsgData.setMotorSpeed(motorSpeed);
//motorTorque 电机转矩
String motorTorque =str.substring(110,114);
while (motorTorque.endsWith("0")){
motorTorque=motorTorque.substring(0,motorTorque.length()-1);
}
coupletMsgData.setMotorTorque(motorTorque);
//motorTemperature 电机温度
String motorTemperature =str.substring(114,120);
while (motorTemperature.endsWith("0")){
motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1);
}
coupletMsgData.setMotorTemperature(motorTemperature);
//motorVoltage 电机电压
String motorVoltage =str.substring(120,125);
while (motorVoltage.endsWith("0")){
motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1);
}
coupletMsgData.setMotorVoltage(motorVoltage);
//motorCurrent 电机电流
String motorCurrent =str.substring(125,133);
while (motorCurrent.endsWith("0")){
motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1);
}
coupletMsgData.setMotorCurrent(motorCurrent);
//remainingBattery 动力电池剩余电量SOC
BigDecimal remainingBattery = new BigDecimal(str.substring(133,138));
coupletMsgData.setRemainingBattery(remainingBattery);
//maximumFeedbackPower 当前状态允许的最大反馈功率
String maximumFeedbackPower =str.substring(139,144);
while (maximumFeedbackPower.endsWith("0")){
maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1);
}
coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower);
//maximumDischargePower 当前状态允许最大放电功率
String maximumDischargePower =str.substring(145,151);
while (maximumDischargePower.endsWith("0")){
maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1);
}
coupletMsgData.setMaximumDischargePower(maximumDischargePower);
//selfCheckCounter BMS自检计数器
String selfCheckCounter =str.substring(151,153);
String selfCheckCounterReplace = selfCheckCounter.replace("0", "");
coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace);
//totalBatteryCurrent 动力电池充放电电流
String totalBatteryCurrent =str.substring(153,158);
while (totalBatteryCurrent.endsWith("0")){
totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1);
}
coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent);
//totalBatteryVoltage 动力电池负载端总电压V3
String totalBatteryVoltage =str.substring(158,164);
while (totalBatteryVoltage.endsWith("0")){
totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1);
}
coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage);
//singleBatteryMaxVoltage 单次最大电压
String singleBatteryMaxVoltage =str.substring(164,168);
while (singleBatteryMaxVoltage.endsWith("0")){
singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
//singleBatteryMinVoltage 单体电池最低电压
String singleBatteryMinVoltage =str.substring(168,172);
while (singleBatteryMinVoltage.endsWith("0")){
singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
//singleBatteryMaxTemperature 单体电池最高温度
String singleBatteryMaxTemperature =str.substring(172,178);
while (singleBatteryMaxTemperature.endsWith("0")){
singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
//singleBatteryMinTemperature 单体电池最低温度
String singleBatteryMinTemperature =str.substring(178,184);
while (singleBatteryMinTemperature.endsWith("0")){
singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
//availableBatteryCapacity 可用电池容量
String availableBatteryCapacity =str.substring(184,190);
while (availableBatteryCapacity.endsWith("0")){
availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1);
}
coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity);
//vehicleStatus 车辆状态
int vehicleStatus = Integer.parseInt(str.substring(190,191));
coupletMsgData.setVehicleStatus(vehicleStatus);
//chargingStatus 充电状态
int chargingStatus = Integer.parseInt(str.substring(191,192));
coupletMsgData.setChargingStatus(chargingStatus);
//operatingStatus 运行状态
int operatingStatus = Integer.parseInt(str.substring(192,193));
coupletMsgData.setOperatingStatus(operatingStatus);
//socStatus SOC
int socStatus = Integer.parseInt(str.substring(193,194));
coupletMsgData.setSocStatus(socStatus);
//chargingEnergyStorageStatus 可充电储能装置工作状态
int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195));
coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus);
//driveMotorStatus 驱动电机状态
int driveMotorStatus = Integer.parseInt(str.substring(195,196));
coupletMsgData.setDriveMotorStatus(driveMotorStatus);
//positionStatus 定位是否有效
int positionStatus = Integer.parseInt(str.substring(196,197));
coupletMsgData.setPositionStatus(positionStatus);
//easStatus EAS(汽车防盗系统)状态
int easStatus = Integer.parseInt(str.substring(197,198));
coupletMsgData.setEasStatus(easStatus);
//ptcStatus PTC(电动加热器)状态
int ptcStatus = Integer.parseInt(str.substring(198,199));
coupletMsgData.setPtcStatus(ptcStatus);
//epsStatus
int epsStatus = Integer.parseInt(str.substring(199,200));
coupletMsgData.setEpsStatus(epsStatus);
//absStatus EPS(电动助力系统)状态
int absStatus = Integer.parseInt(str.substring(200,201));
coupletMsgData.setAbsStatus(absStatus);
//mcuStatus MCU(电机/逆变器)状态
int mcuStatus = Integer.parseInt(str.substring(201,202));
coupletMsgData.setMcuStatus(mcuStatus);
//heatingStatus 动力电池加热状态
int heatingStatus = Integer.parseInt(str.substring(202,203));
coupletMsgData.setHeatingStatus(heatingStatus);
//batteryStatus 动力电池当前状态
int batteryStatus = Integer.parseInt(str.substring(203,204));
coupletMsgData.setBatteryStatus(batteryStatus);
//batteryInsulationStatus 动力电池保温状态
int batteryInsulationStatus = Integer.parseInt(str.substring(204,205));
coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus);
//dcdcStatus DCDC(电力交换系统)状态
int dcdcStatus = Integer.parseInt(str.substring(205,206));
coupletMsgData.setDcdcStatus(dcdcStatus);
//chgStatus CHG(充电机)状态
int chgStatus = Integer.parseInt(str.substring(206,207));
coupletMsgData.setChgStatus(chgStatus);
coupletMsgDataList.add(coupletMsgData);
return coupletMsgDataList;
}
}

View File

@ -75,22 +75,6 @@ public class SysTroubleController extends BaseController {
return success();
}
/**
*
* @param coupletTroubleCode
* @return
*/
@PostMapping("insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode){
long start = System.currentTimeMillis();
int i = troubleService.insertMsgResq(coupletTroubleCode);
long end = System.currentTimeMillis();
log.info("记录异常信息成功,耗时:{}",(end-start));
return Result.success(i);
}
/**
*
*/

View File

@ -3,6 +3,7 @@ package com.couplet.business.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.request.TroubleResp;
import org.apache.ibatis.annotations.Mapper;

View File

@ -25,7 +25,5 @@ public interface SysTroubleService extends IService<CoupletTroubleCode> {
*/
void newFaultData(CoupletTroubleCode code);
int insertMsgResq(CoupletTroubleCode coupletTroubleCode);
void cleanTroubleCode();
}

View File

@ -50,16 +50,6 @@ public class SysTroubleServiceImpl extends ServiceImpl<SysTroubleMapper, Couplet
sysTroubleMapper.newFaultData(code);
}
/**
*
* @param coupletTroubleCode
* @return
*/
@Override
public int insertMsgResq(CoupletTroubleCode coupletTroubleCode) {
return sysTroubleMapper.insertMsgResq(coupletTroubleCode);
}
/**
*
*/

View File

@ -6,6 +6,7 @@ import com.couplet.common.domain.request.VehicleListParams;
import com.couplet.common.redis.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -22,7 +23,7 @@ import java.util.List;
public class Timer {
//redis
@Autowired
private RedisService redis;
private StringRedisTemplate redis;
//查询车辆列表
@Autowired
private VehicleService vehicleService;
@ -35,9 +36,7 @@ public class Timer {
//先查询车辆列表
List<Vehicle> list = vehicleService.list(new VehicleListParams(null, null, null, null));
list.forEach(vehicle -> {
for (Vehicle vehicle : list) {
//只针对已经上线的车辆
if (redis.hasKey(vehicle.getVin())) {
@ -48,15 +47,15 @@ public class Timer {
//执行修改下线状态的方法
Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0);
if (0 == 1) {
log.error("下线状态修改失败");
}
// if (0 == 1) {
// log.error("下线状态修改失败");
// }
log.info("下线状态修改成功");
}
}
}
});
}
}

View File

@ -16,9 +16,11 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 172469
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式
file-extension: yml
# 共享配置
@ -28,4 +30,4 @@ spring:
allow-bean-definition-overriding: true
logging:
level:
com.couplet.trouble.mapper: DEBUG
com.couplet.business.server.mapper: DEBUG

View File

@ -7,18 +7,14 @@
<resultMap type="com.couplet.common.domain.CoupletTroubleCode" id="SysTroubleResult">
<id property="troubleId" column="trouble_id"/>
<result property="troubleCode" column="trouble_code"/>
<result property="troubleValue" column="trouble_value"/>
<result property="troublePosition" column="trouble_position"/>
<result property="troubleVin" column="trouble_vin"/>
<result property="troubleTag" column="trouble_tag"/>
<result property="typeId" column="type_id"/>
<result property="gradeId" column="grade_id"/>
<result property="troubleType" column="trouble_type"/>
<result property="troubleStartTime" column="trouble_start_time"/>
<result property="troubleEndTime" column="trouble_end_time"/>
</resultMap>
<sql id="selectTroubleVo">
select t.*,g.grade_name,y.type_name from couplet_trouble_code t
LEFT JOIN couplet_trouble_grade g on t.grade_id = g.grade_id
LEFT JOIN couplet_trouble_type y on t.type_id= y.type_id
select * from couplet_trouble_code
</sql>
<insert id="newFaultData">
INSERT INTO `couplet-cloud`.`couplet_trouble_code`
@ -26,9 +22,6 @@
`trouble_start_time`)
VALUES
(#{troubleVin}, #{troubleTag}, #{troubleStartTime})
</insert>
<insert id="insertMsgResq">
</insert>
<update id="cleanTroubleCode">

View File

@ -1,54 +1,93 @@
package com.couplet.mq.controller;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/5 21:38
* @description
* @ProjectName: five-groups-couplet
* @Author: LiuYunHu
* @CreateTime: 2024/4/4
* @Description: kafka
*/
@Slf4j
public class KafkaTest {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
public static void main(String[] args) {
aaa();
//生产者示例
// produceMessage();
//消费者示例
// consumerMessages();
}
private static void aaa() {
//生产者
@PostConstruct
private static void produceMessage() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//创建生产者
try {
//订阅主题
consumer.subscribe(Collections.singletonList("online"));
//发送消息
for (int i = 0; i < 10000; i++) {
String message = "佳佳来喽" + (i + 1);
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("消费者接受到的消息值:" + record.value());
});
System.out.println("发送消息:" + message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
producer.close();
}
}
//消费者
// private static void consumerMessages() {
// Properties props = new Properties();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
// //创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
// try {
//
// //订阅主题
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
// //持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// records.forEach(record -> {
// System.out.println("消费者接受到的消息值:" + record.value());
// });
// }
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// consumer.close();
// }
// }
}

View File

@ -1 +1,2 @@
com.couplet.mq.config.RabbitMQConfig
com.couplet.mq.remote.factory.RemoteFenceFallbackFactory

View File

@ -2,7 +2,6 @@ package com.couplet.online.utils;
import com.couplet.common.domain.Vehicle;
import com.couplet.remote.RemoteVehicleService;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -17,6 +16,8 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -78,6 +79,11 @@ public class MqttMonitor {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
//线程池,用于异步处理消息到来时的业务逻辑
private ExecutorService executorService = Executors.newSingleThreadExecutor();
//Kafka生产者实例化为类成员变量
private KafkaProducer<String, String> kafkaProducer;
//随项目启动而执行这个方法
@PostConstruct
@ -116,7 +122,7 @@ public class MqttMonitor {
client.reconnect();
log.info("重连中...");
} catch (InterruptedException | MqttException e) {
throw new RuntimeException(e);
log.error("重连失败:" + e.getMessage());
}
}
@ -124,72 +130,7 @@ public class MqttMonitor {
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
// log.info("消息已送达");
// log.info("接收消息主题:{}",topic);
// log.info("接收消息qos{}", mqttMessage.getQos());
//接收到的原始报文
String message = new String(mqttMessage.getPayload());
// log.info("接收消息原始内容:{}", message);
//解析后的字符串
String parseMsg = ParseMessageUtil.parseMsg(message);
//拿到前17位车辆vin码
String start17 = parseMsg.substring(0, 17);
log.info("当前车辆的vin码为" + start17);
// //判断缓存中是否有这个vin
// if (redis.hasKey("不存在的车辆VIN" + start17)) {
//
// //可使用RabbitMQ发送消息
// log.error("vin码为" + start17 + "的车辆不属于本系统!");
//
// } else {//如果缓存中没有存这个vin
//
//
// }
//调取接口通过vin查询车辆
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
System.out.println("**************" + vehicles);
//如果不存在这个车
if (0 == vehicles.size()) {
//将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示
// redis.setCacheObject("不存在的车辆VIN" + start17, start17);
log.error("未找到vin码为" + start17 + "的车辆信息");
} else {
//如果存在这个车
Vehicle vehicle = vehicles.get(0);
log.info("远程调用查询到的车辆数据:" + vehicle);
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS);
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
//调用上线接口,修改上线状态
Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1);
//上线成功
if (0 != i) {
log.info("上线成功!");
try {
produceMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
executorService.execute(() -> processMessageArrival(topic, mqttMessage));
}
@Override
@ -203,31 +144,104 @@ public class MqttMonitor {
} catch (MqttException e) {
log.error("mqtt监听者启动失败{}", e.getMessage());
throw new RuntimeException(e);
// throw new RuntimeException(e);
}
}
//Kafka生产者
private static void produceMessage(String message) {
//异步处理mqtt消息到达后的逻辑
private void processMessageArrival(String topic, MqttMessage mqttMessage) {
// log.info("消息已送达");
// log.info("接收消息主题:{}",topic);
// log.info("接收消息qos{}", mqttMessage.getQos());
//接收到的原始报文
String message = new String(mqttMessage.getPayload());
//log.info("接收消息原始内容:{}", message);
//解析后的字符串
String parseMsg = ParseMessageUtil.parseMsg(message);
//拿到前17位车辆vin码
String start17 = parseMsg.substring(0, 17);
log.info("当前车辆的vin码为" + start17);
// //判断缓存中是否有这个vin
// if (redis.hasKey("不存在的车辆VIN" + start17)) {
//
// //可使用RabbitMQ发送消息
// log.error("vin码为" + start17 + "的车辆不属于本系统!");
//
// } else {//如果缓存中没有存这个vin
//
//
// }
//调取接口通过vin查询车辆
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
System.out.println("**************" + vehicles);
//如果不存在这个车
if (0 == vehicles.size()) {
//将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示
// redis.setCacheObject("不存在的车辆VIN" + start17, start17);
log.error("未找到vin码为" + start17 + "的车辆信息");
} else {
//如果存在这个车
Vehicle vehicle = vehicles.get(0);
log.info("远程调用查询到的车辆数据:" + vehicle);
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS);
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
//调用上线接口,修改上线状态
Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1);
//上线成功
if (0 != i) {
log.info("上线成功!");
try {
produceMessage(message);
} catch (Exception e) {
log.error("发送信息异常:" + e.getMessage());
}
}
}
}
//Kafka生产者实例化为类成员变量
@PostConstruct
public void initKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//创建生产者
try {
kafkaProducer = new KafkaProducer<>(props);
}
//发送消息至Kafka
private void produceMessage(String message) {
try {
//发送消息
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, message));
System.out.println("发送消息:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
log.error("消息发送失败:" + e.getMessage());
}
}
}

View File

@ -15,9 +15,11 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 172469
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式
file-extension: yml
# 共享配置