From 72ac72330eda207d10710221fd180ca7a73e7f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Tue, 8 Oct 2024 11:27:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:()=20=E6=9B=B4=E6=94=B9kafka=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/KafkaConsumerService.java | 59 +++++++++++-------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index cd45c29..578db9d 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import com.muyu.common.core.utils.html.EscapeUtil; import com.muyu.domain.Fence; import com.muyu.domain.Vehicle; import com.muyu.domain.WarnRule; @@ -22,6 +23,8 @@ import javax.annotation.Resource; import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * kafka消费者 @@ -35,6 +38,9 @@ import java.util.Map; @Component public class KafkaConsumerService implements InitializingBean { + /** + * 订阅的Topic + */ private static final String TIPSY = "tipsy"; @Resource @@ -46,6 +52,11 @@ public class KafkaConsumerService implements InitializingBean { @Resource private EventPublisher eventPublisher; + /** + * 线程池 + */ + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + @Override public void afterPropertiesSet() throws Exception { @@ -59,34 +70,32 @@ public class KafkaConsumerService implements InitializingBean { while (true) { // 轮询消费消息 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord consumerRecord : consumerRecords) { - try { - //从ConsumerRecord中获取消费数据 - String originalMsg = (String) consumerRecord.value(); - log.info("从Kafka中消费的原始数据: " + originalMsg); - //把消费数据转换为JSON对象 - JSONObject jsonObject = JSON.parseObject(originalMsg); - // 获取VIN码 - String vin = (String) jsonObject.get("vin"); - log.info("vin码为: {}",vin); - // 获取本地缓存中的数据 - Map map = (Map) cacheUtil.get(vin); - log.info("map: {}",map); - Fence fence = (Fence) map.get("fence"); - Object breakdown = map.get("breakdown"); - Vehicle vehicle = (Vehicle) map.get("vehicle"); - WarnRule warnRule = (WarnRule) map.get("warnRule"); - WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); - VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); - eventPublisher.eventPublish(jsonObject); - } catch (Exception e) { - // 捕获异常 - log.info("这个有问题:{}",e.getMessage()); - } - } + consumerRecords.forEach(record -> executorService.execute(() -> publish(record))); } }); thread.start(); } + + private void publish(ConsumerRecord consumerRecord) { + //从ConsumerRecord中获取消费数据 + String originalMsg = (String) consumerRecord.value(); + log.info("从Kafka中消费的原始数据: " + originalMsg); + //把消费数据转换为JSON对象 + JSONObject jsonObject = JSON.parseObject(originalMsg); + // 获取VIN码 + String vin = (String) jsonObject.get("vin"); + log.info("vin码为: {}",vin); + // 获取本地缓存中的数据 +// Map map = (Map) cacheUtil.get(vin); +// log.info("map: {}",map); +// Fence fence = (Fence) map.get("fence"); +// Object breakdown = map.get("breakdown"); +// Vehicle vehicle = (Vehicle) map.get("vehicle"); +// WarnRule warnRule = (WarnRule) map.get("warnRule"); +// WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); +// VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); + eventPublisher.eventPublish(jsonObject); + } + }