diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index 81d6907..60cf732 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -12,7 +12,7 @@ cloud-common-kafka - cloud-common-kafka + cloud-common-kafka模块 diff --git a/cloud-common/cloud-common-mqtt/pom.xml b/cloud-common/cloud-common-mqtt/pom.xml new file mode 100644 index 0000000..32a0a17 --- /dev/null +++ b/cloud-common/cloud-common-mqtt/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-mqtt + + + cloud-common-mqtt消息队列遥测传输协议 + + + + 17 + 17 + UTF-8 + + + + + + com.muyu + cloud-common-core + + + diff --git a/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java new file mode 100644 index 0000000..8534db5 --- /dev/null +++ b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java @@ -0,0 +1,31 @@ +package com.muyu.common.mqtt; + +/** + * mqtt连接配置 + * @Author:李庆帅 + * @Package:com.muyu.common.mqtt + * @Project:cloud-server + * @name:MQTTConnect + * @Date:2024/10/2 9:40 + */ +public class MQTTConnect +{ + /** + * String topic = "vehicle"; + * String broker = "tcp://106.15.136.7:1883"; + * String clientId = "JavaSample"; + */ + + /** + * 定义主题字符串,用于MQTT消息交换的频道 + */ + public static final String TOPIC="vehicle"; + /** + *定义代理服务器的连接字符串,格式通常为协议名称,IP地址和端口号 + */ + public static final String BROKER="tcp://106.15.136.7:1883"; + /** + *定义客户端ID,用于在MQTT代理服务器中标识客户端 + */ + public static final String CLIENT_ID ="JavaSample"; +} diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index 47bbaa5..8fbd43c 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -24,6 +24,7 @@ cloud-common-kafka cloud-common-cache cloud-common-swagger + cloud-common-mqtt cloud-common diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml index 0b48d0e..5798604 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml @@ -11,6 +11,10 @@ cloud-modules-enterprise-common + + cloud-modules-enterprise-common企业业务平台服务 + + 17 17 diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml index fa27aa5..dbc6859 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml +++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml @@ -111,6 +111,22 @@ com.muyu cloud-common-kafka + + + com.muyu + cloud-common-mqtt + + + + org.springframework.boot + spring-boot-starter-cache + + + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java index dc0347a..6c9f58e 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java @@ -1,9 +1,10 @@ -package com.muyu.analysis.parsing.MQTT; +package com.muyu.analysis.parsing.mqtt; import com.muyu.analysis.parsing.remote.RemoteClientService; import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.RedisConstants; import com.muyu.common.core.domain.Result; +import com.muyu.common.mqtt.MQTTConnect; import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; @@ -45,18 +46,18 @@ public class ParsingMQTT { */ @PostConstruct public void mqttClient() { - String topic = "vehicle"; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; +// String topic = "vehicle"; +//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883"; +//// String clientId = "JavaSample"; try { // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); + MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); - System.out.println("Connecting to broker: " + broker); + System.out.println("Connecting to MQTTConnect.BROKER: " + MQTTConnect.BROKER); sampleClient.connect(connOpts); - sampleClient.subscribe(topic, 0); + sampleClient.subscribe(MQTTConnect.TOPIC, 0); sampleClient.setCallback(new MqttCallback() { // 连接丢失 @Override @@ -154,6 +155,11 @@ public class ParsingMQTT { log.info("loc " + me.getLocalizedMessage()); log.info("cause " + me.getCause()); log.info("excep " + me); + System.out.println("reason " + me.getReasonCode()); + System.out.println("msg " + me.getMessage()); + System.out.println("loc " + me.getLocalizedMessage()); + System.out.println("cause " + me.getCause()); + System.out.println("excep " + me); me.printStackTrace(); } } diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java new file mode 100644 index 0000000..9ed2037 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java @@ -0,0 +1,86 @@ +package com.muyu.analysis.parsing.mqtt; + +import com.alibaba.fastjson2.JSONObject; +import lombok.extern.log4j.Log4j2; +import org.w3c.dom.stylesheets.LinkStyle; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @Author:李庆帅 + * @Package:com.muyu.analysis.parsing.mqtt + * @Project:cloud-server + * @name:Test2 + * @Date:2024/10/6 20:36 + */ +@Log4j2 +public class Test2 +{ + private static final int DURATION_SECONDS = 5; + private static List receivedStrings = new ArrayList<>(); + private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private static int elapsedSeconds = 0; + private static String file = "elapsed" ; + + public static void main(String[] args){ + //定义一个任务,每秒执行一次 + Runnable task = new Runnable() { + @Override + public void run() { + JSONObject stringFromSource = getStringFromSource(); + receivedStrings.add(stringFromSource); + System.out.println("Received:"+stringFromSource); + //清理超过的数据 + cleanUpOIdStrings(); + //检查超速条件 + checkForSpeeding(); + } + }; + //每个1秒执行一次任务 + scheduler.scheduleAtFixedRate(task,0,1, TimeUnit.SECONDS); + } + //模拟从某个源获取字符串的方法 + private static JSONObject getStringFromSource(){ + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message","Hello World"); + jsonObject.put("time",System.currentTimeMillis()); + jsonObject.put("elapsed",elapsedSeconds); + return jsonObject; + } + + //清理超过60秒的数据 + private static void cleanUpOIdStrings(){ + long currentTime = System.currentTimeMillis(); + receivedStrings.removeIf(jsonObject ->currentTime-jsonObject.getLong("time")>TimeUnit.SECONDS.toMicros(DURATION_SECONDS)); + } + + //检查是否有超速情况 + private static void checkForSpeeding() + { + if(receivedStrings.size() < 2)return;//如果数据不足,直接返回 + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message","你好"); + jsonObject.put("time",System.currentTimeMillis()); + jsonObject.put("elapsed",10); + + for (int i = 0; i < receivedStrings.size(); i++) { + JSONObject current = receivedStrings.get(i); + JSONObject next = receivedStrings.get(i + 1); + + Short currentElapsed = current.getShort(file); + Short nextElapsed = next.getShort(file); + receivedStrings.add(jsonObject); + //检查条件,如果相差大于12,则记录错误 + if (nextElapsed - currentElapsed > 12) { + System.out.println("出错啦!出错啦!车子超速啦!!!"); + } + } + } + + +} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java new file mode 100644 index 0000000..e69de29 diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml index e6feb7f..567d611 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev spring: application: diff --git a/pom.xml b/pom.xml index b6c48ad..714add3 100644 --- a/pom.xml +++ b/pom.xml @@ -395,6 +395,19 @@ cloud-modules-enterprise-common ${muyu.version} + + + + com.muyu + cloud-modules-protocol-analysis + ${muyu.version} + + + + com.muyu + cloud-common-mqtt + ${muyu.version} +