初始化mqtt和fluxMq

master
ZhangXushuo 2023-12-02 08:42:35 +08:00
commit 940ed7132f
10 changed files with 683 additions and 0 deletions

39
.gitignore vendored 100644
View File

@ -0,0 +1,39 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
/.idea

64
pom.xml 100644
View File

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fivegroup</groupId>
<artifactId>mqtt_fluxMq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
</parent>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Integration MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Eclipse Paho MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- Lombok (如果您需要) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,12 @@
package com.fivegroup;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
}
}

View File

@ -0,0 +1,18 @@
package com.fivegroup.common;
import java.math.BigDecimal;
/**
*
* @author ZhangXushuo
* @version 2023/12/1 - 15:45
*/
public class SystemConstant {
public static final BigDecimal powerConsumption = new BigDecimal(1000);
public static final BigDecimal hundredKilometers = new BigDecimal(100);
public static final String MSG_START="7E";
public static final String MSG_END ="7E";
}

View File

@ -0,0 +1,25 @@
package com.fivegroup.common;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author ZhangXushuo
* @version 2023/12/1 - 15:50
*/
public class ThreadPool {
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(16);
public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
return scheduledThreadPool.scheduleAtFixedRate(thread, 0, 1, TimeUnit.SECONDS);
}
public static void shutdown(){
scheduledThreadPool.shutdown();
}
}

View File

@ -0,0 +1,52 @@
package com.fivegroup.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* kafka
*
* @author ZhangXushuo
* @version 2023/12/1 - 18:42
*/
@Configuration
@EnableKafka
public class KafkaChannel {
//这个类是kafka 的配置类在这里配置了kafka的连接信息包括服务器地址端口号topic等等。
public static final String TOPIC_NAME = "topic_test";
public static final String BOOTSTRAP_SERVERS = "123.207.204.152:9092";
public static final String GROUP_ID = "group_test";
/**
*
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 可以根据需要添加其他配置
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* Kafka
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,55 @@
package com.fivegroup.config;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
/**
* mqtt
*
* @author ZhangXushuo
* @version 2023/12/1 - 18:44
*/
@Configuration
public class MqttChannel {
public static final String MQTT_SERVER_URL = "tcp://fluxmq.muyu.icu:1883";
public static final String CLIENT_ID = "grouping_connection";
public static final String USERNAME = "root";
public static final String PASSWORD = "root";
/**
* mqtt Connect
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{MQTT_SERVER_URL});
options.setUserName(USERNAME);
options.setPassword(PASSWORD.toCharArray());
// 添加其他连接选项例如SSL证书等
return options;
}
/**
* mqtt
*/
@Bean
public IMqttClient mqttClient() {
IMqttClient clientInstance=null;
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
try {
clientInstance = factory.getClientInstance(MQTT_SERVER_URL, CLIENT_ID);
} catch (MqttException e) {
throw new RuntimeException(e);
}
return clientInstance;
}
}

View File

@ -0,0 +1,309 @@
package com.fivegroup.domain;
import lombok.*;
import java.math.BigDecimal;
import java.util.Date;
/**
*
*
* @author ZhangXushuo
* @version 2023/12/1 - 20:06
*/
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class VehicleData {
/**
* id
*/
private Integer id;
/**
* VIN
*/
private String vin;
/**
*
*/
private Date createTime;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
/**
*
*/
private String current;
/**
*
*/
private String resistance;
/**
*
*/
private String gear;
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
private String motorSpeed;
/**
*
*/
private String motorTorque;
/**
*
*/
private String motorTemperature;
/**
*
*/
private String motorVoltage;
/**
*
*/
private String motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private String maximumFeedbackPower;
/**
*
*/
private String maximumDischargePower;
/**
* BMS
*/
private String selfCheckCounter;
/**
*
*/
private String totalBatteryCurrent;
/**
* V3
*/
private String totalBatteryVoltage;
/**
*
*/
private String singleBatteryMaxVoltage;
/**
*
*/
private String singleBatteryMinVoltage;
/**
*
*/
private String singleBatteryMaxTemperature;
/**
*
*/
private String singleBatteryMinTemperature;
/**
*
*/
private String availableBatteryCapacity;
/**
*
*/
private int vehicleStatus;
/**
*
*/
private int chargingStatus;
/**
*
*/
private int operatingStatus;
/**
* SOC
*/
private int socStatus;
/**
*
*/
private int chargingEnergyStorageStatus;
/**
*
*/
private int driveMotorStatus;
/**
*
*/
private int positionStatus;
/**
* EAS()
*/
private int easStatus;
/**
* PTC()
*/
private int ptcStatus;
/**
* EPS()
*/
private int epsStatus;
/**
* ABS()
*/
private int absStatus;
/**
* MCU(/)
*/
private int mcuStatus;
/**
*
*/
private int heatingStatus;
/**
*
*/
private int batteryStatus;
/**
*
*/
private int batteryInsulationStatus;
public static VehicleData getBuild(String messages) {
char start = messages.charAt(0);
char end = messages.charAt(messages.length() - 1);
System.out.println(start);
System.out.println(end);
return VehicleData.builder()
.vin(messages.substring(1, 18))
//messages.substring(18, 31)
.createTime(new Date())
.longitude(messages.substring(31, 42))
.latitude(messages.substring(42, 52))
.speed(messages.substring(52, 58))
.mileage(new BigDecimal(messages.substring(58, 69)))
.voltage(messages.substring(69, 75))
.current(messages.substring(75, 80))
.resistance(messages.substring(80, 89))
.gear(messages.substring(89, 90))
.accelerationPedal(messages.substring(90, 92))
.brakePedal(messages.substring(92, 94))
.fuelConsumptionRate(messages.substring(94, 99))
.motorControllerTemperature(messages.substring(99, 105))
.motorSpeed(messages.substring(105, 110))
.motorTorque(messages.substring(110, 114))
.motorTemperature(messages.substring(114, 120))
.motorVoltage(messages.substring(120, 125))
.motorCurrent(messages.substring(125, 133))
.remainingBattery(new BigDecimal(messages.substring(133, 139)))
.maximumFeedbackPower(messages.substring(139, 145))
.maximumDischargePower(messages.substring(145, 151))
.selfCheckCounter(messages.substring(151, 153))
.totalBatteryCurrent(messages.substring(153, 158))
.totalBatteryVoltage(messages.substring(158, 164))
.singleBatteryMaxVoltage(messages.substring(164, 168))
.singleBatteryMinVoltage(messages.substring(168, 172))
.singleBatteryMaxTemperature(messages.substring(172, 178))
.singleBatteryMinTemperature(messages.substring(178, 184))
.availableBatteryCapacity(messages.substring(184, 190))
.vehicleStatus(Integer.valueOf(messages.substring(190, 191)))
.chargingStatus(Integer.valueOf(messages.substring(191, 192)))
.operatingStatus(Integer.valueOf(messages.substring(192, 193)))
.socStatus(Integer.valueOf(messages.substring(193, 194)))
.chargingEnergyStorageStatus(Integer.valueOf(messages.substring(194, 195)))
.driveMotorStatus(Integer.valueOf(messages.substring(195, 196)))
.positionStatus(Integer.valueOf(messages.substring(196, 197)))
.easStatus(Integer.valueOf(messages.substring(197, 198)))
.ptcStatus(Integer.valueOf(messages.substring(198, 199)))
.epsStatus(Integer.valueOf(messages.substring(199, 200)))
.absStatus(Integer.valueOf(messages.substring(200, 201)))
.mcuStatus(Integer.valueOf(messages.substring(201, 202)))
.heatingStatus(Integer.valueOf(messages.substring(202, 203)))
.batteryStatus(Integer.valueOf(messages.substring(203, 204)))
.batteryInsulationStatus(Integer.valueOf(messages.substring(204, 205)))
.build();
}
}

View File

@ -0,0 +1,46 @@
package com.fivegroup.service;
import com.fivegroup.config.MqttChannel;
import com.fivegroup.utils.MqttCallBacks;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* mqttkafka
*
* @author ZhangXushuo
* @version 2023/12/1 - 19:00
*/
@Service
public class ShuntService {
@Autowired
MqttCallBacks mqttCallBack;
@Autowired
private MqttChannel mqttChannel;
/**
* 使@ServiceActivatorhandleMessage
* MQTTmqtt_receivedTopic
* message.getPayload()
*/
@PostConstruct
@ServiceActivator(inputChannel = "mqttInputChannel")
public void mqttAc() {
// 在这里处理接收到的MQTT消息
try {
IMqttClient client = mqttChannel.mqttClient();
client.connect();
client.subscribe("five");
client.setCallback(mqttCallBack);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,63 @@
package com.fivegroup.utils;
import com.alibaba.fastjson2.JSONObject;
import com.fivegroup.config.KafkaChannel;
import com.fivegroup.domain.VehicleData;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* mqtt
*
* @author ZhangXushuo
* @version 2023/12/1 - 19:42
*/
@Log4j2
@Component
public class MqttCallBacks implements MqttCallback {
@Autowired
KafkaChannel kafkaChannel;
/**
*
*/
@Override
public void connectionLost(Throwable throwable) {
}
/**
*
*/
@Override
public void messageArrived(String topci, MqttMessage mqttMessage) throws Exception {
String s1 = new String(mqttMessage.getPayload());
System.out.println(s1 + "数据" + s1);
StringBuilder sb = new StringBuilder();
String[] arr = s1.split(" ");
int length = arr.length;
for (int i = 0; i < length; i++) {
int ch = Integer.parseInt(arr[i], 16);
sb.append((char) ch);
}
VehicleData build = VehicleData.getBuild(sb.toString());
// String vin = build.getVin();
String vin ="five";
KafkaTemplate<String, String> kafkaed = kafkaChannel.kafkaTemplate();
kafkaed.send(vin, JSONObject.toJSONString(build));
}
/**
*
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}