diff --git a/pom.xml b/pom.xml
index 7a21e39..0a1685a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,58 @@
UTF-8
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.7.18
+
+
+
+ org.springframework.boot
+ spring-boot-starter-batch
+ 2.5.6
+
+
+
+ org.springframework
+ spring-jdbc
+ 5.3.31
+
+
+
+ org.mybatis.spring.boot
+ mybatis-spring-boot-starter
+ 2.3.1
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.13.5
+
+
+
+ org.springframework
+ spring-test
+ 5.3.23
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.80
+
+
org.springframework.boot
@@ -22,6 +73,12 @@
2.7.18
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.7.18
+
+
mysql
@@ -54,6 +111,12 @@
jackson-databind
2.12.5
+
+ org.junit.jupiter
+ junit-jupiter
+ RELEASE
+ compile
+
diff --git a/src/main/java/com/bwie/Application.java b/src/main/java/com/bwie/Application.java
index 599cdc0..2045533 100644
--- a/src/main/java/com/bwie/Application.java
+++ b/src/main/java/com/bwie/Application.java
@@ -5,7 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: wangxinyuan
- * @Date: 2024/4/11 下午7:29
+ * @Date: 2024/4/11 下午9:08
*/
@SpringBootApplication
public class Application {
diff --git a/src/main/java/com/bwie/admin/Pur.java b/src/main/java/com/bwie/admin/Pur.java
index 97ffd76..adb8fe2 100644
--- a/src/main/java/com/bwie/admin/Pur.java
+++ b/src/main/java/com/bwie/admin/Pur.java
@@ -1,5 +1,9 @@
package com.bwie.admin;
-
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.alibaba.fastjson.annotation.JSONField;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.deser.std.DateDeserializers;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -19,18 +23,19 @@ public class Pur {
private String id;
- private String XM;
+ private String xm;
- private String Age;
+ private Integer age;
- private Date Time;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date time;
private String plate;
- private String Card;
+ private String card;
- private String Photo1;
+ private String photo1;
- private String Photo2;
+ private String photo2;
}
diff --git a/src/main/java/com/bwie/config/RabbitAdminConfig.java b/src/main/java/com/bwie/config/RabbitAdminConfig.java
deleted file mode 100644
index 9b114fd..0000000
--- a/src/main/java/com/bwie/config/RabbitAdminConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.bwie.config;
-
-import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.core.RabbitAdmin;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * RabbitAdmin是RabbitMQ的一个Java客户端库,它提供了管理RabbitMQ资源的功能。它是通过与RabbitMQ服务器进行交互来执行管理操作的。
- */
-@Configuration
-public class RabbitAdminConfig {
-
- @Value("${spring.rabbitmq.host}")
- private String host;
- @Value("${spring.rabbitmq.username}")
- private String username;
- @Value("${spring.rabbitmq.password}")
- private String password;
- @Value("${spring.rabbitmq.virtualhost}")
- private String virtualhost;
-
- /**
- * 构建 RabbitMQ的连接工厂
- * @return
- */
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses(host);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualhost);
- // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
- connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
- connectionFactory.setPublisherReturns(true);
- return connectionFactory;
- }
-
- /**
- * 自己初始化 RabbitAdmin
- * @param connectionFactory
- * @return
- */
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
- RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
- rabbitAdmin.setAutoStartup(true);
- return rabbitAdmin;
- }
-}
diff --git a/src/main/java/com/bwie/consumer/ConsumeData2.java b/src/main/java/com/bwie/consumer/ConsumeData2.java
new file mode 100644
index 0000000..b55b35f
--- /dev/null
+++ b/src/main/java/com/bwie/consumer/ConsumeData2.java
@@ -0,0 +1,62 @@
+//package com.bwie.consumer;
+//
+//import lombok.extern.log4j.Log4j2;
+//import org.springframework.amqp.rabbit.annotation.RabbitListener;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.jdbc.core.JdbcTemplate;
+//import org.springframework.stereotype.Component;
+//
+//import java.sql.Timestamp;
+//
+///**
+// * @Author: wangxinyuan
+// * @Date: 2024/4/11 下午7:40
+// */
+////@Component
+//@Log4j2
+//public class ConsumeData2 {
+//
+// //@Autowired
+// private JdbcTemplate jdbcTemplate;
+//
+// //@RabbitListener(queues = "data2")
+// public void consumeData2(String message) {
+// log.info("Received data2: " + message);
+// saveDataToDatabase(message);
+// }
+//
+// private void saveDataToDatabase(String message) {
+// try {
+// String[] parts = message.split(",");
+// if (parts.length >= 9) {
+// String id = parts[0];
+// String XM = parts[1];
+// String Age = parts[2];
+// String timeStr = parts[3];
+// Timestamp Time = Timestamp.valueOf(timeStr);
+// String plate = parts[4];
+// String Card = parts[5];
+// String Photo1 = parts[7];
+// String Photo2 = parts[8];
+//
+//
+// String querySql = "SELECT * FROM pur WHERE id = ?";
+// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
+//
+// if (count == null || count == 0) {
+// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
+// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
+// log.info("Data saved to database successfully.");
+// } else {
+// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
+// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
+// log.info("Data updated in database successfully.");
+// }
+// } else {
+// log.info("Data saved to database failed.");
+// }
+// } catch (Exception e) {
+// log.error("Error while saving data to database: " + e.getMessage());
+// }
+// }
+//}
diff --git a/src/main/java/com/bwie/consumer/ConsumeData3.java b/src/main/java/com/bwie/consumer/ConsumeData3.java
new file mode 100644
index 0000000..3c8d90f
--- /dev/null
+++ b/src/main/java/com/bwie/consumer/ConsumeData3.java
@@ -0,0 +1,64 @@
+//package com.bwie.consumer;
+//
+//import lombok.extern.log4j.Log4j2;
+//import org.springframework.amqp.rabbit.annotation.RabbitListener;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.jdbc.core.JdbcTemplate;
+//import org.springframework.stereotype.Component;
+//
+//import java.sql.Timestamp;
+//
+///**
+// * @Author: wangxinyuan
+// * @Date: 2024/4/11 下午7:40
+// */
+////@Component
+//@Log4j2
+//public class ConsumeData3 {
+//
+//
+// //@Autowired
+// private JdbcTemplate jdbcTemplate;
+//
+// //@RabbitListener(queues = "data3")
+// public void consumeData3(String message) {
+// log.info("Received data3: " + message);
+// saveDataToDatabase(message);
+// }
+//
+// private void saveDataToDatabase(String message) {
+// try {
+// String[] parts = message.split(",");
+// if (parts.length >= 9) {
+// String id = parts[0];
+// String XM = parts[1];
+// String Age = parts[2];
+// String timeStr = parts[3];
+// Timestamp Time = Timestamp.valueOf(timeStr);
+// String plate = parts[4];
+// String Card = parts[5];/**/
+// String Photo1 = parts[7];
+// String Photo2 = parts[8];
+//
+//
+// String querySql = "SELECT * FROM pur WHERE id = ?";
+// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
+//
+//
+// if (count == null || count == 0) {
+// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
+// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
+// log.info("Data saved to database successfully.");
+// } else {
+// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
+// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
+// log.info("Data updated in database successfully.");
+// }
+// } else {
+// log.info("Data saved to database failed.");
+// }
+// } catch (Exception e) {
+// log.error("Error while saving data to database: " + e.getMessage());
+// }
+// }
+//}
diff --git a/src/main/java/com/bwie/consumer/ConsumerDate1.java b/src/main/java/com/bwie/consumer/ConsumerDate1.java
new file mode 100644
index 0000000..da3d06f
--- /dev/null
+++ b/src/main/java/com/bwie/consumer/ConsumerDate1.java
@@ -0,0 +1,70 @@
+//package com.bwie.consumer;
+//
+//import com.bwie.admin.Pur;
+//import lombok.extern.log4j.Log4j2;
+//import org.junit.jupiter.api.Test;
+//import org.springframework.amqp.rabbit.annotation.RabbitListener;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.jdbc.core.JdbcTemplate;
+//import org.springframework.stereotype.Component;
+//
+//import java.sql.Timestamp;
+//import java.util.Date;
+//
+///**
+// * @Author: wangxinyuan
+// * @Date: 2024/4/11 下午7:40
+// */
+////@Component
+//@Log4j2
+//public class ConsumerDate1 {
+//
+//
+// //@Autowired
+// private JdbcTemplate jdbcTemplate;
+//
+// //@RabbitListener(queues = "data1")
+// public void consumeData1(String message) {
+// log.info("Received data1: " + message);
+// saveDataToDatabase(message);
+// }
+//
+//
+// private void saveDataToDatabase(String message) {
+// try {
+// String[] parts = message.split(",");
+// if (parts.length >= 9) {
+// String id = parts[0];
+// String XM = parts[1];
+// String Age = parts[2];
+// String timeStr = parts[3];
+// Timestamp Time = Timestamp.valueOf(timeStr);
+// String plate = parts[4];
+// String Card = parts[5];
+// String Photo1 = parts[7];
+// String Photo2 = parts[8];
+//
+// String querySql = "SELECT * FROM pur WHERE id = ?";
+// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
+//
+//
+// if (count == null || count == 0) {
+// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
+// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
+// log.info("Data saved to database successfully.");
+// } else {
+// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
+// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
+// log.info("Data updated in database successfully.");
+// }
+// } else {
+// log.info("Data saved to database failed.");
+// }
+// } catch (Exception e) {
+// log.error("Error while saving data to database: " + e.getMessage());
+// }
+// }
+//
+//
+//
+//}
diff --git a/src/main/java/com/bwie/consumer/DataConsumer.java b/src/main/java/com/bwie/consumer/DataConsumer.java
index bb922e7..802178a 100644
--- a/src/main/java/com/bwie/consumer/DataConsumer.java
+++ b/src/main/java/com/bwie/consumer/DataConsumer.java
@@ -1,22 +1,77 @@
package com.bwie.consumer;
-import lombok.extern.log4j.Log4j2;
+import com.alibaba.fastjson.JSON;
+import com.bwie.admin.Pur;
+import com.bwie.controller.PurController;
+import com.bwie.service.PurService;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import com.rabbitmq.client.Channel;
+import org.springframework.stereotype.Repository;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
/**
* @Author: wangxinyuan
- * @Date: 2024/4/11 下午7:40
+ * @Date: 2024/4/11 下午9:24
*/
@Component
-@Log4j2
public class DataConsumer {
- @RabbitListener(queues = "data1")
- public void consumeData1(String message) {
- log.info("Received data1: " + message);
+
+ @Autowired
+ private PurController purController;
+
+ @Autowired
+ private PurService purService;
+
+
+ @RabbitListener(queuesToDeclare = {@Queue(name ="data1")})
+ public void consumeData1(String msg, Message message, Channel channel) throws IOException {
+ System.out.println("Received data1: " + message);
+ Pur pur = JSON.parseObject(msg, Pur.class);
+ System.out.println(pur);
+ // 假设 id 是 pur 对象中的一个属性
+ String id = pur.getId();
+
+ // 根据 id 判断数据库中是否存在对应的记录
+ Pur existingPur = purService.findById(id);
+
+ if (existingPur != null) {
+ // 如果存在则进行修改操作
+ purService.update(existingPur); // 假设有一个方法用来更新记录
+ } else {
+ // 如果不存在则进行插入操作
+ purService.insert(pur); // 假设有一个方法用来插入记录
+ }
+ //确认消费
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
+
}
+ @RabbitListener(queuesToDeclare = {@Queue(name ="data2")})
+ public void consumeData2(String msg, Message message, Channel channel) throws IOException {
+ System.out.println("Received data2: " + message);
+ Pur pur = JSON.parseObject(msg, Pur.class);
+ System.out.println(pur);
+ // 处理data2数据的逻辑
+ //确认消费
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
+ }
+ @RabbitListener(queuesToDeclare = {@Queue(name ="data3")})
+ public void consumeData3(String msg, Message message, Channel channel) throws IOException {
+ System.out.println("Received data3: " + message);
+ Pur pur = JSON.parseObject(msg, Pur.class);
+ System.out.println(pur);
+ // 处理data3数据的逻辑
+ //确认消费
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
+ }
}
diff --git a/src/main/java/com/bwie/consumer/consumeData2.java b/src/main/java/com/bwie/consumer/consumeData2.java
deleted file mode 100644
index f0bf09d..0000000
--- a/src/main/java/com/bwie/consumer/consumeData2.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.bwie.consumer;
-
-import lombok.extern.log4j.Log4j2;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Component;
-
-/**
- * @Author: wangxinyuan
- * @Date: 2024/4/11 下午7:40
- */
-@Component
-@Log4j2
-public class consumeData2 {
-
- @RabbitListener(queues = "data2")
- public void consumeData2(String message) {
-
- }
-}
diff --git a/src/main/java/com/bwie/consumer/consumeData3.java b/src/main/java/com/bwie/consumer/consumeData3.java
deleted file mode 100644
index 50f0a77..0000000
--- a/src/main/java/com/bwie/consumer/consumeData3.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.bwie.consumer;
-
-import lombok.extern.log4j.Log4j2;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Component;
-
-/**
- * @Author: wangxinyuan
- * @Date: 2024/4/11 下午7:40
- */
-@Component
-@Log4j2
-public class consumeData3 {
-
-
- @RabbitListener(queues = "data3")
- public void consumeData3(String message) {
-
- }
-}
diff --git a/src/main/java/com/bwie/controller/PurController.java b/src/main/java/com/bwie/controller/PurController.java
new file mode 100644
index 0000000..2e006e4
--- /dev/null
+++ b/src/main/java/com/bwie/controller/PurController.java
@@ -0,0 +1,38 @@
+package com.bwie.controller;
+
+import com.alibaba.fastjson.JSON;
+import com.bwie.admin.Pur;
+import com.bwie.service.PurService;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * @Author: wangxinyuan
+ * @Date: 2024/4/11 下午10:37
+ */
+@RestController
+public class PurController {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ @Autowired
+ private PurService purService;
+
+ @PostMapping
+ public String pur(@RequestBody List list) {
+ try {
+ rabbitTemplate.convertAndSend("data1",list);
+ rabbitTemplate.convertAndSend("data2", list);
+ rabbitTemplate.convertAndSend("data3", list);
+ } catch (AmqpException e) {
+ throw new RuntimeException(e);
+ }
+ return "消息发送成功";
+ }
+}
diff --git a/src/main/java/com/bwie/mapper/PurMapper.java b/src/main/java/com/bwie/mapper/PurMapper.java
new file mode 100644
index 0000000..f28080a
--- /dev/null
+++ b/src/main/java/com/bwie/mapper/PurMapper.java
@@ -0,0 +1,11 @@
+package com.bwie.mapper;
+
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @Author: wangxinyuan
+ * @Date: 2024/4/11 下午11:34
+ */
+@Mapper
+public interface PurMapper {
+}
diff --git a/src/main/java/com/bwie/service/Impl/PurServiceImpl.java b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java
new file mode 100644
index 0000000..b75a24d
--- /dev/null
+++ b/src/main/java/com/bwie/service/Impl/PurServiceImpl.java
@@ -0,0 +1,52 @@
+package com.bwie.service.Impl;
+
+import com.bwie.admin.Pur;
+import com.bwie.mapper.PurMapper;
+import com.bwie.service.PurService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Author: wangxinyuan
+ * @Date: 2024/4/11 下午11:17
+ */
+@Service
+public class PurServiceImpl implements PurService {
+
+
+ @Autowired
+ private PurMapper purMapper;
+
+
+ // 获取数据
+ @Override
+ public Pur findById(String id) {
+ // 在真实应用中,你可能需要根据 id 去数据库查询数据并返回相应的 Pur 对象
+ // 这里为了示例,直接返回 null
+ return null;
+ }
+
+ // 更新数据
+ @Override
+ public void update(Pur existingPur) {
+ // 在真实应用中,你需要将 existingPur 对象的属性值更新到数据库中对应的记录
+ // 这里为了示例,仅打印日志
+ System.out.println("Updating Pur: " + existingPur);
+ }
+
+ // 插入数据
+ @Override
+ public void insert(Pur pur) {
+ // 在真实应用中,你需要将 pur 对象插入到数据库中
+ // 这里为了示例,仅打印日志
+ System.out.println("Inserting Pur: " + pur);
+ }
+
+ // 处理 Pur 消息的逻辑
+ @Override
+ public void processPur(String msg) {
+ // 在真实应用中,你可能需要对接收到的消息进行解析,然后调用相应的方法进行处理
+ // 这里为了示例,仅打印日志
+ System.out.println("Processing Pur message: " + msg);
+ }
+}
diff --git a/src/main/java/com/bwie/service/PurService.java b/src/main/java/com/bwie/service/PurService.java
new file mode 100644
index 0000000..49393f9
--- /dev/null
+++ b/src/main/java/com/bwie/service/PurService.java
@@ -0,0 +1,20 @@
+package com.bwie.service;
+
+import com.bwie.admin.Pur;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * @Author: wangxinyuan
+ * @Date: 2024/4/11 下午11:15
+ */
+public interface PurService {
+
+
+ Pur findById(String id);
+
+ void update(Pur existingPur);
+
+ void insert(Pur pur);
+
+ void processPur(String msg);
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index bc7bfe0..7347d33 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,6 +1,6 @@
spring:
datasource:
- url: jdbc:mysql://111.231.174.71:3306/day8?useSSL=false&serverTimezone=UTC
+ url: jdbc:mysql://111.231.174.71:3306/day8?useUnicode=true&characterEncoding=true&characterEncoding=utf8
username: root
password: wxy@123
driver-class-name: com.mysql.cj.jdbc.Driver