diff --git a/JavaSample-tcp1061513671883/.lck b/JavaSample-tcp1061513671883/.lck new file mode 100644 index 0000000..e69de29 diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java new file mode 100644 index 0000000..c7980b4 --- /dev/null +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java @@ -0,0 +1,19 @@ +package com.muyu.common.core.constant; + +/** + * kafka常量信息 + * @Author:李庆帅 + * @Package:com.muyu.common.core.constant + * @Project:cloud-server + * @name:KafkaConstants + * @Date:2024/9/29 16:41 + */ +public class KafkaConstants { + + /** + * 协议解析报文传递数据(队列名称) + */ + public final static String MESSAGE_PARSING = "MessageParsing"; + + +} diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java new file mode 100644 index 0000000..84e6fa6 --- /dev/null +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java @@ -0,0 +1,16 @@ +package com.muyu.common.core.constant; + +/** + * redis常量信息 + * @Author:李庆帅 + * @Package:com.muyu.common.core.constant + * @Project:cloud-server + * @name:RedisConstants + * @Date:2024/9/29 17:28 + */ +public class RedisConstants { + /** + * redisKey(协议解析报文传递) + */ + public final static String MESSAGE_TEMPLATE = "messageTemplate"; +} diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index 16ec61d..81d6907 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -11,6 +11,10 @@ cloud-common-kafka + + cloud-common-kafka + + 17 17 diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java index 9883e2e..e739722 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java @@ -107,7 +107,7 @@ public class MessageValueController extends BaseController * @param templateId 请求对象 * @return 返回结果 */ - @GetMapping("/messageValue/findByTemplateId/{templateId}") + @GetMapping("/findByTemplateId/{templateId}") @Operation(summary = "根据报文模版id查询报文数据", description = "根据报文模版id查询报文数据") public Result> findByTemplateId(@PathVariable("templateId") Long templateId){ List list = messageValueService.findByTemplateId(templateId); diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml index e6f5c4c..c6ad48f 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml +++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml @@ -100,6 +100,11 @@ com.muyu cloud-common-core + + + com.muyu + cloud-common-kafka + diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java deleted file mode 100644 index 2ba02a7..0000000 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.muyu.analysis.parsing.MQTT; - -import com.muyu.analysis.parsing.controller.ParsingController; -import org.eclipse.paho.client.mqttv3.*; - -/** - * 测试MQTT - * @ClassName demo - * @Description 描述 - * @Author 李庆帅 - * @Date 2024/9/28 - */ -public class DemoMQTT { - - public void main(String[] args) { - - String topic = "vehicle"; - String content = "Message from MqttPublishSample"; - int qos = 2; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; - - - - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: "+broker); - sampleClient.connect(connOpts); - sampleClient.subscribe(topic,0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - - } - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - System.out.println(new String(mqttMessage.getPayload())); - } - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - }); - } catch(MqttException me) { - System.out.println("reason "+me.getReasonCode()); - System.out.println("msg "+me.getMessage()); - System.out.println("loc "+me.getLocalizedMessage()); - System.out.println("cause "+me.getCause()); - System.out.println("excep "+me); - me.printStackTrace(); - } - } - - -} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java similarity index 70% rename from cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java rename to cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java index 909319a..09cffec 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java @@ -1,78 +1,86 @@ -package com.muyu.analysis.parsing.service.impl; +package com.muyu.analysis.parsing.MQTT; +import com.muyu.analysis.parsing.remote.RemoteClientService; +import com.muyu.common.core.constant.KafkaConstants; +import com.muyu.common.core.constant.RedisConstants; +import com.muyu.common.core.domain.Result; +import com.muyu.enterprise.domain.resp.car.MessageValueListResp; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; import cn.hutool.json.JSONObject; import com.alibaba.fastjson.JSON; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.muyu.analysis.parsing.controller.ParsingController; -import com.muyu.analysis.parsing.remote.RemoteClientService; -import com.muyu.analysis.parsing.mapper.ParsingMapper; -import com.muyu.analysis.parsing.service.ParsingService; -import com.muyu.common.core.domain.Result; -import com.muyu.enterprise.domain.car.MessageValue; -import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; -import javax.annotation.Resource; import java.util.List; /** - * 协议解析实现层 + * 协议解析处理数据发送传送到队列 * @Author:李庆帅 - * @Package:com.muyu.analysis.parsing.service.impl + * @Package:com.muyu.analysis.parsing.MQTT * @Project:cloud-server - * @name:ParsingServiceImpl - * @Date:2024/9/28 20:53 + * @name:ParsingMQTT + * @Date:2024/9/29 16:08 */ @Log4j2 -@Service -public class ParsingServiceImpl extends ServiceImpl - implements ParsingService -{ +@Component +public class ParsingMQTT { + @Resource private RedisTemplate redisTemplate; @Autowired - private RemoteClientService remoteServiceClientService; - - - - @Override - public void mqtt() { - String topic = "vehicle"; - String content = "Message from MqttPublishSample"; - int qos = 2; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; + private RemoteClientService remoteServiceClient; + @Resource + private KafkaProducer kafkaProducer; + /** + * 协议解析 + */ + @PostConstruct + public void mqttClient() { + String topic = "vehicle"; + String broker = "tcp://106.15.136.7:1883"; + String clientId = "JavaSample"; try { // 第三个参数为空,默认持久化策略 MqttClient sampleClient = new MqttClient(broker, clientId); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); - System.out.println("Connecting to broker: "+broker); + System.out.println("Connecting to broker: " + broker); sampleClient.connect(connOpts); - sampleClient.subscribe(topic,0); + sampleClient.subscribe(topic, 0); sampleClient.setCallback(new MqttCallback() { // 连接丢失 @Override public void connectionLost(Throwable throwable) { } + // 连接成功 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println(new String(mqttMessage.getPayload())); - String mqtt= new String(mqttMessage.getPayload()); - JSONObject jsonObject = this.protocolParsing(mqtt); - System.out.println("转换后:"+jsonObject); + JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload())); + + ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, + entries.toString() ); + kafkaProducer.send(producerRecord); + System.out.println(entries); } + /** + * 协议解析 + * @param messageStr + * @return + */ public JSONObject protocolParsing(String messageStr) { //根据空格切割数据 String[] hexArray = messageStr.split(" "); @@ -86,12 +94,12 @@ public class ParsingServiceImpl extends ServiceImpl String vehicleVin = result.substring(1, 18); log.info("车辆VIN码: " + vehicleVin); //根据车辆VIN码查询报文模板ID - Result byVehicleVin = remoteServiceClientService.findByVehicleVin(vehicleVin); + Result byVehicleVin = remoteServiceClient.findByVehicleVin(vehicleVin); Long templateId = byVehicleVin.getData(); List templateList; //从redis缓存中获取报文模板数据 try { - String redisKey = "messageTemplate" + templateId; + String redisKey = RedisConstants.MESSAGE_TEMPLATE + templateId; if (redisTemplate.hasKey(redisKey)) { List list = redisTemplate.opsForList().range(redisKey, 0, -1); templateList = list.stream() @@ -99,7 +107,7 @@ public class ParsingServiceImpl extends ServiceImpl .toList(); log.info("Redis缓存查询成功"); } else { - Result> byTemplateId = remoteServiceClientService.findByTemplateId(templateId); + Result> byTemplateId = remoteServiceClient.findByTemplateId(templateId); templateList = byTemplateId.getData(); templateList.forEach( listResp -> @@ -128,24 +136,27 @@ public class ParsingServiceImpl extends ServiceImpl //存入数据 jsonObject.put(messageValue.getMessageLabel(), value); } - System.out.println("发发呆沙发斯蒂芬萨达:"+jsonObject.toString()); return jsonObject; } + // 接收信息 @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); - } catch(MqttException me) { - System.out.println("reason "+me.getReasonCode()); - System.out.println("msg "+me.getMessage()); - System.out.println("loc "+me.getLocalizedMessage()); - System.out.println("cause "+me.getCause()); - System.out.println("excep "+me); + } catch (MqttException me) { + System.out.println("reason " + me.getReasonCode()); + System.out.println("msg " + me.getMessage()); + System.out.println("loc " + me.getLocalizedMessage()); + System.out.println("cause " + me.getCause()); + System.out.println("excep " + me); me.printStackTrace(); } } + + + } diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/controller/ParsingController.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/controller/ParsingController.java deleted file mode 100644 index 6d89015..0000000 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/controller/ParsingController.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.muyu.analysis.parsing.controller; - -import cn.hutool.json.JSONObject; -import com.muyu.analysis.parsing.MQTT.DemoMQTT; -import com.muyu.analysis.parsing.service.ParsingService; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -/** - * 协议解析控制层 - * @Author:李庆帅 - * @Package:com.muyu.analysis.parsing.controller - * @Project:cloud-server - * @name:ParsingController - * @Date:2024/9/28 20:36 - */ -@RestController -@RequestMapping("/parsing") -public class ParsingController -{ - private static final String topic = "vehicle"; - private static final String content = "Message from MqttPublishSample"; - private static final int qos = 2; - private static final String broker = "tcp://106.15.136.7:1883"; - private static final String clientId = "JavaSample"; - - @Autowired - private ParsingService parsingService; - -// /** -// * 协议解析 -// * @param messageStr -// * @return -// */ -// @PostMapping("/protocolParsing") -// public JSONObject protocolParsing(@RequestParam("messageStr") String messageStr) { -// try { -// // 第三个参数为空,默认持久化策略 -// MqttClient sampleClient = new MqttClient(broker, clientId); -// MqttConnectOptions connOpts = new MqttConnectOptions(); -// connOpts.setCleanSession(true); -// System.out.println("Connecting to broker: "+broker); -// sampleClient.connect(connOpts); -// sampleClient.subscribe(topic,0); -// sampleClient.setCallback(new MqttCallback() { -// // 连接丢失 -// @Override -// public void connectionLost(Throwable throwable) { -// -// } -// // 连接成功 -// @Override -// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { -// System.out.println(new String(mqttMessage.getPayload())); -// } -// // 接收信息 -// @Override -// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { -// -// } -// }); -// } catch(MqttException me) { -// System.out.println("reason "+me.getReasonCode()); -// System.out.println("msg "+me.getMessage()); -// System.out.println("loc "+me.getLocalizedMessage()); -// System.out.println("cause "+me.getCause()); -// System.out.println("excep "+me); -// me.printStackTrace(); -// } -// JSONObject messageValue = parsingService.protocolParsing(messageStr); -// return messageValue; -// } - - /** - * 协议解析 - */ - @PostMapping("/mqttClient") - public void mqttClient() { - parsingService.mqtt(); - } - - -} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/mapper/ParsingMapper.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/mapper/ParsingMapper.java deleted file mode 100644 index 913cd90..0000000 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/mapper/ParsingMapper.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.muyu.analysis.parsing.mapper; - -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.muyu.enterprise.domain.car.MessageValue; -import org.apache.ibatis.annotations.Mapper; - -/** - * 协议解析持久层 - * @Author:李庆帅 - * @Package:com.muyu.analysis.parsing.mapper - * @Project:cloud-server - * @name:ParsingMapper - * @Date:2024/9/28 20:54 - */ -@Mapper -public interface ParsingMapper extends BaseMapper { -} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/remote/RemoteClientService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/remote/RemoteClientService.java index 43eae53..bfaebb8 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/remote/RemoteClientService.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/remote/RemoteClientService.java @@ -4,6 +4,7 @@ import com.muyu.analysis.parsing.remote.factory.RemoteClientServiceFactory; import com.muyu.common.core.constant.ServiceNameConstants; import com.muyu.common.core.domain.Result; import com.muyu.enterprise.domain.resp.car.MessageValueListResp; +import io.swagger.v3.oas.annotations.Operation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/ParsingService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/ParsingService.java deleted file mode 100644 index 9885754..0000000 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/ParsingService.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.muyu.analysis.parsing.service; - -import cn.hutool.json.JSONObject; -import com.baomidou.mybatisplus.extension.service.IService; -import com.muyu.enterprise.domain.car.MessageValue; - -/** - * 协议解析业务层 - * @Author:李庆帅 - * @Package:com.muyu.analysis.parsing.service - * @Project:cloud-server - * @name:ParsingService - * @Date:2024/9/28 20:50 - */ -public interface ParsingService extends IService -{ - - - /** - * 协议解析 - * @return - */ - void mqtt(); -} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml index 02ebf66..e6feb7f 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml @@ -44,3 +44,5 @@ spring: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} # 系统环境Config共享配置 - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # kafka共享配置 + - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}