test:(编写消费者消费基础事件的的数据)

dev
zhang xu 2024-06-21 22:29:09 +08:00
parent e381220a72
commit dc0aa66dcf
5 changed files with 151 additions and 2 deletions

View File

@ -3,6 +3,7 @@ package com.muyu.networking.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.muyu.domain.FaultCode;
import com.muyu.domain.FaultRecord;
import java.util.List;
@ -21,4 +22,6 @@ public interface FaultCodeService extends IService<FaultCode> {
*/
public List<FaultCode> list(FaultCode faultCode);
}

View File

@ -25,4 +25,7 @@ public interface FaultRecordService extends IService<FaultRecord>
public List<FaultRecord> list(FaultRecord faultRecord);
FaultRecord getByFaultRecord(FaultRecord faultRecord);
void updateByFaultRecord(FaultRecord byFaultRecord);
}

View File

@ -7,6 +7,8 @@ import com.muyu.networking.mapper.FaultRecordMapper;
import com.muyu.networking.service.FaultRecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateConfigurer;
import org.springframework.stereotype.Service;
import java.util.List;
@ -21,6 +23,11 @@ import java.util.List;
@Slf4j
public class FaultRecordServiceImpl extends ServiceImpl<FaultRecordMapper, FaultRecord> implements FaultRecordService {
/**
*
*
@ -34,4 +41,7 @@ public class FaultRecordServiceImpl extends ServiceImpl<FaultRecordMapper, Fault
}
}

View File

@ -1,9 +1,15 @@
# Tomcat
server:
port: 9204
# Spring
spring:
# 配置RabbitMQ的基本信息 ip 端口 username password..
rabbitmq:
host: 47.93.162.81 # ip
port: 5672
username: guest
password: guest
# virtual-host: /baiqi
application:
# 应用名称
name: muyu-networking
@ -25,7 +31,6 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
logging:
level:
com.muyu.networking.mapper: DEBUG

View File

@ -0,0 +1,128 @@
package com.muyu.vehicle.rabbit;
import com.alibaba.fastjson2.JSON;
import com.muyu.domain.FaultRecord;
import com.muyu.networking.service.FaultRecordService;
import com.muyu.vehicle.myDatasource.holder.DynamicDataSourceHolder;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* @ClassDescription:
* @JdkVersion: 17
* @Author: zhangxu
* @Created: 2024/6/21 9:41
*/
@Component
@Log4j2
public class Consumer {
private static final String QUEUE_NAME_EVENT_START = "event_exception_start";
private static final String QUEUE_NAME_EVENT_ENT = "event_exception_ent";
@Autowired
private FaultRecordService recordService;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@RabbitListener( queuesToDeclare= {@Queue(name = QUEUE_NAME_EVENT_START)})
public void receive(String msg, Channel channel, Message message) {
//获取消息的id
String messageId = message.getMessageProperties().getMessageId();
log.info("接收到消息是:{}", message);
Long messageId1 = redisTemplate.opsForSet().add("messageId", messageId);
try {
if (messageId1 == 1) {
//如果消息id不存在则进行消费
log.info("消息id不存在进行消费");
//进行消费
FaultRecord faultCode = JSON.parseObject(msg, FaultRecord.class);
//判断车辆数据那个企业
String s = redisTemplate.opsForValue().get(faultCode.getVin() + "1");
//...选择数据源
DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+s);
//进行添加故障
FaultRecord build = FaultRecord.builder()
.faultCode(faultCode.getFaultCode())
.startTime(new Date())
.vin(faultCode.getVin())
.build();
recordService.save(build);
//移除数据源
DynamicDataSourceHolder.removeDynamicDataSourceKey();
//确认消息消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.error("消费成功", message);
}
} catch (IOException e) {
log.error("消费失败{}", e.getMessage());
try {
//回退消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.error("消费回退{}乘公共");
} catch (IOException ex) {
log.info("回退失败:{}",ex.getMessage());
}
throw new RuntimeException(e);
}
}
@RabbitListener(queuesToDeclare = {@Queue(name = QUEUE_NAME_EVENT_ENT)})
public void receive2(String msg, Channel channel, Message message) {
//获取消息是
String messageId = message.getMessageProperties().getMessageId();
try {
//将消息添加redis里面
Long messageId1 = redisTemplate.opsForSet().add("messageIdEnt", messageId);
//添加乘公共
if (messageId1 == 1){
log.info("消息id不存在进行消费");
FaultRecord faultRecord = JSON.parseObject(msg, FaultRecord.class);
//还是判断车辆数据那个企业
String s = redisTemplate.opsForValue().get(faultRecord.getVin() + "1");
DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+s);
//进行查询修改故障表
FaultRecord byFaultRecord = recordService.getByFaultRecord(faultRecord);
log.info("进行查询的故障为:{}",byFaultRecord);
byFaultRecord.setEndTime(faultRecord.getEndTime());
//修改故障表
recordService.updateByFaultRecord(byFaultRecord);
//移除数据源
DynamicDataSourceHolder.removeDynamicDataSourceKey();
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消费成功");
}
}catch (Exception e){
log.error("消费失败{}",e.getMessage());
try {
//回退消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.info("消费回退{}乘公共");
} catch (IOException ex) {
log.error("回退失败:{}",ex.getMessage());
}
}
}
}