diff --git a/.gitignore b/.gitignore index 549e00a..5ed48f7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ HELP.md target/ !.mvn/wrapper/maven-wrapper.jar !**/src/main/**/target/ -!**/src/test/**/target/ +!**/src/vehicleKafka/**/target/ ### STS ### .apt_generated @@ -27,7 +27,7 @@ target/ /.nb-gradle/ build/ !**/src/main/**/build/ -!**/src/test/**/build/ +!**/src/vehicleKafka/**/build/ ### VS Code ### .vscode/ diff --git a/pom.xml b/pom.xml index 947826d..9fe410b 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,11 @@ 17 + + + org.springframework.boot + spring-boot-starter-data-redis + org.springframework.kafka spring-kafka diff --git a/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java b/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java new file mode 100644 index 0000000..a717143 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/common/AnalyzeConfigInfo.java @@ -0,0 +1,40 @@ +package com.muyu.mqttmessage.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ClassName AnalyzeConfigInfo + * @Description 解析配置实体类 + * @Author Xin.Yao + * @Date 2024/6/26 下午2:08 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class AnalyzeConfigInfo { + /** + * 开始位置 + */ + private Integer startPosition; + /** + * 结束为止 + */ + private Integer endPosition; + /** + * 属性key + */ + private String attributeKey; + /** + * 标签 + */ + private String label; + /** + * 数据类型 + */ + private String type; + +} diff --git a/src/main/java/com/muyu/mqttmessage/common/Test.java b/src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java similarity index 85% rename from src/main/java/com/muyu/mqttmessage/common/Test.java rename to src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java index 9bf1fa4..cd5b598 100644 --- a/src/main/java/com/muyu/mqttmessage/common/Test.java +++ b/src/main/java/com/muyu/mqttmessage/common/VehicleKafka.java @@ -9,9 +9,8 @@ import lombok.Data; * @Date 2024/6/9 上午10:56 */ @Data -public class Test { +public class VehicleKafka { private Integer partitions; private String key; - private String data; private String consumerName; } diff --git a/src/main/java/com/muyu/mqttmessage/common/rest/Result.java b/src/main/java/com/muyu/mqttmessage/common/rest/Result.java new file mode 100644 index 0000000..d6e3d97 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/common/rest/Result.java @@ -0,0 +1,112 @@ +package com.muyu.mqttmessage.common.rest; + +import com.muyu.mqttmessage.constants.Constants; +import com.muyu.mqttmessage.constants.HttpStatus; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 响应信息主体 + * + * @author muyu + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Result implements Serializable { + /** + * 成功 + */ + public static final int SUCCESS = Constants.SUCCESS; + /** + * 失败 + */ + public static final int FAIL = Constants.FAIL; + /** + * 警告 + */ + public static final int WARN = HttpStatus.WARN; + + private static final long serialVersionUID = 1L; + private int code; + + private String msg; + + private T data; + + public static Result success () { + return restResult(null, SUCCESS, null); + } + + public static Result success (T data) { + return restResult(data, SUCCESS, null); + } + + public static Result success (T data, String msg) { + return restResult(data, SUCCESS, msg); + } + + public static Result error () { + return restResult(null, FAIL, null); + } + + public static Result error (String msg) { + return restResult(null, FAIL, msg); + } + + public static Result error (T data) { + return restResult(data, FAIL, null); + } + + public static Result error (T data, String msg) { + return restResult(data, FAIL, msg); + } + + public static Result error (int code, String msg) { + return restResult(null, code, msg); + } + + + + public static Result warn () { + return restResult(null, WARN, null); + } + + public static Result warn (String msg) { + return restResult(null, WARN, msg); + } + + public static Result warn (T data) { + return restResult(data, WARN, null); + } + + public static Result warn (T data, String msg) { + return restResult(data, WARN, msg); + } + + public static Result warn (int code, String msg) { + return restResult(null, code, msg); + } + + private static Result restResult (T data, int code, String msg) { + return Result.builder() + .code(code) + .data(data) + .msg(msg) + .build(); + } + + public static Boolean isError (Result ret) { + return !isSuccess(ret); + } + + public static Boolean isSuccess (Result ret) { + return Result.SUCCESS == ret.getCode(); + } + +} diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java b/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java deleted file mode 100644 index b1fd2b1..0000000 --- a/src/main/java/com/muyu/mqttmessage/config/MqttMessageConfig.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.muyu.mqttmessage.config; - -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Import; - -/** - * @ClassName DataAccessClientConfig - * @Description 描述 - * @Author Xin.Yao - * @Date 2024/5/9 19:52 - */ -@ComponentScan -@Import({MqttMessageRunner.class}) -public class MqttMessageConfig { -} diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java b/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java deleted file mode 100644 index 5e69418..0000000 --- a/src/main/java/com/muyu/mqttmessage/config/MqttMessageRunner.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.muyu.mqttmessage.config; - -import lombok.extern.log4j.Log4j2; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -/** - * @ClassName DataAccessClientRunner - * @Description 描述 - * @Author Xin.Yao - * @Date 2024/5/9 19:53 - */ -@Log4j2 -@Component -public class MqttMessageRunner implements ApplicationRunner { - - @Override - public void run(ApplicationArguments args) throws Exception { - - } -} diff --git a/src/main/java/com/muyu/mqttmessage/constants/Constants.java b/src/main/java/com/muyu/mqttmessage/constants/Constants.java new file mode 100644 index 0000000..82b3db0 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/constants/Constants.java @@ -0,0 +1,134 @@ +package com.muyu.mqttmessage.constants; + +/** + * 通用常量信息 + * + * @author muyu + */ +public class Constants { + /** + * UTF-8 字符集 + */ + public static final String UTF8 = "UTF-8"; + + /** + * GBK 字符集 + */ + public static final String GBK = "GBK"; + + /** + * www主域 + */ + public static final String WWW = "www."; + + /** + * RMI 远程方法调用 + */ + public static final String LOOKUP_RMI = "rmi:"; + + /** + * LDAP 远程方法调用 + */ + public static final String LOOKUP_LDAP = "ldap:"; + + /** + * LDAPS 远程方法调用 + */ + public static final String LOOKUP_LDAPS = "ldaps:"; + + /** + * http请求 + */ + public static final String HTTP = "http://"; + + /** + * https请求 + */ + public static final String HTTPS = "https://"; + + /** + * 成功标记 + */ + public static final Integer SUCCESS = 200; + + /** + * 失败标记 + */ + public static final Integer FAIL = 500; + + /** + * 登录成功状态 + */ + public static final String LOGIN_SUCCESS_STATUS = "0"; + + /** + * 登录失败状态 + */ + public static final String LOGIN_FAIL_STATUS = "1"; + + /** + * 登录成功 + */ + public static final String LOGIN_SUCCESS = "Success"; + + /** + * 注销 + */ + public static final String LOGOUT = "Logout"; + + /** + * 注册 + */ + public static final String REGISTER = "Register"; + + /** + * 登录失败 + */ + public static final String LOGIN_FAIL = "Error"; + + /** + * 当前记录起始索引 + */ + public static final String PAGE_NUM = "pageNum"; + + /** + * 每页显示记录数 + */ + public static final String PAGE_SIZE = "pageSize"; + + /** + * 排序列 + */ + public static final String ORDER_BY_COLUMN = "orderByColumn"; + + /** + * 排序的方向 "desc" 或者 "asc". + */ + public static final String IS_ASC = "isAsc"; + + /** + * 验证码有效期(分钟) + */ + public static final long CAPTCHA_EXPIRATION = 2; + + /** + * 资源映射路径 前缀 + */ + public static final String RESOURCE_PREFIX = "/profile"; + + /** + * 自动识别json对象白名单配置(仅允许解析的包名,范围越小越安全) + */ + public static final String[] JSON_WHITELIST_STR = {"org.springframework", "com.muyu"}; + + /** + * 定时任务白名单配置(仅允许访问的包名,如其他需要可以自行添加) + */ + public static final String[] JOB_WHITELIST_STR = {"com.muyu"}; + + /** + * 定时任务违规的字符 + */ + public static final String[] JOB_ERROR_STR = {"java.net.URL", "javax.naming.InitialContext", "org.yaml.snakeyaml", + "org.springframework", "org.apache", "com.muyu.common.core.utils.file"}; +} diff --git a/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java b/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java new file mode 100644 index 0000000..99bf740 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/constants/HttpStatus.java @@ -0,0 +1,93 @@ +package com.muyu.mqttmessage.constants; + +/** + * 返回状态码 + * + * @author muyu + */ +public class HttpStatus { + /** + * 操作成功 + */ + public static final int SUCCESS = 200; + + /** + * 对象创建成功 + */ + public static final int CREATED = 201; + + /** + * 请求已经被接受 + */ + public static final int ACCEPTED = 202; + + /** + * 操作已经执行成功,但是没有返回数据 + */ + public static final int NO_CONTENT = 204; + + /** + * 资源已被移除 + */ + public static final int MOVED_PERM = 301; + + /** + * 重定向 + */ + public static final int SEE_OTHER = 303; + + /** + * 资源没有被修改 + */ + public static final int NOT_MODIFIED = 304; + + /** + * 参数列表错误(缺少,格式不匹配) + */ + public static final int BAD_REQUEST = 400; + + /** + * 未授权 + */ + public static final int UNAUTHORIZED = 401; + + /** + * 访问受限,授权过期 + */ + public static final int FORBIDDEN = 403; + + /** + * 资源,服务未找到 + */ + public static final int NOT_FOUND = 404; + + /** + * 不允许的http方法 + */ + public static final int BAD_METHOD = 405; + + /** + * 资源冲突,或者资源被锁 + */ + public static final int CONFLICT = 409; + + /** + * 不支持的数据,媒体类型 + */ + public static final int UNSUPPORTED_TYPE = 415; + + /** + * 系统内部错误 + */ + public static final int ERROR = 500; + + /** + * 接口未实现 + */ + public static final int NOT_IMPLEMENTED = 501; + + /** + * 系统警告消息 + */ + public static final int WARN = 601; +} diff --git a/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java b/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java new file mode 100644 index 0000000..2ebbc39 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/constants/RedisConstants.java @@ -0,0 +1,11 @@ +package com.muyu.mqttmessage.constants; + +/** + * @ClassName RedisConstants + * @Description redis常量类 + * @Author Xin.Yao + * @Date 2024/6/26 下午2:12 + */ +public class RedisConstants { + public static final String ANALYZE_CONFIG = "analyze_config"; +} diff --git a/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java b/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java index a1318fc..0daec74 100644 --- a/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java +++ b/src/main/java/com/muyu/mqttmessage/consumer/KafkaConsumers.java @@ -1,17 +1,13 @@ package com.muyu.mqttmessage.consumer; -import com.muyu.mqttmessage.common.Test; +import com.muyu.mqttmessage.common.VehicleKafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.ProducerFactory; import org.springframework.stereotype.Component; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -29,7 +25,7 @@ public class KafkaConsumers { - public KafkaConsumer kafkaConsumer(Test test){ + public KafkaConsumer kafkaConsumer(VehicleKafka vehicleKafka){ Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -41,7 +37,7 @@ public class KafkaConsumers { KafkaConsumer consumer = new KafkaConsumer<>(properties); // 订阅主题分区 List topicPartitions = new ArrayList<>(); - topicPartitions.add(new TopicPartition("testKafka", test.getPartitions())); + topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions())); consumer.assign(topicPartitions); return consumer; diff --git a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java index 2092011..281f739 100644 --- a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java +++ b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java @@ -5,10 +5,12 @@ import com.muyu.mqttmessage.common.MqttMessageModel; import com.muyu.mqttmessage.config.MqttFactory; import com.muyu.mqttmessage.constants.RabbitMqConstant; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.NewTopic; import org.eclipse.paho.client.mqttv3.MqttClient; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -25,11 +27,14 @@ public class RabbitConsumer { private KafkaTemplate kafkaTemplate; @Autowired private MqttFactory mqttFactory; + @Autowired + private KafkaAdmin kafkaAdmin; @RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)}) public void monitorServer(String msg){ log.info("监听到的消息:{}",msg); MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class); - MqttClient mqttClient = mqttFactory.createMqttClient(mqttMessageModel); + mqttFactory.createMqttClient(mqttMessageModel); + kafkaAdmin.createOrModifyTopics(new NewTopic(mqttMessageModel.getBroker(),8,(short) 1)); log.info("{}服务器监听连接成功",mqttMessageModel.getTopic()); } diff --git a/src/main/java/com/muyu/mqttmessage/controller/TestController.java b/src/main/java/com/muyu/mqttmessage/controller/TestController.java index e14e823..69b0845 100644 --- a/src/main/java/com/muyu/mqttmessage/controller/TestController.java +++ b/src/main/java/com/muyu/mqttmessage/controller/TestController.java @@ -1,8 +1,7 @@ package com.muyu.mqttmessage.controller; -import com.muyu.mqttmessage.common.Test; +import com.muyu.mqttmessage.common.VehicleKafka; import com.muyu.mqttmessage.service.MqttKafkaService; -import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -18,18 +17,19 @@ public class TestController { @Autowired private MqttKafkaService mqttKafkaService; - @GetMapping("/Test") - public void Test(@RequestBody Test test) { - mqttKafkaService.test(test); - } @GetMapping("/Consumer") - public void consumer(@RequestBody Test test) { - mqttKafkaService.consumer(test); + public void consumer(@RequestBody VehicleKafka vehicleKafka) { + mqttKafkaService.consumer(vehicleKafka); } @GetMapping("/CloseConsumer") public void closeConsumer(@RequestParam("consumerName") String consumerName) { mqttKafkaService.closeConsumer(consumerName); } + + @GetMapping("/Test") + public void test() { + mqttKafkaService.test(); + } } diff --git a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java index 284e52d..10b388b 100644 --- a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java +++ b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java @@ -23,58 +23,72 @@ public class VehicleData { * VIN */ private String vin; + /** * 行驶路线 */ private String drivingRoute; + /** * 经度 */ private String longitude; + /** * 维度 */ private String latitude; + /** * 速度 */ private String speed; + /** * 里程 */ private BigDecimal mileage; + /** * 总电压 */ private String voltage; + /** * 总电流 */ private String current; + /** * 绝缘电阻 */ private String resistance; + /** * 档位 */ private String gear = "P"; + /** * 加速踏板行程值 */ private String accelerationPedal; + /** * 制动踏板行程值 */ private String brakePedal; + /** * 燃料消耗率 */ private String fuelConsumptionRate; + /** * 电机控制器温度 */ private String motorControllerTemperature; + /** * 电机转速 */ @@ -244,4 +258,5 @@ public class VehicleData { * CHG(充电机)状态 */ private int chgStatus = 1; + } diff --git a/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java b/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java index 1c1f580..5ab36e4 100644 --- a/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java +++ b/src/main/java/com/muyu/mqttmessage/service/MqttKafkaService.java @@ -1,7 +1,6 @@ package com.muyu.mqttmessage.service; -import com.muyu.mqttmessage.common.Test; -import org.springframework.stereotype.Component; +import com.muyu.mqttmessage.common.VehicleKafka; /** * @ClassName MqttKafkaService @@ -10,9 +9,10 @@ import org.springframework.stereotype.Component; * @Date 2024/6/9 上午11:05 */ public interface MqttKafkaService { - void test(Test test); - void consumer(Test test); + void consumer(VehicleKafka vehicleKafka); void closeConsumer(String consumerName); + + void test(); } diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java index 686447d..deac026 100644 --- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java @@ -1,19 +1,24 @@ package com.muyu.mqttmessage.service.impl; import com.alibaba.fastjson2.JSON; -import com.muyu.mqttmessage.common.Test; +import com.muyu.mqttmessage.common.AnalyzeConfigInfo; +import com.muyu.mqttmessage.common.VehicleKafka; +import com.muyu.mqttmessage.constants.RedisConstants; import com.muyu.mqttmessage.domain.VehicleData; import com.muyu.mqttmessage.utils.ConversionUtil; +import kong.unirest.json.JSONObject; import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; /** * @ClassName MqttCallBackConfig @@ -24,11 +29,14 @@ import java.math.BigDecimal; @Component @Log4j2 public class MqttCallBackServiceImpl implements MqttCallback { + private KafkaTemplate kafkaTemplate; public MqttCallBackServiceImpl(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } + @Autowired + private RedisTemplate redisTemplate; // @Autowired @@ -42,14 +50,10 @@ public class MqttCallBackServiceImpl implements MqttCallback { @Override public void messageArrived(String topic, MqttMessage message) { try { - VehicleData vehicleData = getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))); - String jsonString = JSON.toJSONString(vehicleData); - log.info("转化为对象:{}",jsonString); - Test test = new Test(); - test.setPartitions(1); - test.setKey("123"); - test.setData(jsonString); - kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData()); + JSONObject jsonObject = getJsonObject(ConversionUtil.hexStringToString(new String(message.getPayload()))); + Object o = redisTemplate.opsForHash().get("vehicleKafka", jsonObject.get("vin")); + VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class); + kafkaTemplate.send(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions(), vehicleKafka.getKey(),jsonObject.toString()); }catch (Exception e){ e.printStackTrace(); } @@ -61,6 +65,21 @@ public class MqttCallBackServiceImpl implements MqttCallback { System.out.println("deliveryComplete---------" + token.isComplete()); } + public JSONObject getJsonObject(String message) { + message = message.substring(1,message.length()-2); + StringBuffer stringBuffer = new StringBuffer(); + Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG,message.substring(0,17)); + List analyzeConfigInfos = new ArrayList<>(); + analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList(); + String finalMessage = message; + analyzeConfigInfos.forEach(analyzeConfigInfo -> { + stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ removeSuperfluousDigit(finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition()))+"\""); + }); + String jsonString = "{"+stringBuffer.substring(1)+"}"; + log.info("解析后的数据:{}",jsonString); + return new JSONObject(jsonString); + } + public VehicleData getVehicleData(String message) { message = message.substring(1,message.length()-2); return VehicleData.builder() @@ -161,7 +180,6 @@ public class MqttCallBackServiceImpl implements MqttCallback { .build(); } - public String removeSuperfluousDigit(String str){ if(str.length()>1){ if(str.charAt(0)=='0'){ diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java index a3caf0c..8558ae1 100644 --- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java +++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttKafkaServiceImpl.java @@ -1,18 +1,25 @@ package com.muyu.mqttmessage.service.impl; -import com.muyu.mqttmessage.common.Test; +import com.alibaba.fastjson2.JSON; +import com.muyu.mqttmessage.common.AnalyzeConfigInfo; +import com.muyu.mqttmessage.common.VehicleKafka; +import com.muyu.mqttmessage.constants.RedisConstants; import com.muyu.mqttmessage.consumer.KafkaConsumers; import com.muyu.mqttmessage.service.MqttKafkaService; +import com.muyu.mqttmessage.utils.ConversionUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -30,20 +37,19 @@ public class MqttKafkaServiceImpl implements MqttKafkaService { @Autowired private KafkaConsumers kafkaConsumers; - @Override - public void test(Test test) { - kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData()); - } + @Autowired + private RedisTemplate redisTemplate; + @Override - public void consumer(Test test) { - KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(test); - consumerMap.put(test.getConsumerName(),true); - while (consumerMap.containsKey(test.getConsumerName())){ + public void consumer(VehicleKafka vehicleKafka) { + KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(vehicleKafka); + consumerMap.put(vehicleKafka.getConsumerName(),true); + while (consumerMap.containsKey(vehicleKafka.getConsumerName())){ // 拉取消息 ConsumerRecords msg = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : msg) { - log.info("{}监听到的消息内容: {}",test.getConsumerName(),consumerRecord.value()); + log.info("{}监听到的消息内容: {}", vehicleKafka.getConsumerName(),consumerRecord.value()); } } consumer.close(); @@ -53,4 +59,22 @@ public class MqttKafkaServiceImpl implements MqttKafkaService { public void closeConsumer(String consumerName) { consumerMap.remove(consumerName); } + + @Override + public void test() { + String message = new String(); + message = "7E 56 49 4e 31 32 33 34 35 36 37 38 39 44 49 4a 45 34 31 37 31 37 35 37 34 33 39 33 35 33 33 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 32 33 2e 36 39 36 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 50 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 34 35 30 30 30 2e 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 14 7E"; + String str = ConversionUtil.hexStringToString(message); + message = str.substring(1,str.length()-2); + StringBuffer stringBuffer = new StringBuffer(); + Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG, "vin1"); + List analyzeConfigInfos = new ArrayList<>(); + analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList(); + String finalMessage = message; + analyzeConfigInfos.forEach(analyzeConfigInfo -> { + stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition())+"\""); + }); + String jsonString = "{"+stringBuffer.substring(1)+"}"; + log.info("解析后的数据:{}",jsonString); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b249fa4..11444a8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,4 +1,8 @@ spring: + redis: + host: 47.99.219.99 + port: 6379 + password: yx@123 application: name: mqtt-message jackson: @@ -9,51 +13,12 @@ spring: password: guest virtualHost: / port: 5672 - host: 43.142.44.217 + host: 47.99.219.99 listener: simple: prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条 publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue) -# kafka: -# bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的 -# producer: -# batch-size: 16384 #批量大小 -# acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) -# retries: 10 # 消息发送重试次数 -# #transaction-id-prefix: transaction -# buffer-memory: 33554432 -# key-serializer: org.apache.kafka.common.serialization.StringSerializer -# value-serializer: org.apache.kafka.common.serialization.StringSerializer -# properties: -# partitioner: -# class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner -# linger: -# ms: 2000 #提交延迟 -# #partitioner: #指定分区器 -# #class: pers.zhang.config.CustomerPartitionHandler -# consumer: -# group-id: testGroup #默认的消费组ID -# enable-auto-commit: true #是否自动提交offset -# auto-commit-interval: 2000 #提交offset延时 -# # 当kafka中没有初始offset或offset超出范围时将自动重置offset -# # earliest:重置为分区中最小的offset; -# # latest:重置为分区中最新的offset(消费分区中新产生的数据); -# # none:只要有一个分区不存在已提交的offset,就抛出异常; -# auto-offset-reset: latest -# max-poll-records: 500 #单次拉取消息的最大条数 -# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer -# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer -# properties: -# session: -# timeout: -# ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作) -# request: -# timeout: -# ms: 18000 # 消费请求的超时时间 -# listener: -# missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 -# # type: batch kafka: #config/consumer.properties配置的bootstrap.servers