pull/1/head
wxy 2024-04-12 10:30:10 +08:00
parent efc142e89d
commit f911988e74
15 changed files with 455 additions and 107 deletions

63
pom.xml
View File

@ -14,7 +14,58 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.3.31</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.23</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- Alibaba Fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!-- Spring Boot Starter for JDBC, includes HikariCP connection pool --> <!-- Spring Boot Starter for JDBC, includes HikariCP connection pool -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
@ -22,6 +73,12 @@
<version>2.7.18</version> <version>2.7.18</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.18</version>
</dependency>
<!-- MySQL Connector Java --> <!-- MySQL Connector Java -->
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
@ -54,6 +111,12 @@
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.12.5</version> <!-- 替换为你需要的版本 --> <version>2.12.5</version> <!-- 替换为你需要的版本 -->
</dependency> </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>

View File

@ -5,7 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/** /**
* @Author: wangxinyuan * @Author: wangxinyuan
* @Date: 2024/4/11 7:29 * @Date: 2024/4/11 9:08
*/ */
@SpringBootApplication @SpringBootApplication
public class Application { public class Application {

View File

@ -1,5 +1,9 @@
package com.bwie.admin; package com.bwie.admin;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.DateDeserializers;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -19,18 +23,19 @@ public class Pur {
private String id; private String id;
private String XM; private String xm;
private String Age; private Integer age;
private Date Time; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date time;
private String plate; private String plate;
private String Card; private String card;
private String Photo1; private String photo1;
private String Photo2; private String photo2;
} }

View File

@ -1,53 +0,0 @@
package com.bwie.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,62 @@
//package com.bwie.consumer;
//
//import lombok.extern.log4j.Log4j2;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.jdbc.core.JdbcTemplate;
//import org.springframework.stereotype.Component;
//
//import java.sql.Timestamp;
//
///**
// * @Author: wangxinyuan
// * @Date: 2024/4/11 下午7:40
// */
////@Component
//@Log4j2
//public class ConsumeData2 {
//
// //@Autowired
// private JdbcTemplate jdbcTemplate;
//
// //@RabbitListener(queues = "data2")
// public void consumeData2(String message) {
// log.info("Received data2: " + message);
// saveDataToDatabase(message);
// }
//
// private void saveDataToDatabase(String message) {
// try {
// String[] parts = message.split(",");
// if (parts.length >= 9) {
// String id = parts[0];
// String XM = parts[1];
// String Age = parts[2];
// String timeStr = parts[3];
// Timestamp Time = Timestamp.valueOf(timeStr);
// String plate = parts[4];
// String Card = parts[5];
// String Photo1 = parts[7];
// String Photo2 = parts[8];
//
//
// String querySql = "SELECT * FROM pur WHERE id = ?";
// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
//
// if (count == null || count == 0) {
// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
// log.info("Data saved to database successfully.");
// } else {
// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
// log.info("Data updated in database successfully.");
// }
// } else {
// log.info("Data saved to database failed.");
// }
// } catch (Exception e) {
// log.error("Error while saving data to database: " + e.getMessage());
// }
// }
//}

View File

@ -0,0 +1,64 @@
//package com.bwie.consumer;
//
//import lombok.extern.log4j.Log4j2;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.jdbc.core.JdbcTemplate;
//import org.springframework.stereotype.Component;
//
//import java.sql.Timestamp;
//
///**
// * @Author: wangxinyuan
// * @Date: 2024/4/11 下午7:40
// */
////@Component
//@Log4j2
//public class ConsumeData3 {
//
//
// //@Autowired
// private JdbcTemplate jdbcTemplate;
//
// //@RabbitListener(queues = "data3")
// public void consumeData3(String message) {
// log.info("Received data3: " + message);
// saveDataToDatabase(message);
// }
//
// private void saveDataToDatabase(String message) {
// try {
// String[] parts = message.split(",");
// if (parts.length >= 9) {
// String id = parts[0];
// String XM = parts[1];
// String Age = parts[2];
// String timeStr = parts[3];
// Timestamp Time = Timestamp.valueOf(timeStr);
// String plate = parts[4];
// String Card = parts[5];/**/
// String Photo1 = parts[7];
// String Photo2 = parts[8];
//
//
// String querySql = "SELECT * FROM pur WHERE id = ?";
// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
//
//
// if (count == null || count == 0) {
// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
// log.info("Data saved to database successfully.");
// } else {
// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
// log.info("Data updated in database successfully.");
// }
// } else {
// log.info("Data saved to database failed.");
// }
// } catch (Exception e) {
// log.error("Error while saving data to database: " + e.getMessage());
// }
// }
//}

View File

@ -0,0 +1,70 @@
//package com.bwie.consumer;
//
//import com.bwie.admin.Pur;
//import lombok.extern.log4j.Log4j2;
//import org.junit.jupiter.api.Test;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.jdbc.core.JdbcTemplate;
//import org.springframework.stereotype.Component;
//
//import java.sql.Timestamp;
//import java.util.Date;
//
///**
// * @Author: wangxinyuan
// * @Date: 2024/4/11 下午7:40
// */
////@Component
//@Log4j2
//public class ConsumerDate1 {
//
//
// //@Autowired
// private JdbcTemplate jdbcTemplate;
//
// //@RabbitListener(queues = "data1")
// public void consumeData1(String message) {
// log.info("Received data1: " + message);
// saveDataToDatabase(message);
// }
//
//
// private void saveDataToDatabase(String message) {
// try {
// String[] parts = message.split(",");
// if (parts.length >= 9) {
// String id = parts[0];
// String XM = parts[1];
// String Age = parts[2];
// String timeStr = parts[3];
// Timestamp Time = Timestamp.valueOf(timeStr);
// String plate = parts[4];
// String Card = parts[5];
// String Photo1 = parts[7];
// String Photo2 = parts[8];
//
// String querySql = "SELECT * FROM pur WHERE id = ?";
// Integer count = jdbcTemplate.queryForObject(querySql, new Object[]{id}, Integer.class);
//
//
// if (count == null || count == 0) {
// String insertSql = "INSERT INTO pur (id, XM, Age, Time, plate, Card, Photo1, Photo2) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
// jdbcTemplate.update(insertSql, id, XM, Age, Time, plate, Card, Photo1, Photo2);
// log.info("Data saved to database successfully.");
// } else {
// String updateSql = "UPDATE pur SET XM = ?, Age = ?, Time = ?, plate = ?, Card = ?, Photo1 = ?, Photo2 = ? WHERE id = ?";
// jdbcTemplate.update(updateSql, XM, Age, Time, plate, Card, Photo1, Photo2, id);
// log.info("Data updated in database successfully.");
// }
// } else {
// log.info("Data saved to database failed.");
// }
// } catch (Exception e) {
// log.error("Error while saving data to database: " + e.getMessage());
// }
// }
//
//
//
//}

View File

@ -1,22 +1,77 @@
package com.bwie.consumer; package com.bwie.consumer;
import lombok.extern.log4j.Log4j2; import com.alibaba.fastjson.JSON;
import com.bwie.admin.Pur;
import com.bwie.controller.PurController;
import com.bwie.service.PurService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.stereotype.Repository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
/** /**
* @Author: wangxinyuan * @Author: wangxinyuan
* @Date: 2024/4/11 7:40 * @Date: 2024/4/11 9:24
*/ */
@Component @Component
@Log4j2
public class DataConsumer { public class DataConsumer {
@RabbitListener(queues = "data1")
public void consumeData1(String message) { @Autowired
log.info("Received data1: " + message); private PurController purController;
@Autowired
private PurService purService;
@RabbitListener(queuesToDeclare = {@Queue(name ="data1")})
public void consumeData1(String msg, Message message, Channel channel) throws IOException {
System.out.println("Received data1: " + message);
Pur pur = JSON.parseObject(msg, Pur.class);
System.out.println(pur);
// 假设 id 是 pur 对象中的一个属性
String id = pur.getId();
// 根据 id 判断数据库中是否存在对应的记录
Pur existingPur = purService.findById(id);
if (existingPur != null) {
// 如果存在则进行修改操作
purService.update(existingPur); // 假设有一个方法用来更新记录
} else {
// 如果不存在则进行插入操作
purService.insert(pur); // 假设有一个方法用来插入记录
}
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} }
@RabbitListener(queuesToDeclare = {@Queue(name ="data2")})
public void consumeData2(String msg, Message message, Channel channel) throws IOException {
System.out.println("Received data2: " + message);
Pur pur = JSON.parseObject(msg, Pur.class);
System.out.println(pur);
// 处理data2数据的逻辑
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(queuesToDeclare = {@Queue(name ="data3")})
public void consumeData3(String msg, Message message, Channel channel) throws IOException {
System.out.println("Received data3: " + message);
Pur pur = JSON.parseObject(msg, Pur.class);
System.out.println(pur);
// 处理data3数据的逻辑
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
} }

View File

@ -1,19 +0,0 @@
package com.bwie.consumer;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 7:40
*/
@Component
@Log4j2
public class consumeData2 {
@RabbitListener(queues = "data2")
public void consumeData2(String message) {
}
}

View File

@ -1,20 +0,0 @@
package com.bwie.consumer;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 7:40
*/
@Component
@Log4j2
public class consumeData3 {
@RabbitListener(queues = "data3")
public void consumeData3(String message) {
}
}

View File

@ -0,0 +1,38 @@
package com.bwie.controller;
import com.alibaba.fastjson.JSON;
import com.bwie.admin.Pur;
import com.bwie.service.PurService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 10:37
*/
@RestController
public class PurController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private PurService purService;
@PostMapping
public String pur(@RequestBody List<Pur> list) {
try {
rabbitTemplate.convertAndSend("data1",list);
rabbitTemplate.convertAndSend("data2", list);
rabbitTemplate.convertAndSend("data3", list);
} catch (AmqpException e) {
throw new RuntimeException(e);
}
return "消息发送成功";
}
}

View File

@ -0,0 +1,11 @@
package com.bwie.mapper;
import org.apache.ibatis.annotations.Mapper;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 11:34
*/
@Mapper
public interface PurMapper {
}

View File

@ -0,0 +1,52 @@
package com.bwie.service.Impl;
import com.bwie.admin.Pur;
import com.bwie.mapper.PurMapper;
import com.bwie.service.PurService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 11:17
*/
@Service
public class PurServiceImpl implements PurService {
@Autowired
private PurMapper purMapper;
// 获取数据
@Override
public Pur findById(String id) {
// 在真实应用中,你可能需要根据 id 去数据库查询数据并返回相应的 Pur 对象
// 这里为了示例,直接返回 null
return null;
}
// 更新数据
@Override
public void update(Pur existingPur) {
// 在真实应用中,你需要将 existingPur 对象的属性值更新到数据库中对应的记录
// 这里为了示例,仅打印日志
System.out.println("Updating Pur: " + existingPur);
}
// 插入数据
@Override
public void insert(Pur pur) {
// 在真实应用中,你需要将 pur 对象插入到数据库中
// 这里为了示例,仅打印日志
System.out.println("Inserting Pur: " + pur);
}
// 处理 Pur 消息的逻辑
@Override
public void processPur(String msg) {
// 在真实应用中,你可能需要对接收到的消息进行解析,然后调用相应的方法进行处理
// 这里为了示例,仅打印日志
System.out.println("Processing Pur message: " + msg);
}
}

View File

@ -0,0 +1,20 @@
package com.bwie.service;
import com.bwie.admin.Pur;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author: wangxinyuan
* @Date: 2024/4/11 11:15
*/
public interface PurService {
Pur findById(String id);
void update(Pur existingPur);
void insert(Pur pur);
void processPur(String msg);
}

View File

@ -1,6 +1,6 @@
spring: spring:
datasource: datasource:
url: jdbc:mysql://111.231.174.71:3306/day8?useSSL=false&serverTimezone=UTC url: jdbc:mysql://111.231.174.71:3306/day8?useUnicode=true&characterEncoding=true&characterEncoding=utf8
username: root username: root
password: wxy@123 password: wxy@123
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver