feat:() 更改kafka模式

dev.processing.optimize
晨哀 2024-10-08 11:27:39 +08:00
parent 1e9c92e4d3
commit 72ac72330e
1 changed files with 34 additions and 25 deletions

View File

@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.common.core.utils.html.EscapeUtil;
import com.muyu.domain.Fence; import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule; import com.muyu.domain.WarnRule;
@ -22,6 +23,8 @@ import javax.annotation.Resource;
import java.time.Duration; import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* kafka * kafka
@ -35,6 +38,9 @@ import java.util.Map;
@Component @Component
public class KafkaConsumerService implements InitializingBean { public class KafkaConsumerService implements InitializingBean {
/**
* Topic
*/
private static final String TIPSY = "tipsy"; private static final String TIPSY = "tipsy";
@Resource @Resource
@ -46,6 +52,11 @@ public class KafkaConsumerService implements InitializingBean {
@Resource @Resource
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
/**
* 线
*/
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@ -59,34 +70,32 @@ public class KafkaConsumerService implements InitializingBean {
while (true) { while (true) {
// 轮询消费消息 // 轮询消费消息
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) { consumerRecords.forEach(record -> executorService.execute(() -> publish(record)));
try {
//从ConsumerRecord中获取消费数据
String originalMsg = (String) consumerRecord.value();
log.info("从Kafka中消费的原始数据: " + originalMsg);
//把消费数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(originalMsg);
// 获取VIN码
String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin);
// 获取本地缓存中的数据
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
log.info("map: {}",map);
Fence fence = (Fence) map.get("fence");
Object breakdown = map.get("breakdown");
Vehicle vehicle = (Vehicle) map.get("vehicle");
WarnRule warnRule = (WarnRule) map.get("warnRule");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
eventPublisher.eventPublish(jsonObject);
} catch (Exception e) {
// 捕获异常
log.info("这个有问题:{}",e.getMessage());
}
}
} }
}); });
thread.start(); thread.start();
} }
private void publish(ConsumerRecord consumerRecord) {
//从ConsumerRecord中获取消费数据
String originalMsg = (String) consumerRecord.value();
log.info("从Kafka中消费的原始数据: " + originalMsg);
//把消费数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(originalMsg);
// 获取VIN码
String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin);
// 获取本地缓存中的数据
// Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
// log.info("map: {}",map);
// Fence fence = (Fence) map.get("fence");
// Object breakdown = map.get("breakdown");
// Vehicle vehicle = (Vehicle) map.get("vehicle");
// WarnRule warnRule = (WarnRule) map.get("warnRule");
// WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
// VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
eventPublisher.eventPublish(jsonObject);
}
} }