feat():修复MQTT连接,接收rabbitMq数据消费,拿到数据解析投递kafka

dev.analysis
LQS 2024-10-11 21:56:22 +08:00
parent 0d48eb2e34
commit 15bcd34b07
8 changed files with 403 additions and 87 deletions

View File

View File

@ -0,0 +1,47 @@
package com.muyu.enterprise.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* @Author
* @Packagecom.muyu.enterprise.domain
* @Projectcloud-server
* @nameMqttProperties
* @Date2024/10/10 20:01
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttProperties
{
/**
*
*/
private String broker;
/**
*
*/
private String topic;
/**
*
*/
private String userName;
/**
*
*/
private String password;
/**
* ID
*/
private String clientId;
/**
*
*/
private int qos=0;
}

View File

@ -136,6 +136,17 @@
<artifactId>cloud-modules-enterprise-cache</artifactId> <artifactId>cloud-modules-enterprise-cache</artifactId>
</dependency> </dependency>
<!-- rabbit消息队列模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<!-- &lt;!&ndash; gateway系统模块 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.muyu</groupId>-->
<!-- <artifactId>cloud-modules-vehicle-gateway</artifactId>-->
<!-- </dependency>-->
</dependencies> </dependencies>

View File

@ -1,5 +1,6 @@
package com.muyu.analysis.parsing.mqtt; package com.muyu.analysis.parsing.mqtt;
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants; import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
@ -7,6 +8,7 @@ import com.muyu.common.mqtt.MQTTConnect;
import com.muyu.enterprise.cache.CarMessageValueCacheService; import com.muyu.enterprise.cache.CarMessageValueCacheService;
import com.muyu.enterprise.cache.CarVehicleCacheService; import com.muyu.enterprise.cache.CarVehicleCacheService;
import com.muyu.enterprise.cache.CarVehicleTypeCacheService; import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
import com.muyu.enterprise.domain.MqttProperties;
import com.muyu.enterprise.domain.car.Vehicle; import com.muyu.enterprise.domain.car.Vehicle;
import com.muyu.enterprise.domain.car.VehicleType; import com.muyu.enterprise.domain.car.VehicleType;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
@ -20,6 +22,7 @@ import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -64,49 +67,79 @@ public class ParsingMQTT {
@Resource @Resource
private CarMessageValueCacheService allMessageValueCacheService; private CarMessageValueCacheService allMessageValueCacheService;
@Autowired
private MqttClientService mqttClientService;
/** /**
* *
*/ */
@PostConstruct @PostConstruct
public void mqttClient() { public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
// String topic = "vehicle"; // String topic = "vehicle";
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883"; //// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
//// String clientId = "JavaSample"; //// String clientId = "JavaSample";
try { String payload = new String(mqttMessage.getPayload());
// 第三个参数为空,默认持久化策略 log.info("====:{}", payload);
MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
sampleClient.connect(connOpts);
sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
} // try {
// // 第三个参数为空,默认持久化策略
// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
// MqttConnectOptions connOpts = new MqttConnectOptions();
// connOpts.setCleanSession(true);
// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
// sampleClient.connect(connOpts);
// //MQTTConnect.TOPIC
// sampleClient.subscribe(MQTTConnect.TOPIC, 0);
// sampleClient.setCallback(new MqttCallback() {
// // 连接丢失
// @Override
// public void connectionLost(Throwable throwable) {
//
// }
//
// // 连接成功
// @Override
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// System.out.println(new String(mqttMessage.getPayload()));
//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload()));
////
//// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
//// entries.toString() );
//// kafkaProducer.send(producerRecord);
//// log.info("解析之后的数据:"+entries);
//
// }
//
//
//
// // 接收信息
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//
// }
// });
// } catch (MqttException me) {
// log.info("reason " + me.getReasonCode());
// log.info("msg " + me.getMessage());
// log.info("loc " + me.getLocalizedMessage());
// log.info("cause " + me.getCause());
// log.info("excep " + me);
// System.out.println("reason " + me.getReasonCode());
// System.out.println("msg " + me.getMessage());
// System.out.println("loc " + me.getLocalizedMessage());
// System.out.println("cause " + me.getCause());
// System.out.println("excep " + me);
// me.printStackTrace();
// }
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
log.info("解析之后的数据:"+entries);
}
/** /**
* *
* @param messageStr * @param messageStr
* @return * @return
*/ */
public JSONObject protocolParsing(String messageStr) { // public JSONObject protocolParsing() {
//根据空格切割数据 //根据空格切割数据
String[] hexArray = messageStr.split(" "); String[] hexArray = messageStr.split(" ");
StringBuilder result = new StringBuilder(); StringBuilder result = new StringBuilder();
@ -142,28 +175,10 @@ public class ParsingMQTT {
jsonObject.put(messageValue.getMessageLabel(), value); jsonObject.put(messageValue.getMessageLabel(), value);
} }
return jsonObject; return jsonObject;
// }
} }
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch (MqttException me) {
log.info("reason " + me.getReasonCode());
log.info("msg " + me.getMessage());
log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause());
log.info("excep " + me);
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}

View File

@ -0,0 +1,110 @@
package com.muyu.analysis.parsing.mqtt.service;
import cn.hutool.json.JSONObject;
import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.enterprise.domain.MqttProperties;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.mqtt.service
* @Projectcloud-server
* @nameMqttClientService
* @Date2024/10/10 20:24
*/
@Service
@Slf4j
public class MqttClientService {
@Autowired
private ParsingMQTT messageProcessor;
@Resource
private KafkaProducer<String, String> kafkaProducer;
private final ExecutorService executorService = Executors.newCachedThreadPool();
private MqttClient sampleClient;
public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
executorService.submit(() -> {
try {
connectAndSubscribe(mqttProperties);
} catch (MqttException | IOException e) {
log.error("MQTT连接或订阅失败", e);
}
});
}
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
// if (sampleClient != null && sampleClient.isConnected()) {
// log.info("MQTT客户端已经连接跳过重新连接。");
// return;
// }
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(mqttProperties.getUserName());
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
sampleClient.subscribe(mqttProperties.getTopic(), 0);
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
log.error("连接丢失:{}", throwable.getMessage());
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
log.info("解析之后的数据:"+entries);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送完成:{}", iMqttDeliveryToken);
}
});
}
@PreDestroy
public void shutdown() {
executorService.shutdown();
if (sampleClient != null && sampleClient.isConnected()) {
try {
sampleClient.disconnect();
} catch (MqttException e) {
log.error("MQTT客户端断开连接失败", e);
}
}
}
}

View File

@ -0,0 +1,43 @@
package com.muyu.analysis.parsing.consumer;
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
import com.muyu.enterprise.domain.MqttProperties;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.consumer
* @Projectcloud-server
* @nameRabbitListenerComponent
* @Date2024/10/10 20:24
*/
@Component
@Slf4j
public class RabbitListenerComponent {
@Autowired
private MqttClientService mqttClientService;
private static final String FORM_QUEUE = "GO_LINE";
@RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true"))
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
try {
mqttClientService.connectAndSubscribeAsync(mqttProperties);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
log.error("处理RabbitMQ消息时发生错误", e);
}
}
}

View File

@ -0,0 +1,83 @@
//package com.muyu.analysis.parsing.manager;
//
//
//import com.alibaba.fastjson2.JSONObject;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.producer.KafkaProducer;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.eclipse.paho.client.mqttv3.MqttMessage;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//
//
//
///**
// *
// *
// * @Author李庆帅
// * @Packagecom.muyu.analysis.parsing.manager
// * @Projectcloud-server
// * @nameMessageProcessor
// * @Date2024/10/10 20:24
// */
//
//@Slf4j
//@Service
//public class MessageProcessor {
// private static final int ID = 1;
// @Resource
// private SysCarMessageServiceImpl sysCarMessageService;
// @Resource
// private KafkaProducer<String, String> kafkaProducer;
//
//
// public void processMessage(MqttMessage mqttMessage) {
// String payload = new String(mqttMessage.getPayload());
// log.info("====:{}", payload);
//
// List<SysCarMessage> carMessages = sysCarMessageService.selectSysCarMessageLists(ID);
// List<KafKaData> kafKaDataList = new ArrayList<>();
//
// String[] test = payload.split(" ");
// for (SysCarMessage carMessage : carMessages) {
// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
// int end = Integer.parseInt(carMessage.getMessageEndIndex());
//
// StringBuilder hexBuilder = new StringBuilder();
// for (int i = start; i < end; i++) {
// hexBuilder.append(test[i]);
// }
//
// String hex = hexBuilder.toString();
// char[] result = new char[hex.length() / 2];
// for (int x = 0; x < hex.length(); x += 2) {
// int high = Character.digit(hex.charAt(x), 16);
// int low = Character.digit(hex.charAt(x + 1), 16);
// result[x / 2] = (char) ((high << 4) + low);
// }
//
// String value = new String(result);
// kafKaDataList.add(KafKaData.builder()
// .key(carMessage.getMessageTypeCode())
// .label(carMessage.getMessageTypeCode())
// .value(value)
// .type(carMessage.getMessageType())
// .build());
// }
//
// kafKaDataList.add(KafKaData.builder()
// .key("firmCode")
// .label("企业编码")
// .value("firm01")
// .type("String")
// .build());
// String jsonString = JSONObject.toJSONString(kafKaDataList);
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
// kafkaProducer.send(producerRecord);
// log.info("kafka投产{}", jsonString);
// }
//
//}

View File

@ -410,6 +410,13 @@
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<!-- gateway系统模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-vehicle-gateway</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>