Merge remote-tracking branch 'origin/dev.analysis' into dev

# Conflicts:
#	cloud-common/pom.xml
dev.eventProcess
LQS 2024-10-07 20:33:09 +08:00
commit 580172aa76
11 changed files with 197 additions and 9 deletions

View File

@ -12,7 +12,7 @@
<artifactId>cloud-common-kafka</artifactId> <artifactId>cloud-common-kafka</artifactId>
<description> <description>
cloud-common-kafka cloud-common-kafka模块
</description> </description>
<properties> <properties>

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>cloud-common</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>cloud-common-mqtt</artifactId>
<description>
cloud-common-mqtt消息队列遥测传输协议
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- 项目公共核心模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,31 @@
package com.muyu.common.mqtt;
/**
* mqtt
* @Author
* @Packagecom.muyu.common.mqtt
* @Projectcloud-server
* @nameMQTTConnect
* @Date2024/10/2 9:40
*/
public class MQTTConnect
{
/**
* String topic = "vehicle";
* String broker = "tcp://106.15.136.7:1883";
* String clientId = "JavaSample";
*/
/**
* MQTT
*/
public static final String TOPIC="vehicle";
/**
*IP
*/
public static final String BROKER="tcp://106.15.136.7:1883";
/**
*ID,MQTT
*/
public static final String CLIENT_ID ="JavaSample";
}

View File

@ -24,6 +24,7 @@
<module>cloud-common-kafka</module> <module>cloud-common-kafka</module>
<module>cloud-common-cache</module> <module>cloud-common-cache</module>
<module>cloud-common-swagger</module> <module>cloud-common-swagger</module>
<module>cloud-common-mqtt</module>
</modules> </modules>
<artifactId>cloud-common</artifactId> <artifactId>cloud-common</artifactId>

View File

@ -11,6 +11,10 @@
<artifactId>cloud-modules-enterprise-common</artifactId> <artifactId>cloud-modules-enterprise-common</artifactId>
<description>
cloud-modules-enterprise-common企业业务平台服务
</description>
<properties> <properties>
<maven.compiler.source>17</maven.compiler.source> <maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>

View File

@ -111,6 +111,22 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId> <artifactId>cloud-common-kafka</artifactId>
</dependency> </dependency>
<!-- mqtt消息队列遥测传输协议服务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-mqtt</artifactId>
</dependency>
<!-- Spring Boot的缓存启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 高性能的Java缓存库缓存解决方案 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -1,9 +1,10 @@
package com.muyu.analysis.parsing.MQTT; package com.muyu.analysis.parsing.mqtt;
import com.muyu.analysis.parsing.remote.RemoteClientService; import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants; import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.mqtt.MQTTConnect;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -45,18 +46,18 @@ public class ParsingMQTT {
*/ */
@PostConstruct @PostConstruct
public void mqttClient() { public void mqttClient() {
String topic = "vehicle"; // String topic = "vehicle";
String broker = "tcp://106.15.136.7:1883"; //// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
String clientId = "JavaSample"; //// String clientId = "JavaSample";
try { try {
// 第三个参数为空,默认持久化策略 // 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId); MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions(); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker); System.out.println("Connecting to MQTTConnect.BROKER: " + MQTTConnect.BROKER);
sampleClient.connect(connOpts); sampleClient.connect(connOpts);
sampleClient.subscribe(topic, 0); sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() { sampleClient.setCallback(new MqttCallback() {
// 连接丢失 // 连接丢失
@Override @Override
@ -154,6 +155,11 @@ public class ParsingMQTT {
log.info("loc " + me.getLocalizedMessage()); log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause()); log.info("cause " + me.getCause());
log.info("excep " + me); log.info("excep " + me);
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace(); me.printStackTrace();
} }
} }

View File

@ -0,0 +1,86 @@
package com.muyu.analysis.parsing.mqtt;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.w3c.dom.stylesheets.LinkStyle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.mqtt
* @Projectcloud-server
* @nameTest2
* @Date2024/10/6 20:36
*/
@Log4j2
public class Test2
{
private static final int DURATION_SECONDS = 5;
private static List<JSONObject> receivedStrings = new ArrayList<>();
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int elapsedSeconds = 0;
private static String file = "elapsed" ;
public static void main(String[] args){
//定义一个任务,每秒执行一次
Runnable task = new Runnable() {
@Override
public void run() {
JSONObject stringFromSource = getStringFromSource();
receivedStrings.add(stringFromSource);
System.out.println("Received"+stringFromSource);
//清理超过的数据
cleanUpOIdStrings();
//检查超速条件
checkForSpeeding();
}
};
//每个1秒执行一次任务
scheduler.scheduleAtFixedRate(task,0,1, TimeUnit.SECONDS);
}
//模拟从某个源获取字符串的方法
private static JSONObject getStringFromSource(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("message","Hello World");
jsonObject.put("time",System.currentTimeMillis());
jsonObject.put("elapsed",elapsedSeconds);
return jsonObject;
}
//清理超过60秒的数据
private static void cleanUpOIdStrings(){
long currentTime = System.currentTimeMillis();
receivedStrings.removeIf(jsonObject ->currentTime-jsonObject.getLong("time")>TimeUnit.SECONDS.toMicros(DURATION_SECONDS));
}
//检查是否有超速情况
private static void checkForSpeeding()
{
if(receivedStrings.size() < 2)return;//如果数据不足,直接返回
JSONObject jsonObject = new JSONObject();
jsonObject.put("message","你好");
jsonObject.put("time",System.currentTimeMillis());
jsonObject.put("elapsed",10);
for (int i = 0; i < receivedStrings.size(); i++) {
JSONObject current = receivedStrings.get(i);
JSONObject next = receivedStrings.get(i + 1);
Short currentElapsed = current.getShort(file);
Short nextElapsed = next.getShort(file);
receivedStrings.add(jsonObject);
//检查条件如果相差大于12则记录错误
if (nextElapsed - currentElapsed > 12) {
System.out.println("出错啦!出错啦!车子超速啦!!!");
}
}
}
}

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: lqs namespace: dev
spring: spring:
application: application:

13
pom.xml
View File

@ -395,6 +395,19 @@
<artifactId>cloud-modules-enterprise-common</artifactId> <artifactId>cloud-modules-enterprise-common</artifactId>
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<!-- 协议解析模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-protocol-analysis</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- mqtt消息队列遥测传输协议服务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-mqtt</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>