feat: 完善事件处理系统的故障报警功能

master
yaoxin 2024-06-23 22:37:12 +08:00
parent 2dd3a41d62
commit 52dbe5ea8f
7 changed files with 61 additions and 8 deletions

View File

@ -14,7 +14,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;

View File

@ -7,8 +7,8 @@ package com.muyu.eventdriven.constants;
* @Date 2024/6/21 11:39
*/
public class RabbitConstants {
public static final String QUEUE_STATUS_ABNORMAL = "queue_status_abnormal";
public static final String QUEUE_STATUS_NORMAL = "queue_status_normal";
public static final String QUEUE_STATUS_ABNORMAL = "zhiLian-vehicle-start";
public static final String QUEUE_STATUS_NORMAL = "zhiLian-vehicle-end";
public static final String EXCHANGE_STATUS = "exchange_status";
public static final String STATUS_ABNORMAL = "abnormal";
public static final String STATUS_NORMAL = "normal";

View File

@ -3,7 +3,9 @@ package com.muyu.eventdriven.consumer.redis;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.constants.RabbitConstants;
import com.muyu.eventdriven.constants.RedisConstants;
import com.muyu.eventdriven.domain.RabbitFalut;
import com.muyu.eventdriven.domain.VehicleFaultStatus;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
@ -12,6 +14,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
/**
* @ClassName RedisKeyExpirationListener
@ -20,6 +23,7 @@ import java.util.Date;
* @Date 2024/6/21 11:27
*/
@Component
@Log4j2
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@ -30,11 +34,21 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
//拿到过期key的信息并做处理
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
public void onMessage(Message messages, byte[] pattern) {
String key = messages.toString();
if (key.contains(RedisConstants.VEHICLE_FAULT_KEY)){
String[] split = key.split(":");
rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.STATUS_NORMAL, JSON.toJSONString(new VehicleFaultStatus(split[1],new Date().getTime(),split[2],"1")));
RabbitFalut rabbitFalut = new RabbitFalut();
rabbitFalut.setEndTime(new Date());
rabbitFalut.setVin(split[1]);
rabbitFalut.setFaultCode(split[2]);
rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.STATUS_NORMAL, JSON.toJSONString(rabbitFalut),message ->{
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
//设置消息延迟时间为5秒
message.getMessageProperties().setDelay(5000);
log.info ( "消息发送成功" );
return message;
});
}
}
}

View File

@ -0,0 +1,25 @@
package com.muyu.eventdriven.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Date;
/**
* @ClassName RabbitFalut
* @Description
* @Author Xin.Yao
* @Date 2024/6/22 1:12
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class RabbitFalut {
private String faultCode;
private String vin;
private Date startTime;
private Date endTime;
}

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.muyu.eventdriven.constants.FaultCodeConstants;
import com.muyu.eventdriven.constants.RabbitConstants;
import com.muyu.eventdriven.constants.RedisConstants;
import com.muyu.eventdriven.domain.RabbitFalut;
import com.muyu.eventdriven.domain.VehicleData;
import com.muyu.eventdriven.domain.VehicleFaultStatus;
import com.muyu.eventdriven.tactics.EventTactics;
@ -17,6 +18,7 @@ import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
@ -123,8 +125,19 @@ public class FaultAlarmEvent implements EventTactics {
public void hasLocalCache(VehicleData vehicleData,String faultCode){
Object o = caffeineCache.get(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode, key -> vehicleData.getDrivingRoute());
if (o.toString().equals(vehicleData.getDrivingRoute())){
rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.STATUS_ABNORMAL, JSON.toJSONString(new VehicleFaultStatus(vehicleData.getVin(),new Date().getTime(),faultCode,"0")));
RabbitFalut rabbitFalut = new RabbitFalut();
rabbitFalut.setStartTime(new Date());
rabbitFalut.setVin(vehicleData.getVin());
rabbitFalut.setFaultCode(faultCode);
rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.STATUS_ABNORMAL, JSON.toJSONString(rabbitFalut),message ->{
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
//设置消息延迟时间为5秒
message.getMessageProperties().setDelay(5000);
log.info ( "消息发送成功" );
return message;
});
}
redisTemplate.opsForValue().set(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode,vehicleData.getDrivingRoute(),10, TimeUnit.SECONDS);
}
}

View File

@ -24,5 +24,6 @@ public class IndexWarningEvent implements EventTactics {
@Override
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
log.info("车辆{}执行指标预警事件",vin);
}
}

View File

@ -7,7 +7,7 @@ spring:
password: guest
virtualHost: /
port: 5672
host: 47.99.219.99
host: 43.142.12.243
kafka:
#config/consumer.properties配置的bootstrap.servers
bootstrap-servers: 47.98.170.220:9092