commit 940ed7132fe8610333c47daf8e68bc161449cb69 Author: ZhangXushuo <3508242435.com> Date: Sat Dec 2 08:42:35 2023 +0800 初始化mqtt和fluxMq diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a32aab2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store +/.idea diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..3ebf741 --- /dev/null +++ b/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + com.fivegroup + mqtt_fluxMq + 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-parent + 2.5.6 + + + + 17 + 17 + UTF-8 + + + + + org.springframework.boot + spring-boot-starter-web + + + + + org.apache.kafka + kafka-clients + 2.8.0 + + + + org.springframework.kafka + spring-kafka + + + + org.springframework.integration + spring-integration-mqtt + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + + org.projectlombok + lombok + + + com.alibaba.fastjson2 + fastjson2 + 2.0.41 + + + + diff --git a/src/main/java/com/fivegroup/MqttApplication.java b/src/main/java/com/fivegroup/MqttApplication.java new file mode 100644 index 0000000..1741c24 --- /dev/null +++ b/src/main/java/com/fivegroup/MqttApplication.java @@ -0,0 +1,12 @@ +package com.fivegroup; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MqttApplication { + public static void main(String[] args) { + SpringApplication.run(MqttApplication.class,args); + } + +} diff --git a/src/main/java/com/fivegroup/common/SystemConstant.java b/src/main/java/com/fivegroup/common/SystemConstant.java new file mode 100644 index 0000000..af5e3d7 --- /dev/null +++ b/src/main/java/com/fivegroup/common/SystemConstant.java @@ -0,0 +1,18 @@ +package com.fivegroup.common; + +import java.math.BigDecimal; + +/** + * 系统常量 + * @author ZhangXushuo + * @version 2023/12/1 - 15:45 + */ + +public class SystemConstant { + + public static final BigDecimal powerConsumption = new BigDecimal(1000); + public static final BigDecimal hundredKilometers = new BigDecimal(100); + public static final String MSG_START="7E"; + public static final String MSG_END ="7E"; + +} diff --git a/src/main/java/com/fivegroup/common/ThreadPool.java b/src/main/java/com/fivegroup/common/ThreadPool.java new file mode 100644 index 0000000..9bac67f --- /dev/null +++ b/src/main/java/com/fivegroup/common/ThreadPool.java @@ -0,0 +1,25 @@ +package com.fivegroup.common; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * 线程池 + * + * @author ZhangXushuo + * @version 2023/12/1 - 15:50 + */ + +public class ThreadPool { + private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(16); + public static ScheduledFuture submit (Runnable thread){ + // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 + return scheduledThreadPool.scheduleAtFixedRate(thread, 0, 1, TimeUnit.SECONDS); + } + public static void shutdown(){ + scheduledThreadPool.shutdown(); + } + +} diff --git a/src/main/java/com/fivegroup/config/KafkaChannel.java b/src/main/java/com/fivegroup/config/KafkaChannel.java new file mode 100644 index 0000000..7c7ba05 --- /dev/null +++ b/src/main/java/com/fivegroup/config/KafkaChannel.java @@ -0,0 +1,52 @@ +package com.fivegroup.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * kafka配置连接通道 + * + * @author ZhangXushuo + * @version 2023/12/1 - 18:42 + */ +@Configuration +@EnableKafka +public class KafkaChannel { + //这个类是kafka 的配置类,在这里配置了kafka的连接信息,包括服务器地址,端口号,topic等等。 + public static final String TOPIC_NAME = "topic_test"; + public static final String BOOTSTRAP_SERVERS = "123.207.204.152:9092"; + public static final String GROUP_ID = "group_test"; + + /** + * 生产工厂 + */ + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // 可以根据需要添加其他配置 + + return new DefaultKafkaProducerFactory<>(configProps); + } + + /** + * Kafka 模板 + */ + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + +} diff --git a/src/main/java/com/fivegroup/config/MqttChannel.java b/src/main/java/com/fivegroup/config/MqttChannel.java new file mode 100644 index 0000000..672ef04 --- /dev/null +++ b/src/main/java/com/fivegroup/config/MqttChannel.java @@ -0,0 +1,55 @@ +package com.fivegroup.config; + + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; + +/** + * mqtt配置 + * + * @author ZhangXushuo + * @version 2023/12/1 - 18:44 + */ +@Configuration + +public class MqttChannel { + + + public static final String MQTT_SERVER_URL = "tcp://fluxmq.muyu.icu:1883"; + public static final String CLIENT_ID = "grouping_connection"; + public static final String USERNAME = "root"; + public static final String PASSWORD = "root"; + + /** + * mqtt Connect 选项 + */ + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(new String[]{MQTT_SERVER_URL}); + options.setUserName(USERNAME); + options.setPassword(PASSWORD.toCharArray()); + // 添加其他连接选项,例如SSL证书等 + return options; + } + + /** + * mqtt 客户端工厂 + */ + @Bean + public IMqttClient mqttClient() { + IMqttClient clientInstance=null; + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(mqttConnectOptions()); + try { + clientInstance = factory.getClientInstance(MQTT_SERVER_URL, CLIENT_ID); + } catch (MqttException e) { + throw new RuntimeException(e); + } + return clientInstance; + } +} diff --git a/src/main/java/com/fivegroup/domain/VehicleData.java b/src/main/java/com/fivegroup/domain/VehicleData.java new file mode 100644 index 0000000..158caf2 --- /dev/null +++ b/src/main/java/com/fivegroup/domain/VehicleData.java @@ -0,0 +1,309 @@ +package com.fivegroup.domain; + +import lombok.*; + +import java.math.BigDecimal; +import java.util.Date; + +/** + * 车辆数据 + * + * @author ZhangXushuo + * @version 2023/12/1 - 20:06 + */ +@Data +@Builder +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class VehicleData { + + /** + * 主键id + */ + + private Integer id; + + /** + * VIN + */ + private String vin; + + /** + * 时间戳 + */ + private Date createTime; + + /** + * 经度 + */ + private String longitude; + + /** + * 维度 + */ + private String latitude; + + /** + * 速度 + */ + private String speed; + + /** + * 里程 + */ + private BigDecimal mileage; + + /** + * 总电压 + */ + private String voltage; + + /** + * 总电流 + */ + private String current; + + /** + * 绝缘电阻 + */ + private String resistance; + + /** + * 档位 + */ + private String gear; + + /** + * 加速踏板行程值 + */ + private String accelerationPedal; + + /** + * 制动踏板行程值 + */ + private String brakePedal; + + /** + * 燃料消耗率 + */ + private String fuelConsumptionRate; + + /** + * 电机控制器温度 + */ + private String motorControllerTemperature; + + /** + * 电机转速 + */ + private String motorSpeed; + + /** + * 电机转矩 + */ + private String motorTorque; + + /** + * 电机温度 + */ + private String motorTemperature; + + /** + * 电机电压 + */ + private String motorVoltage; + + /** + * 电机电流 + */ + private String motorCurrent; + + /** + * 动力电池剩余电量SOC + */ + private BigDecimal remainingBattery; + + + /** + * 当前状态允许的最大反馈功率 + */ + private String maximumFeedbackPower; + + /** + * 当前状态允许最大放电功率 + */ + private String maximumDischargePower; + + /** + * BMS自检计数器 + */ + private String selfCheckCounter; + + /** + * 动力电池充放电电流 + */ + private String totalBatteryCurrent; + + /** + * 动力电池负载端总电压V3 + */ + private String totalBatteryVoltage; + + /** + * 单次最大电压 + */ + private String singleBatteryMaxVoltage; + + /** + * 单体电池最低电压 + */ + private String singleBatteryMinVoltage; + + /** + * 单体电池最高温度 + */ + private String singleBatteryMaxTemperature; + + /** + * 单体电池最低温度 + */ + private String singleBatteryMinTemperature; + + /** + * 动力电池可用容量 + */ + private String availableBatteryCapacity; + + /** + * 车辆状态 + */ + private int vehicleStatus; + + /** + * 充电状态 + */ + private int chargingStatus; + + /** + * 运行状态 + */ + private int operatingStatus; + + /** + * SOC + */ + private int socStatus; + + /** + * 可充电储能装置工作状态 + */ + private int chargingEnergyStorageStatus; + + /** + * 驱动电机状态 + */ + private int driveMotorStatus; + + /** + * 定位是否有效 + */ + private int positionStatus; + + /** + * EAS(汽车防盗系统)状态 + */ + private int easStatus; + + /** + * PTC(电动加热器)状态 + */ + private int ptcStatus; + + /** + * EPS(电动助力系统)状态 + */ + private int epsStatus; + + /** + * ABS(防抱死)状态 + */ + private int absStatus; + + /** + * MCU(电机/逆变器)状态 + */ + private int mcuStatus; + + /** + * 动力电池加热状态 + */ + private int heatingStatus; + + /** + * 动力电池当前状态 + */ + private int batteryStatus; + + /** + * 动力电池保温状态 + */ + private int batteryInsulationStatus; + + + public static VehicleData getBuild(String messages) { + char start = messages.charAt(0); + char end = messages.charAt(messages.length() - 1); + System.out.println(start); + System.out.println(end); + return VehicleData.builder() + .vin(messages.substring(1, 18)) + //messages.substring(18, 31) + .createTime(new Date()) + .longitude(messages.substring(31, 42)) + .latitude(messages.substring(42, 52)) + .speed(messages.substring(52, 58)) + .mileage(new BigDecimal(messages.substring(58, 69))) + .voltage(messages.substring(69, 75)) + .current(messages.substring(75, 80)) + .resistance(messages.substring(80, 89)) + .gear(messages.substring(89, 90)) + .accelerationPedal(messages.substring(90, 92)) + .brakePedal(messages.substring(92, 94)) + .fuelConsumptionRate(messages.substring(94, 99)) + .motorControllerTemperature(messages.substring(99, 105)) + .motorSpeed(messages.substring(105, 110)) + .motorTorque(messages.substring(110, 114)) + .motorTemperature(messages.substring(114, 120)) + .motorVoltage(messages.substring(120, 125)) + .motorCurrent(messages.substring(125, 133)) + .remainingBattery(new BigDecimal(messages.substring(133, 139))) + .maximumFeedbackPower(messages.substring(139, 145)) + .maximumDischargePower(messages.substring(145, 151)) + .selfCheckCounter(messages.substring(151, 153)) + .totalBatteryCurrent(messages.substring(153, 158)) + .totalBatteryVoltage(messages.substring(158, 164)) + .singleBatteryMaxVoltage(messages.substring(164, 168)) + .singleBatteryMinVoltage(messages.substring(168, 172)) + .singleBatteryMaxTemperature(messages.substring(172, 178)) + .singleBatteryMinTemperature(messages.substring(178, 184)) + .availableBatteryCapacity(messages.substring(184, 190)) + .vehicleStatus(Integer.valueOf(messages.substring(190, 191))) + .chargingStatus(Integer.valueOf(messages.substring(191, 192))) + .operatingStatus(Integer.valueOf(messages.substring(192, 193))) + .socStatus(Integer.valueOf(messages.substring(193, 194))) + .chargingEnergyStorageStatus(Integer.valueOf(messages.substring(194, 195))) + .driveMotorStatus(Integer.valueOf(messages.substring(195, 196))) + .positionStatus(Integer.valueOf(messages.substring(196, 197))) + .easStatus(Integer.valueOf(messages.substring(197, 198))) + .ptcStatus(Integer.valueOf(messages.substring(198, 199))) + .epsStatus(Integer.valueOf(messages.substring(199, 200))) + .absStatus(Integer.valueOf(messages.substring(200, 201))) + .mcuStatus(Integer.valueOf(messages.substring(201, 202))) + .heatingStatus(Integer.valueOf(messages.substring(202, 203))) + .batteryStatus(Integer.valueOf(messages.substring(203, 204))) + .batteryInsulationStatus(Integer.valueOf(messages.substring(204, 205))) + .build(); + + } +} diff --git a/src/main/java/com/fivegroup/service/ShuntService.java b/src/main/java/com/fivegroup/service/ShuntService.java new file mode 100644 index 0000000..ab9b260 --- /dev/null +++ b/src/main/java/com/fivegroup/service/ShuntService.java @@ -0,0 +1,46 @@ +package com.fivegroup.service; + +import com.fivegroup.config.MqttChannel; +import com.fivegroup.utils.MqttCallBacks; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +/** + * 收到mqtt的消息分流给kafka + * + * @author ZhangXushuo + * @version 2023/12/1 - 19:00 + */ +@Service +public class ShuntService { + @Autowired + MqttCallBacks mqttCallBack; + @Autowired + private MqttChannel mqttChannel; + + /** + * 在这个示例中,我们使用@ServiceActivator注解将handleMessage方法标记为消息处理器。 + * 该方法会在接收到MQTT消息时被调用。通过mqtt_receivedTopic头部可以获取到消息的主题, + * 通过message.getPayload()可以获取到消息的内容。 + */ + @PostConstruct + @ServiceActivator(inputChannel = "mqttInputChannel") + public void mqttAc() { + // 在这里处理接收到的MQTT消息 + try { + IMqttClient client = mqttChannel.mqttClient(); + client.connect(); + client.subscribe("five"); + client.setCallback(mqttCallBack); + } catch (MqttException e) { + throw new RuntimeException(e); + } + + } + + +} diff --git a/src/main/java/com/fivegroup/utils/MqttCallBacks.java b/src/main/java/com/fivegroup/utils/MqttCallBacks.java new file mode 100644 index 0000000..1baeabf --- /dev/null +++ b/src/main/java/com/fivegroup/utils/MqttCallBacks.java @@ -0,0 +1,63 @@ +package com.fivegroup.utils; + +import com.alibaba.fastjson2.JSONObject; +import com.fivegroup.config.KafkaChannel; +import com.fivegroup.domain.VehicleData; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +/** + * mqtt回调函数重写 + * + * @author ZhangXushuo + * @version 2023/12/1 - 19:42 + */ +@Log4j2 +@Component +public class MqttCallBacks implements MqttCallback { + @Autowired + KafkaChannel kafkaChannel; + + /** + * 连接丢失 + */ + @Override + public void connectionLost(Throwable throwable) { + + } + + /** + * 消息已到达 + */ + @Override + public void messageArrived(String topci, MqttMessage mqttMessage) throws Exception { + String s1 = new String(mqttMessage.getPayload()); + System.out.println(s1 + "数据" + s1); + StringBuilder sb = new StringBuilder(); + String[] arr = s1.split(" "); + int length = arr.length; + for (int i = 0; i < length; i++) { + int ch = Integer.parseInt(arr[i], 16); + sb.append((char) ch); + } + + VehicleData build = VehicleData.getBuild(sb.toString()); + // String vin = build.getVin(); + String vin ="five"; + KafkaTemplate kafkaed = kafkaChannel.kafkaTemplate(); + kafkaed.send(vin, JSONObject.toJSONString(build)); + } + + /** + * 交货完成 + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } +}