diff --git a/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/java/com/muyu/enterpise/service/impl/MessageValueServiceImpl.java b/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/java/com/muyu/enterpise/service/impl/MessageValueServiceImpl.java index dda89a1..3f1407f 100644 --- a/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/java/com/muyu/enterpise/service/impl/MessageValueServiceImpl.java +++ b/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/java/com/muyu/enterpise/service/impl/MessageValueServiceImpl.java @@ -55,10 +55,7 @@ public class MessageValueServiceImpl } List list = this.list(queryWrapper); -// for (MessageValue messageValue : list) { -// messageValueCacheService.put(String.valueOf(messageValue.getTemplateId()),list); -// } - + messageValueCacheService.put(String.valueOf(messageValueReq.getMessageTemplateId()),list); return list.stream() .map(messageValue -> MessageValueListResp.valueBuild( messageValue diff --git a/cloud-modules/cloud-modules-parse/src/main/java/com/muyu/parse/process/ProcessData.java b/cloud-modules/cloud-modules-parse/src/main/java/com/muyu/parse/process/ProcessData.java index 3538b69..609bd17 100644 --- a/cloud-modules/cloud-modules-parse/src/main/java/com/muyu/parse/process/ProcessData.java +++ b/cloud-modules/cloud-modules-parse/src/main/java/com/muyu/parse/process/ProcessData.java @@ -11,6 +11,10 @@ import com.muyu.enterpise.cache.SysCarTypeCacheService; import com.muyu.parse.uitl.DataParseUtil; import com.muyu.remote.RemoteMessageValueService; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + import javax.annotation.Resource; import java.util.List; @@ -39,8 +43,10 @@ public class ProcessData { @Resource private static SysCarTypeCacheService sysCarTypeCacheService; -// @Resource -// private static KafkaProducerConfig kafkaProducerConfig; + @Resource + private static KafkaProducerConfig kafkaProducerConfig; + + private final static String topic = "sysCar_vin_topic"; public static void DataConversion(String jsonVin ) { //设置数组存储车辆数据 @@ -68,58 +74,20 @@ public class ProcessData { System.out.println("标签"+messageValue.getMessageLabel()+"值"+value); jsonObject.put(messageValue.getMessageLabel(), value); } - + sendKafkaMessage(jsonObject); } - - -// Result byTemplateId = remoteMessageValueService.findByTemplateId(vin); -// -// Long templateId = byTemplateId.getData(); -// List templateList; -// try{ - //从Redis中获取报文模版信息 -// if ( enterpiseCacheService.hashKey(String.valueOf(templateId))) { -// -// List list = Collections.singletonList(enterpiseCacheService.get(String.valueOf(templateId))); -// templateList = list.stream() -// .map(obj -> JSON.parseObject(obj.toString(), MessageValueListResp.class)) -// .toList(); -// log.info("Redis缓存查询成功"); -// } else { -// -// Result> byTemplate = remoteMessageValueService.findAll(MessageValueReq.builder().messageTemplateId(templateId).build()); -// templateList = byTemplate.getData(); -// templateList.forEach( -// listResp ->{ -// enterpiseCacheService.put(String.valueOf(listResp.getMessageId()), MessageValue.addRollback(listResp)); -// } -// -// ); -// } -// log.info("数据库查询成功"); -// } catch (Exception e) { -// throw new RuntimeException("获取报文模板失败"); -// } -// //判断报文模板列表不为空 -// if (templateList.isEmpty()) { -// throw new RuntimeException("报文模版为空"); -// } -// //存储报文模版解析后的数据 -// JSONObject jsonObject = new JSONObject(); -// for (MessageValueListResp messageValue : templateList) { -// //起始位下标 -// Integer startIndex = messageValue.getMessageStartIndex() - 1; -// //结束位下标 -// Integer endIndex = messageValue.getMessageEndIndex(); -// //根据报文模版截取数据 -// String value = vin.substring(startIndex, endIndex); -// //存入数据 -// System.out.println("标签"+messageValue.getMessageLabel()+"值"+value); -// jsonObject.put(messageValue.getMessageLabel(), value); -// } -// System.out.println(jsonObject); - + } + + + + private static void sendKafkaMessage(JSONObject jsonObject){ + ProducerRecord producerRecord = new ProducerRecord<>(topic, jsonObject); + try { + kafkaProducerConfig.kafkaProducer().send(new ProducerRecord<>(topic,jsonObject.toString())); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/cloud-modules/cloud-modules-parse/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-parse/src/main/resources/bootstrap.yml index 6de1934..3f1f253 100644 --- a/cloud-modules/cloud-modules-parse/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-parse/src/main/resources/bootstrap.yml @@ -53,7 +53,9 @@ spring: - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} # # xxl-job 配置文件 # - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - + #kafka共享配置 + #application-kafka-config-dev.yml + - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} logging: level: com.muyu.system.mapper: DEBUG