diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java index 22a5cd8..73c9c97 100644 --- a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java @@ -10,21 +10,12 @@ import org.springframework.stereotype.Component; */ public interface AnalyzeEventContents { - /** - * 故障 - */ + //故障 static final String BREAKDOWN = "breakdown"; - /** - * 电子围栏 - */ + //电子围栏 static final String ELECTRONIC_FENCE = "electronic_fence"; - /** - * 实时数据 - */ + //实时数据 static final String REAL_TIME_DATA = "real_time_data"; - - /** - * 存储 - */ + //存储 static final String STORED_EVENT = "stored_event"; } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 43a0618..1000f15 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -13,7 +13,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; */ @SpringBootApplication(scanBasePackages = "com.couplet") @EnableScheduling -@EnableFeignClients(basePackages = "com.couplet.**") +@EnableFeignClients(basePackages = "com.couplet") public class CoupletMsgApplication { public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 4929831..469981e 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -50,9 +50,6 @@ public class ModelsKafkaMessage { private AnalyzeEventCache analyzeEventCache; - - - //kafka消费者初始化 @PostConstruct public void initKafkaConsumer() { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index 6b1c72e..a3821dd 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -42,8 +42,12 @@ public class RealTimeDataServiceImpl implements IncidentService { log.info("实时数据事件开始....."); RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); //判断是否有缓存数据 - if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { - log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); + if (redisService.hasKey("vin:query:" + cacheObject.getVin())){ + redisService.deleteObject("vin:query:" + cacheObject.getVin()); + } +// if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { + if (coupletMsgData.getVin().equals(cacheObject.getVin())){ +// log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); //判断数据是否一致, // if (RealTimeJudge.addRealTime(cacheObject)) { log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index ac437ce..2143241 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -52,8 +52,3 @@ mybatis-plus: configuration: map-underscore-to-camel-case: true -# RabbitMQ配置 -#mq: -# queueName: queue -# exchangeName: exchange -# routingKey: routingKey diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index a0f255a..5ff0ec6 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -1,93 +1,93 @@ -package com.couplet.mq.controller; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.PostConstruct; -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka测试类 - */ - -@Slf4j -public class KafkaTest { - private static final String TOPIC_NAME = "online"; - private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; - - public static void main(String[] args) { - //生产者示例 - produceMessage(); - - //消费者示例 - consumerMessages(); - - } - - //生产者 - @PostConstruct - private static void produceMessage() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - KafkaProducer producer = new KafkaProducer<>(props); - //创建生产者 - try { - - //发送消息 - for (int i = 0; i < 10000; i++) { - String message = "佳佳来喽" + (i + 1); - producer.send(new ProducerRecord<>(TOPIC_NAME, message)); - - System.out.println("发送消息:" + message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - producer.close(); - } - } - - //消费者 - private static void consumerMessages() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - - //创建消费者 - KafkaConsumer consumer = new KafkaConsumer<>(props); - - try { - - //订阅主题 - consumer.subscribe(Collections.singletonList(TOPIC_NAME)); - - //持续消费消息 - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - System.out.println("消费者接受到的消息值:" + record.value()); - }); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - consumer.close(); - } - } - -} +//package com.couplet.mq.controller; +// +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.clients.producer.ProducerRecord; +// +//import javax.annotation.PostConstruct; +//import java.time.Duration; +//import java.util.Collections; +//import java.util.Properties; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/4/4 +// * @Description: kafka测试类 +// */ +// +//@Slf4j +//public class KafkaTest { +// private static final String TOPIC_NAME = "online"; +// private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; +// +// public static void main(String[] args) { +// //生产者示例 +//// produceMessage(); +// +// //消费者示例 +//// consumerMessages(); +// +// } +// +// //生产者 +// @PostConstruct +// private static void produceMessage() { +// Properties props = new Properties(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// +// KafkaProducer producer = new KafkaProducer<>(props); +// //创建生产者 +// try { +// +// //发送消息 +// for (int i = 0; i < 10000; i++) { +// String message = "佳佳来喽" + (i + 1); +// producer.send(new ProducerRecord<>(TOPIC_NAME, message)); +// +// System.out.println("发送消息:" + message); +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } finally { +// producer.close(); +// } +// } +// +// //消费者 +//// private static void consumerMessages() { +//// Properties props = new Properties(); +//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// +//// //创建消费者 +//// KafkaConsumer consumer = new KafkaConsumer<>(props); +//// +//// try { +//// +//// //订阅主题 +//// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +//// +//// //持续消费消息 +//// while (true) { +//// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +//// records.forEach(record -> { +//// System.out.println("消费者接受到的消息值:" + record.value()); +//// }); +//// } +//// } catch (Exception e) { +//// e.printStackTrace(); +//// } finally { +//// consumer.close(); +//// } +//// } +// +//}