diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 0e34c4d..f26aeb7 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -51,10 +51,13 @@ public class KafkaConsumerService implements InitializingBean { Thread thread = new Thread(() -> { log.info("启动线程监听Topic: {}", "zeshi"); + // 延迟1秒 ThreadUtil.sleep(1000); + // 订阅主题 Collection topics = Lists.newArrayList("zeshi"); kafkaConsumer.subscribe(topics); while (true) { + // 轮询消费消息 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { try { @@ -63,18 +66,21 @@ public class KafkaConsumerService implements InitializingBean { log.info("从Kafka中消费的原始数据: " + originalMsg); //把消费数据转换为JSON对象 JSONObject jsonObject = JSON.parseObject(originalMsg); + // 获取VIN码 String vin = (String) jsonObject.get("vin"); log.info("vin码为: {}",vin); + // 获取本地缓存中的数据 Map map = (Map) cacheUtil.get(vin); log.info("map: {}",map); - VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); + Fence fence = (Fence) map.get("fence"); + Object breakdown = map.get("breakdown"); + Vehicle vehicle = (Vehicle) map.get("vehicle"); WarnRule warnRule = (WarnRule) map.get("warnRule"); WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); - Vehicle vehicle = (Vehicle) map.get("vehicle"); - Object breakdown = map.get("breakdown"); - Fence fence = (Fence) map.get("fence"); + VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); // eventInterface.handle(jsonObject); } catch (Exception e) { + // 捕获异常 log.info("这个有问题:{}",e.getMessage()); } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index 6f6a206..43230fe 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -58,25 +58,29 @@ public class OnLineMonitoringConsumer { try { log.info("添加本地缓存,车辆vin: {}", vin); - VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin); + // 获取redis中的数据 + Fence fence = fenceCahceService.get(vin); + Object breakdown = faultCacheService.get(vin); + Vehicle vehicle = vehicleCacheService.get(vin); WarnRule warnRule = warnRuleCacheService.get(vin); WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); - Vehicle vehicle = vehicleCacheService.get(vin); - Object breakdown = faultCacheService.get(vin); - Fence fence = fenceCahceService.get(vin); + VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin); + // 封装从redis中获得的数据 HashMap map = new HashMap<>(); - map.put("vehicleManageResp",vehicleManageResp); + map.put("fence",fence); + map.put("breakdown",breakdown); + map.put("vehicle",vehicle); map.put("warnRule",warnRule); map.put("warnStrategy",warnStrategy); - map.put("vehicle",vehicle); - map.put("breakdown",breakdown); - map.put("fence",fence); + map.put("vehicleManageResp",vehicleManageResp); + // 添加到本地缓存中 cacheUtil.put(vin,map); log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map); + // 手动确认消息发送成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); - } catch (Exception e) { try { + // 手动拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } catch (Exception ex) { throw new RuntimeException(ex);