From 70ce3ee31683edb1892d0218e2c484b820634d62 Mon Sep 17 00:00:00 2001
From: yaoxin <1752800946@qq.com>
Date: Wed, 26 Jun 2024 22:25:26 +0800
Subject: [PATCH] =?UTF-8?q?fix():=20=E4=BF=AE=E6=94=B9=E4=BA=86=E6=8A=A5?=
=?UTF-8?q?=E6=96=87=E8=A7=A3=E6=9E=90=E5=8A=9F=E8=83=BD,=E8=83=BD?=
=?UTF-8?q?=E6=A0=B9=E6=8D=AE=E8=BD=A6=E8=BE=86=E5=AF=B9=E5=BA=94=E7=9A=84?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90=E8=A7=84=E8=8C=83,=E5=AF=B9=E6=8A=A5?=
=?UTF-8?q?=E6=96=87=E8=BF=9B=E8=A1=8C=E8=A7=A3=E6=9E=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 4 +-
pom.xml | 5 +
.../mqttmessage/common/AnalyzeConfigInfo.java | 40 ++++++
.../common/{Test.java => VehicleKafka.java} | 3 +-
.../muyu/mqttmessage/common/rest/Result.java | 112 +++++++++++++++
.../mqttmessage/config/MqttMessageConfig.java | 15 --
.../mqttmessage/config/MqttMessageRunner.java | 22 ---
.../muyu/mqttmessage/constants/Constants.java | 134 ++++++++++++++++++
.../mqttmessage/constants/HttpStatus.java | 93 ++++++++++++
.../mqttmessage/constants/RedisConstants.java | 11 ++
.../mqttmessage/consumer/KafkaConsumers.java | 10 +-
.../mqttmessage/consumer/RabbitConsumer.java | 7 +-
.../controller/TestController.java | 16 +--
.../muyu/mqttmessage/domain/VehicleData.java | 15 ++
.../mqttmessage/service/MqttKafkaService.java | 8 +-
.../service/impl/MqttCallBackServiceImpl.java | 40 ++++--
.../service/impl/MqttKafkaServiceImpl.java | 44 ++++--
src/main/resources/application.yml | 45 +-----
18 files changed, 502 insertions(+), 122 deletions(-)
create mode 100644 src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java
rename src/main/java/com/muyu/mqttmessage/common/{Test.java => VehicleKafka.java} (85%)
create mode 100644 src/main/java/com/muyu/mqttmessage/common/rest/Result.java
delete mode 100644 src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java
delete mode 100644 src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java
create mode 100644 src/main/java/com/muyu/mqttmessage/constants/Constants.java
create mode 100644 src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java
create mode 100644 src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java
diff --git a/.gitignore b/.gitignore
index 549e00a..5ed48f7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,7 +2,7 @@ HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
-!**/src/test/**/target/
+!**/src/vehicleKafka/**/target/
### STS ###
.apt_generated
@@ -27,7 +27,7 @@ target/
/.nb-gradle/
build/
!**/src/main/**/build/
-!**/src/test/**/build/
+!**/src/vehicleKafka/**/build/
### VS Code ###
.vscode/
diff --git a/pom.xml b/pom.xml
index 947826d..9fe410b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,11 @@
17
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
org.springframework.kafka
spring-kafka
diff --git a/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java b/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java
new file mode 100644
index 0000000..a717143
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java
@@ -0,0 +1,40 @@
+package com.muyu.mqttmessage.common;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * @ClassName AnalyzeConfigInfo
+ * @Description 解析配置实体类
+ * @Author Xin.Yao
+ * @Date 2024/6/26 下午2:08
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@SuperBuilder
+public class AnalyzeConfigInfo {
+ /**
+ * 开始位置
+ */
+ private Integer startPosition;
+ /**
+ * 结束为止
+ */
+ private Integer endPosition;
+ /**
+ * 属性key
+ */
+ private String attributeKey;
+ /**
+ * 标签
+ */
+ private String label;
+ /**
+ * 数据类型
+ */
+ private String type;
+
+}
diff --git a/src/main/java/com/muyu/mqttmessage/common/Test.java b/src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java
similarity index 85%
rename from src/main/java/com/muyu/mqttmessage/common/Test.java
rename to src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java
index 9bf1fa4..cd5b598 100644
--- a/src/main/java/com/muyu/mqttmessage/common/Test.java
+++ b/src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java
@@ -9,9 +9,8 @@ import lombok.Data;
* @Date 2024/6/9 上午10:56
*/
@Data
-public class Test {
+public class VehicleKafka {
private Integer partitions;
private String key;
- private String data;
private String consumerName;
}
diff --git a/src/main/java/com/muyu/mqttmessage/common/rest/Result.java b/src/main/java/com/muyu/mqttmessage/common/rest/Result.java
new file mode 100644
index 0000000..d6e3d97
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/common/rest/Result.java
@@ -0,0 +1,112 @@
+package com.muyu.mqttmessage.common.rest;
+
+import com.muyu.mqttmessage.constants.Constants;
+import com.muyu.mqttmessage.constants.HttpStatus;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * 响应信息主体
+ *
+ * @author muyu
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Result implements Serializable {
+ /**
+ * 成功
+ */
+ public static final int SUCCESS = Constants.SUCCESS;
+ /**
+ * 失败
+ */
+ public static final int FAIL = Constants.FAIL;
+ /**
+ * 警告
+ */
+ public static final int WARN = HttpStatus.WARN;
+
+ private static final long serialVersionUID = 1L;
+ private int code;
+
+ private String msg;
+
+ private T data;
+
+ public static Result success () {
+ return restResult(null, SUCCESS, null);
+ }
+
+ public static Result success (T data) {
+ return restResult(data, SUCCESS, null);
+ }
+
+ public static Result success (T data, String msg) {
+ return restResult(data, SUCCESS, msg);
+ }
+
+ public static Result error () {
+ return restResult(null, FAIL, null);
+ }
+
+ public static Result error (String msg) {
+ return restResult(null, FAIL, msg);
+ }
+
+ public static Result error (T data) {
+ return restResult(data, FAIL, null);
+ }
+
+ public static Result error (T data, String msg) {
+ return restResult(data, FAIL, msg);
+ }
+
+ public static Result error (int code, String msg) {
+ return restResult(null, code, msg);
+ }
+
+
+
+ public static Result warn () {
+ return restResult(null, WARN, null);
+ }
+
+ public static Result warn (String msg) {
+ return restResult(null, WARN, msg);
+ }
+
+ public static Result warn (T data) {
+ return restResult(data, WARN, null);
+ }
+
+ public static Result warn (T data, String msg) {
+ return restResult(data, WARN, msg);
+ }
+
+ public static Result warn (int code, String msg) {
+ return restResult(null, code, msg);
+ }
+
+ private static Result restResult (T data, int code, String msg) {
+ return Result.builder()
+ .code(code)
+ .data(data)
+ .msg(msg)
+ .build();
+ }
+
+ public static Boolean isError (Result ret) {
+ return !isSuccess(ret);
+ }
+
+ public static Boolean isSuccess (Result ret) {
+ return Result.SUCCESS == ret.getCode();
+ }
+
+}
diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java b/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java
deleted file mode 100644
index b1fd2b1..0000000
--- a/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.muyu.mqttmessage.config;
-
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Import;
-
-/**
- * @ClassName DataAccessClientConfig
- * @Description 描述
- * @Author Xin.Yao
- * @Date 2024/5/9 19:52
- */
-@ComponentScan
-@Import({MqttMessageRunner.class})
-public class MqttMessageConfig {
-}
diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java b/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java
deleted file mode 100644
index 5e69418..0000000
--- a/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.muyu.mqttmessage.config;
-
-import lombok.extern.log4j.Log4j2;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-/**
- * @ClassName DataAccessClientRunner
- * @Description 描述
- * @Author Xin.Yao
- * @Date 2024/5/9 19:53
- */
-@Log4j2
-@Component
-public class MqttMessageRunner implements ApplicationRunner {
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
-
- }
-}
diff --git a/src/main/java/com/muyu/mqttmessage/constants/Constants.java b/src/main/java/com/muyu/mqttmessage/constants/Constants.java
new file mode 100644
index 0000000..82b3db0
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/constants/Constants.java
@@ -0,0 +1,134 @@
+package com.muyu.mqttmessage.constants;
+
+/**
+ * 通用常量信息
+ *
+ * @author muyu
+ */
+public class Constants {
+ /**
+ * UTF-8 字符集
+ */
+ public static final String UTF8 = "UTF-8";
+
+ /**
+ * GBK 字符集
+ */
+ public static final String GBK = "GBK";
+
+ /**
+ * www主域
+ */
+ public static final String WWW = "www.";
+
+ /**
+ * RMI 远程方法调用
+ */
+ public static final String LOOKUP_RMI = "rmi:";
+
+ /**
+ * LDAP 远程方法调用
+ */
+ public static final String LOOKUP_LDAP = "ldap:";
+
+ /**
+ * LDAPS 远程方法调用
+ */
+ public static final String LOOKUP_LDAPS = "ldaps:";
+
+ /**
+ * http请求
+ */
+ public static final String HTTP = "http://";
+
+ /**
+ * https请求
+ */
+ public static final String HTTPS = "https://";
+
+ /**
+ * 成功标记
+ */
+ public static final Integer SUCCESS = 200;
+
+ /**
+ * 失败标记
+ */
+ public static final Integer FAIL = 500;
+
+ /**
+ * 登录成功状态
+ */
+ public static final String LOGIN_SUCCESS_STATUS = "0";
+
+ /**
+ * 登录失败状态
+ */
+ public static final String LOGIN_FAIL_STATUS = "1";
+
+ /**
+ * 登录成功
+ */
+ public static final String LOGIN_SUCCESS = "Success";
+
+ /**
+ * 注销
+ */
+ public static final String LOGOUT = "Logout";
+
+ /**
+ * 注册
+ */
+ public static final String REGISTER = "Register";
+
+ /**
+ * 登录失败
+ */
+ public static final String LOGIN_FAIL = "Error";
+
+ /**
+ * 当前记录起始索引
+ */
+ public static final String PAGE_NUM = "pageNum";
+
+ /**
+ * 每页显示记录数
+ */
+ public static final String PAGE_SIZE = "pageSize";
+
+ /**
+ * 排序列
+ */
+ public static final String ORDER_BY_COLUMN = "orderByColumn";
+
+ /**
+ * 排序的方向 "desc" 或者 "asc".
+ */
+ public static final String IS_ASC = "isAsc";
+
+ /**
+ * 验证码有效期(分钟)
+ */
+ public static final long CAPTCHA_EXPIRATION = 2;
+
+ /**
+ * 资源映射路径 前缀
+ */
+ public static final String RESOURCE_PREFIX = "/profile";
+
+ /**
+ * 自动识别json对象白名单配置(仅允许解析的包名,范围越小越安全)
+ */
+ public static final String[] JSON_WHITELIST_STR = {"org.springframework", "com.muyu"};
+
+ /**
+ * 定时任务白名单配置(仅允许访问的包名,如其他需要可以自行添加)
+ */
+ public static final String[] JOB_WHITELIST_STR = {"com.muyu"};
+
+ /**
+ * 定时任务违规的字符
+ */
+ public static final String[] JOB_ERROR_STR = {"java.net.URL", "javax.naming.InitialContext", "org.yaml.snakeyaml",
+ "org.springframework", "org.apache", "com.muyu.common.core.utils.file"};
+}
diff --git a/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java b/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java
new file mode 100644
index 0000000..99bf740
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java
@@ -0,0 +1,93 @@
+package com.muyu.mqttmessage.constants;
+
+/**
+ * 返回状态码
+ *
+ * @author muyu
+ */
+public class HttpStatus {
+ /**
+ * 操作成功
+ */
+ public static final int SUCCESS = 200;
+
+ /**
+ * 对象创建成功
+ */
+ public static final int CREATED = 201;
+
+ /**
+ * 请求已经被接受
+ */
+ public static final int ACCEPTED = 202;
+
+ /**
+ * 操作已经执行成功,但是没有返回数据
+ */
+ public static final int NO_CONTENT = 204;
+
+ /**
+ * 资源已被移除
+ */
+ public static final int MOVED_PERM = 301;
+
+ /**
+ * 重定向
+ */
+ public static final int SEE_OTHER = 303;
+
+ /**
+ * 资源没有被修改
+ */
+ public static final int NOT_MODIFIED = 304;
+
+ /**
+ * 参数列表错误(缺少,格式不匹配)
+ */
+ public static final int BAD_REQUEST = 400;
+
+ /**
+ * 未授权
+ */
+ public static final int UNAUTHORIZED = 401;
+
+ /**
+ * 访问受限,授权过期
+ */
+ public static final int FORBIDDEN = 403;
+
+ /**
+ * 资源,服务未找到
+ */
+ public static final int NOT_FOUND = 404;
+
+ /**
+ * 不允许的http方法
+ */
+ public static final int BAD_METHOD = 405;
+
+ /**
+ * 资源冲突,或者资源被锁
+ */
+ public static final int CONFLICT = 409;
+
+ /**
+ * 不支持的数据,媒体类型
+ */
+ public static final int UNSUPPORTED_TYPE = 415;
+
+ /**
+ * 系统内部错误
+ */
+ public static final int ERROR = 500;
+
+ /**
+ * 接口未实现
+ */
+ public static final int NOT_IMPLEMENTED = 501;
+
+ /**
+ * 系统警告消息
+ */
+ public static final int WARN = 601;
+}
diff --git a/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java b/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java
new file mode 100644
index 0000000..2ebbc39
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java
@@ -0,0 +1,11 @@
+package com.muyu.mqttmessage.constants;
+
+/**
+ * @ClassName RedisConstants
+ * @Description redis常量类
+ * @Author Xin.Yao
+ * @Date 2024/6/26 下午2:12
+ */
+public class RedisConstants {
+ public static final String ANALYZE_CONFIG = "analyze_config";
+}
diff --git a/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java b/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java
index a1318fc..0daec74 100644
--- a/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java
+++ b/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java
@@ -1,17 +1,13 @@
package com.muyu.mqttmessage.consumer;
-import com.muyu.mqttmessage.common.Test;
+import com.muyu.mqttmessage.common.VehicleKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -29,7 +25,7 @@ public class KafkaConsumers {
- public KafkaConsumer kafkaConsumer(Test test){
+ public KafkaConsumer kafkaConsumer(VehicleKafka vehicleKafka){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -41,7 +37,7 @@ public class KafkaConsumers {
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// 订阅主题分区
List topicPartitions = new ArrayList<>();
- topicPartitions.add(new TopicPartition("testKafka", test.getPartitions()));
+ topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions()));
consumer.assign(topicPartitions);
return consumer;
diff --git a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
index 2092011..281f739 100644
--- a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
+++ b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
@@ -5,10 +5,12 @@ import com.muyu.mqttmessage.common.MqttMessageModel;
import com.muyu.mqttmessage.config.MqttFactory;
import com.muyu.mqttmessage.constants.RabbitMqConstant;
import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.admin.NewTopic;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@@ -25,11 +27,14 @@ public class RabbitConsumer {
private KafkaTemplate kafkaTemplate;
@Autowired
private MqttFactory mqttFactory;
+ @Autowired
+ private KafkaAdmin kafkaAdmin;
@RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)})
public void monitorServer(String msg){
log.info("监听到的消息:{}",msg);
MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
- MqttClient mqttClient = mqttFactory.createMqttClient(mqttMessageModel);
+ mqttFactory.createMqttClient(mqttMessageModel);
+ kafkaAdmin.createOrModifyTopics(new NewTopic(mqttMessageModel.getBroker(),8,(short) 1));
log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
}
diff --git a/src/main/java/com/muyu/mqttmessage/controller/TestController.java b/src/main/java/com/muyu/mqttmessage/controller/TestController.java
index e14e823..69b0845 100644
--- a/src/main/java/com/muyu/mqttmessage/controller/TestController.java
+++ b/src/main/java/com/muyu/mqttmessage/controller/TestController.java
@@ -1,8 +1,7 @@
package com.muyu.mqttmessage.controller;
-import com.muyu.mqttmessage.common.Test;
+import com.muyu.mqttmessage.common.VehicleKafka;
import com.muyu.mqttmessage.service.MqttKafkaService;
-import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -18,18 +17,19 @@ public class TestController {
@Autowired
private MqttKafkaService mqttKafkaService;
- @GetMapping("/Test")
- public void Test(@RequestBody Test test) {
- mqttKafkaService.test(test);
- }
@GetMapping("/Consumer")
- public void consumer(@RequestBody Test test) {
- mqttKafkaService.consumer(test);
+ public void consumer(@RequestBody VehicleKafka vehicleKafka) {
+ mqttKafkaService.consumer(vehicleKafka);
}
@GetMapping("/CloseConsumer")
public void closeConsumer(@RequestParam("consumerName") String consumerName) {
mqttKafkaService.closeConsumer(consumerName);
}
+
+ @GetMapping("/Test")
+ public void test() {
+ mqttKafkaService.test();
+ }
}
diff --git a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
index 284e52d..10b388b 100644
--- a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
+++ b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
@@ -23,58 +23,72 @@ public class VehicleData {
* VIN
*/
private String vin;
+
/**
* 行驶路线
*/
private String drivingRoute;
+
/**
* 经度
*/
private String longitude;
+
/**
* 维度
*/
private String latitude;
+
/**
* 速度
*/
private String speed;
+
/**
* 里程
*/
private BigDecimal mileage;
+
/**
* 总电压
*/
private String voltage;
+
/**
* 总电流
*/
private String current;
+
/**
* 绝缘电阻
*/
private String resistance;
+
/**
* 档位
*/
private String gear = "P";
+
/**
* 加速踏板行程值
*/
private String accelerationPedal;
+
/**
* 制动踏板行程值
*/
private String brakePedal;
+
/**
* 燃料消耗率
*/
private String fuelConsumptionRate;
+
/**
* 电机控制器温度
*/
private String motorControllerTemperature;
+
/**
* 电机转速
*/
@@ -244,4 +258,5 @@ public class VehicleData {
* CHG(充电机)状态
*/
private int chgStatus = 1;
+
}
diff --git a/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java b/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java
index 1c1f580..5ab36e4 100644
--- a/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java
+++ b/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java
@@ -1,7 +1,6 @@
package com.muyu.mqttmessage.service;
-import com.muyu.mqttmessage.common.Test;
-import org.springframework.stereotype.Component;
+import com.muyu.mqttmessage.common.VehicleKafka;
/**
* @ClassName MqttKafkaService
@@ -10,9 +9,10 @@ import org.springframework.stereotype.Component;
* @Date 2024/6/9 上午11:05
*/
public interface MqttKafkaService {
- void test(Test test);
- void consumer(Test test);
+ void consumer(VehicleKafka vehicleKafka);
void closeConsumer(String consumerName);
+
+ void test();
}
diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
index 686447d..deac026 100644
--- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
+++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
@@ -1,19 +1,24 @@
package com.muyu.mqttmessage.service.impl;
import com.alibaba.fastjson2.JSON;
-import com.muyu.mqttmessage.common.Test;
+import com.muyu.mqttmessage.common.AnalyzeConfigInfo;
+import com.muyu.mqttmessage.common.VehicleKafka;
+import com.muyu.mqttmessage.constants.RedisConstants;
import com.muyu.mqttmessage.domain.VehicleData;
import com.muyu.mqttmessage.utils.ConversionUtil;
+import kong.unirest.json.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
/**
* @ClassName MqttCallBackConfig
@@ -24,11 +29,14 @@ import java.math.BigDecimal;
@Component
@Log4j2
public class MqttCallBackServiceImpl implements MqttCallback {
+
private KafkaTemplate kafkaTemplate;
public MqttCallBackServiceImpl(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
+ @Autowired
+ private RedisTemplate redisTemplate;
// @Autowired
@@ -42,14 +50,10 @@ public class MqttCallBackServiceImpl implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
- VehicleData vehicleData = getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())));
- String jsonString = JSON.toJSONString(vehicleData);
- log.info("转化为对象:{}",jsonString);
- Test test = new Test();
- test.setPartitions(1);
- test.setKey("123");
- test.setData(jsonString);
- kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
+ JSONObject jsonObject = getJsonObject(ConversionUtil.hexStringToString(new String(message.getPayload())));
+ Object o = redisTemplate.opsForHash().get("vehicleKafka", jsonObject.get("vin"));
+ VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
+ kafkaTemplate.send(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions(), vehicleKafka.getKey(),jsonObject.toString());
}catch (Exception e){
e.printStackTrace();
}
@@ -61,6 +65,21 @@ public class MqttCallBackServiceImpl implements MqttCallback {
System.out.println("deliveryComplete---------" + token.isComplete());
}
+ public JSONObject getJsonObject(String message) {
+ message = message.substring(1,message.length()-2);
+ StringBuffer stringBuffer = new StringBuffer();
+ Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG,message.substring(0,17));
+ List analyzeConfigInfos = new ArrayList<>();
+ analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList();
+ String finalMessage = message;
+ analyzeConfigInfos.forEach(analyzeConfigInfo -> {
+ stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ removeSuperfluousDigit(finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition()))+"\"");
+ });
+ String jsonString = "{"+stringBuffer.substring(1)+"}";
+ log.info("解析后的数据:{}",jsonString);
+ return new JSONObject(jsonString);
+ }
+
public VehicleData getVehicleData(String message) {
message = message.substring(1,message.length()-2);
return VehicleData.builder()
@@ -161,7 +180,6 @@ public class MqttCallBackServiceImpl implements MqttCallback {
.build();
}
-
public String removeSuperfluousDigit(String str){
if(str.length()>1){
if(str.charAt(0)=='0'){
diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java
index a3caf0c..8558ae1 100644
--- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java
+++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java
@@ -1,18 +1,25 @@
package com.muyu.mqttmessage.service.impl;
-import com.muyu.mqttmessage.common.Test;
+import com.alibaba.fastjson2.JSON;
+import com.muyu.mqttmessage.common.AnalyzeConfigInfo;
+import com.muyu.mqttmessage.common.VehicleKafka;
+import com.muyu.mqttmessage.constants.RedisConstants;
import com.muyu.mqttmessage.consumer.KafkaConsumers;
import com.muyu.mqttmessage.service.MqttKafkaService;
+import com.muyu.mqttmessage.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -30,20 +37,19 @@ public class MqttKafkaServiceImpl implements MqttKafkaService {
@Autowired
private KafkaConsumers kafkaConsumers;
- @Override
- public void test(Test test) {
- kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
- }
+ @Autowired
+ private RedisTemplate redisTemplate;
+
@Override
- public void consumer(Test test) {
- KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(test);
- consumerMap.put(test.getConsumerName(),true);
- while (consumerMap.containsKey(test.getConsumerName())){
+ public void consumer(VehicleKafka vehicleKafka) {
+ KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(vehicleKafka);
+ consumerMap.put(vehicleKafka.getConsumerName(),true);
+ while (consumerMap.containsKey(vehicleKafka.getConsumerName())){
// 拉取消息
ConsumerRecords msg = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord consumerRecord : msg) {
- log.info("{}监听到的消息内容: {}",test.getConsumerName(),consumerRecord.value());
+ log.info("{}监听到的消息内容: {}", vehicleKafka.getConsumerName(),consumerRecord.value());
}
}
consumer.close();
@@ -53,4 +59,22 @@ public class MqttKafkaServiceImpl implements MqttKafkaService {
public void closeConsumer(String consumerName) {
consumerMap.remove(consumerName);
}
+
+ @Override
+ public void test() {
+ String message = new String();
+ message = "7E 56 49 4e 31 32 33 34 35 36 37 38 39 44 49 4a 45 34 31 37 31 37 35 37 34 33 39 33 35 33 33 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 32 33 2e 36 39 36 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 50 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 34 35 30 30 30 2e 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 14 7E";
+ String str = ConversionUtil.hexStringToString(message);
+ message = str.substring(1,str.length()-2);
+ StringBuffer stringBuffer = new StringBuffer();
+ Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG, "vin1");
+ List analyzeConfigInfos = new ArrayList<>();
+ analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList();
+ String finalMessage = message;
+ analyzeConfigInfos.forEach(analyzeConfigInfo -> {
+ stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition())+"\"");
+ });
+ String jsonString = "{"+stringBuffer.substring(1)+"}";
+ log.info("解析后的数据:{}",jsonString);
+ }
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b249fa4..11444a8 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,4 +1,8 @@
spring:
+ redis:
+ host: 47.99.219.99
+ port: 6379
+ password: yx@123
application:
name: mqtt-message
jackson:
@@ -9,51 +13,12 @@ spring:
password: guest
virtualHost: /
port: 5672
- host: 43.142.44.217
+ host: 47.99.219.99
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
-# kafka:
-# bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
-# producer:
-# batch-size: 16384 #批量大小
-# acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
-# retries: 10 # 消息发送重试次数
-# #transaction-id-prefix: transaction
-# buffer-memory: 33554432
-# key-serializer: org.apache.kafka.common.serialization.StringSerializer
-# value-serializer: org.apache.kafka.common.serialization.StringSerializer
-# properties:
-# partitioner:
-# class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
-# linger:
-# ms: 2000 #提交延迟
-# #partitioner: #指定分区器
-# #class: pers.zhang.config.CustomerPartitionHandler
-# consumer:
-# group-id: testGroup #默认的消费组ID
-# enable-auto-commit: true #是否自动提交offset
-# auto-commit-interval: 2000 #提交offset延时
-# # 当kafka中没有初始offset或offset超出范围时将自动重置offset
-# # earliest:重置为分区中最小的offset;
-# # latest:重置为分区中最新的offset(消费分区中新产生的数据);
-# # none:只要有一个分区不存在已提交的offset,就抛出异常;
-# auto-offset-reset: latest
-# max-poll-records: 500 #单次拉取消息的最大条数
-# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-# properties:
-# session:
-# timeout:
-# ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
-# request:
-# timeout:
-# ms: 18000 # 消费请求的超时时间
-# listener:
-# missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
-# # type: batch
kafka:
#config/consumer.properties配置的bootstrap.servers