feat(): 完善mqtt的投递

dev.eventProcess
LQS 2024-10-07 20:30:44 +08:00
parent 514c94b398
commit 3202af6ebb
24 changed files with 205 additions and 512 deletions

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
# Spring
spring:
application:

View File

@ -12,7 +12,7 @@
<artifactId>cloud-common-kafka</artifactId>
<description>
cloud-common-kafka
cloud-common-kafka模块
</description>
<properties>

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>cloud-common</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>cloud-common-mqtt</artifactId>
<description>
cloud-common-mqtt消息队列遥测传输协议
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- 项目公共核心模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,31 @@
package com.muyu.common.mqtt;
/**
* mqtt
* @Author
* @Packagecom.muyu.common.mqtt
* @Projectcloud-server
* @nameMQTTConnect
* @Date2024/10/2 9:40
*/
public class MQTTConnect
{
/**
* String topic = "vehicle";
* String broker = "tcp://106.15.136.7:1883";
* String clientId = "JavaSample";
*/
/**
* MQTT
*/
public static final String TOPIC="vehicle";
/**
*IP
*/
public static final String BROKER="tcp://106.15.136.7:1883";
/**
*ID,MQTT
*/
public static final String CLIENT_ID ="JavaSample";
}

View File

@ -22,6 +22,7 @@
<module>cloud-common-xxl</module>
<module>cloud-common-rabbit</module>
<module>cloud-common-kafka</module>
<module>cloud-common-mqtt</module>
</modules>
<artifactId>cloud-common</artifactId>

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
# Spring
spring:

View File

@ -11,6 +11,10 @@
<artifactId>cloud-modules-enterprise-common</artifactId>
<description>
cloud-modules-enterprise-common企业业务平台服务
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>

View File

@ -89,7 +89,7 @@
<artifactId>cloud-modules-enterprise-common</artifactId>
</dependency>
<!-- MyBatisPlus - 依赖包 -->
<!-- MyBatisPlusJoin 依赖包 -->
<dependency>
<groupId>com.github.yulichang</groupId>
<artifactId>mybatis-plus-join</artifactId>

View File

@ -1,74 +0,0 @@
package com.muyu.enterprise.MQTT;
import com.alibaba.nacos.api.remote.PushCallBack;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameClientMQTT
* @Date2024/9/28 12:11
*/
public class ClientMQTT {
//String topic = "vehicle";
// String content = "Message from MqttPublishSample";
// int qos = 2;
// String broker = "tcp://106.15.136.7:1883";
// String clientId = "JavaSample";
//MQTT代理服务器地址
public static final String HOST="tcp://106.15.136.7:1883";
public static final String TOPIC1="pos_message_all";
private static final String clientId="12345678";
private MqttClient client;
private MqttConnectOptions options;
private String userName="mqtt"; //非必须
private String passWord="mqtt"; //非必须
private void start(){
try{
// host为主机名clientid即连接MQTT的客户端ID一般以唯一标识符表示MemoryPersistence设置clientid的保存形式默认为以内存保存
client= new MqttClient(HOST,clientId,new MemoryPersistence());
// MQTT的连接设置
options=new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置断开后重新连接
options.setAutomaticReconnect(true);
// 设置回调
client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC1);
//setWill方法如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱
options.setWill(topic,"close".getBytes(),1,true);
client.connect(options);
//订阅消息
int[] Qos = {1} ; //0最多一次 、1最少一次 、2只有一次
String[] topics1 = {TOPIC1};
client.subscribe(topics1,Qos);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args){
ClientMQTT clientMQTT = new ClientMQTT();
clientMQTT.start();
}
}

View File

@ -1,69 +0,0 @@
package com.muyu.enterprise.MQTT;
/**
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameMQTTReceiveCallback
* @Date2024/9/27 22:21
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
*
*
* MqttCallbackCallBack MqttCallBack
*
*
*
*
* (1):public void messageArrived(MqttTopic topic, MqttMessage message)
*
* (2):public void connectionLost(Throwable cause)
*
* (3):public void deliveryComplete(MqttDeliveryToken token))
* QoS 1 QoS 2
* MqttClient.connect
*
*/
public class MQTTReceiveCallback implements MqttCallback {
// @Override
// public void connectionLost(Throwable throwable) {
// //连接丢失后,一般在这里面进行重连
// System.out.println("连接断开,可以做重连");
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// //subscribe后得到的消息会执行到这里面
// System.out.println("接收消息主题:"+topic);
// System.out.println("接收消息Qos"+message.getQos());
// System.out.println("接收消息内容:"+new String(message.getPayload()));
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("deliveryComplete----------"+token.isComplete());
// }
@Override
public void connectionLost(Throwable throwable){
//连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//subscribe后得到的消息会执行到这面
System.out.println("接收消息主题:"+topic);
System.out.println("接收消息Qos"+mqttMessage.getQos());
System.out.println("接收消息内容:"+new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete());
}
}

View File

@ -1,187 +0,0 @@
package com.muyu.enterprise.MQTT;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* @Author
* @Packagecom.muyu.enterprise.mqtt
* @Projectcloud-server
* @nameMyMqttClient
* @Date2024/9/27 22:20
*/
public class MyMqttClient {
public static MqttClient mqttClient =null;
private static MemoryPersistence memoryPersistence=null;
private static MqttConnectOptions mqttConectOptions=null;
private static String ClinentName=""; //待填 将在服务端出现的名字
private static String IP=""; //待填 服务器IP
public static void main(String[] args) {
start(ClinentName);
}
public static void start(String clientId){
//初始化连接设置对象
mqttConectOptions=new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
//这里设置为true表示每次连接到服务器都以新的身份连接
mqttConectOptions.setCleanSession(true);
//设置连接超时时间,单位是秒
mqttConectOptions.setConnectionTimeout(10);
//设置持久化方式
memoryPersistence=new MemoryPersistence();
if(null!=clientId){
try{
mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
System.out.println("连接状态:"+mqttClient.isConnected());
//设置连接和回调
if(null!=mqttClient){
if(!mqttClient.isConnected()){
//创建回调函数对象
MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
//客户端添加回调函数
mqttClient.setCallback(MQTTReceiveCallback);
//创建连接
try{
System.out.println("创建连接");
mqttClient.connect(mqttConectOptions);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
}else {
System.out.println("mqttClient为空");
}
System.out.println("连接状态"+mqttClient.isConnected());
}
// 关闭连接
public void closeConnect(){
//关闭储存方式
if(null!=memoryPersistence){
try{
memoryPersistence.close();
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("memoryPersistence为空");
}
if(null!=mqttClient){
if(mqttClient.isConnected()){
try{
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient未连接");
}
}else {
System.out.println("mqttClient为空");
}
}
//发布消息
public static void publishMessage(String pubTopic,String message,int qos){
if(null!=mqttClient && mqttClient.isConnected()){
System.out.println("发布消息"+mqttClient.isConnected());
System.out.println("id"+mqttClient.isConnected());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
MqttTopic topic = mqttClient.getTopic(pubTopic);
if(null!=topic){
try{
MqttDeliveryToken publish = topic.publish(mqttMessage);
if(!publish.isComplete()){
System.out.println("消息发布成功");
}
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
}
}
//重新连接
public static void reConnect(){
if(null!=mqttClient&&mqttClient.isConnected()){
if(!mqttClient.isConnected()){
if(null!=mqttConectOptions){
try{
mqttClient.connect(mqttConectOptions);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttConnectOptions为空");
}
}else {
System.out.println("mqttClient为空或已连接");
}
}else {
start(ClinentName);
}
}
//订阅主题
public static void suvTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try{
mqttClient.subscribe(topic,1);
} catch (MqttException e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient出错");
}
}
//清空主题
public void cleanTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try{
mqttClient.subscribe(topic);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient出错");
}
}
}

View File

@ -1,52 +0,0 @@
package com.muyu.enterprise.MQTT;
/**
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @namePushCallback
* @Date2024/9/28 14:35
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
*
*
* MqttCallbackCallBack MqttCallBack
*
*
*
*
* public void messageArrived(MqttTopic topic, MqttMessage message)
*
* public void connectionLost(Throwable cause)
*
* public void deliveryComplete(MqttDeliveryToken token))
* QoS 1 QoS 2
* MqttClient.connect
*
*/
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
}

View File

@ -1,113 +0,0 @@
package com.muyu.enterprise.MQTT;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameServerMQTT
* @Date2024/9/28 14:32
*/
@Log4j2
public class ServerMQTT {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
public static final String HOST = "tcp://127.0.0.1:1883";
//定义一个主题
public static final String TOPIC = "pos_message_all";
//定义MQTT的ID可以在MQTT服务配置中指定
private static final String clientId = "server11";
private MqttClient client;
private static MqttTopic topic11;
// private String userName = "mqtt"; //非必须
// private String passWord = "mqtt"; //非必须
private static MqttMessage message;
/**
*
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence设置clientid的保存形式默认为以内存保存
client=new MqttClient(HOST, clientId,new MemoryPersistence());
connect();
}
/**
*
*/
private void connect()
{
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
// options.setUserName(userName);
// options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try{
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttException
*/
public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{
MqttDeliveryToken token= topic.publish(message);
token.waitForCompletion();
System.out.println("消息已完全发布!"+token.isComplete());
log.info("消息已完全发布!"+token.isComplete());
}
/**
*
* @param clieId
* @param msg
* @throws Exception
*/
public static void sendMessage(String clieId,String msg)throws Exception{
ServerMQTT server = new ServerMQTT();
server.message = new MqttMessage();
server.message.setQos(1); //保证消息能到达一次
server.message.setRetained(true);
String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}";
try{
publish(server.topic11 , server.message);
//断开连接
// server.client.disconnect();
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
sendMessage("123444","哈哈哈");
}
}

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
spring:
application:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
# Spring
spring:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
# Spring
spring:

View File

@ -105,6 +105,22 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
<!-- mqtt消息队列遥测传输协议服务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-mqtt</artifactId>
</dependency>
<!-- Spring Boot的缓存启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 高性能的Java缓存库缓存解决方案 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
</dependencies>
<build>

View File

@ -1,9 +1,10 @@
package com.muyu.analysis.parsing.MQTT;
package com.muyu.analysis.parsing.mqtt;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
import com.muyu.common.mqtt.MQTTConnect;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@ -45,18 +46,18 @@ public class ParsingMQTT {
*/
@PostConstruct
public void mqttClient() {
String topic = "vehicle";
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
// String topic = "vehicle";
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
//// String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
System.out.println("Connecting to MQTTConnect.BROKER: " + MQTTConnect.BROKER);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic, 0);
sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
@ -154,6 +155,11 @@ public class ParsingMQTT {
log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause());
log.info("excep " + me);
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}

View File

@ -0,0 +1,86 @@
package com.muyu.analysis.parsing.mqtt;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.w3c.dom.stylesheets.LinkStyle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.mqtt
* @Projectcloud-server
* @nameTest2
* @Date2024/10/6 20:36
*/
@Log4j2
public class Test2
{
private static final int DURATION_SECONDS = 5;
private static List<JSONObject> receivedStrings = new ArrayList<>();
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int elapsedSeconds = 0;
private static String file = "elapsed" ;
public static void main(String[] args){
//定义一个任务,每秒执行一次
Runnable task = new Runnable() {
@Override
public void run() {
JSONObject stringFromSource = getStringFromSource();
receivedStrings.add(stringFromSource);
System.out.println("Received"+stringFromSource);
//清理超过的数据
cleanUpOIdStrings();
//检查超速条件
checkForSpeeding();
}
};
//每个1秒执行一次任务
scheduler.scheduleAtFixedRate(task,0,1, TimeUnit.SECONDS);
}
//模拟从某个源获取字符串的方法
private static JSONObject getStringFromSource(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("message","Hello World");
jsonObject.put("time",System.currentTimeMillis());
jsonObject.put("elapsed",elapsedSeconds);
return jsonObject;
}
//清理超过60秒的数据
private static void cleanUpOIdStrings(){
long currentTime = System.currentTimeMillis();
receivedStrings.removeIf(jsonObject ->currentTime-jsonObject.getLong("time")>TimeUnit.SECONDS.toMicros(DURATION_SECONDS));
}
//检查是否有超速情况
private static void checkForSpeeding()
{
if(receivedStrings.size() < 2)return;//如果数据不足,直接返回
JSONObject jsonObject = new JSONObject();
jsonObject.put("message","你好");
jsonObject.put("time",System.currentTimeMillis());
jsonObject.put("elapsed",10);
for (int i = 0; i < receivedStrings.size(); i++) {
JSONObject current = receivedStrings.get(i);
JSONObject next = receivedStrings.get(i + 1);
Short currentElapsed = current.getShort(file);
Short nextElapsed = next.getShort(file);
receivedStrings.add(jsonObject);
//检查条件如果相差大于12则记录错误
if (nextElapsed - currentElapsed > 12) {
System.out.println("出错啦!出错啦!车子超速啦!!!");
}
}
}
}

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
spring:
application:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
spring:
application:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: lqs
namespace: dev
# Spring
spring:

13
pom.xml
View File

@ -303,6 +303,19 @@
<artifactId>cloud-modules-enterprise-common</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- 协议解析模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-protocol-analysis</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- mqtt消息队列遥测传输协议服务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-mqtt</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies>
</dependencyManagement>