kafka代码

server_five_liuyunhu
dongxiaodong 2024-04-06 16:29:50 +08:00
parent 812b8d47f8
commit 6798b04fc4
12 changed files with 816 additions and 487 deletions

View File

@ -1,22 +0,0 @@
package com.couplet.common.system.remote;
import com.couplet.common.core.constant.ServiceNameConstants;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:00
* @description
*/
@FeignClient(contextId = "remoteCodeService",value = ServiceNameConstants.BUSINESS_SERVICE, fallbackFactory = RemoteCodeFallbackFactory.class)
public interface RemoteCodeService {
@PostMapping("trouble/insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode);
}

View File

@ -1,30 +0,0 @@
package com.couplet.common.system.remote.factory;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:03
* @description
*/
@Slf4j
@Component
public class RemoteCodeFallbackFactory implements FallbackFactory<RemoteCodeService> {
@Override
public RemoteCodeService create(Throwable cause) {
log.error("调用日志服务异常:{}", cause.getMessage());
return new RemoteCodeService()
{
@Override
public Result<Integer> insertCode(CoupletTroubleCode coupletTroubleCode) {
return null;
}
};
}
}

View File

@ -3,4 +3,3 @@ com.couplet.common.system.remote.factory.RemoteLogFallbackFactory
com.couplet.common.system.remote.factory.RemoteFileFallbackFactory com.couplet.common.system.remote.factory.RemoteFileFallbackFactory
com.couplet.common.system.remote.factory.RemoteDeptFallbackFactory com.couplet.common.system.remote.factory.RemoteDeptFallbackFactory
com.couplet.common.system.remote.factory.RemoteEmployeeFallbackFactory com.couplet.common.system.remote.factory.RemoteEmployeeFallbackFactory
com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory

View File

@ -0,0 +1,16 @@
package com.couplet.analyze.msg.consumer;
import org.springframework.kafka.annotation.KafkaListener;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 15:37
* @description
*/
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group", properties = {"bootstrap.servers = 39.103.133.136:9092"})
public void getMessage(String msg) {
System.out.println("接收到消息:" + msg);
}
}

View File

@ -3,7 +3,6 @@ package com.couplet.analyze.msg.model;
import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService; import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.utils.SpringUtils; import com.couplet.common.core.utils.SpringUtils;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -29,372 +28,369 @@ import static com.couplet.analyze.msg.contents.MsgContent.CLIENT_ID;
@Component @Component
public class ModelMessage { public class ModelMessage {
@Autowired // @Autowired
private RabbitTemplate rabbitTemplate; // private RabbitTemplate rabbitTemplate;
static ArrayList<String> strings = new ArrayList<>() { // static ArrayList<String> strings = new ArrayList<>() {
{ // {
add("breakdown"); // add("breakdown");
add("electronic-fence"); // add("electronic-fence");
add("real-time-data"); // add("real-time-data");
add("stored-event"); // add("stored-event");
} // }
}; // };
@Autowired
private RemoteCodeService remoteCodeService;
// @Value("${mq.queueName}")
// public String queueName;
// //
// //交换机 //// @Value("${mq.queueName}")
// @Value("${mq.exchangeName}") //// public String queueName;
// public String exchangeName; ////
//// //交换机
//// @Value("${mq.exchangeName}")
//// public String exchangeName;
////
//// //路由键
//// @Value("${mq.routingKey}")
//// public String routingKey;
// //
// //路由键 // @Scheduled(cron = "0/5 * * * * ?")
// @Value("${mq.routingKey}") // public void startMsg() {
// public String routingKey; // try {
// MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID);
@Scheduled(cron = "0/5 * * * * ?") //
public void startMsg() { // MqttConnectOptions options = new MqttConnectOptions();
try { //
MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID); // options.setCleanSession(true);
// mqttClient.connect(options);
MqttConnectOptions options = new MqttConnectOptions(); //
// mqttClient.setCallback(new MqttCallback() {
options.setCleanSession(true); // @Override
mqttClient.connect(options); // public void connectionLost(Throwable throwable) {
// log.error("Mqtt[{}-{}]连接断开:[{}]", CLIENT_ID, BROKER_URL, throwable.getMessage(), throwable);
mqttClient.setCallback(new MqttCallback() { // }
@Override //
public void connectionLost(Throwable throwable) { // @Override
log.error("Mqtt[{}-{}]连接断开:[{}]", CLIENT_ID, BROKER_URL, throwable.getMessage(), throwable); // public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
} // //打印接收到的消息和主题
// log.info("主题='{}':消息内容={}", topic, new String(mqttMessage.getPayload()));
@Override //// String str = hexToString(new String(mqttMessage.getPayload()));
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { //// List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
//打印接收到的消息和主题 //
log.info("主题='{}':消息内容={}", topic, new String(mqttMessage.getPayload())); // for (CoupletMsgData msgData : coupletMsgDataList) {
String str = hexToString(new String(mqttMessage.getPayload())); // log.info("解析到车辆数据:{}", msgData);
List<CoupletMsgData> coupletMsgDataList = sendMsg(str); // for (String string : strings) {
// IncidentService incidentService = SpringUtils.getBean(string);
for (CoupletMsgData msgData : coupletMsgDataList) { // incidentService.incident(msgData);
log.info("解析到车辆数据:{}", msgData);
for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData);
}
// 不睡眠,他们也会主动拉取数据,不会影响其他服务
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } // }
} //// 不睡眠,他们也会主动拉取数据,不会影响其他服务
//// try {
} //// Thread.sleep(1000);
//// } catch (InterruptedException e) {
@Override //// throw new RuntimeException(e);
public void deliveryComplete(IMqttDeliveryToken token) { //// }
log.info("消息已投递成功:{}",token); // }
} //
}); // }
mqttClient.subscribe("test",0); //
// @Override
Thread.sleep(1000*60*10); // public void deliveryComplete(IMqttDeliveryToken token) {
} catch (Exception e) { // log.info("消息已投递成功:{}",token);
throw new RuntimeException(e); // }
} // });
} // mqttClient.subscribe("test",0);
//
/** // Thread.sleep(1000*60*10);
* 16ASCII // } catch (Exception e) {
* @param s 16 // throw new RuntimeException(e);
* @return ASCII // }
*/ // }
public static String hexToString(String s) { //
if (s == null || s.equals("")) { // /**
return null; // * 将16进制字符串转换为ASCII字符串
} // * @param s 16进制字符串
s = s.replace(" ", ""); // * @return ASCII字符串
byte[] baKeyword = new byte[s.length() / 2]; // */
for (int i = 0; i < baKeyword.length; i++) { //// public static String hexToString(String s) {
try { //// if (s == null || s.equals("")) {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16)); //// return null;
} catch (Exception e) { //// }
e.printStackTrace(); //// s = s.replace(" ", "");
} //// byte[] baKeyword = new byte[s.length() / 2];
} //// for (int i = 0; i < baKeyword.length; i++) {
try { //// try {
s = new String(baKeyword, StandardCharsets.UTF_8); //// baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e1) { //// } catch (Exception e) {
e1.printStackTrace(); //// e.printStackTrace();
return s; //// }
} //// }
return s; //// try {
} //// s = new String(baKeyword, StandardCharsets.UTF_8);
//// } catch (Exception e1) {
public static List<CoupletMsgData> sendMsg(String str) { //// e1.printStackTrace();
List<CoupletMsgData> coupletMsgDataList = new ArrayList<>(); //// return s;
CoupletMsgData coupletMsgData = new CoupletMsgData(); //// }
//// return s;
coupletMsgData.setVin(str.substring(1,18)); //// }
////
log.info("vin=="+coupletMsgData.getVin()); //// public static List<CoupletMsgData> sendMsg(String str) {
//// List<CoupletMsgData> coupletMsgDataList = new ArrayList<>();
//时间 //// CoupletMsgData coupletMsgData = new CoupletMsgData();
String tim =str.substring(18,31); ////
long timestamp = Long.parseLong(tim); //// coupletMsgData.setVin(str.substring(1,18));
////
Date date = new Date(timestamp); //// log.info("vin=="+coupletMsgData.getVin());
coupletMsgData.setCreateTime(date); ////
//// //时间
//经度 //// String tim =str.substring(18,31);
String lt = str.substring(31,42); //// long timestamp = Long.parseLong(tim);
// 如果末尾是零,则舍去 ////
int endIndex = lt.length() - 1; //// Date date = new Date(timestamp);
while (lt.charAt(endIndex) == '0'){ //// coupletMsgData.setCreateTime(date);
endIndex--; ////
} //// //经度
//// String lt = str.substring(31,42);
String longitude = lt.substring(0, endIndex + 1); //// // 如果末尾是零,则舍去
coupletMsgData.setLongitude(longitude); //// int endIndex = lt.length() - 1;
//// while (lt.charAt(endIndex) == '0'){
//维度 //// endIndex--;
String latitudeIndex =str.substring(42,52); //// }
int endIndexT = latitudeIndex.length() - 1; ////
while (latitudeIndex.charAt(endIndexT) == '0'){ //// String longitude = lt.substring(0, endIndex + 1);
endIndexT--; //// coupletMsgData.setLongitude(longitude);
} ////
//// //维度
String latitude = latitudeIndex.substring(0, endIndexT + 1); //// String latitudeIndex =str.substring(42,52);
coupletMsgData.setLatitude(latitude); //// int endIndexT = latitudeIndex.length() - 1;
//// while (latitudeIndex.charAt(endIndexT) == '0'){
//速度speed //// endIndexT--;
String speed =str.substring(52,58); //// }
coupletMsgData.setSpeed(speed); ////
//// String latitude = latitudeIndex.substring(0, endIndexT + 1);
//里程 //// coupletMsgData.setLatitude(latitude);
BigDecimal mileage= new BigDecimal(str.substring(58,69)); ////
mileage=mileage.stripTrailingZeros(); //// //速度speed
coupletMsgData.setMileage(mileage); //// String speed =str.substring(52,58);
//// coupletMsgData.setSpeed(speed);
//总电压 ////
String voltage =str.substring(69,75); //// //里程
while (voltage.endsWith("0")) { // BigDecimal mileage= new BigDecimal(str.substring(58,69));
voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零 // mileage=mileage.stripTrailingZeros();
} // coupletMsgData.setMileage(mileage);
coupletMsgData.setVoltage(voltage); //
// //总电压
//总电流 // String voltage =str.substring(69,75);
String current =str.substring(75,80); // while (voltage.endsWith("0")) {
while (current.endsWith("0")){ // voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零
current=current.substring(0,current.length()-1); // }
} // coupletMsgData.setVoltage(voltage);
coupletMsgData.setCurrent(current); //
// //总电流
//绝缘电阻 resistance // String current =str.substring(75,80);
String res =str.substring(80,89); // while (current.endsWith("0")){
String resistance = res.substring(0, 5); // current=current.substring(0,current.length()-1);
coupletMsgData.setResistance(resistance); // }
// coupletMsgData.setCurrent(current);
//档位 //
String gear =str.substring(89,90); // //绝缘电阻 resistance
coupletMsgData.setGear(gear); // String res =str.substring(80,89);
// String resistance = res.substring(0, 5);
//accelerationPedal 加速踏板行程值 // coupletMsgData.setResistance(resistance);
String accelerationPedal =str.substring(90,91); //
coupletMsgData.setAccelerationPedal(accelerationPedal); // //档位
// String gear =str.substring(89,90);
//brakePedal 制动踏板行程值 // coupletMsgData.setGear(gear);
String brakePedal =str.substring(92,93); //
coupletMsgData.setBrakePedal(brakePedal); // //accelerationPedal 加速踏板行程值
// String accelerationPedal =str.substring(90,91);
//fuelConsumptionRate 燃料消耗率 // coupletMsgData.setAccelerationPedal(accelerationPedal);
String fuelConsumptionRate =str.substring(94,99); //
coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate); // //brakePedal 制动踏板行程值
// String brakePedal =str.substring(92,93);
//motorControllerTemperature 电机控制器温度 // coupletMsgData.setBrakePedal(brakePedal);
String motorControllerTemperature =str.substring(99,105); //
while (motorControllerTemperature.endsWith("0")){ // //fuelConsumptionRate 燃料消耗率
motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1); // String fuelConsumptionRate =str.substring(94,99);
} // coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate);
coupletMsgData.setMotorControllerTemperature(motorControllerTemperature); //
// //motorControllerTemperature 电机控制器温度
//motorSpeed 电机转速 // String motorControllerTemperature =str.substring(99,105);
String motorSpeed =str.substring(105,110); // while (motorControllerTemperature.endsWith("0")){
coupletMsgData.setMotorSpeed(motorSpeed); // motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1);
// }
//motorTorque 电机转矩 // coupletMsgData.setMotorControllerTemperature(motorControllerTemperature);
String motorTorque =str.substring(110,114); //
while (motorTorque.endsWith("0")){ // //motorSpeed 电机转速
motorTorque=motorTorque.substring(0,motorTorque.length()-1); // String motorSpeed =str.substring(105,110);
} // coupletMsgData.setMotorSpeed(motorSpeed);
coupletMsgData.setMotorTorque(motorTorque); //
// //motorTorque 电机转矩
//motorTemperature 电机温度 // String motorTorque =str.substring(110,114);
String motorTemperature =str.substring(114,120); // while (motorTorque.endsWith("0")){
while (motorTemperature.endsWith("0")){ // motorTorque=motorTorque.substring(0,motorTorque.length()-1);
motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1); // }
} // coupletMsgData.setMotorTorque(motorTorque);
coupletMsgData.setMotorTemperature(motorTemperature); //
// //motorTemperature 电机温度
//motorVoltage 电机电压 // String motorTemperature =str.substring(114,120);
String motorVoltage =str.substring(120,125); // while (motorTemperature.endsWith("0")){
while (motorVoltage.endsWith("0")){ // motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1);
motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1); // }
} // coupletMsgData.setMotorTemperature(motorTemperature);
coupletMsgData.setMotorVoltage(motorVoltage); //
// //motorVoltage 电机电压
//motorCurrent 电机电流 // String motorVoltage =str.substring(120,125);
String motorCurrent =str.substring(125,133); // while (motorVoltage.endsWith("0")){
while (motorCurrent.endsWith("0")){ // motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1);
motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1); // }
} // coupletMsgData.setMotorVoltage(motorVoltage);
coupletMsgData.setMotorCurrent(motorCurrent); //
// //motorCurrent 电机电流
//remainingBattery 动力电池剩余电量SOC // String motorCurrent =str.substring(125,133);
BigDecimal remainingBattery = new BigDecimal(str.substring(133,138)); // while (motorCurrent.endsWith("0")){
coupletMsgData.setRemainingBattery(remainingBattery); // motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1);
// }
//maximumFeedbackPower 当前状态允许的最大反馈功率 // coupletMsgData.setMotorCurrent(motorCurrent);
String maximumFeedbackPower =str.substring(139,144); //
while (maximumFeedbackPower.endsWith("0")){ // //remainingBattery 动力电池剩余电量SOC
maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1); // BigDecimal remainingBattery = new BigDecimal(str.substring(133,138));
} // coupletMsgData.setRemainingBattery(remainingBattery);
coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower); //
// //maximumFeedbackPower 当前状态允许的最大反馈功率
//maximumDischargePower 当前状态允许最大放电功率 // String maximumFeedbackPower =str.substring(139,144);
String maximumDischargePower =str.substring(145,151); // while (maximumFeedbackPower.endsWith("0")){
while (maximumDischargePower.endsWith("0")){ // maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1);
maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1); // }
} // coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower);
coupletMsgData.setMaximumDischargePower(maximumDischargePower); //
// //maximumDischargePower 当前状态允许最大放电功率
//selfCheckCounter BMS自检计数器 // String maximumDischargePower =str.substring(145,151);
String selfCheckCounter =str.substring(151,153); // while (maximumDischargePower.endsWith("0")){
String selfCheckCounterReplace = selfCheckCounter.replace("0", ""); // maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1);
coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace); // }
// coupletMsgData.setMaximumDischargePower(maximumDischargePower);
//totalBatteryCurrent 动力电池充放电电流 //
String totalBatteryCurrent =str.substring(153,158); // //selfCheckCounter BMS自检计数器
while (totalBatteryCurrent.endsWith("0")){ // String selfCheckCounter =str.substring(151,153);
totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1); // String selfCheckCounterReplace = selfCheckCounter.replace("0", "");
} // coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace);
coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent); //
// //totalBatteryCurrent 动力电池充放电电流
//totalBatteryVoltage 动力电池负载端总电压V3 // String totalBatteryCurrent =str.substring(153,158);
String totalBatteryVoltage =str.substring(158,164); // while (totalBatteryCurrent.endsWith("0")){
while (totalBatteryVoltage.endsWith("0")){ // totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1);
totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1); // }
} // coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent);
coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage); //
// //totalBatteryVoltage 动力电池负载端总电压V3
//singleBatteryMaxVoltage 单次最大电压 // String totalBatteryVoltage =str.substring(158,164);
String singleBatteryMaxVoltage =str.substring(164,168); // while (totalBatteryVoltage.endsWith("0")){
while (singleBatteryMaxVoltage.endsWith("0")){ // totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1);
singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1); // }
} // coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage);
coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage); //
// //singleBatteryMaxVoltage 单次最大电压
//singleBatteryMinVoltage 单体电池最低电压 // String singleBatteryMaxVoltage =str.substring(164,168);
String singleBatteryMinVoltage =str.substring(168,172); // while (singleBatteryMaxVoltage.endsWith("0")){
while (singleBatteryMinVoltage.endsWith("0")){ // singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1);
singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1); // }
} // coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
//
coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage); // //singleBatteryMinVoltage 单体电池最低电压
// String singleBatteryMinVoltage =str.substring(168,172);
//singleBatteryMaxTemperature 单体电池最高温度 // while (singleBatteryMinVoltage.endsWith("0")){
String singleBatteryMaxTemperature =str.substring(172,178); // singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1);
while (singleBatteryMaxTemperature.endsWith("0")){ // }
singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1); //
} // coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature); //
// //singleBatteryMaxTemperature 单体电池最高温度
//singleBatteryMinTemperature 单体电池最低温度 // String singleBatteryMaxTemperature =str.substring(172,178);
String singleBatteryMinTemperature =str.substring(178,184); // while (singleBatteryMaxTemperature.endsWith("0")){
while (singleBatteryMinTemperature.endsWith("0")){ // singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1);
singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1); // }
} // coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature); //
// //singleBatteryMinTemperature 单体电池最低温度
//availableBatteryCapacity 可用电池容量 // String singleBatteryMinTemperature =str.substring(178,184);
String availableBatteryCapacity =str.substring(184,190); // while (singleBatteryMinTemperature.endsWith("0")){
while (availableBatteryCapacity.endsWith("0")){ // singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1);
availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1); // }
} // coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity); //
// //availableBatteryCapacity 可用电池容量
//vehicleStatus 车辆状态 // String availableBatteryCapacity =str.substring(184,190);
int vehicleStatus = Integer.parseInt(str.substring(190,191)); // while (availableBatteryCapacity.endsWith("0")){
coupletMsgData.setVehicleStatus(vehicleStatus); // availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1);
// }
//chargingStatus 充电状态 // coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity);
int chargingStatus = Integer.parseInt(str.substring(191,192)); //
coupletMsgData.setChargingStatus(chargingStatus); // //vehicleStatus 车辆状态
// int vehicleStatus = Integer.parseInt(str.substring(190,191));
//operatingStatus 运行状态 // coupletMsgData.setVehicleStatus(vehicleStatus);
int operatingStatus = Integer.parseInt(str.substring(192,193)); //
coupletMsgData.setOperatingStatus(operatingStatus); // //chargingStatus 充电状态
// int chargingStatus = Integer.parseInt(str.substring(191,192));
//socStatus SOC // coupletMsgData.setChargingStatus(chargingStatus);
int socStatus = Integer.parseInt(str.substring(193,194)); //
coupletMsgData.setSocStatus(socStatus); // //operatingStatus 运行状态
// int operatingStatus = Integer.parseInt(str.substring(192,193));
//chargingEnergyStorageStatus 可充电储能装置工作状态 // coupletMsgData.setOperatingStatus(operatingStatus);
int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195)); //
coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus); // //socStatus SOC
// int socStatus = Integer.parseInt(str.substring(193,194));
//driveMotorStatus 驱动电机状态 // coupletMsgData.setSocStatus(socStatus);
int driveMotorStatus = Integer.parseInt(str.substring(195,196)); //
coupletMsgData.setDriveMotorStatus(driveMotorStatus); // //chargingEnergyStorageStatus 可充电储能装置工作状态
// int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195));
//positionStatus 定位是否有效 // coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus);
int positionStatus = Integer.parseInt(str.substring(196,197)); //
coupletMsgData.setPositionStatus(positionStatus); // //driveMotorStatus 驱动电机状态
// int driveMotorStatus = Integer.parseInt(str.substring(195,196));
//easStatus EAS(汽车防盗系统)状态 // coupletMsgData.setDriveMotorStatus(driveMotorStatus);
int easStatus = Integer.parseInt(str.substring(197,198)); //
coupletMsgData.setEasStatus(easStatus); // //positionStatus 定位是否有效
// int positionStatus = Integer.parseInt(str.substring(196,197));
//ptcStatus PTC(电动加热器)状态 // coupletMsgData.setPositionStatus(positionStatus);
int ptcStatus = Integer.parseInt(str.substring(198,199)); //
coupletMsgData.setPtcStatus(ptcStatus); // //easStatus EAS(汽车防盗系统)状态
// int easStatus = Integer.parseInt(str.substring(197,198));
//epsStatus // coupletMsgData.setEasStatus(easStatus);
int epsStatus = Integer.parseInt(str.substring(199,200)); //
coupletMsgData.setEpsStatus(epsStatus); // //ptcStatus PTC(电动加热器)状态
// int ptcStatus = Integer.parseInt(str.substring(198,199));
//absStatus EPS(电动助力系统)状态 // coupletMsgData.setPtcStatus(ptcStatus);
int absStatus = Integer.parseInt(str.substring(200,201)); //
coupletMsgData.setAbsStatus(absStatus); // //epsStatus
// int epsStatus = Integer.parseInt(str.substring(199,200));
//mcuStatus MCU(电机/逆变器)状态 // coupletMsgData.setEpsStatus(epsStatus);
int mcuStatus = Integer.parseInt(str.substring(201,202)); //
coupletMsgData.setMcuStatus(mcuStatus); // //absStatus EPS(电动助力系统)状态
// int absStatus = Integer.parseInt(str.substring(200,201));
//heatingStatus 动力电池加热状态 // coupletMsgData.setAbsStatus(absStatus);
int heatingStatus = Integer.parseInt(str.substring(202,203)); //
coupletMsgData.setHeatingStatus(heatingStatus); // //mcuStatus MCU(电机/逆变器)状态
// int mcuStatus = Integer.parseInt(str.substring(201,202));
//batteryStatus 动力电池当前状态 // coupletMsgData.setMcuStatus(mcuStatus);
int batteryStatus = Integer.parseInt(str.substring(203,204)); //
coupletMsgData.setBatteryStatus(batteryStatus); // //heatingStatus 动力电池加热状态
// int heatingStatus = Integer.parseInt(str.substring(202,203));
//batteryInsulationStatus 动力电池保温状态 // coupletMsgData.setHeatingStatus(heatingStatus);
int batteryInsulationStatus = Integer.parseInt(str.substring(204,205)); //
coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus); // //batteryStatus 动力电池当前状态
// int batteryStatus = Integer.parseInt(str.substring(203,204));
//dcdcStatus DCDC(电力交换系统)状态 // coupletMsgData.setBatteryStatus(batteryStatus);
int dcdcStatus = Integer.parseInt(str.substring(205,206)); //
coupletMsgData.setDcdcStatus(dcdcStatus); // //batteryInsulationStatus 动力电池保温状态
// int batteryInsulationStatus = Integer.parseInt(str.substring(204,205));
//chgStatus CHG(充电机)状态 // coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus);
int chgStatus = Integer.parseInt(str.substring(206,207)); //
coupletMsgData.setChgStatus(chgStatus); // //dcdcStatus DCDC(电力交换系统)状态
// int dcdcStatus = Integer.parseInt(str.substring(205,206));
coupletMsgDataList.add(coupletMsgData); // coupletMsgData.setDcdcStatus(dcdcStatus);
//
return coupletMsgDataList; // //chgStatus CHG(充电机)状态
} // int chgStatus = Integer.parseInt(str.substring(206,207));
// coupletMsgData.setChgStatus(chgStatus);
//
// coupletMsgDataList.add(coupletMsgData);
//
// return coupletMsgDataList;
// }
} }

View File

@ -0,0 +1,86 @@
package com.couplet.analyze.msg.model;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static com.couplet.analyze.msg.utils.MsgUtils.hexToString;
import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 14:04
* @description
*/
@Component
@Slf4j
public class ModelsKafkaMessage {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
static ArrayList<String> strings = new ArrayList<>() {
{
add("breakdown");
add("electronic-fence");
add("real-time-data");
add("stored-event");
}
};
/**
*
* @return
*/
@Scheduled(fixedDelay = 50)
private static void consumerMessages() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("接收到的数据:" + record.value());
String str = hexToString(record.value());
List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
for (CoupletMsgData msgData : coupletMsgDataList) {
log.info("解析到车辆数据:{}", msgData);
for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData);
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,317 @@
package com.couplet.analyze.msg.utils;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/6 14:12
* @description
*/
@Slf4j
public class MsgUtils {
/**
* 16ASCII
* @param s 16
* @return ASCII
*/
public static String hexToString(String s) {
if (s == null || s.equals("")) {
return null;
}
s = s.replace(" ", "");
byte[] baKeyword = new byte[s.length() / 2];
for (int i = 0; i < baKeyword.length; i++) {
try {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
s = new String(baKeyword, StandardCharsets.UTF_8);
} catch (Exception e1) {
e1.printStackTrace();
return s;
}
return s;
}
/**
*
* @param str
* @return
*/
public static List<CoupletMsgData> sendMsg(String str) {
List<CoupletMsgData> coupletMsgDataList = new ArrayList<>();
CoupletMsgData coupletMsgData = new CoupletMsgData();
coupletMsgData.setVin(str.substring(1,18));
log.info("vin=="+coupletMsgData.getVin());
//时间
String tim =str.substring(18,31);
long timestamp = Long.parseLong(tim);
Date date = new Date(timestamp);
coupletMsgData.setCreateTime(date);
//经度
String lt = str.substring(31,42);
// 如果末尾是零,则舍去
int endIndex = lt.length() - 1;
while (lt.charAt(endIndex) == '0'){
endIndex--;
}
String longitude = lt.substring(0, endIndex + 1);
coupletMsgData.setLongitude(longitude);
//维度
String latitudeIndex =str.substring(42,52);
int endIndexT = latitudeIndex.length() - 1;
while (latitudeIndex.charAt(endIndexT) == '0'){
endIndexT--;
}
String latitude = latitudeIndex.substring(0, endIndexT + 1);
coupletMsgData.setLatitude(latitude);
//速度speed
String speed =str.substring(52,58);
coupletMsgData.setSpeed(speed);
//里程
BigDecimal mileage= new BigDecimal(str.substring(58,69));
mileage=mileage.stripTrailingZeros();
coupletMsgData.setMileage(mileage);
//总电压
String voltage =str.substring(69,75);
while (voltage.endsWith("0")) {
voltage = voltage.substring(0, voltage.length() - 1); // 去除末尾的零
}
coupletMsgData.setVoltage(voltage);
//总电流
String current =str.substring(75,80);
while (current.endsWith("0")){
current=current.substring(0,current.length()-1);
}
coupletMsgData.setCurrent(current);
//绝缘电阻 resistance
String res =str.substring(80,89);
String resistance = res.substring(0, 5);
coupletMsgData.setResistance(resistance);
//档位
String gear =str.substring(89,90);
coupletMsgData.setGear(gear);
//accelerationPedal 加速踏板行程值
String accelerationPedal =str.substring(90,91);
coupletMsgData.setAccelerationPedal(accelerationPedal);
//brakePedal 制动踏板行程值
String brakePedal =str.substring(92,93);
coupletMsgData.setBrakePedal(brakePedal);
//fuelConsumptionRate 燃料消耗率
String fuelConsumptionRate =str.substring(94,99);
coupletMsgData.setFuelConsumptionRate(fuelConsumptionRate);
//motorControllerTemperature 电机控制器温度
String motorControllerTemperature =str.substring(99,105);
while (motorControllerTemperature.endsWith("0")){
motorControllerTemperature=motorControllerTemperature.substring(0,motorControllerTemperature.length()-1);
}
coupletMsgData.setMotorControllerTemperature(motorControllerTemperature);
//motorSpeed 电机转速
String motorSpeed =str.substring(105,110);
coupletMsgData.setMotorSpeed(motorSpeed);
//motorTorque 电机转矩
String motorTorque =str.substring(110,114);
while (motorTorque.endsWith("0")){
motorTorque=motorTorque.substring(0,motorTorque.length()-1);
}
coupletMsgData.setMotorTorque(motorTorque);
//motorTemperature 电机温度
String motorTemperature =str.substring(114,120);
while (motorTemperature.endsWith("0")){
motorTemperature=motorTemperature.substring(0,motorTemperature.length()-1);
}
coupletMsgData.setMotorTemperature(motorTemperature);
//motorVoltage 电机电压
String motorVoltage =str.substring(120,125);
while (motorVoltage.endsWith("0")){
motorVoltage=motorVoltage.substring(0,motorVoltage.length()-1);
}
coupletMsgData.setMotorVoltage(motorVoltage);
//motorCurrent 电机电流
String motorCurrent =str.substring(125,133);
while (motorCurrent.endsWith("0")){
motorCurrent=motorCurrent.substring(0,motorCurrent.length()-1);
}
coupletMsgData.setMotorCurrent(motorCurrent);
//remainingBattery 动力电池剩余电量SOC
BigDecimal remainingBattery = new BigDecimal(str.substring(133,138));
coupletMsgData.setRemainingBattery(remainingBattery);
//maximumFeedbackPower 当前状态允许的最大反馈功率
String maximumFeedbackPower =str.substring(139,144);
while (maximumFeedbackPower.endsWith("0")){
maximumFeedbackPower=maximumFeedbackPower.substring(0,maximumFeedbackPower.length()-1);
}
coupletMsgData.setMaximumFeedbackPower(maximumFeedbackPower);
//maximumDischargePower 当前状态允许最大放电功率
String maximumDischargePower =str.substring(145,151);
while (maximumDischargePower.endsWith("0")){
maximumDischargePower=maximumDischargePower.substring(0,maximumDischargePower.length()-1);
}
coupletMsgData.setMaximumDischargePower(maximumDischargePower);
//selfCheckCounter BMS自检计数器
String selfCheckCounter =str.substring(151,153);
String selfCheckCounterReplace = selfCheckCounter.replace("0", "");
coupletMsgData.setSelfCheckCounter(selfCheckCounterReplace);
//totalBatteryCurrent 动力电池充放电电流
String totalBatteryCurrent =str.substring(153,158);
while (totalBatteryCurrent.endsWith("0")){
totalBatteryCurrent=totalBatteryCurrent.substring(0,totalBatteryCurrent.length()-1);
}
coupletMsgData.setTotalBatteryCurrent(totalBatteryCurrent);
//totalBatteryVoltage 动力电池负载端总电压V3
String totalBatteryVoltage =str.substring(158,164);
while (totalBatteryVoltage.endsWith("0")){
totalBatteryVoltage=totalBatteryVoltage.substring(0,totalBatteryVoltage.length()-1);
}
coupletMsgData.setTotalBatteryVoltage(totalBatteryVoltage);
//singleBatteryMaxVoltage 单次最大电压
String singleBatteryMaxVoltage =str.substring(164,168);
while (singleBatteryMaxVoltage.endsWith("0")){
singleBatteryMaxVoltage=singleBatteryMaxVoltage.substring(0,singleBatteryMaxVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
//singleBatteryMinVoltage 单体电池最低电压
String singleBatteryMinVoltage =str.substring(168,172);
while (singleBatteryMinVoltage.endsWith("0")){
singleBatteryMinVoltage=singleBatteryMinVoltage.substring(0,singleBatteryMinVoltage.length()-1);
}
coupletMsgData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
//singleBatteryMaxTemperature 单体电池最高温度
String singleBatteryMaxTemperature =str.substring(172,178);
while (singleBatteryMaxTemperature.endsWith("0")){
singleBatteryMaxTemperature=singleBatteryMaxTemperature.substring(0,singleBatteryMaxTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
//singleBatteryMinTemperature 单体电池最低温度
String singleBatteryMinTemperature =str.substring(178,184);
while (singleBatteryMinTemperature.endsWith("0")){
singleBatteryMinTemperature=singleBatteryMinTemperature.substring(0,singleBatteryMinTemperature.length()-1);
}
coupletMsgData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
//availableBatteryCapacity 可用电池容量
String availableBatteryCapacity =str.substring(184,190);
while (availableBatteryCapacity.endsWith("0")){
availableBatteryCapacity=availableBatteryCapacity.substring(0,availableBatteryCapacity.length()-1);
}
coupletMsgData.setAvailableBatteryCapacity(availableBatteryCapacity);
//vehicleStatus 车辆状态
int vehicleStatus = Integer.parseInt(str.substring(190,191));
coupletMsgData.setVehicleStatus(vehicleStatus);
//chargingStatus 充电状态
int chargingStatus = Integer.parseInt(str.substring(191,192));
coupletMsgData.setChargingStatus(chargingStatus);
//operatingStatus 运行状态
int operatingStatus = Integer.parseInt(str.substring(192,193));
coupletMsgData.setOperatingStatus(operatingStatus);
//socStatus SOC
int socStatus = Integer.parseInt(str.substring(193,194));
coupletMsgData.setSocStatus(socStatus);
//chargingEnergyStorageStatus 可充电储能装置工作状态
int chargingEnergyStorageStatus = Integer.parseInt(str.substring(194,195));
coupletMsgData.setChargingEnergyStorageStatus(chargingEnergyStorageStatus);
//driveMotorStatus 驱动电机状态
int driveMotorStatus = Integer.parseInt(str.substring(195,196));
coupletMsgData.setDriveMotorStatus(driveMotorStatus);
//positionStatus 定位是否有效
int positionStatus = Integer.parseInt(str.substring(196,197));
coupletMsgData.setPositionStatus(positionStatus);
//easStatus EAS(汽车防盗系统)状态
int easStatus = Integer.parseInt(str.substring(197,198));
coupletMsgData.setEasStatus(easStatus);
//ptcStatus PTC(电动加热器)状态
int ptcStatus = Integer.parseInt(str.substring(198,199));
coupletMsgData.setPtcStatus(ptcStatus);
//epsStatus
int epsStatus = Integer.parseInt(str.substring(199,200));
coupletMsgData.setEpsStatus(epsStatus);
//absStatus EPS(电动助力系统)状态
int absStatus = Integer.parseInt(str.substring(200,201));
coupletMsgData.setAbsStatus(absStatus);
//mcuStatus MCU(电机/逆变器)状态
int mcuStatus = Integer.parseInt(str.substring(201,202));
coupletMsgData.setMcuStatus(mcuStatus);
//heatingStatus 动力电池加热状态
int heatingStatus = Integer.parseInt(str.substring(202,203));
coupletMsgData.setHeatingStatus(heatingStatus);
//batteryStatus 动力电池当前状态
int batteryStatus = Integer.parseInt(str.substring(203,204));
coupletMsgData.setBatteryStatus(batteryStatus);
//batteryInsulationStatus 动力电池保温状态
int batteryInsulationStatus = Integer.parseInt(str.substring(204,205));
coupletMsgData.setBatteryInsulationStatus(batteryInsulationStatus);
//dcdcStatus DCDC(电力交换系统)状态
int dcdcStatus = Integer.parseInt(str.substring(205,206));
coupletMsgData.setDcdcStatus(dcdcStatus);
//chgStatus CHG(充电机)状态
int chgStatus = Integer.parseInt(str.substring(206,207));
coupletMsgData.setChgStatus(chgStatus);
coupletMsgDataList.add(coupletMsgData);
return coupletMsgDataList;
}
}

View File

@ -75,22 +75,6 @@ public class SysTroubleController extends BaseController {
return success(); return success();
} }
/**
*
* @param coupletTroubleCode
* @return
*/
@PostMapping("insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode){
long start = System.currentTimeMillis();
int i = troubleService.insertMsgResq(coupletTroubleCode);
long end = System.currentTimeMillis();
log.info("记录异常信息成功,耗时:{}",(end-start));
return Result.success(i);
}
/** /**
* *
*/ */

View File

@ -25,7 +25,5 @@ public interface SysTroubleService extends IService<CoupletTroubleCode> {
*/ */
void newFaultData(CoupletTroubleCode code); void newFaultData(CoupletTroubleCode code);
int insertMsgResq(CoupletTroubleCode coupletTroubleCode);
void cleanTroubleCode(); void cleanTroubleCode();
} }

View File

@ -50,16 +50,6 @@ public class SysTroubleServiceImpl extends ServiceImpl<SysTroubleMapper, Couplet
sysTroubleMapper.newFaultData(code); sysTroubleMapper.newFaultData(code);
} }
/**
*
* @param coupletTroubleCode
* @return
*/
@Override
public int insertMsgResq(CoupletTroubleCode coupletTroubleCode) {
return sysTroubleMapper.insertMsgResq(coupletTroubleCode);
}
/** /**
* *
*/ */

View File

@ -7,18 +7,14 @@
<resultMap type="com.couplet.common.domain.CoupletTroubleCode" id="SysTroubleResult"> <resultMap type="com.couplet.common.domain.CoupletTroubleCode" id="SysTroubleResult">
<id property="troubleId" column="trouble_id"/> <id property="troubleId" column="trouble_id"/>
<result property="troubleCode" column="trouble_code"/> <result property="troubleCode" column="trouble_code"/>
<result property="troubleValue" column="trouble_value"/> <result property="troubleVin" column="trouble_vin"/>
<result property="troublePosition" column="trouble_position"/>
<result property="troubleTag" column="trouble_tag"/> <result property="troubleTag" column="trouble_tag"/>
<result property="typeId" column="type_id"/> <result property="troubleStartTime" column="trouble_start_time"/>
<result property="gradeId" column="grade_id"/> <result property="troubleEndTime" column="trouble_end_time"/>
<result property="troubleType" column="trouble_type"/>
</resultMap> </resultMap>
<sql id="selectTroubleVo"> <sql id="selectTroubleVo">
select t.*,g.grade_name,y.type_name from couplet_trouble_code t select * from couplet_trouble_code
LEFT JOIN couplet_trouble_grade g on t.grade_id = g.grade_id
LEFT JOIN couplet_trouble_type y on t.type_id= y.type_id
</sql> </sql>
<insert id="newFaultData"> <insert id="newFaultData">
INSERT INTO `couplet-cloud`.`couplet_trouble_code` INSERT INTO `couplet-cloud`.`couplet_trouble_code`
@ -26,9 +22,6 @@
`trouble_start_time`) `trouble_start_time`)
VALUES VALUES
(#{troubleVin}, #{troubleTag}, #{troubleStartTime}) (#{troubleVin}, #{troubleTag}, #{troubleStartTime})
</insert>
<insert id="insertMsgResq">
</insert> </insert>
<update id="cleanTroubleCode"> <update id="cleanTroubleCode">

View File

@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.PostConstruct;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.Properties; import java.util.Properties;
@ -29,11 +30,12 @@ public class KafkaTest {
produceMessage(); produceMessage();
//消费者示例 //消费者示例
consumerMessages(); // consumerMessages();
} }
//生产者 //生产者
@PostConstruct
private static void produceMessage() { private static void produceMessage() {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
@ -59,33 +61,33 @@ public class KafkaTest {
} }
//消费者 //消费者
private static void consumerMessages() { // private static void consumerMessages() {
Properties props = new Properties(); // Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
//创建消费者 // //创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
try { // try {
//
//订阅主题 // //订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME)); // consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
//持续消费消息 // //持续消费消息
while (true) { // while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> { // records.forEach(record -> {
System.out.println("消费者接受到的消息值:" + record.value()); // System.out.println("消费者接受到的消息值:" + record.value());
}); // });
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
consumer.close(); // consumer.close();
} // }
} // }
} }