From f911988e744f8ed2d0fa0ffe138fdd24f8a9536d Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 12 Apr 2024 10:30:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 63 +++++++++++++++++ src/main/java/com/bwie/Application.java | 2 +- src/main/java/com/bwie/admin/Pur.java | 19 +++-- .../com/bwie/config/RabbitAdminConfig.java | 53 -------------- .../java/com/bwie/consumer/ConsumeData2.java | 62 ++++++++++++++++ .../java/com/bwie/consumer/ConsumeData3.java | 64 +++++++++++++++++ .../java/com/bwie/consumer/ConsumerDate1.java | 70 +++++++++++++++++++ .../java/com/bwie/consumer/DataConsumer.java | 67 ++++++++++++++++-- .../java/com/bwie/consumer/consumeData2.java | 19 ----- .../java/com/bwie/consumer/consumeData3.java | 20 ------ .../com/bwie/controller/PurController.java | 38 ++++++++++ src/main/java/com/bwie/mapper/PurMapper.java | 11 +++ .../com/bwie/service/Impl/PurServiceImpl.java | 52 ++++++++++++++ .../java/com/bwie/service/PurService.java | 20 ++++++ src/main/resources/application.yml | 2 +- 15 files changed, 455 insertions(+), 107 deletions(-) delete mode 100644 src/main/java/com/bwie/config/RabbitAdminConfig.java create mode 100644 src/main/java/com/bwie/consumer/ConsumeData2.java create mode 100644 src/main/java/com/bwie/consumer/ConsumeData3.java create mode 100644 src/main/java/com/bwie/consumer/ConsumerDate1.java delete mode 100644 src/main/java/com/bwie/consumer/consumeData2.java delete mode 100644 src/main/java/com/bwie/consumer/consumeData3.java create mode 100644 src/main/java/com/bwie/controller/PurController.java create mode 100644 src/main/java/com/bwie/mapper/PurMapper.java create mode 100644 src/main/java/com/bwie/service/Impl/PurServiceImpl.java create mode 100644 src/main/java/com/bwie/service/PurService.java diff --git a/pom.xml b/pom.xml index 7a21e39..0a1685a 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,58 @@ UTF-8 + + + + org.springframework.boot + spring-boot-starter + 2.7.18 + + + + org.springframework.boot + spring-boot-starter-batch + 2.5.6 + + + + org.springframework + spring-jdbc + 5.3.31 + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.3.1 + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.13.5 + + + + org.springframework + spring-test + 5.3.23 + + + + junit + junit + 4.12 + test + + + + com.alibaba + fastjson + 1.2.80 + + org.springframework.boot @@ -22,6 +73,12 @@ 2.7.18 + + org.springframework.boot + spring-boot-starter-web + 2.7.18 + + mysql @@ -54,6 +111,12 @@ jackson-databind 2.12.5 + + org.junit.jupiter + junit-jupiter + RELEASE + compile + diff --git a/src/main/java/com/bwie/Application.java b/src/main/java/com/bwie/Application.java index 599cdc0..2045533 100644 --- a/src/main/java/com/bwie/Application.java +++ b/src/main/java/com/bwie/Application.java @@ -5,7 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:29 + * @Date: 2024/4/11 下午9:08 */ @SpringBootApplication public class Application { diff --git a/src/main/java/com/bwie/admin/Pur.java b/src/main/java/com/bwie/admin/Pur.java index 97ffd76..adb8fe2 100644 --- a/src/main/java/com/bwie/admin/Pur.java +++ b/src/main/java/com/bwie/admin/Pur.java @@ -1,5 +1,9 @@ package com.bwie.admin; - +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.deser.std.DateDeserializers; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -19,18 +23,19 @@ public class Pur { private String id; - private String XM; + private String xm; - private String Age; + private Integer age; - private Date Time; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date time; private String plate; - private String Card; + private String card; - private String Photo1; + private String photo1; - private String Photo2; + private String photo2; } diff --git a/src/main/java/com/bwie/config/RabbitAdminConfig.java b/src/main/java/com/bwie/config/RabbitAdminConfig.java deleted file mode 100644 index 9b114fd..0000000 --- a/src/main/java/com/bwie/config/RabbitAdminConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.bwie.config; - -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * RabbitAdmin是RabbitMQ的一个Java客户端库,它提供了管理RabbitMQ资源的功能。它是通过与RabbitMQ服务器进行交互来执行管理操作的。 - */ -@Configuration -public class RabbitAdminConfig { - - @Value("${spring.rabbitmq.host}") - private String host; - @Value("${spring.rabbitmq.username}") - private String username; - @Value("${spring.rabbitmq.password}") - private String password; - @Value("${spring.rabbitmq.virtualhost}") - private String virtualhost; - - /** - * 构建 RabbitMQ的连接工厂 - * @return - */ - @Bean - public ConnectionFactory connectionFactory() { - CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); - connectionFactory.setAddresses(host); - connectionFactory.setUsername(username); - connectionFactory.setPassword(password); - connectionFactory.setVirtualHost(virtualhost); - // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 - connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); - connectionFactory.setPublisherReturns(true); - return connectionFactory; - } - - /** - * 自己初始化 RabbitAdmin - * @param connectionFactory - * @return - */ - @Bean - public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { - RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); - rabbitAdmin.setAutoStartup(true); - return rabbitAdmin; - } -} diff --git a/src/main/java/com/bwie/consumer/ConsumeData2.java b/src/main/java/com/bwie/consumer/ConsumeData2.java new file mode 100644 index 0000000..b55b35f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumeData2.java @@ -0,0 +1,62 @@ +//package com.bwie.consumer; +// +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumeData2 { +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data2") +// public void consumeData2(String message) { +// log.info("Received data2: " + message); +// saveDataToDatabase(message); +// } +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5]; +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +//} diff --git a/src/main/java/com/bwie/consumer/ConsumeData3.java b/src/main/java/com/bwie/consumer/ConsumeData3.java new file mode 100644 index 0000000..3c8d90f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumeData3.java @@ -0,0 +1,64 @@ +//package com.bwie.consumer; +// +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumeData3 { +// +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data3") +// public void consumeData3(String message) { +// log.info("Received data3: " + message); +// saveDataToDatabase(message); +// } +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5];/**/ +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +//} diff --git a/src/main/java/com/bwie/consumer/ConsumerDate1.java b/src/main/java/com/bwie/consumer/ConsumerDate1.java new file mode 100644 index 0000000..da3d06f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumerDate1.java @@ -0,0 +1,70 @@ +//package com.bwie.consumer; +// +//import com.bwie.admin.Pur; +//import lombok.extern.log4j.Log4j2; +//import org.junit.jupiter.api.Test; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +//import java.util.Date; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumerDate1 { +// +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data1") +// public void consumeData1(String message) { +// log.info("Received data1: " + message); +// saveDataToDatabase(message); +// } +// +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5]; +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +// +// +// +//} diff --git a/src/main/java/com/bwie/consumer/DataConsumer.java b/src/main/java/com/bwie/consumer/DataConsumer.java index bb922e7..802178a 100644 --- a/src/main/java/com/bwie/consumer/DataConsumer.java +++ b/src/main/java/com/bwie/consumer/DataConsumer.java @@ -1,22 +1,77 @@ package com.bwie.consumer; -import lombok.extern.log4j.Log4j2; +import com.alibaba.fastjson.JSON; +import com.bwie.admin.Pur; +import com.bwie.controller.PurController; +import com.bwie.service.PurService; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.rabbitmq.client.Channel; +import org.springframework.stereotype.Repository; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; /** * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 + * @Date: 2024/4/11 下午9:24 */ @Component -@Log4j2 public class DataConsumer { - @RabbitListener(queues = "data1") - public void consumeData1(String message) { - log.info("Received data1: " + message); + + @Autowired + private PurController purController; + + @Autowired + private PurService purService; + + + @RabbitListener(queuesToDeclare = {@Queue(name ="data1")}) + public void consumeData1(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data1: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 假设 id 是 pur 对象中的一个属性 + String id = pur.getId(); + + // 根据 id 判断数据库中是否存在对应的记录 + Pur existingPur = purService.findById(id); + + if (existingPur != null) { + // 如果存在则进行修改操作 + purService.update(existingPur); // 假设有一个方法用来更新记录 + } else { + // 如果不存在则进行插入操作 + purService.insert(pur); // 假设有一个方法用来插入记录 + } + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + @RabbitListener(queuesToDeclare = {@Queue(name ="data2")}) + public void consumeData2(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data2: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 处理data2数据的逻辑 + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + @RabbitListener(queuesToDeclare = {@Queue(name ="data3")}) + public void consumeData3(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data3: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 处理data3数据的逻辑 + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } } diff --git a/src/main/java/com/bwie/consumer/consumeData2.java b/src/main/java/com/bwie/consumer/consumeData2.java deleted file mode 100644 index f0bf09d..0000000 --- a/src/main/java/com/bwie/consumer/consumeData2.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.bwie.consumer; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 - */ -@Component -@Log4j2 -public class consumeData2 { - - @RabbitListener(queues = "data2") - public void consumeData2(String message) { - - } -} diff --git a/src/main/java/com/bwie/consumer/consumeData3.java b/src/main/java/com/bwie/consumer/consumeData3.java deleted file mode 100644 index 50f0a77..0000000 --- a/src/main/java/com/bwie/consumer/consumeData3.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.bwie.consumer; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 - */ -@Component -@Log4j2 -public class consumeData3 { - - - @RabbitListener(queues = "data3") - public void consumeData3(String message) { - - } -} diff --git a/src/main/java/com/bwie/controller/PurController.java b/src/main/java/com/bwie/controller/PurController.java new file mode 100644 index 0000000..2e006e4 --- /dev/null +++ b/src/main/java/com/bwie/controller/PurController.java @@ -0,0 +1,38 @@ +package com.bwie.controller; + +import com.alibaba.fastjson.JSON; +import com.bwie.admin.Pur; +import com.bwie.service.PurService; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午10:37 + */ +@RestController +public class PurController { + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private PurService purService; + + @PostMapping + public String pur(@RequestBody List list) { + try { + rabbitTemplate.convertAndSend("data1",list); + rabbitTemplate.convertAndSend("data2", list); + rabbitTemplate.convertAndSend("data3", list); + } catch (AmqpException e) { + throw new RuntimeException(e); + } + return "消息发送成功"; + } +} diff --git a/src/main/java/com/bwie/mapper/PurMapper.java b/src/main/java/com/bwie/mapper/PurMapper.java new file mode 100644 index 0000000..f28080a --- /dev/null +++ b/src/main/java/com/bwie/mapper/PurMapper.java @@ -0,0 +1,11 @@ +package com.bwie.mapper; + +import org.apache.ibatis.annotations.Mapper; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:34 + */ +@Mapper +public interface PurMapper { +} diff --git a/src/main/java/com/bwie/service/Impl/PurServiceImpl.java b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java new file mode 100644 index 0000000..b75a24d --- /dev/null +++ b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java @@ -0,0 +1,52 @@ +package com.bwie.service.Impl; + +import com.bwie.admin.Pur; +import com.bwie.mapper.PurMapper; +import com.bwie.service.PurService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:17 + */ +@Service +public class PurServiceImpl implements PurService { + + + @Autowired + private PurMapper purMapper; + + + // 获取数据 + @Override + public Pur findById(String id) { + // 在真实应用中,你可能需要根据 id 去数据库查询数据并返回相应的 Pur 对象 + // 这里为了示例,直接返回 null + return null; + } + + // 更新数据 + @Override + public void update(Pur existingPur) { + // 在真实应用中,你需要将 existingPur 对象的属性值更新到数据库中对应的记录 + // 这里为了示例,仅打印日志 + System.out.println("Updating Pur: " + existingPur); + } + + // 插入数据 + @Override + public void insert(Pur pur) { + // 在真实应用中,你需要将 pur 对象插入到数据库中 + // 这里为了示例,仅打印日志 + System.out.println("Inserting Pur: " + pur); + } + + // 处理 Pur 消息的逻辑 + @Override + public void processPur(String msg) { + // 在真实应用中,你可能需要对接收到的消息进行解析,然后调用相应的方法进行处理 + // 这里为了示例,仅打印日志 + System.out.println("Processing Pur message: " + msg); + } +} diff --git a/src/main/java/com/bwie/service/PurService.java b/src/main/java/com/bwie/service/PurService.java new file mode 100644 index 0000000..49393f9 --- /dev/null +++ b/src/main/java/com/bwie/service/PurService.java @@ -0,0 +1,20 @@ +package com.bwie.service; + +import com.bwie.admin.Pur; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:15 + */ +public interface PurService { + + + Pur findById(String id); + + void update(Pur existingPur); + + void insert(Pur pur); + + void processPur(String msg); +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bc7bfe0..7347d33 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: datasource: - url: jdbc:mysql://111.231.174.71:3306/day8?useSSL=false&serverTimezone=UTC + url: jdbc:mysql://111.231.174.71:3306/day8?useUnicode=true&characterEncoding=true&characterEncoding=utf8 username: root password: wxy@123 driver-class-name: com.mysql.cj.jdbc.Driver