From 54a091e3b6dae2db2cd950192e2c3494427a5f27 Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 09:57:28 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD?= =?UTF-8?q?=E8=AF=AD=E5=8F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../analyze/msg/model/ModelsKafkaMessage.java | 5 +- .../com/couplet/mq/controller/KafkaTest.java | 164 +++++++++--------- 2 files changed, 84 insertions(+), 85 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 934e710..d9782d4 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -76,11 +76,10 @@ public class ModelsKafkaMessage { * * @return */ - @Scheduled(fixedDelay = 50) +// @Scheduled(fixedDelay = 50) + @PostConstruct public void consumerMessages() { executorService.execute(this::consumer); - - } public void consumer() { diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index d078386..5ff0ec6 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -1,93 +1,93 @@ -package com.couplet.mq.controller; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.PostConstruct; -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka测试类 - */ - -@Slf4j -public class KafkaTest { - private static final String TOPIC_NAME = "online"; - private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; - - public static void main(String[] args) { - //生产者示例 -// produceMessage(); - - //消费者示例 -// consumerMessages(); - - } - - //生产者 - @PostConstruct - private static void produceMessage() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - KafkaProducer producer = new KafkaProducer<>(props); - //创建生产者 - try { - - //发送消息 - for (int i = 0; i < 10000; i++) { - String message = "佳佳来喽" + (i + 1); - producer.send(new ProducerRecord<>(TOPIC_NAME, message)); - - System.out.println("发送消息:" + message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - producer.close(); - } - } - - //消费者 -// private static void consumerMessages() { +//package com.couplet.mq.controller; +// +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.clients.producer.ProducerRecord; +// +//import javax.annotation.PostConstruct; +//import java.time.Duration; +//import java.util.Collections; +//import java.util.Properties; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/4/4 +// * @Description: kafka测试类 +// */ +// +//@Slf4j +//public class KafkaTest { +// private static final String TOPIC_NAME = "online"; +// private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; +// +// public static void main(String[] args) { +// //生产者示例 +//// produceMessage(); +// +// //消费者示例 +//// consumerMessages(); +// +// } +// +// //生产者 +// @PostConstruct +// private static void produceMessage() { // Properties props = new Properties(); -// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); -// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); -// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// -// //创建消费者 -// KafkaConsumer consumer = new KafkaConsumer<>(props); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // +// KafkaProducer producer = new KafkaProducer<>(props); +// //创建生产者 // try { // -// //订阅主题 -// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +// //发送消息 +// for (int i = 0; i < 10000; i++) { +// String message = "佳佳来喽" + (i + 1); +// producer.send(new ProducerRecord<>(TOPIC_NAME, message)); // -// //持续消费消息 -// while (true) { -// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); -// records.forEach(record -> { -// System.out.println("消费者接受到的消息值:" + record.value()); -// }); +// System.out.println("发送消息:" + message); // } // } catch (Exception e) { // e.printStackTrace(); // } finally { -// consumer.close(); +// producer.close(); // } // } - -} +// +// //消费者 +//// private static void consumerMessages() { +//// Properties props = new Properties(); +//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// +//// //创建消费者 +//// KafkaConsumer consumer = new KafkaConsumer<>(props); +//// +//// try { +//// +//// //订阅主题 +//// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +//// +//// //持续消费消息 +//// while (true) { +//// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +//// records.forEach(record -> { +//// System.out.println("消费者接受到的消息值:" + record.value()); +//// }); +//// } +//// } catch (Exception e) { +//// e.printStackTrace(); +//// } finally { +//// consumer.close(); +//// } +//// } +// +//} From a8fff6ef7024544e9bdb0e05852337d01872c0e0 Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 10:47:42 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- couplet-auth/src/main/resources/bootstrap.yml | 2 -- .../common/contents/AnalyzeEventContents.java | 17 ++++------------- .../src/main/resources/bootstrap.yml | 2 -- .../analyze/msg/CoupletMsgApplication.java | 2 +- .../analyze/msg/model/ModelsKafkaMessage.java | 10 ++++------ .../service/impl/RealTimeDataServiceImpl.java | 8 ++++++-- .../src/main/resources/bootstrap.yml | 7 ------- .../src/main/resources/bootstrap.yml | 2 -- .../src/main/resources/bootstrap.yml | 2 -- .../src/main/resources/bootstrap.yml | 2 -- .../src/main/resources/bootstrap.yml | 2 -- .../com/couplet/online/utils/MqttMonitor.java | 2 +- 12 files changed, 16 insertions(+), 42 deletions(-) diff --git a/couplet-auth/src/main/resources/bootstrap.yml b/couplet-auth/src/main/resources/bootstrap.yml index b89bb00..427f682 100644 --- a/couplet-auth/src/main/resources/bootstrap.yml +++ b/couplet-auth/src/main/resources/bootstrap.yml @@ -17,11 +17,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java index 22a5cd8..73c9c97 100644 --- a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java @@ -10,21 +10,12 @@ import org.springframework.stereotype.Component; */ public interface AnalyzeEventContents { - /** - * 故障 - */ + //故障 static final String BREAKDOWN = "breakdown"; - /** - * 电子围栏 - */ + //电子围栏 static final String ELECTRONIC_FENCE = "electronic_fence"; - /** - * 实时数据 - */ + //实时数据 static final String REAL_TIME_DATA = "real_time_data"; - - /** - * 存储 - */ + //存储 static final String STORED_EVENT = "stored_event"; } diff --git a/couplet-gateway/src/main/resources/bootstrap.yml b/couplet-gateway/src/main/resources/bootstrap.yml index acd4642..091e68a 100644 --- a/couplet-gateway/src/main/resources/bootstrap.yml +++ b/couplet-gateway/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 43a0618..1000f15 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -13,7 +13,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; */ @SpringBootApplication(scanBasePackages = "com.couplet") @EnableScheduling -@EnableFeignClients(basePackages = "com.couplet.**") +@EnableFeignClients(basePackages = "com.couplet") public class CoupletMsgApplication { public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 429becb..8e64ae3 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -36,7 +36,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "xiaoYao"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; @@ -50,16 +50,13 @@ public class ModelsKafkaMessage { private AnalyzeEventCache analyzeEventCache; - - - //kafka消费者初始化 @PostConstruct public void initKafkaConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "ddd"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "xiaoYao"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -79,7 +76,8 @@ public class ModelsKafkaMessage { // @Scheduled(fixedDelay = 50) @PostConstruct public void consumerMessages() { - executorService.execute(this::consumer); +// executorService.execute(this::consumer); + this.consumer(); } public void consumer() { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index 6b1c72e..a3821dd 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -42,8 +42,12 @@ public class RealTimeDataServiceImpl implements IncidentService { log.info("实时数据事件开始....."); RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); //判断是否有缓存数据 - if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { - log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); + if (redisService.hasKey("vin:query:" + cacheObject.getVin())){ + redisService.deleteObject("vin:query:" + cacheObject.getVin()); + } +// if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { + if (coupletMsgData.getVin().equals(cacheObject.getVin())){ +// log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); //判断数据是否一致, // if (RealTimeJudge.addRealTime(cacheObject)) { log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index b36b039..a2cdc85 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 # 配置文件格式 file-extension: yml # 共享配置 @@ -52,8 +50,3 @@ mybatis-plus: configuration: map-underscore-to-camel-case: true -# RabbitMQ配置 -mq: - queueName: queue - exchangeName: exchange - routingKey: routingKey diff --git a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml index ff5fb2c..d900876 100644 --- a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml @@ -16,11 +16,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-file/src/main/resources/bootstrap.yml b/couplet-modules/couplet-file/src/main/resources/bootstrap.yml index 8681657..fa968e9 100644 --- a/couplet-modules/couplet-file/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-file/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-gen/src/main/resources/bootstrap.yml b/couplet-modules/couplet-gen/src/main/resources/bootstrap.yml index fa5dcb3..e3d77cd 100644 --- a/couplet-modules/couplet-gen/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-gen/src/main/resources/bootstrap.yml @@ -17,11 +17,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml index 8220b33..2351484 100644 --- a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 6f4bfc9..feece3e 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "xiaoYao"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑