From 60aa7650f0994f4b846edf2b7b8fd1ef9045e392 Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 11:34:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet/analyze/msg/model/ModelsKafkaMessage.java | 6 +++--- .../msg/service/impl/RealTimeDataServiceImpl.java | 9 ++++----- .../couplet-analyze-msg/src/main/resources/bootstrap.yml | 5 +++++ couplet-modules/couplet-business/pom.xml | 1 + .../main/java/com/couplet/online/utils/MqttMonitor.java | 2 +- .../src/main/resources/bootstrap.yml | 1 - 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 0a40e9a..9536426 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -56,7 +57,7 @@ public class ModelsKafkaMessage { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "ddd"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -78,7 +79,6 @@ public class ModelsKafkaMessage { public void consumerMessages() { executorService.execute(this::consumer); } - public void consumer() { try { @@ -86,7 +86,7 @@ public class ModelsKafkaMessage { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { - System.out.println("接收到的数据:" + record.value()); + log.info("接收到的数据:" + record.value()); String str = hexToString(record.value()); List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index a3821dd..c50b6ef 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -42,8 +42,8 @@ public class RealTimeDataServiceImpl implements IncidentService { log.info("实时数据事件开始....."); RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); //判断是否有缓存数据 - if (redisService.hasKey("vin:query:" + cacheObject.getVin())){ - redisService.deleteObject("vin:query:" + cacheObject.getVin()); + if (redisService.hasKey("vin:query:" + coupletMsgData.getVin())){ + redisService.deleteObject("vin:query:" + coupletMsgData.getVin()); } // if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { if (coupletMsgData.getVin().equals(cacheObject.getVin())){ @@ -51,9 +51,8 @@ public class RealTimeDataServiceImpl implements IncidentService { //判断数据是否一致, // if (RealTimeJudge.addRealTime(cacheObject)) { log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); - CoupletMsgData query = incidentMapper.queryByIncident(coupletMsgData.getVin()); - redisService.setCacheObject("vin:query:" + cacheObject.getVin(), query); - redisService.expire("vin:"+cacheObject.getVin(),10, TimeUnit.MINUTES); + redisService.setCacheObject("vin:query:" + coupletMsgData.getVin(), cacheObject); + redisService.expire("vin:"+coupletMsgData.getVin(),10, TimeUnit.MINUTES); // } else { // log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据不一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); // } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index a2cdc85..b2e9ca1 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -43,6 +43,11 @@ spring: template: # 只要消息抵达Queue,就会异步发送优先回调return firm mandatory: true + kafka: + bootstrap-servers: 115.159.47.13:9092 + consumer: + group-id: group + auto-offset-reset: earliest logging: level: com.couplet.analyze.msg.mapper: DEBUG diff --git a/couplet-modules/couplet-business/pom.xml b/couplet-modules/couplet-business/pom.xml index cc83ffe..da1fe95 100644 --- a/couplet-modules/couplet-business/pom.xml +++ b/couplet-modules/couplet-business/pom.xml @@ -114,6 +114,7 @@ com.couplet couplet-analyze-msg + diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index feece3e..6f4bfc9 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "xiaoYao"; + private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index fc1f9a1..ddf4716 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -28,7 +28,6 @@ spring: logging: level: com.couplet.system.mapper: DEBUG - # 订阅端配置 mqtt: server: