diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml
new file mode 100644
index 0000000..44dbe21
--- /dev/null
+++ b/cloud-common/cloud-common-kafka/pom.xml
@@ -0,0 +1,35 @@
+
+
+ 4.0.0
+
+ com.muyu
+ cloud-common
+ 3.6.3
+
+
+ cloud-common-kafka
+
+
+ 17
+ 17
+ UTF-8
+
+
+
+
+
+ com.muyu
+ cloud-common-redis
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.0.0
+
+
+
+
+
diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..8055e42
--- /dev/null
+++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java
@@ -0,0 +1,54 @@
+package com.muyu.common.kafka.config;
+
+import com.muyu.common.kafka.constants.KafkaConstants;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka 消息的消费者 配置类
+ */
+@Configuration
+public class KafkaConsumerConfig {
+
+ @Bean
+ public KafkaConsumer kafkaConsumer() {
+ Map configs = new HashMap<>();
+ //kafka服务端的IP和端口,格式:(ip:port)
+ configs.put("bootstrap.servers", "60.204.221.52:9092");
+ //开启consumer的偏移量(offset)自动提交到Kafka
+ configs.put("enable.auto.commit", true);
+ //consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒
+ configs.put("auto.commit.interval", 5000);
+ //在Kafka中没有初始化偏移量或者当前偏移量不存在情况
+ //earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量
+ //latest, 在偏移量无效的情况下, 自动重置为最新的偏移量
+ //none, 在偏移量无效的情况下, 抛出异常.
+ configs.put("auto.offset.reset", "latest");
+ //请求阻塞的最大时间(毫秒)
+ configs.put("fetch.max.wait", 500);
+ //请求应答的最小字节数
+ configs.put("fetch.min.size", 1);
+ //心跳间隔时间(毫秒)
+ configs.put("heartbeat-interval", 3000);
+ //一次调用poll返回的最大记录条数
+ configs.put("max.poll.records", 500);
+ //指定消费组
+ configs.put("group.id", KafkaConstants.KafkaGrop);
+ //指定key使用的反序列化类
+ Deserializer keyDeserializer = new StringDeserializer();
+ //指定value使用的反序列化类
+ Deserializer valueDeserializer = new StringDeserializer();
+ //创建Kafka消费者
+ KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer);
+ return kafkaConsumer;
+ }
+
+}
diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java
new file mode 100644
index 0000000..07b56d3
--- /dev/null
+++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java
@@ -0,0 +1,45 @@
+package com.muyu.common.kafka.config;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka 消息的生产者 配置类
+ */
+@Configuration
+public class KafkaProviderConfig {
+
+ @Bean
+ public KafkaProducer kafkaProducer() {
+ Map configs = new HashMap<>();
+ //#kafka服务端的IP和端口,格式:(ip:port)
+ configs.put("bootstrap.servers", "47.116.173.119:9092");
+ //客户端发送服务端失败的重试次数
+ configs.put("retries", 2);
+ //多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求.
+ //此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位)
+ configs.put("batch.size", 16384);
+ //生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位)
+ configs.put("buffer-memory", 33554432);
+ //生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
+ //acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
+ //acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失.
+ //acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失.
+ configs.put("acks", "-1");
+ //指定key使用的序列化类
+ Serializer keySerializer = new StringSerializer();
+ //指定value使用的序列化类
+ Serializer valueSerializer = new StringSerializer();
+ //创建Kafka生产者
+ KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer);
+ return kafkaProducer;
+ }
+
+}
diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java
new file mode 100644
index 0000000..d4c3d13
--- /dev/null
+++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java
@@ -0,0 +1,14 @@
+package com.muyu.common.kafka.constants;
+
+/**
+ * @Author: 胡杨
+ * @date: 2024/7/10
+ * @Description: kafka常量
+ * @Version 1.0.0
+ */
+public class KafkaConstants {
+
+ public final static String KafkaTopic = "kafka_topic";
+
+ public final static String KafkaGrop = "kafka_grop";
+}
diff --git a/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000..f4a1fdb
--- /dev/null
+++ b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1,2 @@
+com.muyu.common.kafka.config.KafkaConsumerConfig
+com.muyu.common.kafka.config.KafkaProviderConfig
diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml
index a7a40be..60e1c14 100644
--- a/cloud-common/pom.xml
+++ b/cloud-common/pom.xml
@@ -21,6 +21,7 @@
cloud-common-xxl
cloud-common-rabbit
cloud-common-saas
+ cloud-common-kafka
cloud-common
diff --git a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/service/Impl/CarMessageServiceImpl.java b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/service/Impl/CarMessageServiceImpl.java
index 0619325..827d3c3 100644
--- a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/service/Impl/CarMessageServiceImpl.java
+++ b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/service/Impl/CarMessageServiceImpl.java
@@ -77,7 +77,7 @@ public class CarMessageServiceImpl implements CarMessageService {
stringBuilder.append((char) inciseindex);
}
//切取车辆VIN
- String substring = stringBuilder.substring(0, 17);
+ String substring = stringBuilder.substring(1, 18);
log.info("车辆的VIN码:" + substring);
//根据给定的vehicleVin(车辆VIN号)获取对应的模板车辆分类carMessageCartype
Long selectcared = carMessageMapper.selectcarMessageCartype(substring);
diff --git a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/Kafka.java b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/Kafka.java
index fe39bf9..0d12b33 100644
--- a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/Kafka.java
+++ b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/Kafka.java
@@ -12,7 +12,7 @@ public class Kafka {
public static void main(String[] args) {
//配置kafka生产者
Properties properties = new Properties();
- properties.put("bootstrap.servers", "http://49.235.108.160:9092");
+ properties.put("bootstrap.servers", "http://60.204.221.52:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka生产者
diff --git a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/MQTT.java b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/MQTT.java
index ae3050e..0908b59 100644
--- a/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/MQTT.java
+++ b/cloud-modules/cloud-modules-car/src/main/java/com/muyu/car/test/MQTT.java
@@ -1,48 +1,166 @@
package com.muyu.car.test;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.muyu.car.domain.CarMessage;
+import com.muyu.car.mapper.CarMessageMapper;
+import com.muyu.car.service.CarMessageService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+@Component
+@Slf4j
public class MQTT {
+ @Resource
+ private CarMessageService carMessageService;
- public static void main(String[] args) {
+ @Resource
+ private CarMessageMapper carMessageMapper;
+
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @Resource
+ private KafkaProducer kafkaProducer;
+
+ @PostConstruct
+ public void test() {
+
+ // 设置消息主题
String topic = "vehicle";
+ // 设置消息内容
String content = "Message from MqttPublishSample";
+ // 设置消息质量等级
int qos = 2;
- String broker = "tcp://127.0.0.1:1883";
+ // 设置MQTT代理服务器地址
+ String broker = "tcp://106.15.136.7:1883";
+ // 设置客户端ID
String clientId = "JavaSample";
try {
- // 第三个参数为空,默认持久化策略
+ // 创建MQTT客户端实例,第三个参数为空,表示使用默认的持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
+ // 创建连接选项实例
MqttConnectOptions connOpts = new MqttConnectOptions();
+ // 设置连接选项,表示每次连接时都清除会话信息
connOpts.setCleanSession(true);
+ // 输出正在连接的代理服务器地址
System.out.println("Connecting to broker: "+broker);
+ // 连接MQTT代理服务器
sampleClient.connect(connOpts);
+ // 订阅指定主题
sampleClient.subscribe(topic,0);
+ // 设置回调函数,处理MQTT事件
sampleClient.setCallback(new MqttCallback() {
- // 连接丢失
+ // 处理连接丢失事件
@Override
public void connectionLost(Throwable throwable) {
}
- // 连接成功
+ // 处理消息到达事件
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
+ String string = new String(mqttMessage.getPayload());
+ JSONObject object = carMessageService.inciseCarMessage(string);
+ System.out.println(object);
}
- // 接收信息
+
+ public JSONObject inciseCarMessage(String testString) {
+ //根据空格拆分切割数据字符串
+ String[] split = testString.split(" ");
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String conversion : split) {
+ //将16进制字符串转换为对应的10进制
+ int inciseindex = Integer.parseInt(conversion, 16);
+ // 将10进制转换为对应的字符
+ stringBuilder.append((char) inciseindex);
+ }
+ //切取车辆VIN
+ String substring = stringBuilder.substring(1, 18);
+ log.info("车辆的VIN码:" + substring);
+ //根据给定的vehicleVin(车辆VIN号)获取对应的模板车辆分类carMessageCartype
+ Long selectcared = carMessageMapper.selectcarMessageCartype(substring);
+ //根据给定的vehicleVin(车辆VIN号)获取对应的模板信息
+ // List carInformations = carMessageMapper.selectcarInformationType(substring);
+ //创建接受数据的数组
+ List carMessagesList ;
+
+ try{
+ String redisKey = "carMessageList" + selectcared;
+ if (redisTemplate.hasKey(redisKey)){
+ List list = redisTemplate.opsForList().range(redisKey , 0, -1);
+ carMessagesList =
+ list.stream().map(objects ->
+ JSON.parseObject(objects.toString(), CarMessage.class))
+ .toList();
+ log.info("Redis缓存查询成功");
+ }else {
+ carMessagesList = carMessageMapper.selectCarMessageList(Math.toIntExact(selectcared));
+
+ carMessagesList.forEach(
+ listReq -> redisTemplate.opsForList().rightPushAll(redisKey, (Collection) listReq)
+ );
+ log.info("数据库查询成功");
+ }
+ log.info("获取失败,请重试");
+ }catch(Exception e){
+ throw new RuntimeException("获取报文模板失败");
+ }
+ //判断报文模板 列表 不为空
+ if(carMessagesList.isEmpty()){
+ throw new RuntimeException("报文模版为空");
+ }
+ //存储报文模板解析后的数据
+ JSONObject jsonObject = new JSONObject();
+ for (CarMessage carMessage : carMessagesList) {
+ //起始位下标
+ Integer startIndex = carMessage.getCarMessageStartIndex();
+ //结束位下标
+ Integer endIndex = carMessage.getCarMessageEndIndex();
+ //根据报文模板获取保温截取位置
+ String value = stringBuilder.substring(startIndex, endIndex);
+ //存入数据
+ jsonObject.put(carMessage.getMessageTypeName(), value);
+
+ }
+ return jsonObject;
+ }
+
+
+
+
+
+ // 处理消息发送完成事件
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ // 输出消息是否成功发送完成
+ System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
});
} catch(MqttException me) {
+ // 输出异常的原因代码
System.out.println("reason "+me.getReasonCode());
+ // 输出异常的消息
System.out.println("msg "+me.getMessage());
+ // 输出异常的本地化消息
System.out.println("loc "+me.getLocalizedMessage());
+ // 输出异常的根源
System.out.println("cause "+me.getCause());
+ // 输出异常的对象
System.out.println("excep "+me);
+ // 打印异常的堆栈跟踪信息
me.printStackTrace();
}
}
diff --git a/cloud-modules/pom.xml b/cloud-modules/pom.xml
index 516b222..faf257c 100644
--- a/cloud-modules/pom.xml
+++ b/cloud-modules/pom.xml
@@ -13,6 +13,7 @@
cloud-modules-gen
cloud-modules-file
cloud-modules-car
+
cloud-modules