feat:多数据源模块mq初始化 链接

master
hbr 2024-06-07 15:27:41 +08:00
parent 2d4153261b
commit d8a8210c60
15 changed files with 353 additions and 49 deletions

View File

@ -20,7 +20,11 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<!--rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.zhiLian</groupId> <groupId>com.zhiLian</groupId>
<artifactId>zhiLian-common-system</artifactId> <artifactId>zhiLian-common-system</artifactId>

View File

@ -0,0 +1,15 @@
package com.zhiLian.business.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -1,25 +0,0 @@
package com.zhiLian.business.remote.factory;
import com.zhiLian.common.core.domain.Result;
import com.zhiLian.common.log.annotation.Log;
import com.zhiLian.common.log.enums.BusinessType;
import com.zhiLian.common.system.domain.SysUser;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* BingRui.Hou
*
* @Description
* @ClassName RemoteUserLoginFactory
* @Date 2024/05/27 16:31
*/
@FeignClient("zhiLian-system")
public interface RemoteUserLoginFactory {
@Log(title = "用户管理", businessType = BusinessType.INSERT)
@PostMapping("/user")
public Result add (@Validated @RequestBody SysUser user);
}

View File

@ -4,11 +4,12 @@ package com.zhiLian.business.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zhiLian.business.config.RabbitmqConfig;
import com.zhiLian.business.domain.Business; import com.zhiLian.business.domain.Business;
import com.zhiLian.business.domain.Entinfo; import com.zhiLian.business.domain.Entinfo;
import com.zhiLian.business.mapper.BusinessMapper; import com.zhiLian.business.mapper.BusinessMapper;
import com.zhiLian.business.remote.factory.RemoteUserLoginFactory; //import com.zhiLian.business.remote.factory.RemoteUserLoginFactory;
import com.zhiLian.business.service.IBusinessService; import com.zhiLian.business.service.IBusinessService;
import com.zhiLian.common.core.domain.Result; import com.zhiLian.common.core.domain.Result;
import com.zhiLian.common.core.utils.DateUtils; import com.zhiLian.common.core.utils.DateUtils;
@ -18,6 +19,7 @@ import com.zhiLian.common.system.domain.LoginUser;
import com.zhiLian.common.system.domain.SysUser; import com.zhiLian.common.system.domain.SysUser;
import com.zhiLian.common.system.remote.RemoteUserService; import com.zhiLian.common.system.remote.RemoteUserService;
import org.apache.catalina.User; import org.apache.catalina.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -32,6 +34,7 @@ import java.net.ProtocolException;
import java.net.URL; import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -90,8 +93,8 @@ public class BusinessServiceImpl extends ServiceImpl<BusinessMapper, Business>
* @param business * @param business
* @return * @return
*/ */
@Autowired // @Autowired
private RemoteUserLoginFactory remoteUserLoginFactory; // private RemoteUserLoginFactory remoteUserLoginFactory;
@Override @Override
public int insertBusiness(Business business) public int insertBusiness(Business business)
{ {
@ -136,23 +139,20 @@ public class BusinessServiceImpl extends ServiceImpl<BusinessMapper, Business>
redisService.deleteObject("entinfo"); redisService.deleteObject("entinfo");
List<Entinfo> list = entinfoService.list(); List<Entinfo> list = entinfoService.list();
redisService.setCacheList("entinfo",list); redisService.setCacheList("entinfo",list);
if (business.getBusinessStates().equals(2) ) { if (business.getBusinessStates().equals("2")) {
Entinfo build = Entinfo.builder()
.entCode("test_" + business.getId())
.ip("192.168.120.128")
.port(Integer.valueOf(3306 + Integer.valueOf(String.valueOf(business.getId())))).build();
entinfoService.insertEntinfo(build);
// redisService.setCacheObject(String.valueOf(business.getId()),JSON.toJSONString(build));
extracted(business); extracted(business);
} }
} }
@Autowired
private RabbitTemplate rabbitTemplate;
/** /**
* http URL * http URL
* @param business * @param business
*/ */
private static void extracted(Business business) { private void extracted(Business business) {
String postUrl="http://192.168.120.128/webhook/%E6%96%B0%E5%BB%BAmysql%E6%9C%8D%E5%8A%A1"; String postUrl="http://192.168.120.128/webhook/%E6%96%B0%E5%BB%BAmysql%E6%9C%8D%E5%8A%A1";
HashMap<String, String> hashMap = new HashMap<>(); HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("businessId",business.getId()+ business.getName()); hashMap.put("businessId",business.getId()+ business.getName());
@ -185,6 +185,15 @@ public class BusinessServiceImpl extends ServiceImpl<BusinessMapper, Business>
new InputStreamReader(httpConn.getInputStream())); new InputStreamReader(httpConn.getInputStream()));
String resultData = br.readLine(); String resultData = br.readLine();
System.out.println("从服务端返回结果: " + resultData); System.out.println("从服务端返回结果: " + resultData);
Entinfo build = Entinfo.builder()
.entCode("test_" + business.getId())
.ip("192.168.120.128")
.port(Integer.valueOf(3306 + Integer.valueOf(String.valueOf(business.getId())))).build();
entinfoService.insertEntinfo(build);
rabbitTemplate.convertAndSend("zhiLian-vehicle-exchange",JSON.toJSONString(build),message ->{
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
// 7.关闭HttpURLConnection连接 // 7.关闭HttpURLConnection连接
httpConn.disconnect(); httpConn.disconnect();
} catch (Exception e) { } catch (Exception e) {

View File

@ -4,6 +4,17 @@ server:
# Spring # Spring
spring: spring:
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 122.51.111.225
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
application: application:
# 应用名称 # 应用名称
name: zhiLian-business name: zhiLian-business

View File

@ -21,6 +21,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableCustomSwagger2 @EnableCustomSwagger2
@EnableMyFeignClients @EnableMyFeignClients
@SpringBootApplication(exclude = {DynamicDataSourceAutoConfiguration.class, DataSourceAutoConfiguration.class }) @SpringBootApplication(exclude = {DynamicDataSourceAutoConfiguration.class, DataSourceAutoConfiguration.class })
@EnableScheduling
public class ZhiLianVehicleApplication { public class ZhiLianVehicleApplication {
public static void main (String[] args) { public static void main (String[] args) {
SpringApplication.run(ZhiLianVehicleApplication.class, args); SpringApplication.run(ZhiLianVehicleApplication.class, args);

View File

@ -1,24 +1,30 @@
package com.zhiLian.vehicle.datasource; package com.zhiLian.vehicle.datasource;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.zhiLian.common.core.utils.SpringUtils;
import com.zhiLian.common.redis.service.RedisService; import com.zhiLian.common.redis.service.RedisService;
import com.zhiLian.common.system.remote.RemoteUserService;
import com.zhiLian.vehicle.datasource.config.factory.DruidDataSourceFactory; import com.zhiLian.vehicle.datasource.config.factory.DruidDataSourceFactory;
import com.zhiLian.vehicle.datasource.config.role.DynamicDataSource; import com.zhiLian.vehicle.datasource.config.role.DynamicDataSource;
import com.zhiLian.vehicle.datasource.domain.DataSourceInfo; import com.zhiLian.vehicle.datasource.domain.DataSourceInfo;
import com.zhiLian.vehicle.datasource.domain.Entinfo; import com.zhiLian.vehicle.datasource.domain.Entinfo;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import springfox.documentation.spring.web.json.Json;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -38,6 +44,60 @@ public class ManyDataSource {
private RedisTemplate<String,String> redisTemplate; private RedisTemplate<String,String> redisTemplate;
//调用注解 添加队列名称
@RabbitListener(queuesToDeclare = {@Queue(name = "zhiLian-vehicle-exchange")})
public void smsConfig(String msg, Message message, Channel channel){
//获取消息的ID
String messageId = message.getMessageProperties().getMessageId();
try {
//添加消息id到redis set集合中 添加成功返回1 表示未消费 添加失败返回0 表示已消费
Long count = redisTemplate.opsForSet().add("messageId", messageId);
//添加成功 正常消费信息
if (count == 1) {
log.info("开始消费");
DruidDataSourceFactory druidDataSourceFactory = SpringUtils.getBean(DruidDataSourceFactory.class);
DynamicDataSource dynamicDataSource = SpringUtils.getBean(DynamicDataSource.class);
Entinfo entinfo1 = JSON.parseObject(msg, Entinfo.class);
DataSourceInfo dataSourceInfo = DataSourceInfo.hostAndPortBuild(entinfo1.getEntCode(), entinfo1.getIp(), entinfo1.getPort());
DruidDataSource druidDataSource = druidDataSourceFactory.create(dataSourceInfo);
dynamicDataSource.put(dataSourceInfo.getKey(), druidDataSource);
new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// List<String> entinfo = redisTemplate.opsForList().range("entinfo", 0, -1);
// List<Entinfo> databaseNameList = new ArrayList<>();
// entinfo.forEach(string -> {
// Entinfo entInfo = JSON.parseObject(string, Entinfo.class);
// databaseNameList.add(entInfo);
// });
// databaseNameList.forEach(enterpriseInfo -> {
// DataSourceInfo dataSourceInfo = DataSourceInfo.hostAndPortBuild(enterpriseInfo.getEntCode(), enterpriseInfo.getIp(), enterpriseInfo.getPort());
// DruidDataSource druidDataSource = druidDataSourceFactory.create(dataSourceInfo);
// dynamicDataSource.put(dataSourceInfo.getKey(), druidDataSource);
// });
}).start();
//确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费成功");
}
} catch (Exception e) {
//删除队列ID
log.info("消费重复");
try {
//回退消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
log.info("消费失败");
} catch (IOException ex) {
//回退失败
log.info("消费异常");
}
}
}
@Lazy @Lazy
private List<Entinfo> dataSourceInfoList(){ private List<Entinfo> dataSourceInfoList(){
List<Entinfo> databaseNameList = new ArrayList<>(){{ List<Entinfo> databaseNameList = new ArrayList<>(){{
@ -89,7 +149,5 @@ public class ManyDataSource {
return dynamicDataSource; return dynamicDataSource;
} }
public void init(){
}
} }

View File

@ -29,18 +29,11 @@ public class DataSourceAsp {
@Lazy @Lazy
@Autowired @Autowired
private RemoteUserService remoteUserService; private RemoteUserService remoteUserService;
/** /**
* *
*/ */
@Before("pointcut()") @Before("pointcut()")
public void beforeMethod() { public void beforeMethod() {
// EntInfo build = EntInfo.builder()
// .entCode("test_" + 12)
// .ip("192.168.120.128")
// .port(Integer.valueOf(3307)).build();
// redisService.setCacheObject(String.valueOf(12), JSON.toJSONString(build));
Long storeId = SecurityUtils.getLoginUser().getUserid(); Long storeId = SecurityUtils.getLoginUser().getUserid();
SysUser sysUser = remoteUserService.selectByUserId(storeId); SysUser sysUser = remoteUserService.selectByUserId(storeId);
DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+sysUser.getUserType()); DynamicDataSourceHolder.setDynamicDataSourceKey("test_"+sysUser.getUserType());

View File

@ -1,6 +1,7 @@
package com.zhiLian.vehicle.datasource.config.role; package com.zhiLian.vehicle.datasource.config.role;
import com.alibaba.druid.pool.DruidDataSource;
import com.zhiLian.vehicle.datasource.config.holder.DynamicDataSourceHolder; import com.zhiLian.vehicle.datasource.config.holder.DynamicDataSourceHolder;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@ -33,4 +34,14 @@ public class DynamicDataSource extends AbstractRoutingDataSource {
protected Object determineCurrentLookupKey() { protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getDynamicDataSourceKey(); return DynamicDataSourceHolder.getDynamicDataSourceKey();
} }
/**
*
* @param key
* @param value
*/
public void put(String key, DruidDataSource value) {
defineTargetDataSources.put(key, value);
this.afterPropertiesSet();
}
} }

View File

@ -0,0 +1,48 @@
package com.zhiLian.vehicle.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
* @PostConstruct bean
* @PreDestory bean
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息投递到 broker 的状态true表示成功
System.out.println("消息发送成功!");
} else {
// 发送异常
System.out.println("发送异常原因 = " + cause);
// TODO 可以将消息 内容 以及 失败的原因 记录到 日志表中
}
}
}

View File

@ -0,0 +1,53 @@
package com.zhiLian.vehicle.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,15 @@
package com.zhiLian.vehicle.rabbitmq.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,34 @@
package com.zhiLian.vehicle.rabbitmq.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* returnedMessage
*/
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法也可以理解为在spring容器初始化的时候执
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() + "被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// 回退了所有的信息,可做补偿机制 记录到 数据库
}
}

View File

@ -0,0 +1,66 @@
//package com.zhiLian.vehicle.rabbitmq.config;//package com.bwie.sms.config;
//
//
//import com.rabbitmq.client.Channel;
//import lombok.extern.log4j.Log4j2;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.stereotype.Component;
//
//import java.io.IOException;
//import java.util.HashMap;
//
///**
// * @BelongsProject: Bob_Up_Like_A_Cork
// * @BelongsPackage: com.bwie.sms.config
// * @Author: zhangquan
// * @CreateTime: 2023/7/30 20:48
// */
//@Component
//@Log4j2
//public class SendCodeConfig {
// @Autowired
// private RedisTemplate<String, String> redisTemplate;
//
// //调用注解 添加队列名称
// @RabbitListener(queuesToDeclare = {@Queue(name = "zhiLian-vehicle-exchange")})
// public void smsConfig(String msg, Message message, Channel channel){
// //获取消息的ID
// String messageId = message.getMessageProperties().getMessageId();
// try {
// //添加消息id到redis set集合中 添加成功返回1 表示未消费 添加失败返回0 表示已消费
// Long count = redisTemplate.opsForSet().add("messageId", messageId);
// //添加成功 正常消费信息
// if (count == 1) {
// log.info("开始消费");
// //将业务层接受的数据反序列为请求类对象
//
// //调用工具类发送验证码
//
// //反序列化
//
//
// //判断是否发送成功 不成功继续发送
//
// //确认消费
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// log.info("消费成功");
// }
// } catch (Exception e) {
// //删除队列ID
//
// log.info("消费重复");
// try {
// //回退消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
// log.info("消费失败");
// } catch (IOException ex) {
// //回退失败
// log.info("消费异常");
// }
// }
// }
//}

View File

@ -4,6 +4,17 @@ server:
# Spring # Spring
spring: spring:
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 122.51.111.225
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确
main: main:
allow-circular-references: true allow-circular-references: true
application: application: