parent
a88e91c139
commit
d7f31e3482
|
@ -22,6 +22,11 @@
|
|||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterprise-cache</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-data-process-common</artifactId>
|
||||
|
|
|
@ -18,21 +18,18 @@
|
|||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterpise-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-cache</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<version>${muyu.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-vehicle-gateway</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<scope>compile</scope>
|
||||
<artifactId>enterpise-common</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
package com.muyu.enterpise.cache;
|
||||
|
||||
import com.muyu.common.cache.CacheAbsBasic;
|
||||
import com.muyu.domain.MessageValue;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @version 1.0
|
||||
* @Author xie ya ru
|
||||
* @Date 2024/9/29 20:04
|
||||
* @注释
|
||||
*/
|
||||
@Component
|
||||
public class MessageValueCacheService extends CacheAbsBasic<String, List<MessageValue>>{
|
||||
|
||||
/**
|
||||
* 前缀
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public String keyPre() {
|
||||
return"messageValue:info:";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String encode(String key) {
|
||||
return super.encode(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 解密
|
||||
* @param key 缓存建
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public String decode(String key) {
|
||||
return key.replace("messageValue:info:","");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package com.muyu.enterpise.cache;
|
||||
|
||||
import com.muyu.common.cache.CacheAbsBasic;
|
||||
import com.muyu.domain.SysCar;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @version 1.0
|
||||
* @Author xie ya ru
|
||||
* @Date 2024/9/30 11:06
|
||||
* @注释
|
||||
*/
|
||||
@Component
|
||||
public class SysCarCacheService extends CacheAbsBasic<String, SysCar> {
|
||||
|
||||
@Override
|
||||
public String keyPre() {
|
||||
return "sysCar:info:";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encode(String key) {
|
||||
return super.encode(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decode(String key) {
|
||||
return super.decode(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.muyu.enterpise.cache;
|
||||
|
||||
import com.muyu.common.cache.CacheAbsBasic;
|
||||
import com.muyu.domain.SysCarType;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @version 1.0
|
||||
* @Author xie ya ru
|
||||
* @Date 2024/9/30 11:18
|
||||
* @注释
|
||||
*/
|
||||
@Component
|
||||
public class SysCarTypeCacheService extends CacheAbsBasic<String, SysCarType> {
|
||||
@Override
|
||||
public String keyPre() {
|
||||
return "sysCarType:info:";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String encode(String key) {
|
||||
return super.encode(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String decode(String key) {
|
||||
return super.decode(key);
|
||||
}
|
||||
}
|
|
@ -1 +1,2 @@
|
|||
com.muyu.enterpise.cache.VehicleCacheService
|
||||
com.muyu.enterpise.cache.MessageValueCacheService
|
||||
com.muyu.enterpise.cache.SysCarCacheService
|
||||
|
|
|
@ -87,7 +87,7 @@
|
|||
<!-- cache缓存框架 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterprise-cache</artifactId>
|
||||
<artifactId>enterpise-cache</artifactId>
|
||||
<version>${muyu.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-enterprise</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>enterprise-cache</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterpise-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-cache</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-vehicle-gateway</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1 @@
|
|||
com.muyu.enterprise.cache.VehicleCacheService
|
|
@ -25,7 +25,7 @@
|
|||
<module>enterpise-common</module>
|
||||
<module>enterpise-remote</module>
|
||||
<module>enterpise-service</module>
|
||||
<module>enterpise-cache</module>
|
||||
<module>enterprise-cache</module>
|
||||
</modules>
|
||||
|
||||
|
||||
|
|
|
@ -89,7 +89,13 @@
|
|||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 实体类 -->
|
||||
<!-- 远调 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterpise-remote</artifactId>
|
||||
<version>${muyu.version}</version>
|
||||
</dependency>
|
||||
<!-- 实体类 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterpise-common</artifactId>
|
||||
|
@ -97,7 +103,7 @@
|
|||
<!-- 缓存框架 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>enterprise-cache</artifactId>
|
||||
<artifactId>enterpise-cache</artifactId>
|
||||
<version>${muyu.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package com.muyu.parse.configure;
|
||||
|
||||
import com.muyu.parse.process.ProcessData;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* @version 1.0
|
||||
* @Author xie ya ru
|
||||
* @Date 2024/9/26 15:31
|
||||
* @注释
|
||||
*/
|
||||
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class MqttConsumerConfig {
|
||||
|
||||
String topic = "xxx";
|
||||
String content = "Message from MqttPublishSample";
|
||||
int qos = 2;
|
||||
String broker = "tcp://123.57.152.124:1883";
|
||||
String clientId = "xyr123456789";
|
||||
|
||||
@PostConstruct
|
||||
public void connect() {
|
||||
try {
|
||||
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
|
||||
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||
//是否清空session
|
||||
connOpts.setCleanSession(false);
|
||||
System.out.println("Connecting to broker: " + broker);
|
||||
//连接
|
||||
sampleClient.connect(connOpts);
|
||||
sampleClient.subscribe(topic, qos);
|
||||
sampleClient.setCallback(new MqttCallback() {
|
||||
//连接丢失(报错)
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
log.error("error:{}", throwable.getMessage(), throwable);
|
||||
}
|
||||
|
||||
//消息已经接收到
|
||||
@Override
|
||||
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
||||
String s1 = new String(mqttMessage.getPayload());
|
||||
System.out.println("接收到的主题是:" + s + "内容是:{}" + s1);
|
||||
ProcessData.DataConversion(s1);
|
||||
|
||||
}
|
||||
|
||||
//交付完成
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
|
||||
}
|
||||
});
|
||||
} catch (MqttException me) {
|
||||
System.out.println("reason " + me.getReasonCode());
|
||||
System.out.println("msg " + me.getMessage());
|
||||
System.out.println("loc " + me.getLocalizedMessage());
|
||||
System.out.println("cause " + me.getCause());
|
||||
System.out.println("excep " + me);
|
||||
me.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
package com.muyu.parse.process;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.muyu.common.kafka.config.KafkaProducerConfig;
|
||||
import com.muyu.domain.MessageValue;
|
||||
import com.muyu.domain.SysCar;
|
||||
import com.muyu.domain.SysCarType;
|
||||
import com.muyu.enterpise.cache.MessageValueCacheService;
|
||||
import com.muyu.enterpise.cache.SysCarCacheService;
|
||||
import com.muyu.enterpise.cache.SysCarTypeCacheService;
|
||||
import com.muyu.parse.uitl.DataParseUtil;
|
||||
import com.muyu.remote.RemoteMessageValueService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* @version 1.0
|
||||
* @Author xie ya ru
|
||||
* @Date 2024/9/28 21:14
|
||||
* @注释
|
||||
*/
|
||||
@Log4j2
|
||||
public class ProcessData {
|
||||
|
||||
@Resource
|
||||
private static RemoteMessageValueService remoteMessageValueService;
|
||||
|
||||
//报文模版信息
|
||||
@Resource
|
||||
private static MessageValueCacheService messageValueCacheService;
|
||||
|
||||
//车辆信息
|
||||
@Resource
|
||||
private static SysCarCacheService sysCarCacheService;
|
||||
|
||||
//车辆类型信息
|
||||
@Resource
|
||||
private static SysCarTypeCacheService sysCarTypeCacheService;
|
||||
|
||||
@Resource
|
||||
private static KafkaProducerConfig kafkaProducerConfig;
|
||||
|
||||
private final static String topic = "sysCar_vin_topic";
|
||||
|
||||
public static void DataConversion(String jsonVin ) {
|
||||
//设置数组存储车辆数据
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
String vin = DataParseUtil.dataParsing(jsonVin);
|
||||
System.out.println("车辆转换的vin是:"+vin);
|
||||
|
||||
//判断vin是否存在缓存中
|
||||
if(sysCarCacheService.hashKey(vin)){
|
||||
//从Redis中获取车辆信息
|
||||
SysCar sysCar = sysCarCacheService.get(vin);
|
||||
//根据缓存车辆类型获取缓存报文
|
||||
SysCarType sysCarType = sysCarTypeCacheService.get(String.valueOf(sysCar.getCarType()));
|
||||
|
||||
//获取报文模版信息
|
||||
List<MessageValue> messageValues = messageValueCacheService.get(String.valueOf(sysCarType.getMessageTemplateId()));
|
||||
for (MessageValue messageValue : messageValues) {
|
||||
//起始位下标
|
||||
Integer startIndex = messageValue.getMessageStartIndex() - 1;
|
||||
//结束位下标
|
||||
Integer endIndex = messageValue.getMessageEndIndex();
|
||||
//根据报文模版截取数据
|
||||
String value = vin.substring(startIndex, endIndex);
|
||||
//存入数据
|
||||
System.out.println("标签"+messageValue.getMessageLabel()+"值"+value);
|
||||
jsonObject.put(messageValue.getMessageLabel(), value);
|
||||
}
|
||||
sendKafkaMessage(jsonObject);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static void sendKafkaMessage(JSONObject jsonObject){
|
||||
ProducerRecord<Object, JSONObject> producerRecord = new ProducerRecord<>(topic, jsonObject);
|
||||
try {
|
||||
kafkaProducerConfig.kafkaProducer().send(new ProducerRecord<>(topic,jsonObject.toString()));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue