From 81632a567910c6b95df3b273ac77d6021917d9c1 Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Thu, 11 Apr 2024 09:43:44 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=93=E6=9D=9F=E6=97=B6=E9=97=B4=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../analyze/msg/CoupletMsgApplication.java | 2 +- .../analyze/msg/model/ModelsKafkaMessage.java | 6 +- .../service/impl/BreakdownServiceImpl.java | 17 +- .../couplet/analyze/msg/utils/MsgUtils.java | 18 ++ .../src/test/java/com/couplet/msg/Main.java | 24 +-- .../com/couplet/mq/controller/KafkaTest.java | 186 +++++++++--------- .../com/couplet/online/utils/MqttMonitor.java | 2 +- 7 files changed, 140 insertions(+), 115 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 11b3509..4054b3a 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -20,7 +20,7 @@ public class CoupletMsgApplication { public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); System.out.println("解析系统启动成功"); - new ModelsKafkaMessage().initKafkaConsumer(); +// new ModelsKafkaMessage().initKafkaConsumer(); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 860a3c5..dfb3e93 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -38,7 +38,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "topic_lyh"; + private static final String TOPIC_NAME = "topic_dxd"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; @@ -58,7 +58,7 @@ public class ModelsKafkaMessage { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "fbab"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -76,7 +76,7 @@ public class ModelsKafkaMessage { * @return */ // @Scheduled(fixedDelay = 50) -// @PostConstruct + @PostConstruct public void consumerMessages() { executorService.execute(this::consumer); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java index f754efb..d406e55 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java @@ -80,13 +80,17 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl long expireTime = 30; redisService.expire(key, expireTime, TimeUnit.MINUTES); scheduledRedis(); +// long timeMillis = System.currentTimeMillis(); +// log.info("故障事件结束时间:"+timeMillis); +// log.info("故障事件检测结束....."); + }else { long timeMillis = System.currentTimeMillis(); - log.info("故障事件结束时间:"+timeMillis); - log.info("故障事件检测结束....."); + Date date = MsgUtils.convertMillisToDateTimeSering(timeMillis); + CoupletTroubleCode troubleCode = new CoupletTroubleCode(); + troubleCode.setTroubleEndTime(date); + log.info("故障事件结束时间:"+date); + log.info("故障事件结束....."); } - long timeMillis = System.currentTimeMillis(); - log.info("故障事件结束时间:"+timeMillis); - log.info("故障事件结束....."); } /** @@ -105,6 +109,7 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl for (CoupletMsgData member : members) { Set breakdownIds = redisService.getCacheSet(member.getVin()+":"+key); if (breakdownIds.size()==0){ + //异步执行任务的方法,它可以在后台执行一个无返回值的异步操作。该方法接受一个 Runnable 类型的参数,用于指定要执行的任务 CompletableFuture.runAsync(() -> { CoupletTroubleCode troubleCode = new CoupletTroubleCode(); troubleCode.setTroubleStartTime(new Date()); @@ -251,7 +256,7 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl break; } remoteTroubleService.newFaultData(troubleCode); - }); + }); redisService.setCacheSet(member.getVin()+":"+key, member.getVin()+":"+member); long expireTime = 30; redisService.expire(member.getVin()+":"+key, expireTime, TimeUnit.MINUTES); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/utils/MsgUtils.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/utils/MsgUtils.java index 1e2800a..de0b704 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/utils/MsgUtils.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/utils/MsgUtils.java @@ -5,6 +5,10 @@ import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -19,6 +23,20 @@ import java.util.Random; @Slf4j public class MsgUtils { + /** + * 转换时间戳 + * @return + */ + public static Date convertMillisToDateTimeSering(Long millis) { + Instant instant = Instant.ofEpochMilli(millis); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } + + /*** + * 随机生成故障码 + * @return + */ public static String generateGTA() { // 生成四位以"GTA"开头的字符串 String prefix = "GTA"; diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/Main.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/Main.java index 143796c..25daca8 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/Main.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/test/java/com/couplet/msg/Main.java @@ -1,5 +1,10 @@ package com.couplet.msg; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Date; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -12,17 +17,14 @@ import java.util.regex.Pattern; */ public class Main { public static void main(String[] args) { - // 创建一个字符串数组存储三个状态 - String[] statuses = {"电池故障", "车体故障", "车尾故障","抽轮故障"}; + long timeMillis = System.currentTimeMillis(); + Date dateTime = convertMillisToDate(timeMillis); + System.out.println("Date Time: " + dateTime); + } - // 生成随机数对象 - Random rand = new Random(); - - // 生成一个范围在 0 到 2 之间的随机整数 - int randomIndex = rand.nextInt(4); - - // 随机选择一个字符串并输出 - String randomStatus = statuses[randomIndex]; - System.out.println("随机输出的字符串:" + randomStatus); + private static Date convertMillisToDate(long millis) { + Instant instant = Instant.ofEpochMilli(millis); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); } } diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index 5ff0ec6..86b4c0c 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -1,93 +1,93 @@ -//package com.couplet.mq.controller; -// -//import lombok.extern.slf4j.Slf4j; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.apache.kafka.clients.consumer.KafkaConsumer; -//import org.apache.kafka.clients.producer.KafkaProducer; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.clients.producer.ProducerRecord; -// -//import javax.annotation.PostConstruct; -//import java.time.Duration; -//import java.util.Collections; -//import java.util.Properties; -// -///** -// * @ProjectName: five-groups-couplet -// * @Author: LiuYunHu -// * @CreateTime: 2024/4/4 -// * @Description: kafka测试类 -// */ -// -//@Slf4j -//public class KafkaTest { -// private static final String TOPIC_NAME = "online"; -// private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; -// -// public static void main(String[] args) { -// //生产者示例 -//// produceMessage(); -// -// //消费者示例 -//// consumerMessages(); -// -// } -// -// //生产者 -// @PostConstruct -// private static void produceMessage() { -// Properties props = new Properties(); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); -// -// KafkaProducer producer = new KafkaProducer<>(props); -// //创建生产者 -// try { -// -// //发送消息 -// for (int i = 0; i < 10000; i++) { -// String message = "佳佳来喽" + (i + 1); -// producer.send(new ProducerRecord<>(TOPIC_NAME, message)); -// -// System.out.println("发送消息:" + message); -// } -// } catch (Exception e) { -// e.printStackTrace(); -// } finally { -// producer.close(); -// } -// } -// -// //消费者 -//// private static void consumerMessages() { -//// Properties props = new Properties(); -//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); -//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); -//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -//// -//// //创建消费者 -//// KafkaConsumer consumer = new KafkaConsumer<>(props); -//// -//// try { -//// -//// //订阅主题 -//// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); -//// -//// //持续消费消息 -//// while (true) { -//// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); -//// records.forEach(record -> { -//// System.out.println("消费者接受到的消息值:" + record.value()); -//// }); -//// } -//// } catch (Exception e) { -//// e.printStackTrace(); -//// } finally { -//// consumer.close(); -//// } -//// } -// -//} +package com.couplet.mq.controller; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * @ProjectName: five-groups-couplet + * @Author: LiuYunHu + * @CreateTime: 2024/4/4 + * @Description: kafka测试类 + */ + +@Slf4j +public class KafkaTest { + private static final String TOPIC_NAME = "topic_dxd"; + private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; + + public static void main(String[] args) { + //生产者示例 +// produceMessage(); + + //消费者示例 + consumerMessages(); + + } + + //生产者 + @PostConstruct + private static void produceMessage() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaProducer producer = new KafkaProducer<>(props); + //创建生产者 + try { + + //发送消息 + for (int i = 0; i < 10000; i++) { + String message = "佳佳来喽" + (i + 1); + producer.send(new ProducerRecord<>(TOPIC_NAME, message)); + + System.out.println("发送消息:" + message); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + producer.close(); + } + } + + //消费者 + private static void consumerMessages() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + //创建消费者 + KafkaConsumer consumer = new KafkaConsumer<>(props); + + try { + + //订阅主题 + consumer.subscribe(Collections.singletonList(TOPIC_NAME)); + + //持续消费消息 + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + records.forEach(record -> { + System.out.println("消费者接受到的消息值:" + record.value()); + }); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + consumer.close(); + } + } + +} diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index f908473..99f99e2 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "topic_lhy"; + private static final String TOPIC_NAME = "topic_dxd"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑