feat():完善故障报警,mq监听

master
Jiang Peng 2024-06-21 22:37:40 +08:00
parent 9438df576a
commit fabd5e0d5e
19 changed files with 455 additions and 18 deletions

View File

@ -2,7 +2,7 @@ package com.muyu.cloud.many.datasource.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.muyu.cloud.many.datasource.domain.Vehicle; import com.muyu.cloud.many.datasource.domain.Vehicle;
import com.muyu.cloud.many.datasource.mapper.VehicleMapper; import com.muyu.cloud.many.datasource.mapper.Vehicle1Mapper;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
@ -20,10 +20,10 @@ import java.util.List;
@RequestMapping("/vehicle") @RequestMapping("/vehicle")
public class VehicleController { public class VehicleController {
@Autowired @Autowired
private VehicleMapper vehicleMapper; private Vehicle1Mapper vehicle1Mapper;
@GetMapping("/list/all") @GetMapping("/list/all")
public Result<List<Vehicle>> findAll () { public Result<List<Vehicle>> findAll () {
return Result.success(vehicleMapper.selectList(new QueryWrapper<>())); return Result.success(vehicle1Mapper.selectList(new QueryWrapper<>()));
} }
} }

View File

@ -8,6 +8,6 @@ import com.muyu.cloud.many.datasource.domain.Vehicle;
* *
* Date 2024/6/4 14:07 * Date 2024/6/4 14:07
*/ */
public interface VehicleMapper extends BaseMapper<Vehicle> { public interface Vehicle1Mapper extends BaseMapper<Vehicle> {
} }

View File

@ -0,0 +1,22 @@
package com.muyu.customer.business.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* BingRui.Hou
*
* @Description
* @ClassName FaultRecordVo
* @Date 2024/06/21 11:44
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FaultRecordVo {
private String faultCode;
private String vin;
private String count;
}

View File

@ -24,6 +24,11 @@
<artifactId>muyu-customer-business-common</artifactId> <artifactId>muyu-customer-business-common</artifactId>
<version>3.6.3</version> <version>3.6.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-many-datasource</artifactId>
<version>3.6.3</version>
</dependency>
<!-- SpringCloud Alibaba Nacos --> <!-- SpringCloud Alibaba Nacos -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>
@ -36,6 +41,11 @@
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel --> <!-- SpringCloud Alibaba Sentinel -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>
@ -84,6 +94,11 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId> <artifactId>muyu-common-swagger</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies> </dependencies>

View File

@ -109,16 +109,4 @@ public class FaultCodeController extends BaseController {
return toAjax(faultCodeService.removeBatchByIds(ids)); return toAjax(faultCodeService.removeBatchByIds(ids));
} }
/**
*
*/
@RequiresPermissions("customerBusiness:faultCode:delete")
@Log(title = "车辆故障码",businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
@ApiOperation("删除车辆故障码")
@ApiImplicitParam(name = "id",value = "id",required = true,dataType = "Long",paramType = "path",dataTypeClass = String.class,example = "1,2,3,4")
public Result<String> delete(@PathVariable List<Long> ids){
return toAjax(faultCodeService.removeBatchByIds(ids));
}
} }

View File

@ -8,6 +8,7 @@ import com.muyu.common.log.annotation.Log;
import com.muyu.common.log.enums.BusinessType; import com.muyu.common.log.enums.BusinessType;
import com.muyu.common.security.annotation.RequiresPermissions; import com.muyu.common.security.annotation.RequiresPermissions;
import com.muyu.customer.business.domain.FaultRecord; import com.muyu.customer.business.domain.FaultRecord;
import com.muyu.customer.business.domain.FaultRecordVo;
import com.muyu.customer.business.domain.req.FaultRecordEditReq; import com.muyu.customer.business.domain.req.FaultRecordEditReq;
import com.muyu.customer.business.domain.req.FaultRecordQueryReq; import com.muyu.customer.business.domain.req.FaultRecordQueryReq;
import com.muyu.customer.business.domain.req.FaultRecordSaveReq; import com.muyu.customer.business.domain.req.FaultRecordSaveReq;
@ -105,5 +106,11 @@ public class FaultRecordController extends BaseController {
return toAjax(faultRecordService.removeBatchByIds(ids)); return toAjax(faultRecordService.removeBatchByIds(ids));
} }
@Log(title = "柱状图",businessType = BusinessType.DELETE)
@GetMapping("/countList")
@ApiOperation("柱状图展示")
public Result<List<FaultRecordVo>> countList( ) {
return Result.success(faultRecordService.countList());
}
} }

View File

@ -1,6 +1,5 @@
package com.muyu.customer.business.mapper; package com.muyu.customer.business.mapper;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.customer.business.domain.FaultCode; import com.muyu.customer.business.domain.FaultCode;

View File

@ -2,6 +2,9 @@ package com.muyu.customer.business.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.customer.business.domain.FaultRecord; import com.muyu.customer.business.domain.FaultRecord;
import com.muyu.customer.business.domain.FaultRecordVo;
import java.util.List;
/** /**
* Mapper * Mapper
@ -10,4 +13,11 @@ import com.muyu.customer.business.domain.FaultRecord;
* @date 2024-06-20 * @date 2024-06-20
*/ */
public interface FaultRecordMapper extends BaseMapper<FaultRecord> { public interface FaultRecordMapper extends BaseMapper<FaultRecord> {
void updateByFaultRecord(FaultRecord build);
FaultRecord getByFaultRecord(FaultRecord faultRecord);
List<FaultRecordVo> countList();
} }

View File

@ -1,6 +1,5 @@
package com.muyu.customer.business.mapper; package com.muyu.customer.business.mapper;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.customer.business.domain.Vehicle; import com.muyu.customer.business.domain.Vehicle;

View File

@ -0,0 +1,185 @@
package com.muyu.customer.business.produce;
import com.alibaba.fastjson.JSON;
import com.muyu.cloud.many.datasource.config.holder.DynamicDataSourceHolder;
import com.muyu.customer.business.domain.FaultRecord;
import com.muyu.customer.business.service.FaultRecordService;
import com.rabbitmq.client.AMQP;
import lombok.extern.log4j.Log4j2;
//import org.apache.logging.log4j.message.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import java.io.IOException;
//import java.nio.channels.Channel;
@Component
@Log4j2
public class VehicleProducer {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private FaultRecordService faultRecordService;
//调用注解 添加队列名称
@RabbitListener(queuesToDeclare = {@Queue(name = "queue_astatus_abnormal")})
public void smsConfigStart(String msg, Message message, Channel channel){
//获取消息的ID
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add("messageId", messageId);
if (count==1) {
log.info("开始消费!{}", msg);
FaultRecord faultRecord = JSON.parseObject(msg, FaultRecord.class);
//判断车辆属于哪个企业
String s = redisTemplate.opsForValue().get(faultRecord.getVin()+"1");
//选择数据源,切换数据源,
DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+s);
// 进行添加故障表
FaultRecord build = FaultRecord
.builder()
.faultCode(faultRecord.getFaultCode())
.vin(faultRecord.getVin())
.startTime(faultRecord.getStartTime()).build();
faultRecordService.save(build);
// 移除数据源,
DynamicDataSourceHolder.removeDynamicDataSourceKey();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消费成功!数据源为:{}",message);
}
} catch (IOException e) {
log.info("消费失败,{}",e.getMessage());
try {
//回退消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("回退成功");
}catch (IOException ex){
log.info("回退失败:{}",ex.getMessage());
}
throw new RuntimeException(e);
}
}
@RabbitListener(queuesToDeclare = {@Queue(name = "")})
public void smsConfigEnt(String msg, Message message, Channel channel){
//获取消息的ID
String messageId = message.getMessageProperties().getMessageId();
try {
//添加消息id到redis set集合中 添加成功返回1 表示未消费 添加失败返回0 表示已消费
Long count = redisTemplate.opsForSet().add("messageId", messageId);
//添加成功 正常消费信息
if (count == 1) {
log.info("开始消费:{}",msg);
FaultRecord faultRecord = JSON.parseObject(msg, FaultRecord.class);
//判断车辆属于哪个企业
String s = redisTemplate.opsForValue().get(faultRecord.getVin()+"1");
//选择数据源,切换数据源,
DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+s);
// 进行查询故障表
FaultRecord faultRecordOne = faultRecordService.getByFaultRecord(faultRecord);
log.info("查询到的故障为:{}",faultRecordOne);
faultRecordOne.setEndTime(faultRecord.getEndTime());
// 进行修改故障表
faultRecordService.updateByFaultRecord(faultRecordOne);
// 移除数据源,
DynamicDataSourceHolder.removeDynamicDataSourceKey();
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费成功");
}
} catch (Exception e) {
//删除队列ID
log.info("消费失败,{}",e.getMessage());
try {
//回退消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("回退消息");
} catch (IOException ex) {
//回退失败
log.info("回退失败");
}
}
}
}

View File

@ -0,0 +1,48 @@
package com.muyu.customer.business.rabbit;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
* @PostConstruct bean
* @PreDestory bean
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息投递到 broker 的状态true表示成功
System.out.println("消息发送成功!");
} else {
// 发送异常
System.out.println("发送异常原因 = " + cause);
// TODO 可以将消息 内容 以及 失败的原因 记录到 日志表中
}
}
}

View File

@ -0,0 +1,54 @@
package com.muyu.customer.business.rabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
*
* @return
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.customer.business.rabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,34 @@
package com.muyu.customer.business.rabbit;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* returnedMessage
*/
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法也可以理解为在spring容器初始化的时候执
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() + "被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// 回退了所有的信息,可做补偿机制 记录到 数据库
}
}

View File

@ -2,6 +2,7 @@ package com.muyu.customer.business.service;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import com.muyu.customer.business.domain.FaultRecord; import com.muyu.customer.business.domain.FaultRecord;
import com.muyu.customer.business.domain.FaultRecordVo;
import java.util.List; import java.util.List;
@ -21,4 +22,10 @@ public interface FaultRecordService extends IService<FaultRecord> {
*/ */
public List<FaultRecord> list(FaultRecord faultRecord); public List<FaultRecord> list(FaultRecord faultRecord);
void updateByFaultRecord(FaultRecord build);
FaultRecord getByFaultRecord(FaultRecord faultRecord);
List<FaultRecordVo> countList();
} }

View File

@ -4,10 +4,12 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.utils.ObjUtils; import com.muyu.common.core.utils.ObjUtils;
import com.muyu.customer.business.domain.FaultRecord; import com.muyu.customer.business.domain.FaultRecord;
import com.muyu.customer.business.domain.FaultRecordVo;
import com.muyu.customer.business.mapper.FaultRecordMapper; import com.muyu.customer.business.mapper.FaultRecordMapper;
import com.muyu.customer.business.service.FaultRecordService; import com.muyu.customer.business.service.FaultRecordService;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
@ -15,6 +17,10 @@ import java.util.List;
@Slf4j @Slf4j
@Service @Service
public class FaultRecordServiceImpl extends ServiceImpl<FaultRecordMapper, FaultRecord> implements FaultRecordService { public class FaultRecordServiceImpl extends ServiceImpl<FaultRecordMapper, FaultRecord> implements FaultRecordService {
@Autowired
private FaultRecordMapper faultRecordMapper;
/** /**
* *
* *
@ -40,4 +46,20 @@ public class FaultRecordServiceImpl extends ServiceImpl<FaultRecordMapper, Fault
return list(queryWrapper); return list(queryWrapper);
} }
@Override
public void updateByFaultRecord(FaultRecord build) {
faultRecordMapper.updateByFaultRecord(build);
}
@Override
public FaultRecord getByFaultRecord(FaultRecord faultRecord) {
return faultRecordMapper.getByFaultRecord(faultRecord);
}
@Override
public List<FaultRecordVo> countList() {
return faultRecordMapper.countList();
}
} }

View File

@ -4,6 +4,7 @@ import java.util.List;
import com.muyu.common.core.utils.ObjUtils; import com.muyu.common.core.utils.ObjUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.muyu.customer.business.mapper.VehicleMapper; import com.muyu.customer.business.mapper.VehicleMapper;
import com.muyu.customer.business.domain.Vehicle; import com.muyu.customer.business.domain.Vehicle;

View File

@ -4,6 +4,19 @@ server:
# Spring # Spring
spring: spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
port: 5672
host: 101.34.248.9
listener:
simple:
prefetch: 1
publisher-confirm-type: correlated
publisher-returns: true
main:
allow-circular-references: true
application: application:
# 应用名称 # 应用名称
name: muyu-customer-business name: muyu-customer-business

View File

@ -17,4 +17,21 @@
<sql id="selectFaultRecordVo"> <sql id="selectFaultRecordVo">
select id, fault_code, vin, start_time, end_time, fault_level, fault_handle from fault_record select id, fault_code, vin, start_time, end_time, fault_level, fault_handle from fault_record
</sql> </sql>
<update id="updateByFaultRecord">
update fault_record set end_time=#{endTime} where id=#{id}
</update>
<select id="getByFaultRecord" resultType="com.muyu.customer.business.domain.FaultRecord">
SELECT id,fault_code,vin,start_time,end_time,fault_level,fault_handle
FROM fault_record
WHERE fault_code =#{faultCode} AND vin = #{vin} AND end_time is NULL
</select>
<select id="countList" resultType="com.muyu.customer.business.domain.FaultRecordVo">
SELECT vin,fault_code,COUNT(*) as count
FROM fault_record
GROUP BY fault_code
</select>
</mapper> </mapper>