4.11/src/main/java/com/bwie/consumer/DataConsumer.java

117 lines
3.6 KiB
Java

package com.bwie.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.bwie.admin.Pur;
import com.bwie.admin.Pur2;
import com.bwie.admin.Pur3;
import com.bwie.controller.PurController;
import com.bwie.mapper.PurMapper;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.stereotype.Repository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 下午9:24
*/
@Component
@Log4j2
public class DataConsumer {
@Autowired
private PurController purController;
@Autowired
private PurMapper purMapper;
@RabbitListener(queuesToDeclare = {@Queue(name = "data1")})
public void consumeData1(Object data, Channel channel, Message message) throws IOException {
Pur pur = new Pur();
if (pur != null) {
pur = (Pur) data;
}
log.info("消费的数据为:{}", JSONObject.toJSONString(pur));
List<String> list = purMapper.list();
int count = 0;
if (pur.getId() != null && !"".equals(pur.getId())) {
for (String s : list) {
if (pur.getId().equals(s)) {
count++;
}
}
if (count > 0) {
purMapper.updateData1(pur);
} else {
purMapper.insertData1(pur);
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queuesToDeclare = {@Queue(name = "data2")})
public void consumeData2(Object data, Channel channel, Message message) throws IOException {
Pur2 pur2 = new Pur2();
if (pur2 != null) {
pur2 = (Pur2) data;
}
log.info("消费的数据为:{}", JSONObject.toJSONString(pur2));
List<String> list = purMapper.Purlist2();
int count = 0;
if (pur2.getId() != null && !"".equals(pur2.getId())) {
for (String s : list) {
if (pur2.getId().equals(s)) {
count++;
}
}
if (count > 0) {
purMapper.updateData2(pur2);
} else {
purMapper.insertData2(pur2);
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queuesToDeclare = {@Queue(name = "data3")})
public void consumeData3(Object data, Channel channel, Message message) throws IOException {
Pur3 pur3 = new Pur3();
if (pur3 != null) {
pur3 = (Pur3) data;
}
log.info("消费的数据为:{}", JSONObject.toJSONString(pur3));
List<String> list = purMapper.Purlist3();
int count = 0;
if (pur3.getId() != null && !"".equals(pur3.getId())) {
for (String s : list) {
if (pur3.getId().equals(s)) {
count++;
}
}
if (count > 0) {
purMapper.updateData3(pur3);
} else {
purMapper.insertData3(pur3);
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}