From efc142e89d88d34289b0ad5b48595efc0fa6d3ed Mon Sep 17 00:00:00 2001 From: wxy Date: Thu, 11 Apr 2024 19:47:11 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=88=9B=E5=BB=BA=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/uiDesigner.xml | 124 ++++++++++++++++++ pom.xml | 45 +++++++ src/main/java/com/bwie/Application.java | 15 +++ src/main/java/com/bwie/admin/Pur.java | 36 +++++ .../bwie/config/ConfirmCallbackConfig.java | 48 +++++++ .../com/bwie/config/RabbitAdminConfig.java | 53 ++++++++ .../java/com/bwie/config/RabbitmqConfig.java | 15 +++ .../com/bwie/config/ReturnCallbackConfig.java | 34 +++++ .../java/com/bwie/consumer/DataConsumer.java | 22 ++++ .../java/com/bwie/consumer/consumeData2.java | 19 +++ .../java/com/bwie/consumer/consumeData3.java | 20 +++ src/main/resources/application.yml | 12 ++ 12 files changed, 443 insertions(+) create mode 100644 .idea/uiDesigner.xml create mode 100644 src/main/java/com/bwie/Application.java create mode 100644 src/main/java/com/bwie/admin/Pur.java create mode 100644 src/main/java/com/bwie/config/ConfirmCallbackConfig.java create mode 100644 src/main/java/com/bwie/config/RabbitAdminConfig.java create mode 100644 src/main/java/com/bwie/config/RabbitmqConfig.java create mode 100644 src/main/java/com/bwie/config/ReturnCallbackConfig.java create mode 100644 src/main/java/com/bwie/consumer/DataConsumer.java create mode 100644 src/main/java/com/bwie/consumer/consumeData2.java create mode 100644 src/main/java/com/bwie/consumer/consumeData3.java create mode 100644 src/main/resources/application.yml diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..2b63946 --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 603082f..7a21e39 100644 --- a/pom.xml +++ b/pom.xml @@ -14,4 +14,49 @@ UTF-8 + + + + org.springframework.boot + spring-boot-starter-jdbc + 2.7.18 + + + + + mysql + mysql-connector-java + 8.0.29 + + + + + org.springframework.boot + spring-boot-starter-amqp + 2.6.4 + + + + ch.qos.logback + logback-classic + 1.2.12 + + + + org.projectlombok + lombok + 1.18.30 + provided + + + + com.fasterxml.jackson.core + jackson-databind + 2.12.5 + + + + + + diff --git a/src/main/java/com/bwie/Application.java b/src/main/java/com/bwie/Application.java new file mode 100644 index 0000000..599cdc0 --- /dev/null +++ b/src/main/java/com/bwie/Application.java @@ -0,0 +1,15 @@ +package com.bwie; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午7:29 + */ +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class); + } +} diff --git a/src/main/java/com/bwie/admin/Pur.java b/src/main/java/com/bwie/admin/Pur.java new file mode 100644 index 0000000..97ffd76 --- /dev/null +++ b/src/main/java/com/bwie/admin/Pur.java @@ -0,0 +1,36 @@ +package com.bwie.admin; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午7:33 + */ + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Pur { + + + private String id; + + private String XM; + + private String Age; + + private Date Time; + + private String plate; + + private String Card; + + private String Photo1; + + private String Photo2; + +} diff --git a/src/main/java/com/bwie/config/ConfirmCallbackConfig.java b/src/main/java/com/bwie/config/ConfirmCallbackConfig.java new file mode 100644 index 0000000..f5c7885 --- /dev/null +++ b/src/main/java/com/bwie/config/ConfirmCallbackConfig.java @@ -0,0 +1,48 @@ +package com.bwie.config; + +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送确认配置 消息发送到交换机的确认 + */ +@Component +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + * @PostConstruct bean 被初始化的时候执行的方法的注解 + * @PreDestory bean 被销毁的时候执行的方法的注解 + */ + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this); + } + + /** + * 交换机不管是否收到消息的一个回调方法 + * + * @param correlationData 消息相关数据 + * @param ack 交换机是否收到消息 + * @param cause 失败原因 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + // 消息投递到 broker 的状态,true表示成功 + System.out.println("消息发送成功!"); + } else { + // 发送异常 + System.out.println("发送异常原因 = " + cause); + // TODO 可以将消息 内容 以及 失败的原因 记录到 日志表中 + } + } + +} diff --git a/src/main/java/com/bwie/config/RabbitAdminConfig.java b/src/main/java/com/bwie/config/RabbitAdminConfig.java new file mode 100644 index 0000000..9b114fd --- /dev/null +++ b/src/main/java/com/bwie/config/RabbitAdminConfig.java @@ -0,0 +1,53 @@ +package com.bwie.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * RabbitAdmin是RabbitMQ的一个Java客户端库,它提供了管理RabbitMQ资源的功能。它是通过与RabbitMQ服务器进行交互来执行管理操作的。 + */ +@Configuration +public class RabbitAdminConfig { + + @Value("${spring.rabbitmq.host}") + private String host; + @Value("${spring.rabbitmq.username}") + private String username; + @Value("${spring.rabbitmq.password}") + private String password; + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 构建 RabbitMQ的连接工厂 + * @return + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + /** + * 自己初始化 RabbitAdmin + * @param connectionFactory + * @return + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } +} diff --git a/src/main/java/com/bwie/config/RabbitmqConfig.java b/src/main/java/com/bwie/config/RabbitmqConfig.java new file mode 100644 index 0000000..6b98815 --- /dev/null +++ b/src/main/java/com/bwie/config/RabbitmqConfig.java @@ -0,0 +1,15 @@ +package com.bwie.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitmqConfig { + // 消息转换配置 + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } +} diff --git a/src/main/java/com/bwie/config/ReturnCallbackConfig.java b/src/main/java/com/bwie/config/ReturnCallbackConfig.java new file mode 100644 index 0000000..55de563 --- /dev/null +++ b/src/main/java/com/bwie/config/ReturnCallbackConfig.java @@ -0,0 +1,34 @@ +package com.bwie.config; + +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送到队列的确认 一旦消息发送到队列失败 则会执行 returnedMessage 方法 + */ +@Component +public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送到 队列失败 执行的 方法 + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("消息" + returnedMessage.getMessage().toString() + "被交换机" + returnedMessage.getExchange() + "回退!" + + "退回原因为:" + returnedMessage.getReplyText()); + // 回退了所有的信息,可做补偿机制 记录到 数据库 + } +} diff --git a/src/main/java/com/bwie/consumer/DataConsumer.java b/src/main/java/com/bwie/consumer/DataConsumer.java new file mode 100644 index 0000000..bb922e7 --- /dev/null +++ b/src/main/java/com/bwie/consumer/DataConsumer.java @@ -0,0 +1,22 @@ +package com.bwie.consumer; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午7:40 + */ +@Component +@Log4j2 +public class DataConsumer { + + @RabbitListener(queues = "data1") + public void consumeData1(String message) { + log.info("Received data1: " + message); + } + + + +} diff --git a/src/main/java/com/bwie/consumer/consumeData2.java b/src/main/java/com/bwie/consumer/consumeData2.java new file mode 100644 index 0000000..f0bf09d --- /dev/null +++ b/src/main/java/com/bwie/consumer/consumeData2.java @@ -0,0 +1,19 @@ +package com.bwie.consumer; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午7:40 + */ +@Component +@Log4j2 +public class consumeData2 { + + @RabbitListener(queues = "data2") + public void consumeData2(String message) { + + } +} diff --git a/src/main/java/com/bwie/consumer/consumeData3.java b/src/main/java/com/bwie/consumer/consumeData3.java new file mode 100644 index 0000000..50f0a77 --- /dev/null +++ b/src/main/java/com/bwie/consumer/consumeData3.java @@ -0,0 +1,20 @@ +package com.bwie.consumer; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午7:40 + */ +@Component +@Log4j2 +public class consumeData3 { + + + @RabbitListener(queues = "data3") + public void consumeData3(String message) { + + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..bc7bfe0 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,12 @@ +spring: + datasource: + url: jdbc:mysql://111.231.174.71:3306/day8?useSSL=false&serverTimezone=UTC + username: root + password: wxy@123 + driver-class-name: com.mysql.cj.jdbc.Driver + + rabbitmq: + host: 111.231.174.71 + port: 5672 + username: guest + password: guest -- 2.40.1 From f911988e744f8ed2d0fa0ffe138fdd24f8a9536d Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 12 Apr 2024 10:30:10 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 63 +++++++++++++++++ src/main/java/com/bwie/Application.java | 2 +- src/main/java/com/bwie/admin/Pur.java | 19 +++-- .../com/bwie/config/RabbitAdminConfig.java | 53 -------------- .../java/com/bwie/consumer/ConsumeData2.java | 62 ++++++++++++++++ .../java/com/bwie/consumer/ConsumeData3.java | 64 +++++++++++++++++ .../java/com/bwie/consumer/ConsumerDate1.java | 70 +++++++++++++++++++ .../java/com/bwie/consumer/DataConsumer.java | 67 ++++++++++++++++-- .../java/com/bwie/consumer/consumeData2.java | 19 ----- .../java/com/bwie/consumer/consumeData3.java | 20 ------ .../com/bwie/controller/PurController.java | 38 ++++++++++ src/main/java/com/bwie/mapper/PurMapper.java | 11 +++ .../com/bwie/service/Impl/PurServiceImpl.java | 52 ++++++++++++++ .../java/com/bwie/service/PurService.java | 20 ++++++ src/main/resources/application.yml | 2 +- 15 files changed, 455 insertions(+), 107 deletions(-) delete mode 100644 src/main/java/com/bwie/config/RabbitAdminConfig.java create mode 100644 src/main/java/com/bwie/consumer/ConsumeData2.java create mode 100644 src/main/java/com/bwie/consumer/ConsumeData3.java create mode 100644 src/main/java/com/bwie/consumer/ConsumerDate1.java delete mode 100644 src/main/java/com/bwie/consumer/consumeData2.java delete mode 100644 src/main/java/com/bwie/consumer/consumeData3.java create mode 100644 src/main/java/com/bwie/controller/PurController.java create mode 100644 src/main/java/com/bwie/mapper/PurMapper.java create mode 100644 src/main/java/com/bwie/service/Impl/PurServiceImpl.java create mode 100644 src/main/java/com/bwie/service/PurService.java diff --git a/pom.xml b/pom.xml index 7a21e39..0a1685a 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,58 @@ UTF-8 + + + + org.springframework.boot + spring-boot-starter + 2.7.18 + + + + org.springframework.boot + spring-boot-starter-batch + 2.5.6 + + + + org.springframework + spring-jdbc + 5.3.31 + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.3.1 + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.13.5 + + + + org.springframework + spring-test + 5.3.23 + + + + junit + junit + 4.12 + test + + + + com.alibaba + fastjson + 1.2.80 + + org.springframework.boot @@ -22,6 +73,12 @@ 2.7.18 + + org.springframework.boot + spring-boot-starter-web + 2.7.18 + + mysql @@ -54,6 +111,12 @@ jackson-databind 2.12.5 + + org.junit.jupiter + junit-jupiter + RELEASE + compile + diff --git a/src/main/java/com/bwie/Application.java b/src/main/java/com/bwie/Application.java index 599cdc0..2045533 100644 --- a/src/main/java/com/bwie/Application.java +++ b/src/main/java/com/bwie/Application.java @@ -5,7 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:29 + * @Date: 2024/4/11 下午9:08 */ @SpringBootApplication public class Application { diff --git a/src/main/java/com/bwie/admin/Pur.java b/src/main/java/com/bwie/admin/Pur.java index 97ffd76..adb8fe2 100644 --- a/src/main/java/com/bwie/admin/Pur.java +++ b/src/main/java/com/bwie/admin/Pur.java @@ -1,5 +1,9 @@ package com.bwie.admin; - +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.deser.std.DateDeserializers; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -19,18 +23,19 @@ public class Pur { private String id; - private String XM; + private String xm; - private String Age; + private Integer age; - private Date Time; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date time; private String plate; - private String Card; + private String card; - private String Photo1; + private String photo1; - private String Photo2; + private String photo2; } diff --git a/src/main/java/com/bwie/config/RabbitAdminConfig.java b/src/main/java/com/bwie/config/RabbitAdminConfig.java deleted file mode 100644 index 9b114fd..0000000 --- a/src/main/java/com/bwie/config/RabbitAdminConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.bwie.config; - -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * RabbitAdmin是RabbitMQ的一个Java客户端库,它提供了管理RabbitMQ资源的功能。它是通过与RabbitMQ服务器进行交互来执行管理操作的。 - */ -@Configuration -public class RabbitAdminConfig { - - @Value("${spring.rabbitmq.host}") - private String host; - @Value("${spring.rabbitmq.username}") - private String username; - @Value("${spring.rabbitmq.password}") - private String password; - @Value("${spring.rabbitmq.virtualhost}") - private String virtualhost; - - /** - * 构建 RabbitMQ的连接工厂 - * @return - */ - @Bean - public ConnectionFactory connectionFactory() { - CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); - connectionFactory.setAddresses(host); - connectionFactory.setUsername(username); - connectionFactory.setPassword(password); - connectionFactory.setVirtualHost(virtualhost); - // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 - connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); - connectionFactory.setPublisherReturns(true); - return connectionFactory; - } - - /** - * 自己初始化 RabbitAdmin - * @param connectionFactory - * @return - */ - @Bean - public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { - RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); - rabbitAdmin.setAutoStartup(true); - return rabbitAdmin; - } -} diff --git a/src/main/java/com/bwie/consumer/ConsumeData2.java b/src/main/java/com/bwie/consumer/ConsumeData2.java new file mode 100644 index 0000000..b55b35f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumeData2.java @@ -0,0 +1,62 @@ +//package com.bwie.consumer; +// +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumeData2 { +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data2") +// public void consumeData2(String message) { +// log.info("Received data2: " + message); +// saveDataToDatabase(message); +// } +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5]; +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +//} diff --git a/src/main/java/com/bwie/consumer/ConsumeData3.java b/src/main/java/com/bwie/consumer/ConsumeData3.java new file mode 100644 index 0000000..3c8d90f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumeData3.java @@ -0,0 +1,64 @@ +//package com.bwie.consumer; +// +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumeData3 { +// +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data3") +// public void consumeData3(String message) { +// log.info("Received data3: " + message); +// saveDataToDatabase(message); +// } +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5];/**/ +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +//} diff --git a/src/main/java/com/bwie/consumer/ConsumerDate1.java b/src/main/java/com/bwie/consumer/ConsumerDate1.java new file mode 100644 index 0000000..da3d06f --- /dev/null +++ b/src/main/java/com/bwie/consumer/ConsumerDate1.java @@ -0,0 +1,70 @@ +//package com.bwie.consumer; +// +//import com.bwie.admin.Pur; +//import lombok.extern.log4j.Log4j2; +//import org.junit.jupiter.api.Test; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.jdbc.core.JdbcTemplate; +//import org.springframework.stereotype.Component; +// +//import java.sql.Timestamp; +//import java.util.Date; +// +///** +// * @Author: wangxinyuan +// * @Date: 2024/4/11 下午7:40 +// */ +////@Component +//@Log4j2 +//public class ConsumerDate1 { +// +// +// //@Autowired +// private JdbcTemplate jdbcTemplate; +// +// //@RabbitListener(queues = "data1") +// public void consumeData1(String message) { +// log.info("Received data1: " + message); +// saveDataToDatabase(message); +// } +// +// +// private void saveDataToDatabase(String message) { +// try { +// String[] parts = message.split(","); +// if (parts.length >= 9) { +// String id = parts[0]; +// String XM = parts[1]; +// String Age = parts[2]; +// String timeStr = parts[3]; +// Timestamp Time = Timestamp.valueOf(timeStr); +// String plate = parts[4]; +// String Card = parts[5]; +// String Photo1 = parts[7]; +// String Photo2 = parts[8]; +// +// String querySql = "SELECT * FROM pur WHERE id = ?"; +// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class); +// +// +// if (count == null || count == 0) { +// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; +// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2); +// log.info("Data saved to database successfully."); +// } else { +// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?"; +// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id); +// log.info("Data updated in database successfully."); +// } +// } else { +// log.info("Data saved to database failed."); +// } +// } catch (Exception e) { +// log.error("Error while saving data to database: " + e.getMessage()); +// } +// } +// +// +// +//} diff --git a/src/main/java/com/bwie/consumer/DataConsumer.java b/src/main/java/com/bwie/consumer/DataConsumer.java index bb922e7..802178a 100644 --- a/src/main/java/com/bwie/consumer/DataConsumer.java +++ b/src/main/java/com/bwie/consumer/DataConsumer.java @@ -1,22 +1,77 @@ package com.bwie.consumer; -import lombok.extern.log4j.Log4j2; +import com.alibaba.fastjson.JSON; +import com.bwie.admin.Pur; +import com.bwie.controller.PurController; +import com.bwie.service.PurService; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.rabbitmq.client.Channel; +import org.springframework.stereotype.Repository; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; /** * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 + * @Date: 2024/4/11 下午9:24 */ @Component -@Log4j2 public class DataConsumer { - @RabbitListener(queues = "data1") - public void consumeData1(String message) { - log.info("Received data1: " + message); + + @Autowired + private PurController purController; + + @Autowired + private PurService purService; + + + @RabbitListener(queuesToDeclare = {@Queue(name ="data1")}) + public void consumeData1(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data1: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 假设 id 是 pur 对象中的一个属性 + String id = pur.getId(); + + // 根据 id 判断数据库中是否存在对应的记录 + Pur existingPur = purService.findById(id); + + if (existingPur != null) { + // 如果存在则进行修改操作 + purService.update(existingPur); // 假设有一个方法用来更新记录 + } else { + // 如果不存在则进行插入操作 + purService.insert(pur); // 假设有一个方法用来插入记录 + } + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + @RabbitListener(queuesToDeclare = {@Queue(name ="data2")}) + public void consumeData2(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data2: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 处理data2数据的逻辑 + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + @RabbitListener(queuesToDeclare = {@Queue(name ="data3")}) + public void consumeData3(String msg, Message message, Channel channel) throws IOException { + System.out.println("Received data3: " + message); + Pur pur = JSON.parseObject(msg, Pur.class); + System.out.println(pur); + // 处理data3数据的逻辑 + //确认消费 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } } diff --git a/src/main/java/com/bwie/consumer/consumeData2.java b/src/main/java/com/bwie/consumer/consumeData2.java deleted file mode 100644 index f0bf09d..0000000 --- a/src/main/java/com/bwie/consumer/consumeData2.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.bwie.consumer; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 - */ -@Component -@Log4j2 -public class consumeData2 { - - @RabbitListener(queues = "data2") - public void consumeData2(String message) { - - } -} diff --git a/src/main/java/com/bwie/consumer/consumeData3.java b/src/main/java/com/bwie/consumer/consumeData3.java deleted file mode 100644 index 50f0a77..0000000 --- a/src/main/java/com/bwie/consumer/consumeData3.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.bwie.consumer; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午7:40 - */ -@Component -@Log4j2 -public class consumeData3 { - - - @RabbitListener(queues = "data3") - public void consumeData3(String message) { - - } -} diff --git a/src/main/java/com/bwie/controller/PurController.java b/src/main/java/com/bwie/controller/PurController.java new file mode 100644 index 0000000..2e006e4 --- /dev/null +++ b/src/main/java/com/bwie/controller/PurController.java @@ -0,0 +1,38 @@ +package com.bwie.controller; + +import com.alibaba.fastjson.JSON; +import com.bwie.admin.Pur; +import com.bwie.service.PurService; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午10:37 + */ +@RestController +public class PurController { + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private PurService purService; + + @PostMapping + public String pur(@RequestBody List list) { + try { + rabbitTemplate.convertAndSend("data1",list); + rabbitTemplate.convertAndSend("data2", list); + rabbitTemplate.convertAndSend("data3", list); + } catch (AmqpException e) { + throw new RuntimeException(e); + } + return "消息发送成功"; + } +} diff --git a/src/main/java/com/bwie/mapper/PurMapper.java b/src/main/java/com/bwie/mapper/PurMapper.java new file mode 100644 index 0000000..f28080a --- /dev/null +++ b/src/main/java/com/bwie/mapper/PurMapper.java @@ -0,0 +1,11 @@ +package com.bwie.mapper; + +import org.apache.ibatis.annotations.Mapper; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:34 + */ +@Mapper +public interface PurMapper { +} diff --git a/src/main/java/com/bwie/service/Impl/PurServiceImpl.java b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java new file mode 100644 index 0000000..b75a24d --- /dev/null +++ b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java @@ -0,0 +1,52 @@ +package com.bwie.service.Impl; + +import com.bwie.admin.Pur; +import com.bwie.mapper.PurMapper; +import com.bwie.service.PurService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:17 + */ +@Service +public class PurServiceImpl implements PurService { + + + @Autowired + private PurMapper purMapper; + + + // 获取数据 + @Override + public Pur findById(String id) { + // 在真实应用中,你可能需要根据 id 去数据库查询数据并返回相应的 Pur 对象 + // 这里为了示例,直接返回 null + return null; + } + + // 更新数据 + @Override + public void update(Pur existingPur) { + // 在真实应用中,你需要将 existingPur 对象的属性值更新到数据库中对应的记录 + // 这里为了示例,仅打印日志 + System.out.println("Updating Pur: " + existingPur); + } + + // 插入数据 + @Override + public void insert(Pur pur) { + // 在真实应用中,你需要将 pur 对象插入到数据库中 + // 这里为了示例,仅打印日志 + System.out.println("Inserting Pur: " + pur); + } + + // 处理 Pur 消息的逻辑 + @Override + public void processPur(String msg) { + // 在真实应用中,你可能需要对接收到的消息进行解析,然后调用相应的方法进行处理 + // 这里为了示例,仅打印日志 + System.out.println("Processing Pur message: " + msg); + } +} diff --git a/src/main/java/com/bwie/service/PurService.java b/src/main/java/com/bwie/service/PurService.java new file mode 100644 index 0000000..49393f9 --- /dev/null +++ b/src/main/java/com/bwie/service/PurService.java @@ -0,0 +1,20 @@ +package com.bwie.service; + +import com.bwie.admin.Pur; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/11 下午11:15 + */ +public interface PurService { + + + Pur findById(String id); + + void update(Pur existingPur); + + void insert(Pur pur); + + void processPur(String msg); +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bc7bfe0..7347d33 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: datasource: - url: jdbc:mysql://111.231.174.71:3306/day8?useSSL=false&serverTimezone=UTC + url: jdbc:mysql://111.231.174.71:3306/day8?useUnicode=true&characterEncoding=true&characterEncoding=utf8 username: root password: wxy@123 driver-class-name: com.mysql.cj.jdbc.Driver -- 2.40.1 From 7a728c676226399cba5e72b95606a4a7f31ad153 Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 12 Apr 2024 20:05:23 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 113 ++++++---------- src/main/java/com/bwie/admin/Pur.java | 11 +- src/main/java/com/bwie/admin/Pur2.java | 27 ++++ src/main/java/com/bwie/admin/Pur3.java | 24 ++++ .../java/com/bwie/consumer/DataConsumer.java | 121 ++++++++++++------ .../com/bwie/controller/PurController.java | 36 ++++-- src/main/java/com/bwie/mapper/PurMapper.java | 23 ++++ .../com/bwie/service/Impl/PurServiceImpl.java | 52 -------- .../java/com/bwie/service/PurService.java | 20 --- src/main/java/com/bwie/task/Task.java | 109 ++++++++++++++++ src/main/resources/application.yml | 63 ++++++++- src/main/resources/mapper/mapper.xml | 54 ++++++++ 12 files changed, 440 insertions(+), 213 deletions(-) create mode 100644 src/main/java/com/bwie/admin/Pur2.java create mode 100644 src/main/java/com/bwie/admin/Pur3.java delete mode 100644 src/main/java/com/bwie/service/Impl/PurServiceImpl.java delete mode 100644 src/main/java/com/bwie/service/PurService.java create mode 100644 src/main/java/com/bwie/task/Task.java create mode 100644 src/main/resources/mapper/mapper.xml diff --git a/pom.xml b/pom.xml index 0a1685a..b329f49 100644 --- a/pom.xml +++ b/pom.xml @@ -15,109 +15,74 @@ + + org.springframework.boot + spring-boot-starter-parent + 2.6.2 + + + - + org.springframework.boot - spring-boot-starter - 2.7.18 - - - - org.springframework.boot - spring-boot-starter-batch - 2.5.6 - - - - org.springframework - spring-jdbc - 5.3.31 - - - - org.mybatis.spring.boot - mybatis-spring-boot-starter - 2.3.1 - - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - 2.13.5 - - - - org.springframework - spring-test - 5.3.23 - - - - junit - junit - 4.12 + spring-boot-starter-test + 3.2.1 test - - - com.alibaba - fastjson - 1.2.80 - - - - - org.springframework.boot - spring-boot-starter-jdbc - 2.7.18 - org.springframework.boot spring-boot-starter-web - 2.7.18 - - - mysql - mysql-connector-java - 8.0.29 + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka - org.springframework.boot spring-boot-starter-amqp - 2.6.4 - ch.qos.logback - logback-classic - 1.2.12 + mysql + mysql-connector-java + 8.0.33 + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 1.3.2 + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.39 org.projectlombok lombok - 1.18.30 - provided - com.fasterxml.jackson.core - jackson-databind - 2.12.5 - - - org.junit.jupiter - junit-jupiter - RELEASE - compile + org.springframework + spring-oxm + diff --git a/src/main/java/com/bwie/admin/Pur.java b/src/main/java/com/bwie/admin/Pur.java index adb8fe2..d695820 100644 --- a/src/main/java/com/bwie/admin/Pur.java +++ b/src/main/java/com/bwie/admin/Pur.java @@ -1,6 +1,6 @@ package com.bwie.admin; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.alibaba.fastjson.annotation.JSONField; + import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.deser.std.DateDeserializers; @@ -25,17 +25,14 @@ public class Pur { private String xm; - private Integer age; + private String age; - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") - private Date time; - - private String plate; + private String time; private String card; private String photo1; - private String photo2; + } diff --git a/src/main/java/com/bwie/admin/Pur2.java b/src/main/java/com/bwie/admin/Pur2.java new file mode 100644 index 0000000..807515a --- /dev/null +++ b/src/main/java/com/bwie/admin/Pur2.java @@ -0,0 +1,27 @@ +package com.bwie.admin; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/12 下午3:10 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Pur2 { + + private String id; + + private String xm; + + + + private String card; + + private String photo1; + + private String photo2; +} diff --git a/src/main/java/com/bwie/admin/Pur3.java b/src/main/java/com/bwie/admin/Pur3.java new file mode 100644 index 0000000..64b286e --- /dev/null +++ b/src/main/java/com/bwie/admin/Pur3.java @@ -0,0 +1,24 @@ +package com.bwie.admin; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/12 下午3:12 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Pur3 { + + private String id; + + + private String time; + + private String plate; + + +} diff --git a/src/main/java/com/bwie/consumer/DataConsumer.java b/src/main/java/com/bwie/consumer/DataConsumer.java index 802178a..ae998d2 100644 --- a/src/main/java/com/bwie/consumer/DataConsumer.java +++ b/src/main/java/com/bwie/consumer/DataConsumer.java @@ -1,9 +1,14 @@ package com.bwie.consumer; -import com.alibaba.fastjson.JSON; + +import com.alibaba.fastjson2.JSONObject; import com.bwie.admin.Pur; +import com.bwie.admin.Pur2; +import com.bwie.admin.Pur3; import com.bwie.controller.PurController; -import com.bwie.service.PurService; +import com.bwie.mapper.PurMapper; + +import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -15,63 +20,97 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; +import java.util.List; +import java.util.Optional; /** * @Author: wangxinyuan * @Date: 2024/4/11 下午9:24 */ @Component +@Log4j2 public class DataConsumer { @Autowired private PurController purController; + @Autowired - private PurService purService; + private PurMapper purMapper; - @RabbitListener(queuesToDeclare = {@Queue(name ="data1")}) - public void consumeData1(String msg, Message message, Channel channel) throws IOException { - System.out.println("Received data1: " + message); - Pur pur = JSON.parseObject(msg, Pur.class); - System.out.println(pur); - // 假设 id 是 pur 对象中的一个属性 - String id = pur.getId(); - - // 根据 id 判断数据库中是否存在对应的记录 - Pur existingPur = purService.findById(id); - - if (existingPur != null) { - // 如果存在则进行修改操作 - purService.update(existingPur); // 假设有一个方法用来更新记录 - } else { - // 如果不存在则进行插入操作 - purService.insert(pur); // 假设有一个方法用来插入记录 + @RabbitListener(queuesToDeclare = {@Queue(name = "data1")}) + public void consumeData1(Object data, Channel channel, Message message) throws IOException { + Pur pur = new Pur(); + if (pur != null) { + pur = (Pur) data; } - //确认消费 - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + log.info("消费的数据为:{}", JSONObject.toJSONString(pur)); + List list = purMapper.list(); + int count = 0; + if (pur.getId() != null && !"".equals(pur.getId())) { + for (String s : list) { + if (pur.getId().equals(s)) { + count++; + } + } + if (count > 0) { + purMapper.updateData1(pur); + } else { + purMapper.insertData1(pur); + } + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } + @RabbitListener(queuesToDeclare = {@Queue(name = "data2")}) + public void consumeData2(Object data, Channel channel, Message message) throws IOException { + Pur2 pur2 = new Pur2(); + if (pur2 != null) { + pur2 = (Pur2) data; + } + log.info("消费的数据为:{}", JSONObject.toJSONString(pur2)); + List list = purMapper.Purlist2(); + int count = 0; + if (pur2.getId() != null && !"".equals(pur2.getId())) { + for (String s : list) { + if (pur2.getId().equals(s)) { + count++; + } + } + if (count > 0) { + purMapper.updateData2(pur2); + } else { + purMapper.insertData2(pur2); + } + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } + + @RabbitListener(queuesToDeclare = {@Queue(name = "data3")}) + public void consumeData3(Object data, Channel channel, Message message) throws IOException { + Pur3 pur3 = new Pur3(); + if (pur3 != null) { + pur3 = (Pur3) data; + } + log.info("消费的数据为:{}", JSONObject.toJSONString(pur3)); + List list = purMapper.Purlist3(); + int count = 0; + if (pur3.getId() != null && !"".equals(pur3.getId())) { + for (String s : list) { + if (pur3.getId().equals(s)) { + count++; + } + } + if (count > 0) { + purMapper.updateData3(pur3); + } else { + purMapper.insertData3(pur3); + } + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } - @RabbitListener(queuesToDeclare = {@Queue(name ="data2")}) - public void consumeData2(String msg, Message message, Channel channel) throws IOException { - System.out.println("Received data2: " + message); - Pur pur = JSON.parseObject(msg, Pur.class); - System.out.println(pur); - // 处理data2数据的逻辑 - //确认消费 - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); - } - - @RabbitListener(queuesToDeclare = {@Queue(name ="data3")}) - public void consumeData3(String msg, Message message, Channel channel) throws IOException { - System.out.println("Received data3: " + message); - Pur pur = JSON.parseObject(msg, Pur.class); - System.out.println(pur); - // 处理data3数据的逻辑 - //确认消费 - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); - } } diff --git a/src/main/java/com/bwie/controller/PurController.java b/src/main/java/com/bwie/controller/PurController.java index 2e006e4..bc76685 100644 --- a/src/main/java/com/bwie/controller/PurController.java +++ b/src/main/java/com/bwie/controller/PurController.java @@ -1,11 +1,16 @@ package com.bwie.controller; -import com.alibaba.fastjson.JSON; + import com.bwie.admin.Pur; -import com.bwie.service.PurService; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.AmqpException; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -18,21 +23,26 @@ import java.util.List; */ @RestController public class PurController { + @Autowired private RabbitTemplate rabbitTemplate; @Autowired - private PurService purService; + private ObjectMapper objectMapper; - @PostMapping - public String pur(@RequestBody List list) { - try { - rabbitTemplate.convertAndSend("data1",list); - rabbitTemplate.convertAndSend("data2", list); - rabbitTemplate.convertAndSend("data3", list); - } catch (AmqpException e) { - throw new RuntimeException(e); - } - return "消息发送成功"; + + + @GetMapping("/data1") + @Transactional + public void sendData1() throws JsonProcessingException { + Pur pur = new Pur(); + pur.setId("593720128240812819055333"); + pur.setXm("dsc"); + pur.setCard("128963"); + pur.setAge("56"); + pur.setTime("2022-10-10 10:10:11"); + String message = objectMapper.writeValueAsString(pur); + rabbitTemplate.convertAndSend("data1", "id", message); } + } diff --git a/src/main/java/com/bwie/mapper/PurMapper.java b/src/main/java/com/bwie/mapper/PurMapper.java index f28080a..cc9066c 100644 --- a/src/main/java/com/bwie/mapper/PurMapper.java +++ b/src/main/java/com/bwie/mapper/PurMapper.java @@ -1,11 +1,34 @@ package com.bwie.mapper; +import com.bwie.admin.Pur; +import com.bwie.admin.Pur2; +import com.bwie.admin.Pur3; import org.apache.ibatis.annotations.Mapper; +import java.util.List; + /** * @Author: wangxinyuan * @Date: 2024/4/11 下午11:34 */ @Mapper public interface PurMapper { + List list(); + + void updateData1(Pur pur); + + void insertData1(Pur pur); + + + List Purlist2(); + + void updateData2(Pur2 pur2); + + void insertData2(Pur2 pur2); + + List Purlist3(); + + void updateData3(Pur3 pur3); + + void insertData3(Pur3 pur3); } diff --git a/src/main/java/com/bwie/service/Impl/PurServiceImpl.java b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java deleted file mode 100644 index b75a24d..0000000 --- a/src/main/java/com/bwie/service/Impl/PurServiceImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.bwie.service.Impl; - -import com.bwie.admin.Pur; -import com.bwie.mapper.PurMapper; -import com.bwie.service.PurService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午11:17 - */ -@Service -public class PurServiceImpl implements PurService { - - - @Autowired - private PurMapper purMapper; - - - // 获取数据 - @Override - public Pur findById(String id) { - // 在真实应用中,你可能需要根据 id 去数据库查询数据并返回相应的 Pur 对象 - // 这里为了示例,直接返回 null - return null; - } - - // 更新数据 - @Override - public void update(Pur existingPur) { - // 在真实应用中,你需要将 existingPur 对象的属性值更新到数据库中对应的记录 - // 这里为了示例,仅打印日志 - System.out.println("Updating Pur: " + existingPur); - } - - // 插入数据 - @Override - public void insert(Pur pur) { - // 在真实应用中,你需要将 pur 对象插入到数据库中 - // 这里为了示例,仅打印日志 - System.out.println("Inserting Pur: " + pur); - } - - // 处理 Pur 消息的逻辑 - @Override - public void processPur(String msg) { - // 在真实应用中,你可能需要对接收到的消息进行解析,然后调用相应的方法进行处理 - // 这里为了示例,仅打印日志 - System.out.println("Processing Pur message: " + msg); - } -} diff --git a/src/main/java/com/bwie/service/PurService.java b/src/main/java/com/bwie/service/PurService.java deleted file mode 100644 index 49393f9..0000000 --- a/src/main/java/com/bwie/service/PurService.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.bwie.service; - -import com.bwie.admin.Pur; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * @Author: wangxinyuan - * @Date: 2024/4/11 下午11:15 - */ -public interface PurService { - - - Pur findById(String id); - - void update(Pur existingPur); - - void insert(Pur pur); - - void processPur(String msg); -} diff --git a/src/main/java/com/bwie/task/Task.java b/src/main/java/com/bwie/task/Task.java new file mode 100644 index 0000000..5c8ba45 --- /dev/null +++ b/src/main/java/com/bwie/task/Task.java @@ -0,0 +1,109 @@ +package com.bwie.task; + +import com.bwie.admin.Pur; +import com.bwie.admin.Pur2; +import com.bwie.admin.Pur3; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.io.*; +import java.net.URL; +import java.util.List; + +/** + * @Author: wangxinyuan + * @Date: 2024/4/12 下午3:35 + */ +@Component +public class Task { + + @Scheduled(cron = "0 0/1 * * * ?") // 每分钟执行一次 + public void doTask() { + // 查询data1的全部数据,得到data1s + List data1s = queryPur1(); + + for (Pur data1 : data1s) { + // 根据id去data2和data3进行查询 + Pur2 data2 = queryPur2(data1.getId()); + Pur3 data3 = queryPur3(data1.getId()); + + // 如果查询到的data2和data3有一个为空,则跳过,等待下次定时调度 + if (data2 == null || data3 == null) { + continue; + } + + // 编写图片保存方法,将图片存储到本地路径 ,并返回图片名称 + String imageName = saveImage(data1.getPhoto1()); + + // 根据data1、2、3及返回的图片名称,拼接字符串写入到bcp文件中,并返回文件名称 + String bcpFileName = writeBcpFile(data1, data2, data3, imageName); + + // 写入规则为字段+"\t" + // 读取索引的模板文件(xml),并替换掉文件名称,并将新的xml内容写到文件中 + String xmlFileName = replaceXmlContent(bcpFileName); + + // 压缩生产的三个文件为zip,压缩到其他目录 + String zipFileName = compressFiles(bcpFileName, xmlFileName, imageName); + + } + } + + private String compressFiles(String bcpFileName, String xmlFileName, String imageName) { + return null; + } + + private String replaceXmlContent(String bcpFileName) { + return null; + } + + private String writeBcpFile(Pur data1, Pur2 data2, Pur3 data3, String imageName) { + // 实现根据data1、2、3及返回的图片名称,拼接字符串写入到bcp文件中,并返回文件名称的逻辑 + try { + String fileName = "D:/save/files/" + data1.getId() + ".bcp"; + BufferedWriter writer = new BufferedWriter(new FileWriter(fileName)); + writer.write(data1.toString() + "\t"); + writer.write(data2.toString() + "\t"); + writer.write(data3.toString() + "\t"); + writer.write(imageName); + writer.close(); + return fileName; + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + private String saveImage(String photo1) { + try { + URL url = new URL(photo1); + String fileName = photo1.substring(photo1.lastIndexOf("/") + 1); + InputStream inputStream = url.openStream(); + FileOutputStream outputStream = new FileOutputStream("D:/to/image/" + fileName); + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + inputStream.close(); + outputStream.close(); + return fileName; + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + private Pur3 queryPur3(String id) { + return null; + } + + private Pur2 queryPur2(String id) { + return null; + } + + private List queryPur1() { + return null; + } + + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7347d33..f111ca4 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,12 +1,63 @@ -spring: - datasource: - url: jdbc:mysql://111.231.174.71:3306/day8?useUnicode=true&characterEncoding=true&characterEncoding=utf8 - username: root - password: wxy@123 - driver-class-name: com.mysql.cj.jdbc.Driver +# Tomcat +server: + port: 9000 + +# Spring +spring: + application: + # 应用名称 + name: data-transmission rabbitmq: host: 111.231.174.71 port: 5672 username: guest password: guest + virtual-host: / + publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue) + template: + # 只要消息抵达Queue,就会异步发送优先回调return firm + mandatory: true + profiles: + # 环境配置 + active: dev + datasource: + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://111.231.174.71:3306/day8?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false + username: root + password: wxy@123 + druid: + # 下面为连接池的补充设置,应用到上面所有数据源中 + # 初始化大小,最小,最大 + initial-size: 5 + min-idle: 5 + max-active: 20 + # 配置获取连接等待超时的时间 + max-wait: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + time-between-eviction-runs-millis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + min-evictable-idle-time-millis: 300000 + validation-query: SELECT 1 FROM DUAL + test-while-idle: true + test-on-borrow: false + test-on-return: false + # 打开PSCache,并且指定每个连接上PSCache的大小 + pool-prepared-statements: true + # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 + max-pool-prepared-statement-per-connection-size: 20 + filters: stat,wall + use-global-data-source-stat: true + # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 + connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 +# mybatis +mybatis: + configuration: + map-underscore-to-camel-case: true + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + mapper-locations: classpath*:mapper/*Mapper.xml + type-aliases-package: com.bwie.datatransmission.domain + global-config: + db-config: + id-type: auto diff --git a/src/main/resources/mapper/mapper.xml b/src/main/resources/mapper/mapper.xml new file mode 100644 index 0000000..254bbef --- /dev/null +++ b/src/main/resources/mapper/mapper.xml @@ -0,0 +1,54 @@ + + + + + + INSERT INTO pur (id, xm, age, time, plate, card, photo1, photo2) + VALUES (#{id}, #{xm}, #{age}, #{time}, #{plate}, #{card}, #{photo1}, #{photo2}); + + + + + + INSERT INTO pur (id, xm, age, time, plate, card, photo1, photo2) + VALUES (#{id}, #{xm}, #{age}, #{time}, #{plate}, #{card}, #{photo1}, #{photo2}); + + + + INSERT INTO pur (id, xm, age, time, plate, card, photo1, photo2) + VALUES (#{id}, #{xm}, #{age}, #{time}, #{plate}, #{card}, #{photo1}, #{photo2}); + + + + -- 更新数据 + UPDATE your_table_name + SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} + WHERE id = 'id'; + + + + + UPDATE your_table_name + SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} + WHERE id = 'id'; + + + + + UPDATE your_table_name + SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} + WHERE id = 'id'; + + + + + + + -- 2.40.1 From eb4bb59f935534e02837af47861d7321a2faea63 Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 12 Apr 2024 21:49:36 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/bwie/config/RabbitAdminConfig.java | 53 +++++++++++++++++++ src/main/resources/application.yml | 4 +- src/main/resources/mapper/mapper.xml | 12 ++--- 3 files changed, 61 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/bwie/config/RabbitAdminConfig.java diff --git a/src/main/java/com/bwie/config/RabbitAdminConfig.java b/src/main/java/com/bwie/config/RabbitAdminConfig.java new file mode 100644 index 0000000..9b114fd --- /dev/null +++ b/src/main/java/com/bwie/config/RabbitAdminConfig.java @@ -0,0 +1,53 @@ +package com.bwie.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * RabbitAdmin是RabbitMQ的一个Java客户端库,它提供了管理RabbitMQ资源的功能。它是通过与RabbitMQ服务器进行交互来执行管理操作的。 + */ +@Configuration +public class RabbitAdminConfig { + + @Value("${spring.rabbitmq.host}") + private String host; + @Value("${spring.rabbitmq.username}") + private String username; + @Value("${spring.rabbitmq.password}") + private String password; + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 构建 RabbitMQ的连接工厂 + * @return + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + /** + * 自己初始化 RabbitAdmin + * @param connectionFactory + * @return + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f111ca4..dfdacd8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,7 +7,7 @@ server: spring: application: # 应用名称 - name: data-transmission + name: boot-4.11z rabbitmq: host: 111.231.174.71 port: 5672 @@ -57,7 +57,7 @@ mybatis: map-underscore-to-camel-case: true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath*:mapper/*Mapper.xml - type-aliases-package: com.bwie.datatransmission.domain + type-aliases-package: com.bwie.admin global-config: db-config: id-type: auto diff --git a/src/main/resources/mapper/mapper.xml b/src/main/resources/mapper/mapper.xml index 254bbef..da30ff3 100644 --- a/src/main/resources/mapper/mapper.xml +++ b/src/main/resources/mapper/mapper.xml @@ -24,23 +24,23 @@ -- 更新数据 - UPDATE your_table_name + UPDATE pur SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} - WHERE id = 'id'; + WHERE id = #{id}; - UPDATE your_table_name + UPDATE pur SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} - WHERE id = 'id'; + WHERE id = #{id}; - UPDATE your_table_name + UPDATE pur SET xm = #{xm}, age = #{age}, time = #{time}, plate = #{plate}, card = #{card}, photo1 = #{photo1}, photo2 = #{photo2} - WHERE id = 'id'; + WHERE id = #{id};