Merge remote-tracking branch 'origin/dev.analysis' into dev

dev.vehicleGateway
LQS 2024-09-29 18:05:15 +08:00
commit 497de4ade9
12 changed files with 109 additions and 236 deletions

View File

@ -0,0 +1,19 @@
package com.muyu.common.core.constant;
/**
* kafka
* @Author
* @Packagecom.muyu.common.core.constant
* @Projectcloud-server
* @nameKafkaConstants
* @Date2024/9/29 16:41
*/
public class KafkaConstants {
/**
* ()
*/
public final static String MESSAGE_PARSING = "MessageParsing";
}

View File

@ -0,0 +1,17 @@
package com.muyu.common.core.constant;
/**
* redis
* @Author
* @Packagecom.muyu.common.core.constant
* @Projectcloud-server
* @nameRedisConstants
* @Date2024/9/29 17:28
*/
public class RedisConstants {
/**
* redisKey()
*/
public final static String MESSAGE_TEMPLATE = "messageTemplate";
}

View File

@ -11,6 +11,10 @@
<artifactId>cloud-common-kafka</artifactId>
<description>
cloud-common-kafka
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>

View File

@ -107,7 +107,7 @@ public class MessageValueController extends BaseController
* @param templateId
* @return
*/
@GetMapping("/messageValue/findByTemplateId/{templateId}")
@GetMapping("/findByTemplateId/{templateId}")
@Operation(summary = "根据报文模版id查询报文数据", description = "根据报文模版id查询报文数据")
public Result<List<MessageValueListResp>> findByTemplateId(@PathVariable("templateId") Long templateId){
List<MessageValueListResp> list = messageValueService.findByTemplateId(templateId);

View File

@ -100,6 +100,11 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
<!-- kafka依赖 - 公共依赖 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -1,61 +0,0 @@
package com.muyu.analysis.parsing.MQTT;
import com.muyu.analysis.parsing.controller.ParsingController;
import org.eclipse.paho.client.mqttv3.*;
/**
* MQTT
* @ClassName demo
* @Description
* @Author
* @Date 2024/9/28
*/
public class DemoMQTT {
public void main(String[] args) {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -1,78 +1,87 @@
package com.muyu.analysis.parsing.service.impl;
package com.muyu.analysis.parsing.MQTT;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.analysis.parsing.controller.ParsingController;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.analysis.parsing.mapper.ParsingMapper;
import com.muyu.analysis.parsing.service.ParsingService;
import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.car.MessageValue;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
*
*
* @Author
* @Packagecom.muyu.analysis.parsing.service.impl
* @Packagecom.muyu.analysis.parsing.MQTT
* @Projectcloud-server
* @nameParsingServiceImpl
* @Date2024/9/28 20:53
* @nameParsingMQTT
* @Date2024/9/29 16:08
*/
@Log4j2
@Service
public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
implements ParsingService
{
@Component
public class ParsingMQTT {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RemoteClientService remoteServiceClientService;
@Override
public void mqtt() {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
private RemoteClientService remoteServiceClient;
@Resource
private KafkaProducer<String, String> kafkaProducer;
/**
*
*/
@PostConstruct
public void mqttClient() {
String topic = "vehicle";
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.subscribe(topic, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
String mqtt= new String(mqttMessage.getPayload());
JSONObject jsonObject = this.protocolParsing(mqtt);
System.out.println("转换后:"+jsonObject);
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
log.info("解析之后的数据"+entries);
}
/**
*
* @param messageStr
* @return
*/
public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据
String[] hexArray = messageStr.split(" ");
@ -86,12 +95,12 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
String vehicleVin = result.substring(1, 18);
log.info("车辆VIN码: " + vehicleVin);
//根据车辆VIN码查询报文模板ID
Result<Long> byVehicleVin = remoteServiceClientService.findByVehicleVin(vehicleVin);
Result<Long> byVehicleVin = remoteServiceClient.findByVehicleVin(vehicleVin);
Long templateId = byVehicleVin.getData();
List<MessageValueListResp> templateList;
//从redis缓存中获取报文模板数据
try {
String redisKey = "messageTemplate" + templateId;
String redisKey = RedisConstants.MESSAGE_TEMPLATE + templateId;
if (redisTemplate.hasKey(redisKey)) {
List<Object> list = redisTemplate.opsForList().range(redisKey, 0, -1);
templateList = list.stream()
@ -99,7 +108,7 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
.toList();
log.info("Redis缓存查询成功");
} else {
Result<List<MessageValueListResp>> byTemplateId = remoteServiceClientService.findByTemplateId(templateId);
Result<List<MessageValueListResp>> byTemplateId = remoteServiceClient.findByTemplateId(templateId);
templateList = byTemplateId.getData();
templateList.forEach(
listResp ->
@ -107,13 +116,15 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
redisKey, JSON.toJSONString(listResp)
)
);
log.info("数据库查询成功");
log.info("数据库查询成功"+byTemplateId);
}
} catch (Exception e) {
log.info("获取报文模板失败");
throw new RuntimeException("获取报文模板失败");
}
//判断报文模板列表不为空
if (templateList.isEmpty()) {
log.info("报文模版为空");
throw new RuntimeException("报文模版为空");
}
//存储报文模版解析后的数据
@ -128,24 +139,27 @@ public class ParsingServiceImpl extends ServiceImpl<ParsingMapper, MessageValue>
//存入数据
jsonObject.put(messageValue.getMessageLabel(), value);
}
System.out.println("发发呆沙发斯蒂芬萨达:"+jsonObject.toString());
return jsonObject;
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
} catch (MqttException me) {
log.info("reason " + me.getReasonCode());
log.info("msg " + me.getMessage());
log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause());
log.info("excep " + me);
me.printStackTrace();
}
}
}

View File

@ -1,87 +0,0 @@
package com.muyu.analysis.parsing.controller;
import cn.hutool.json.JSONObject;
import com.muyu.analysis.parsing.MQTT.DemoMQTT;
import com.muyu.analysis.parsing.service.ParsingService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.controller
* @Projectcloud-server
* @nameParsingController
* @Date2024/9/28 20:36
*/
@RestController
@RequestMapping("/parsing")
public class ParsingController
{
private static final String topic = "vehicle";
private static final String content = "Message from MqttPublishSample";
private static final int qos = 2;
private static final String broker = "tcp://106.15.136.7:1883";
private static final String clientId = "JavaSample";
@Autowired
private ParsingService parsingService;
// /**
// * 协议解析
// * @param messageStr
// * @return
// */
// @PostMapping("/protocolParsing")
// public JSONObject protocolParsing(@RequestParam("messageStr") String messageStr) {
// try {
// // 第三个参数为空,默认持久化策略
// MqttClient sampleClient = new MqttClient(broker, clientId);
// MqttConnectOptions connOpts = new MqttConnectOptions();
// connOpts.setCleanSession(true);
// System.out.println("Connecting to broker: "+broker);
// sampleClient.connect(connOpts);
// sampleClient.subscribe(topic,0);
// sampleClient.setCallback(new MqttCallback() {
// // 连接丢失
// @Override
// public void connectionLost(Throwable throwable) {
//
// }
// // 连接成功
// @Override
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// System.out.println(new String(mqttMessage.getPayload()));
// }
// // 接收信息
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//
// }
// });
// } catch(MqttException me) {
// System.out.println("reason "+me.getReasonCode());
// System.out.println("msg "+me.getMessage());
// System.out.println("loc "+me.getLocalizedMessage());
// System.out.println("cause "+me.getCause());
// System.out.println("excep "+me);
// me.printStackTrace();
// }
// JSONObject messageValue = parsingService.protocolParsing(messageStr);
// return messageValue;
// }
/**
*
*/
@PostMapping("/mqttClient")
public void mqttClient() {
parsingService.mqtt();
}
}

View File

@ -1,17 +0,0 @@
package com.muyu.analysis.parsing.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.enterprise.domain.car.MessageValue;
import org.apache.ibatis.annotations.Mapper;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.mapper
* @Projectcloud-server
* @nameParsingMapper
* @Date2024/9/28 20:54
*/
@Mapper
public interface ParsingMapper extends BaseMapper<MessageValue> {
}

View File

@ -4,6 +4,7 @@ import com.muyu.analysis.parsing.remote.factory.RemoteClientServiceFactory;
import com.muyu.common.core.constant.ServiceNameConstants;
import com.muyu.common.core.domain.Result;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

View File

@ -1,24 +0,0 @@
package com.muyu.analysis.parsing.service;
import cn.hutool.json.JSONObject;
import com.baomidou.mybatisplus.extension.service.IService;
import com.muyu.enterprise.domain.car.MessageValue;
/**
*
* @Author
* @Packagecom.muyu.analysis.parsing.service
* @Projectcloud-server
* @nameParsingService
* @Date2024/9/28 20:50
*/
public interface ParsingService extends IService<MessageValue>
{
/**
*
* @return
*/
void mqtt();
}

View File

@ -44,3 +44,5 @@ spring:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# kafka共享配置
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}