fix():修改报文接口字段问题

dev.event
Number7 2024-09-30 10:29:58 +08:00
parent 8de7f88912
commit 1f072d4ca2
12 changed files with 131 additions and 329 deletions

View File

@ -1,5 +1,6 @@
package com.muyu.common.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -7,9 +8,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.io.Serializable;
/**
*
* @Authorliuxinyue
@ -25,8 +24,6 @@ import java.io.Serializable;
@Tag(name = "报文模版表")
@TableName(value = "message_template_type",autoResultMap = true)
public class MessageTemplateType implements Serializable {
/**
*
*/
@ -59,6 +56,7 @@ public class MessageTemplateType implements Serializable {
/**
*
*/
@TableField(exist = false)
private String dataTypeName;
/**
*

View File

@ -1,239 +0,0 @@
package com.muyu.common.util;
import java.util.Collection;
import java.util.Collections;
/**
* kafka
* @author liuxinyue
* @Packagecom.muyu.common.util
* @nameKafkaCommonProperties
* @Date2024/9/29 12:26
*/
public class KafkaCommonProperties{
/**
* Kafka
*/
private String kafkaHost = "47.101.53.251:9092";
/**
* leader
*
* 0retry
* 1leaderlogfollower
* allleader-1
*/
private String ack = "all";
/**
*
*/
private Integer retryTimes = 1;
/**
* bytes16384bytes=16K
*/
private Integer batchSize = 16384;
/**
*
*/
private Integer lingerMs = 1;
/**
* 使bytes33554432bytes=32M.
*/
private Integer bufferMemory = 33554432;
/**
*
*/
private String keyEncoder = "org.apache.kafka.common.serialization.StringSerializer";
/**
*
*/
private String valueEncoder = "org.apache.kafka.common.serialization.StringSerializer";
/**
* topicID
*/
private String groupId = "my-group-id";
/**
* offset
*/
private String autoCommit = "true";
/**
* offsetenable.auto.committrue
*/
private String autoCommitIntervalMs = "1000";
/**
*
*/
private String keyDecoder = "org.apache.kafka.common.serialization.StringDeserializer";
/**
*
*/
private String valueDecoder = "org.apache.kafka.common.serialization.StringDeserializer";
/**
* offset
* earliesttopicoffset
* latestoffset
* noneoffset
*
*/
private String autoOffsetReset = "latest";
/**
* TOPIC
*/
private Collection<String> topic = Collections.singleton("my-topic");
public KafkaCommonProperties() {
}
public KafkaCommonProperties(String kafkaHost, String ack, Integer retryTimes, Integer batchSize, Integer lingerMs, Integer bufferMemory, String keyEncoder, String valueEncoder, String groupId, String autoCommit, String autoCommitIntervalMs, String keyDecoder, String valueDecoder, String autoOffsetReset, Collection<String> topic) {
this.kafkaHost = kafkaHost;
this.ack = ack;
this.retryTimes = retryTimes;
this.batchSize = batchSize;
this.lingerMs = lingerMs;
this.bufferMemory = bufferMemory;
this.keyEncoder = keyEncoder;
this.valueEncoder = valueEncoder;
this.groupId = groupId;
this.autoCommit = autoCommit;
this.autoCommitIntervalMs = autoCommitIntervalMs;
this.keyDecoder = keyDecoder;
this.valueDecoder = valueDecoder;
this.autoOffsetReset = autoOffsetReset;
this.topic = topic;
}
public String getKafkaHost() {
return kafkaHost;
}
public void setKafkaHost(String kafkaHost) {
this.kafkaHost = kafkaHost;
}
public String getAck() {
return ack;
}
public void setAck(String ack) {
this.ack = ack;
}
public Integer getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(Integer retryTimes) {
this.retryTimes = retryTimes;
}
public Integer getBatchSize() {
return batchSize;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
public Integer getLingerMs() {
return lingerMs;
}
public void setLingerMs(Integer lingerMs) {
this.lingerMs = lingerMs;
}
public Integer getBufferMemory() {
return bufferMemory;
}
public void setBufferMemory(Integer bufferMemory) {
this.bufferMemory = bufferMemory;
}
public String getKeyEncoder() {
return keyEncoder;
}
public void setKeyEncoder(String keyEncoder) {
this.keyEncoder = keyEncoder;
}
public String getValueEncoder() {
return valueEncoder;
}
public void setValueEncoder(String valueEncoder) {
this.valueEncoder = valueEncoder;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getAutoCommit() {
return autoCommit;
}
public void setAutoCommit(String autoCommit) {
this.autoCommit = autoCommit;
}
public String getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
this.autoCommitIntervalMs = autoCommitIntervalMs;
}
public String getKeyDecoder() {
return keyDecoder;
}
public void setKeyDecoder(String keyDecoder) {
this.keyDecoder = keyDecoder;
}
public String getValueDecoder() {
return valueDecoder;
}
public void setValueDecoder(String valueDecoder) {
this.valueDecoder = valueDecoder;
}
public String getAutoOffsetReset() {
return autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public Collection<String> getTopic() {
return topic;
}
public void setTopic(Collection<String> topic) {
this.topic = topic;
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.common.util;
/**
* @author liuxinyue
* @Packagecom.muyu.common.util
* @nameKafkaConstants
* @Date2024/9/29 20:22
*/
public class KafkaConstants {
public final static String KafkaTopic="kafka_topic_test";
public final static String KafkaGrop="kafka_group_test";
}

View File

@ -0,0 +1,43 @@
package com.muyu.common.util;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author liuxinyue
* @Packagecom.muyu.common.util
* @nameKafkaConsumerConfig
* @Date2024/9/29 20:19
*/
@Configuration
public class KafkaConsumerConfig
{
@Bean
public KafkaConsumer kafkaConsumer(){
HashMap<String, Object> map = new HashMap<>();
map.put("bootstrap.servers", "47.101.53.251:9092");
map.put("enable.auto.commit",true);
map.put("auto.commit.interval", 5000);
map.put("auto.offset.reset", "latest");
map.put("fetch.max.wait", 500);
map.put("fetch.min.size", 1);
map.put("heartbeat-interval", 3000);
map.put("max.poll.records", 500);
map.put("group.id", KafkaConstants.KafkaGrop);
//指定key使用的反序列化类
Deserializer keyDeserializer = new StringDeserializer();
//指定value使用的反序列化类
Deserializer valueDeserializer = new StringDeserializer();
//创建Kafka消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer(map, keyDeserializer, valueDeserializer);
return kafkaConsumer;
}
}

View File

@ -1,64 +0,0 @@
package com.muyu.common.util;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Properties;
/**
* kafka
* @author liuxinyue
* @Packagecom.muyu.common.util
* @nameKafkaProducerTest
* @Date2024/9/29 12:27
*/
public class KafkaProducerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);
public static KafkaProducer<String, String> getDefaultKafkaProducer(KafkaCommonProperties kafkaCommonProperties) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost());
properties.put(ProducerConfig.ACKS_CONFIG, kafkaCommonProperties.getAck());
properties.put(ProducerConfig.RETRIES_CONFIG, kafkaCommonProperties.getRetryTimes());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaCommonProperties.getBatchSize());
properties.put(ProducerConfig.LINGER_MS_CONFIG, kafkaCommonProperties.getLingerMs());
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaCommonProperties.getBufferMemory());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyEncoder());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueEncoder());
return new KafkaProducer<>(properties);
}
static class MyProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (Objects.nonNull(exception)) {
logger.error(">>>>>>>>>>Producer生产消息异常", exception);
}
if (Objects.nonNull(metadata)) {
logger.info(">>>>>>>>>>Producer生产消息metadata{}partition:{}, offset{}", metadata, metadata.partition(), metadata.offset());
}
}
}
public static void main(String[] args) {
KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties();
KafkaProducer<String, String> producer = getDefaultKafkaProducer(kafkaCommonProperties);
String message = "hello world ";
try {
for (int i = 0; i < 10; i++) {
// 异步写入数据
String topic = kafkaCommonProperties.getTopic().toArray()[0].toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message + i);
producer.send(producerRecord, new MyProducerCallback());
}
} catch (Exception ex) {
logger.error(">>>>>>>>生产数据异常:", ex);
throw new RuntimeException(ex);
} finally {
producer.close();
}
}
}

View File

@ -0,0 +1,38 @@
package com.muyu.common.util;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author liuxinyue
* @Packagecom.muyu.common.util
* @nameKafkaProviderConfig
* @Date2024/9/29 20:15
*/
@Configuration
public class KafkaProviderConfig{
@Bean
public KafkaProducer KafkaProvider(){
HashMap<String, Object> map = new HashMap<>();
map.put("bootstrap.servers", "47.101.53.251:9092");
map.put("retries", "2");
map.put("batch.size", 16384);
map.put("buffer-memory", 33554432);
map.put("acks", "-1");
Serializer keySerializer = new StringSerializer();
//指定value使用的序列化类
Serializer valueSerializer = new StringSerializer();
//创建Kafka生产者
KafkaProducer kafkaProducer = new KafkaProducer(map, keySerializer, valueSerializer);
return kafkaProducer;
}
}

View File

@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @Version 1.0
* @Data 2024-09-28 17:34:31
*/
@SpringBootApplication
@EnableMyFeignClients
public class SaasApplication {

View File

@ -10,8 +10,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
* @author liuxinyue
* @Authorliuxinyue
* @Packagecom.muyu.mqtt.configure
* @Projectcloud-server
* @nameMqttConfigure
@ -30,7 +29,7 @@ public class MqttConfigure {
String topic = "vehicle";
int qos = 2;
String broker = "tcp://47.101.53.251:1883";
String clientId = "测试mqtt";
String clientId = "hhhhhh";
try {
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();

View File

@ -6,7 +6,6 @@ import com.muyu.server.service.TemplateService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
@ -20,8 +19,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
/**
*
* @author liuxinyue
* @Authorliuxinyue
* @Packagecom.template.controller
* @Projectcloud-server-c
* @nameTemplateController
@ -31,7 +29,6 @@ import java.util.concurrent.ExecutionException;
@RequestMapping("/template")
@AllArgsConstructor
@Tag(name = "报文模版管理",description = "报文模版管理")
@Log4j2
public class TemplateController {
@Autowired

View File

@ -0,0 +1,13 @@
package com.muyu.server.mapper;
import org.apache.ibatis.annotations.Mapper;
/**
* @author liuxinyue
* @Packagecom.muyu.server.mapper
* @nameKafkaMapper
* @Date2024/9/29 20:53
*/
@Mapper
public interface KafkaMapper {
}

View File

@ -1,5 +1,4 @@
package com.muyu.server.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.domain.MessageTemplateType;
@ -7,9 +6,7 @@ import com.muyu.server.mapper.MessageTemplateTypeMapper;
import com.muyu.server.service.MessageTemplateTypeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @Authorliuxinyue
* @Packagecom.sheep.message.service.impl
@ -33,7 +30,7 @@ public class MessageTemplateTypeServiceImpl extends ServiceImpl<MessageTemplate
@Override
public List<MessageTemplateType> findvehicleFoundationData(Integer templatedId, String code) {
QueryWrapper<MessageTemplateType> messageTemplateTypeQueryWrapper = new QueryWrapper<>();
messageTemplateTypeQueryWrapper.eq("templated_id",templatedId);
messageTemplateTypeQueryWrapper.eq("template_id",templatedId);
messageTemplateTypeQueryWrapper.eq("message_class",code);
List<MessageTemplateType> messageTemplateTypes = messageTemplateTypeMapper.selectList(messageTemplateTypeQueryWrapper);
return messageTemplateTypes;
@ -42,7 +39,7 @@ public class MessageTemplateTypeServiceImpl extends ServiceImpl<MessageTemplate
@Override
public List<MessageTemplateType> findvehicleData(Integer templatedId, String code) {
QueryWrapper<MessageTemplateType> messageTemplateTypeQueryWrapper = new QueryWrapper<>();
messageTemplateTypeQueryWrapper.eq("templated_id",templatedId);
messageTemplateTypeQueryWrapper.eq("template_id",templatedId);
messageTemplateTypeQueryWrapper.eq("message_class",code);
List<MessageTemplateType> messageTemplateTypes = messageTemplateTypeMapper.selectList(messageTemplateTypeQueryWrapper);
return messageTemplateTypes;
@ -51,7 +48,7 @@ public class MessageTemplateTypeServiceImpl extends ServiceImpl<MessageTemplate
@Override
public List<MessageTemplateType> finddeviceStatusData(Integer templatedId, String code) {
QueryWrapper<MessageTemplateType> messageTemplateTypeQueryWrapper = new QueryWrapper<>();
messageTemplateTypeQueryWrapper.eq("templated_id",templatedId);
messageTemplateTypeQueryWrapper.eq("template_id",templatedId);
messageTemplateTypeQueryWrapper.eq("message_class",code);
List<MessageTemplateType> messageTemplateTypes = messageTemplateTypeMapper.selectList(messageTemplateTypeQueryWrapper);
return messageTemplateTypes;
@ -60,7 +57,7 @@ public class MessageTemplateTypeServiceImpl extends ServiceImpl<MessageTemplate
@Override
public List<MessageTemplateType> findMessageByTemplateName(Integer templatedId) {
QueryWrapper<MessageTemplateType> messageTemplateTypeQueryWrapper = new QueryWrapper<>();
messageTemplateTypeQueryWrapper.eq("templated_id",templatedId);
messageTemplateTypeQueryWrapper.eq("template_id",templatedId);
List<MessageTemplateType> messageTemplateTypes = messageTemplateTypeMapper.selectList(messageTemplateTypeQueryWrapper);
return messageTemplateTypes;
}
@ -69,6 +66,7 @@ public class MessageTemplateTypeServiceImpl extends ServiceImpl<MessageTemplate
public List<MessageTemplateType> findTemplateById(Integer templateId) {
QueryWrapper<MessageTemplateType> messageTemplateTypeQueryWrapper = new QueryWrapper<>();
messageTemplateTypeQueryWrapper.eq("template_id",templateId);
return List.of();
List<MessageTemplateType> messageTemplateTypes = messageTemplateTypeMapper.selectList(messageTemplateTypeQueryWrapper);
return messageTemplateTypes;
}
}

View File

@ -69,38 +69,40 @@ public class TemplateServiceImpl extends ServiceImpl<TemplateMapper, Template> i
log.info("车辆信息为:" + carByVin);
//对应车辆所对应的报文模版
Integer templateId = carByVin.getTemplateId();
List<MessageTemplateType> templateTypeList;
//key
String redisKey = "messageTemplateType" + templateId;
log.info("key为:" + redisKey);
//key存在
if (redisTemplate.hasKey(redisKey)) {
List list = redisTemplate.opsForList().range(redisKey, 0, -1);
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
.toList();
} else {
List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
log.info("redis存入成功");
templateTypeList = templateTypeList1;
templateTypeList.forEach(
templateTypeList1.forEach(
templateType ->
redisTemplate.opsForList().rightPush(
redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType)
)
);
}
log.info("哈哈哈哈哈哈哈"+templateTypeList);
//将模版里面有的配置进行循环
for (MessageTemplateType messageTemplateType : templateTypeList) {
//开始位置
Integer startIndex = messageTemplateType.getStartIndex() - 1;
//结束位置
Integer endIndex = messageTemplateType.getEndIndex();
String substring = result.substring(startIndex, endIndex);
log.info("截取后的字符1:" + substring);
//将每个解析后的字段都存入到JSON对象中
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
jsonObject.put(messageTemplateType.getMessageField(),substring );
}
System.out.println("哈哈哈红红火火恍恍惚惚");