feat:() 优化kafka消费者和上线监听

dev.processing.optimize
晨哀 2024-10-07 20:37:45 +08:00
parent d3f2444521
commit 7f45b28116
3 changed files with 12 additions and 2 deletions

View File

@ -10,6 +10,7 @@ import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy;
import com.muyu.domain.resp.VehicleManageResp;
import com.muyu.processing.interfaces.EventInterface;
import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2;
@ -65,6 +66,8 @@ public class KafkaConsumerService implements InitializingBean {
String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin);
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
log.info("map: {}",map);
VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
WarnRule warnRule = (WarnRule) map.get("warnRule");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
Vehicle vehicle = (Vehicle) map.get("vehicle");

View File

@ -4,6 +4,7 @@ import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy;
import com.muyu.domain.resp.VehicleManageResp;
import com.muyu.enterprise.cache.*;
import com.muyu.processing.utils.CacheUtil;
import com.rabbitmq.client.Channel;
@ -34,6 +35,9 @@ public class OnLineMonitoringConsumer {
@Resource
private VehicleCacheService vehicleCacheService;
@Resource
private AllVehicleCacheService allVehicleCacheService;
@Resource
private FaultCacheService faultCacheService;
@ -54,18 +58,21 @@ public class OnLineMonitoringConsumer {
try {
log.info("添加本地缓存,车辆vin: {}", vin);
VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin);
WarnRule warnRule = warnRuleCacheService.get(vin);
WarnStrategy warnStrategy = warnStrategyCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin);
Object breakdown = faultCacheService.get(vin);
Fence fence = fenceCahceService.get(vin);
HashMap<String, Object> map = new HashMap<>();
map.put("vehicleManageResp",vehicleManageResp);
map.put("warnRule",warnRule);
map.put("warnStrategy",warnStrategy);
map.put("vehicle",vehicle);
map.put("breakdown",breakdown);
map.put("fence",fence);
cacheUtil.put(vin,map);
log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {

View File

@ -42,7 +42,7 @@ public class TestKafka {
@GetMapping("/send")
public String sendMsg(){
JSONObject entries = new JSONObject();
entries.set("vin","vin123468");
entries.set("vin","1123wsdfr54323wsd");
entries.set("name","宝马");
String entriesString = entries.toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", entriesString);
@ -56,7 +56,7 @@ public class TestKafka {
*/
@GetMapping("/sendMQ")
public String sendMQ(){
rabbitTemplate.convertAndSend("long_time_no_see","晨哀,好久不见",message -> {
rabbitTemplate.convertAndSend("long_time_no_see","1123wsdfr54323wsd",message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});