mq-load/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java

164 lines
6.0 KiB
Java

package com.mobai.mq.rabbitmq.rabbitMq;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.mobai.domain.StayTime;
import com.mobai.mq.rabbitmq.cofig.MqttFactory;
import com.mobai.mq.rabbitmq.cofig.MqttProperties;
import com.mobai.service.StayTimeService;
import com.mobai.util.RedisService;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.servlet.ServletException;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 消费者:消息处理器
*
* @ClassName MessageHandler
* @Description 描述
* @Author SaiSai.Liu
* @Date 2024/5/31 14:37
*/
@Log4j2
@Component
public class MessageHandler {
@Autowired
private MqttFactory mqttFactory;
@Autowired
private StayTimeService stayTimeService;
@Autowired
private RedisService redisService;
@RabbitListener(queues = {"create.topic"})
private void message(String msg) {
log.info("消息内容:{}", msg);
MqttProperties topic0 = MqttProperties.configBuild(
"39.98.69.92",
"topic0");
log.info("接收到消息:{}", topic0);
MqttClient client = mqttFactory.buildOptions(topic0);
log.info("client创建:{}", client);
log.info("clientID创建:{}", client.getClientId());
}
/**
* 上线事件
*
* @param msg
*/
@RabbitListener(queues = {"event"})
private void trainUp(String msg, Message message, Channel channel) {
log.info("event:{}", msg);
// 链接事件
// event:{
// "protocol":"MQTT",
// "clientIp":"39.144.107.165",
// "nodeIp":"127.0.0.1",
// "clientId":"VIN123456789DIJE4",
// "version":"MQTT_3_1_1",
// "keepalive":20,
// "cleanSession":true,
// "timestamp":1717466764797,
// "auth":{
// "username":"6D7A546314155D43A339EE4C0410613D86C821299316ADECDB871E08",
// "password":"VklOMTIzNDU2Nzg5RElKRTQxNzE3NDY2NzY1MDg3NTgyNDI4QThEQjA0RkU2OTkzNTM5NDIyNTQ2ODIwQzFFNzc3NDUzQTA4NzIzRTU4NUQyNDRBNjY="
// },
// "messageId":0}
JSONObject jsonObject = JSON.parseObject(msg);
String vin = JSON.to(String.class, jsonObject.get("clientId"));
long timestamp = JSON.to(Long.class, jsonObject.get("timestamp"));
if (jsonObject.get("auth") != null) {
try {
log.info("上线事件");
String ip = redisService.getValue(vin);
log.info("上线车辆vin:{}\n\t上线时ip:{}", vin, ip);
if (ip == null) {
throw new ServletException("上线时ip为空");
}
// 方法内有判断,有则自增,无则创建
redisService.increment("onlineCar-" + ip, 1);
boolean save = stayTimeService.save(new StayTime() {{
setIp(ip);
setVin(vin);
setUpTime(timestamp);
}});
log.info(save ? vin + "上线记录成功" : vin + "上线记录失败");
// 消息消费成功则确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException | ServletException e) {
log.error("上线失败");
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
} else {
// event:{
// "protocol":"MQTT",
// "messageId":0,
// "timestamp":1717466777367,
// "reason":"normal",
// "clientId":"VIN123456789DIJE4",
// "nodeIp":"127.0.0.1",
// "id":354,
// "clientIp":"39.144.107.165"
// }
log.info("下线事件");
String ip = redisService.getValue(vin);
try {
log.info("下线车辆vin:{}\n\t下线时ip:{}", vin, ip);
if (ip == null) {
throw new ServletException("下线时ip为空");
}
// 方法内有判断,有则自减,无则创建
redisService.decrement("onlineCar-" + ip, 1);
StayTime vinStayTime = stayTimeService.getOne(new LambdaQueryWrapper<>() {{
eq(StayTime::getVin, vin);
eq(StayTime::getDownTime, null);
}});
vinStayTime.setDownTime(timestamp);
boolean update = stayTimeService.update(vinStayTime, new LambdaUpdateWrapper<>() {{
eq(StayTime::getVin, vin);
eq(StayTime::getDownTime, null);
}});
// 输出在线时长
log.info("车辆在线时长为:{}", new SimpleDateFormat("HH时mm分ss秒").format(new Date(vinStayTime.getUpTime() - timestamp)));
log.info(update ? vin + "上线记录成功" : vin + "上线记录失败");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ServletException | IOException e) {
log.error("下线失败");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
}