feat(新增kafka、redis常量类):完善MQTT接收之后解析报文传递kafka

dev.eventProcess
LQS 2024-09-29 17:32:46 +08:00
parent e2d710fbb6
commit 7e02675028
13 changed files with 104 additions and 235 deletions

View File

View File

@ -0,0 +1,19 @@
package com.muyu.common.core.constant;
/**
* kafka
* @Author
* @Packagecom.muyu.common.core.constant
* @Projectcloud-server
* @nameKafkaConstants
* @Date2024/9/29 16:41
*/
public class KafkaConstants {
/**
* ()
*/
public final static String MESSAGE_PARSING = "MessageParsing";
}

View File

@ -0,0 +1,16 @@
package com.muyu.common.core.constant;
/**
* redis
* @Author
* @Packagecom.muyu.common.core.constant
* @Projectcloud-server
* @nameRedisConstants
* @Date2024/9/29 17:28
*/
public class RedisConstants {
/**
* redisKey()
*/
public final static String MESSAGE_TEMPLATE = "messageTemplate";
}

View File

@ -11,6 +11,10 @@
<artifactId>cloud-common-kafka</artifactId> <artifactId>cloud-common-kafka</artifactId>
<description>
cloud-common-kafka
</description>
<properties> <properties>
<maven.compiler.source>17</maven.compiler.source> <maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>

View File

@ -107,7 +107,7 @@ public class MessageValueController extends BaseController
* @param templateId * @param templateId
* @return * @return
*/ */
@GetMapping("/messageValue/findByTemplateId/{templateId}") @GetMapping("/findByTemplateId/{templateId}")
@Operation(summary = "根据报文模版id查询报文数据", description = "根据报文模版id查询报文数据") @Operation(summary = "根据报文模版id查询报文数据", description = "根据报文模版id查询报文数据")
public Result<List<MessageValueListResp>> findByTemplateId(@PathVariable("templateId") Long templateId){ public Result<List<MessageValueListResp>> findByTemplateId(@PathVariable("templateId") Long templateId){
List<MessageValueListResp> list = messageValueService.findByTemplateId(templateId); List<MessageValueListResp> list = messageValueService.findByTemplateId(templateId);

View File

@ -100,6 +100,11 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-core</artifactId>
</dependency> </dependency>
<!-- kafka依赖 - 公共依赖 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -1,61 +0,0 @@
package com.muyu.analysis.parsing.MQTT;
import com.muyu.analysis.parsing.controller.ParsingController;
import org.eclipse.paho.client.mqttv3.*;
/**
* MQTT
* @ClassName demo
* @Description
* @Author
* @Date 2024/9/28
*/
public class DemoMQTT {
public void main(String[] args) {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -1,78 +1,86 @@
package com.muyu.analysis.parsing.service.impl; package com.muyu.analysis.parsing.MQTT;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.analysis.parsing.controller.ParsingController;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.analysis.parsing.mapper.ParsingMapper;
import com.muyu.analysis.parsing.service.ParsingService;
import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.car.MessageValue;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List; import java.util.List;
/** /**
* *
* @Author * @Author
* @Packagecom.muyu.analysis.parsing.service.impl * @Packagecom.muyu.analysis.parsing.MQTT
* @Projectcloud-server * @Projectcloud-server
* @nameParsingServiceImpl * @nameParsingMQTT
* @Date2024/9/28 20:53 * @Date2024/9/29 16:08
*/ */
@Log4j2 @Log4j2
@Service @Component
public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue> public class ParsingMQTT {
implements ParsingService
{
@Resource @Resource
private RedisTemplate<String, Object> redisTemplate; private RedisTemplate<String, Object> redisTemplate;
@Autowired @Autowired
private RemoteClientService remoteServiceClientService; private RemoteClientService remoteServiceClient;
@Resource
private KafkaProducer<String, String> kafkaProducer;
/**
@Override *
public void mqtt() { */
@PostConstruct
public void mqttClient() {
String topic = "vehicle"; String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883"; String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample"; String clientId = "JavaSample";
try { try {
// 第三个参数为空,默认持久化策略 // 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId); MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions(); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker); System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts); sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0); sampleClient.subscribe(topic, 0);
sampleClient.setCallback(new MqttCallback() { sampleClient.setCallback(new MqttCallback() {
// 连接丢失 // 连接丢失
@Override @Override
public void connectionLost(Throwable throwable) { public void connectionLost(Throwable throwable) {
} }
// 连接成功 // 连接成功
@Override @Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload())); System.out.println(new String(mqttMessage.getPayload()));
String mqtt= new String(mqttMessage.getPayload()); JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
JSONObject jsonObject = this.protocolParsing(mqtt);
System.out.println("转换后:"+jsonObject); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
System.out.println(entries);
} }
/**
*
* @param messageStr
* @return
*/
public JSONObject protocolParsing(String messageStr) { public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据 //根据空格切割数据
String[] hexArray = messageStr.split(" "); String[] hexArray = messageStr.split(" ");
@ -86,12 +94,12 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
String vehicleVin = result.substring(1, 18); String vehicleVin = result.substring(1, 18);
log.info("车辆VIN码: " + vehicleVin); log.info("车辆VIN码: " + vehicleVin);
//根据车辆VIN码查询报文模板ID //根据车辆VIN码查询报文模板ID
Result<Long> byVehicleVin = remoteServiceClientService.findByVehicleVin(vehicleVin); Result<Long> byVehicleVin = remoteServiceClient.findByVehicleVin(vehicleVin);
Long templateId = byVehicleVin.getData(); Long templateId = byVehicleVin.getData();
List<MessageValueListResp> templateList; List<MessageValueListResp> templateList;
//从redis缓存中获取报文模板数据 //从redis缓存中获取报文模板数据
try { try {
String redisKey = "messageTemplate" + templateId; String redisKey = RedisConstants.MESSAGE_TEMPLATE + templateId;
if (redisTemplate.hasKey(redisKey)) { if (redisTemplate.hasKey(redisKey)) {
List<Object> list = redisTemplate.opsForList().range(redisKey, 0, -1); List<Object> list = redisTemplate.opsForList().range(redisKey, 0, -1);
templateList = list.stream() templateList = list.stream()
@ -99,7 +107,7 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
.toList(); .toList();
log.info("Redis缓存查询成功"); log.info("Redis缓存查询成功");
} else { } else {
Result<List<MessageValueListResp>> byTemplateId = remoteServiceClientService.findByTemplateId(templateId); Result<List<MessageValueListResp>> byTemplateId = remoteServiceClient.findByTemplateId(templateId);
templateList = byTemplateId.getData(); templateList = byTemplateId.getData();
templateList.forEach( templateList.forEach(
listResp -> listResp ->
@ -128,24 +136,27 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
//存入数据 //存入数据
jsonObject.put(messageValue.getMessageLabel(), value); jsonObject.put(messageValue.getMessageLabel(), value);
} }
System.out.println("发发呆沙发斯蒂芬萨达:"+jsonObject.toString());
return jsonObject; return jsonObject;
} }
// 接收信息 // 接收信息
@Override @Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
} }
}); });
} catch(MqttException me) { } catch (MqttException me) {
System.out.println("reason "+me.getReasonCode()); System.out.println("reason " + me.getReasonCode());
System.out.println("msg "+me.getMessage()); System.out.println("msg " + me.getMessage());
System.out.println("loc "+me.getLocalizedMessage()); System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause "+me.getCause()); System.out.println("cause " + me.getCause());
System.out.println("excep "+me); System.out.println("excep " + me);
me.printStackTrace(); me.printStackTrace();
} }
} }
} }

View File

@ -1,87 +0,0 @@
package com.muyu.analysis.parsing.controller;
import cn.hutool.json.JSONObject;
import com.muyu.analysis.parsing.MQTT.DemoMQTT;
import com.muyu.analysis.parsing.service.ParsingService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.controller
* @Projectcloud-server
* @nameParsingController
* @Date2024/9/28 20:36
*/
@RestController
@RequestMapping("/parsing")
public class ParsingController
{
private static final String topic = "vehicle";
private static final String content = "Message from MqttPublishSample";
private static final int qos = 2;
private static final String broker = "tcp://106.15.136.7:1883";
private static final String clientId = "JavaSample";
@Autowired
private ParsingService parsingService;
// /**
// * 协议解析
// * @param messageStr
// * @return
// */
// @PostMapping("/protocolParsing")
// public JSONObject protocolParsing(@RequestParam("messageStr") String messageStr) {
// try {
// // 第三个参数为空,默认持久化策略
// MqttClient sampleClient = new MqttClient(broker, clientId);
// MqttConnectOptions connOpts = new MqttConnectOptions();
// connOpts.setCleanSession(true);
// System.out.println("Connecting to broker: "+broker);
// sampleClient.connect(connOpts);
// sampleClient.subscribe(topic,0);
// sampleClient.setCallback(new MqttCallback() {
// // 连接丢失
// @Override
// public void connectionLost(Throwable throwable) {
//
// }
// // 连接成功
// @Override
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// System.out.println(new String(mqttMessage.getPayload()));
// }
// // 接收信息
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//
// }
// });
// } catch(MqttException me) {
// System.out.println("reason "+me.getReasonCode());
// System.out.println("msg "+me.getMessage());
// System.out.println("loc "+me.getLocalizedMessage());
// System.out.println("cause "+me.getCause());
// System.out.println("excep "+me);
// me.printStackTrace();
// }
// JSONObject messageValue = parsingService.protocolParsing(messageStr);
// return messageValue;
// }
/**
*
*/
@PostMapping("/mqttClient")
public void mqttClient() {
parsingService.mqtt();
}
}

View File

@ -1,17 +0,0 @@
package com.muyu.analysis.parsing.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.enterprise.domain.car.MessageValue;
import org.apache.ibatis.annotations.Mapper;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.mapper
* @Projectcloud-server
* @nameParsingMapper
* @Date2024/9/28 20:54
*/
@Mapper
public interface ParsingMapper extends BaseMapper<MessageValue> {
}

View File

@ -4,6 +4,7 @@ import com.muyu.analysis.parsing.remote.factory.RemoteClientServiceFactory;
import com.muyu.common.core.constant.ServiceNameConstants; import com.muyu.common.core.constant.ServiceNameConstants;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;

View File

@ -1,24 +0,0 @@
package com.muyu.analysis.parsing.service;
import cn.hutool.json.JSONObject;
import com.baomidou.mybatisplus.extension.service.IService;
import com.muyu.enterprise.domain.car.MessageValue;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.service
* @Projectcloud-server
* @nameParsingService
* @Date2024/9/28 20:50
*/
public interface ParsingService extends IService<MessageValue>
{
/**
*
* @return
*/
void mqtt();
}

View File

@ -44,3 +44,5 @@ spring:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置 # 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# kafka共享配置
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}