From d3f24445213dbfb1af341e6a2ab5ddc0202afd9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Mon, 7 Oct 2024 20:00:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:()=20=E4=BC=98=E5=8C=96kafka=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../VehicleProcessingApplication.java | 20 +++++++++++ .../consumer/KafkaConsumerService.java | 33 ++++++++++--------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/VehicleProcessingApplication.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/VehicleProcessingApplication.java index 3b32ddb..a59b123 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/VehicleProcessingApplication.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/VehicleProcessingApplication.java @@ -20,6 +20,26 @@ public class VehicleProcessingApplication { public static void main(String[] args) { SpringApplication.run(VehicleProcessingApplication.class, args); + System.out.println(" _ooOoo_\n" + + " o8888888o\n" + + " 88\" . \"88\n" + + " (| -_- |)\n" + + " O\\ = /O\n" + + " ____/`---'\\____\n" + + " .' \\\\| |// `.\n" + + " / \\\\||| : |||// \\\n" + + " / _||||| -:- |||||- \\\n" + + " | | \\\\\\ - /// | |\n" + + " | \\_| ''\\---/'' | |\n" + + " \\ .-\\__ `-` ___/-. /\n" + + " ___`. .' /--.--\\ `. . __\n" + + " .\"\" '< `.___\\_<|>_/___.' >'\"\".\n" + + " | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n" + + " \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n" + + " ======`-.____`-.___\\_____/___.-`____.-'======\n" + + " `=---='\n" + + " ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" + + " // 佛祖保佑 永不宕机 永无BUG //"); } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 1c2d7fd..9a4ed6f 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -56,23 +56,24 @@ public class KafkaConsumerService implements InitializingBean { while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { - //从ConsumerRecord中获取消费数据 - String originalMsg = (String) consumerRecord.value(); - log.info("从Kafka中消费的原始数据: " + originalMsg); - //把消费数据转换为JSON对象 - JSONObject jsonObject = JSON.parseObject(originalMsg); - log.info("消费数据转换为JSON对象: " + jsonObject); - log.info("消费数据转换为JSON对象: " + jsonObject.toString()); - - String value = jsonObject.toString(); - String vin = value.substring(0, 11); - Map map = (Map) cacheUtil.get(vin); - WarnRule warnRule = (WarnRule) map.get("warnRule"); - WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); - Vehicle vehicle = (Vehicle) map.get("vehicle"); - Object breakdown = map.get("breakdown"); - Fence fence = (Fence) map.get("fence"); + try { + //从ConsumerRecord中获取消费数据 + String originalMsg = (String) consumerRecord.value(); + log.info("从Kafka中消费的原始数据: " + originalMsg); + //把消费数据转换为JSON对象 + JSONObject jsonObject = JSON.parseObject(originalMsg); + String vin = (String) jsonObject.get("vin"); + log.info("vin码为: {}",vin); + Map map = (Map) cacheUtil.get(vin); + WarnRule warnRule = (WarnRule) map.get("warnRule"); + WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); + Vehicle vehicle = (Vehicle) map.get("vehicle"); + Object breakdown = map.get("breakdown"); + Fence fence = (Fence) map.get("fence"); // eventInterface.handle(jsonObject); + } catch (Exception e) { + log.info("这个有问题:{}",e.getMessage()); + } } } });