feat():修复kafka生产者

dev.gateway
ruyaxie 2024-09-30 16:50:07 +08:00
parent f98c11e825
commit 19f99ac116
3 changed files with 24 additions and 57 deletions

View File

@ -55,10 +55,7 @@ public class MessageValueServiceImpl
}
List<MessageValue> list = this.list(queryWrapper);
// for (MessageValue messageValue : list) {
// messageValueCacheService.put(String.valueOf(messageValue.getTemplateId()),list);
// }
messageValueCacheService.put(String.valueOf(messageValueReq.getMessageTemplateId()),list);
return list.stream()
.map(messageValue -> MessageValueListResp.valueBuild(
messageValue

View File

@ -11,6 +11,10 @@ import com.muyu.enterpise.cache.SysCarTypeCacheService;
import com.muyu.parse.uitl.DataParseUtil;
import com.muyu.remote.RemoteMessageValueService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import javax.annotation.Resource;
import java.util.List;
@ -39,8 +43,10 @@ public class ProcessData {
@Resource
private static SysCarTypeCacheService sysCarTypeCacheService;
// @Resource
// private static KafkaProducerConfig kafkaProducerConfig;
@Resource
private static KafkaProducerConfig kafkaProducerConfig;
private final static String topic = "sysCar_vin_topic";
public static void DataConversion(String jsonVin ) {
//设置数组存储车辆数据
@ -68,58 +74,20 @@ public class ProcessData {
System.out.println("标签"+messageValue.getMessageLabel()+"值"+value);
jsonObject.put(messageValue.getMessageLabel(), value);
}
sendKafkaMessage(jsonObject);
}
// Result<Long> byTemplateId = remoteMessageValueService.findByTemplateId(vin);
//
// Long templateId = byTemplateId.getData();
// List<MessageValueListResp> templateList;
// try{
//从Redis中获取报文模版信息
// if ( enterpiseCacheService.hashKey(String.valueOf(templateId))) {
//
// List<Object> list = Collections.singletonList(enterpiseCacheService.get(String.valueOf(templateId)));
// templateList = list.stream()
// .map(obj -> JSON.parseObject(obj.toString(), MessageValueListResp.class))
// .toList();
// log.info("Redis缓存查询成功");
// } else {
//
// Result<List<MessageValueListResp>> byTemplate = remoteMessageValueService.findAll(MessageValueReq.builder().messageTemplateId(templateId).build());
// templateList = byTemplate.getData();
// templateList.forEach(
// listResp ->{
// enterpiseCacheService.put(String.valueOf(listResp.getMessageId()), MessageValue.addRollback(listResp));
// }
//
// );
// }
// log.info("数据库查询成功");
// } catch (Exception e) {
// throw new RuntimeException("获取报文模板失败");
// }
// //判断报文模板列表不为空
// if (templateList.isEmpty()) {
// throw new RuntimeException("报文模版为空");
// }
// //存储报文模版解析后的数据
// JSONObject jsonObject = new JSONObject();
// for (MessageValueListResp messageValue : templateList) {
// //起始位下标
// Integer startIndex = messageValue.getMessageStartIndex() - 1;
// //结束位下标
// Integer endIndex = messageValue.getMessageEndIndex();
// //根据报文模版截取数据
// String value = vin.substring(startIndex, endIndex);
// //存入数据
// System.out.println("标签"+messageValue.getMessageLabel()+"值"+value);
// jsonObject.put(messageValue.getMessageLabel(), value);
// }
// System.out.println(jsonObject);
}
private static void sendKafkaMessage(JSONObject jsonObject){
ProducerRecord<Object, JSONObject> producerRecord = new ProducerRecord<>(topic, jsonObject);
try {
kafkaProducerConfig.kafkaProducer().send(new ProducerRecord<>(topic,jsonObject.toString()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -53,7 +53,9 @@ spring:
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# # xxl-job 配置文件
# - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
#kafka共享配置
#application-kafka-config-dev.yml
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
logging:
level:
com.muyu.system.mapper: DEBUG