feat:() 优化kafka消费者和上线监听 添加注解

dev.processing.optimize
晨哀 2024-10-07 20:46:24 +08:00
parent 7f45b28116
commit 013fe5809d
2 changed files with 23 additions and 13 deletions

View File

@ -51,10 +51,13 @@ public class KafkaConsumerService implements InitializingBean {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
log.info("启动线程监听Topic: {}", "zeshi"); log.info("启动线程监听Topic: {}", "zeshi");
// 延迟1秒
ThreadUtil.sleep(1000); ThreadUtil.sleep(1000);
// 订阅主题
Collection<String> topics = Lists.newArrayList("zeshi"); Collection<String> topics = Lists.newArrayList("zeshi");
kafkaConsumer.subscribe(topics); kafkaConsumer.subscribe(topics);
while (true) { while (true) {
// 轮询消费消息
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) { for (ConsumerRecord consumerRecord : consumerRecords) {
try { try {
@ -63,18 +66,21 @@ public class KafkaConsumerService implements InitializingBean {
log.info("从Kafka中消费的原始数据: " + originalMsg); log.info("从Kafka中消费的原始数据: " + originalMsg);
//把消费数据转换为JSON对象 //把消费数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(originalMsg); JSONObject jsonObject = JSON.parseObject(originalMsg);
// 获取VIN码
String vin = (String) jsonObject.get("vin"); String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin); log.info("vin码为: {}",vin);
// 获取本地缓存中的数据
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin); Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
log.info("map: {}",map); log.info("map: {}",map);
VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); Fence fence = (Fence) map.get("fence");
Object breakdown = map.get("breakdown");
Vehicle vehicle = (Vehicle) map.get("vehicle");
WarnRule warnRule = (WarnRule) map.get("warnRule"); WarnRule warnRule = (WarnRule) map.get("warnRule");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
Vehicle vehicle = (Vehicle) map.get("vehicle"); VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
Object breakdown = map.get("breakdown");
Fence fence = (Fence) map.get("fence");
// eventInterface.handle(jsonObject); // eventInterface.handle(jsonObject);
} catch (Exception e) { } catch (Exception e) {
// 捕获异常
log.info("这个有问题:{}",e.getMessage()); log.info("这个有问题:{}",e.getMessage());
} }
} }

View File

@ -58,25 +58,29 @@ public class OnLineMonitoringConsumer {
try { try {
log.info("添加本地缓存,车辆vin: {}", vin); log.info("添加本地缓存,车辆vin: {}", vin);
VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin); // 获取redis中的数据
Fence fence = fenceCahceService.get(vin);
Object breakdown = faultCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin);
WarnRule warnRule = warnRuleCacheService.get(vin); WarnRule warnRule = warnRuleCacheService.get(vin);
WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); WarnStrategy warnStrategy = warnStrategyCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin); VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin);
Object breakdown = faultCacheService.get(vin); // 封装从redis中获得的数据
Fence fence = fenceCahceService.get(vin);
HashMap<String, Object> map = new HashMap<>(); HashMap<String, Object> map = new HashMap<>();
map.put("vehicleManageResp",vehicleManageResp); map.put("fence",fence);
map.put("breakdown",breakdown);
map.put("vehicle",vehicle);
map.put("warnRule",warnRule); map.put("warnRule",warnRule);
map.put("warnStrategy",warnStrategy); map.put("warnStrategy",warnStrategy);
map.put("vehicle",vehicle); map.put("vehicleManageResp",vehicleManageResp);
map.put("breakdown",breakdown); // 添加到本地缓存中
map.put("fence",fence);
cacheUtil.put(vin,map); cacheUtil.put(vin,map);
log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map); log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
// 手动确认消息发送成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) { } catch (Exception e) {
try { try {
// 手动拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);