fix(mcwl): 调整
parent
35ec6d7485
commit
2ef2fabc66
|
@ -1,5 +1,6 @@
|
|||
package com.mcwl.framework.config;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cache.annotation.CachingConfigurerSupport;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
@ -24,6 +25,16 @@ import java.time.Duration;
|
|||
@EnableCaching
|
||||
public class RedisConfig extends CachingConfigurerSupport
|
||||
{
|
||||
|
||||
@Value("${spring.redis.host}")
|
||||
private String host;
|
||||
|
||||
@Value("${spring.redis.port}")
|
||||
private int port;
|
||||
|
||||
@Value("${spring.redis.password}")
|
||||
private String password;
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings(value = { "unchecked", "rawtypes" })
|
||||
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
|
||||
|
@ -80,8 +91,8 @@ public class RedisConfig extends CachingConfigurerSupport
|
|||
*/
|
||||
@Bean
|
||||
public LettuceConnectionFactory redisConnectionFactory() {
|
||||
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("1.13.246.108", 6370);
|
||||
standaloneConfig.setPassword(RedisPassword.of("MuYu_Cloud@Redis"));
|
||||
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(host, port);
|
||||
standaloneConfig.setPassword(RedisPassword.of(password));
|
||||
|
||||
LettuceClientConfiguration lettuceClientConfig = LettuceClientConfiguration.builder()
|
||||
.commandTimeout(Duration.ofSeconds(10))
|
||||
|
|
|
@ -30,8 +30,10 @@ public class AuthenticationEntryPointImpl implements AuthenticationEntryPoint, S
|
|||
throws IOException
|
||||
{
|
||||
int code = HttpStatus.UNAUTHORIZED;
|
||||
if (e.getCause() instanceof RedisCommandTimeoutException) {
|
||||
ServletUtils.renderString(response, JSON.toJSONString(AjaxResult.warn("网络超时,请检查网络稍后重试")));
|
||||
if (e.getCause() != null) {
|
||||
if (e.getCause() instanceof RedisCommandTimeoutException) {
|
||||
ServletUtils.renderString(response, JSON.toJSONString(AjaxResult.warn("网络超时,请检查网络稍后重试")));
|
||||
}
|
||||
}
|
||||
String msg = StringUtils.format("请求访问:{},认证失败,无法访问系统资源", request.getRequestURI());
|
||||
ServletUtils.renderString(response, JSON.toJSONString(AjaxResult.error(code, msg)));
|
||||
|
|
|
@ -52,7 +52,6 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
|
||||
private String clientId;
|
||||
|
||||
//================= 运行时组件 =================//
|
||||
private MqttAsyncClient client;
|
||||
private final Map<String, List<IMessageHandler>> topicHandlers = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
@ -63,7 +62,7 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
this.context = context;
|
||||
}
|
||||
|
||||
//================= 生命周期管理 =================//
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws MqttException {
|
||||
if (StringUtils.isBlank(clientId)) {
|
||||
|
@ -112,7 +111,10 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
});
|
||||
}
|
||||
|
||||
//================= 自动重连机制 =================//
|
||||
/**
|
||||
* 重连管理
|
||||
* @param attempt 重连次数
|
||||
*/
|
||||
private void scheduleReconnect(int attempt) {
|
||||
if (attempt > maxReconnectAttempts) {
|
||||
log.error("达到最大重连次数: {}", maxReconnectAttempts);
|
||||
|
@ -132,6 +134,7 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
client.setCallback(this); // 关键:重新绑定回调
|
||||
client.reconnect();
|
||||
resubscribeAllTopics(); // 重连后立即订阅
|
||||
log.info("重连成功");
|
||||
} catch (MqttException e) {
|
||||
log.error("重连失败", e);
|
||||
scheduleReconnect(attempt + 1);
|
||||
|
@ -139,7 +142,12 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
}, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
//================= 消息发布 =================//
|
||||
/**
|
||||
* 发布消息
|
||||
* @param topic 主题
|
||||
* @param payload 消息体
|
||||
* @param qos QoS等级
|
||||
*/
|
||||
@Async
|
||||
public void publish(String topic, String payload, int qos) {
|
||||
try {
|
||||
|
@ -164,7 +172,12 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
}
|
||||
}
|
||||
|
||||
//================= 订阅管理 =================//
|
||||
/**
|
||||
* 订阅主题
|
||||
* @param topicFilter 主题过滤器
|
||||
* @param qos QoS等级
|
||||
* @param handler 消息处理器
|
||||
*/
|
||||
public void subscribe(String topicFilter, int qos, IMessageHandler handler) {
|
||||
try {
|
||||
if (!client.isConnected()) {
|
||||
|
@ -198,7 +211,11 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
});
|
||||
}
|
||||
|
||||
//================= 回调处理 =================//
|
||||
/**
|
||||
* 消息到达处理
|
||||
* @param topic 主题
|
||||
* @param message 消息
|
||||
*/
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) {
|
||||
|
||||
|
@ -222,7 +239,7 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
// QoS处理逻辑(可选)
|
||||
// QoS处理逻辑
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -234,7 +251,9 @@ public class MqttTemplate implements MqttCallbackExtended, DisposableBean {
|
|||
}
|
||||
}
|
||||
|
||||
//================= 自动注册处理器 =================//
|
||||
/**
|
||||
* 自动注册处理器
|
||||
*/
|
||||
private void autoRegisterHandlers() {
|
||||
context.getBeansOfType(IMessageHandler.class).values()
|
||||
.forEach(handler -> {
|
||||
|
|
Loading…
Reference in New Issue