feat():指标预警准备
parent
40ab1e3859
commit
d577eab442
|
@ -5,13 +5,13 @@ import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
import com.mobai.domain.StayTime;
|
import com.mobai.domain.StayTime;
|
||||||
import com.mobai.mq.rabbitmq.cofig.MqttFactory;
|
|
||||||
import com.mobai.service.StayTimeService;
|
import com.mobai.service.StayTimeService;
|
||||||
import com.mobai.util.RedisService;
|
import com.mobai.util.RedisService;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.core.Message;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -32,27 +32,14 @@ import java.util.Date;
|
||||||
@Component
|
@Component
|
||||||
public class MessageHandler {
|
public class MessageHandler {
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private MqttFactory mqttFactory;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private StayTimeService stayTimeService;
|
private StayTimeService stayTimeService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisService redisService;
|
private RedisService redisService;
|
||||||
|
|
||||||
// @RabbitListener(queues = {"create.topic"})
|
@Autowired
|
||||||
// private void message(String msg) {
|
private RabbitTemplate rabbitTemplate;
|
||||||
// log.info("消息内容:{}", msg);
|
|
||||||
//// MqttProperties topic0 = MqttProperties.configBuild(
|
|
||||||
//// "39.98.69.92",
|
|
||||||
//// "topic0");
|
|
||||||
//// log.info("接收到消息:{}", topic0);
|
|
||||||
//// MqttClient client = mqttFactory.buildOptions(topic0);
|
|
||||||
//// log.info("client创建:{}", client);
|
|
||||||
//// log.info("clientID创建:{}", client.getClientId());
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上线事件
|
* 上线事件
|
||||||
|
@ -66,9 +53,9 @@ public class MessageHandler {
|
||||||
JSONObject jsonObject = JSON.parseObject(msg);
|
JSONObject jsonObject = JSON.parseObject(msg);
|
||||||
String clientId = JSON.to(String.class, jsonObject.get("clientId"));
|
String clientId = JSON.to(String.class, jsonObject.get("clientId"));
|
||||||
if (!(clientId.contains("-") && clientId.contains("."))) {
|
if (!(clientId.contains("-") && clientId.contains("."))) {
|
||||||
log.error("不是车辆事件::{}",message);
|
log.error("不是车辆事件::{}", message);
|
||||||
try {
|
try {
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
return;
|
return;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
@ -94,6 +81,10 @@ public class MessageHandler {
|
||||||
setVin(vin);
|
setVin(vin);
|
||||||
setUpTime(timestamp);
|
setUpTime(timestamp);
|
||||||
}});
|
}});
|
||||||
|
// 发送事件开启指标预警事件
|
||||||
|
if (save) {
|
||||||
|
rabbitTemplate.convertAndSend("standard_Warn_Event_Start", vin);
|
||||||
|
}
|
||||||
log.info(save ? vin + "上线记录成功" : vin + "上线记录失败");
|
log.info(save ? vin + "上线记录成功" : vin + "上线记录失败");
|
||||||
// 消息消费成功则确认
|
// 消息消费成功则确认
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
@ -121,10 +112,10 @@ public class MessageHandler {
|
||||||
eq(StayTime::getDownTime, 0);
|
eq(StayTime::getDownTime, 0);
|
||||||
}});
|
}});
|
||||||
// 为空抛异常
|
// 为空抛异常
|
||||||
if (vinStayTime==null){
|
if (vinStayTime == null) {
|
||||||
throw new ServletException("{"+vin+"}数据不存在");
|
throw new ServletException("{" + vin + "}数据不存在");
|
||||||
}
|
}
|
||||||
String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()-8*60*60*1000));
|
String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime() - 8 * 60 * 60 * 1000));
|
||||||
vinStayTime.setStayLongTime(format);
|
vinStayTime.setStayLongTime(format);
|
||||||
vinStayTime.setDownTime(timestamp);
|
vinStayTime.setDownTime(timestamp);
|
||||||
// 修改下线时间
|
// 修改下线时间
|
||||||
|
@ -135,6 +126,10 @@ public class MessageHandler {
|
||||||
// 输出在线时长
|
// 输出在线时长
|
||||||
log.info("车辆在线时长为:{}", format);
|
log.info("车辆在线时长为:{}", format);
|
||||||
log.info(update ? vin + "下线记录成功" : vin + "下线记录失败");
|
log.info(update ? vin + "下线记录成功" : vin + "下线记录失败");
|
||||||
|
// 发送事件开启指标预警事件
|
||||||
|
if (update) {
|
||||||
|
rabbitTemplate.convertAndSend("standard_Warn_Event_End", vin);
|
||||||
|
}
|
||||||
redisService.deleteObject(vin);
|
redisService.deleteObject(vin);
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
} catch (ServletException | IOException e) {
|
} catch (ServletException | IOException e) {
|
||||||
|
|
|
@ -31,13 +31,9 @@ public class MqttRunner implements ApplicationRunner {
|
||||||
@Autowired
|
@Autowired
|
||||||
private SelectInstances selectInstances;
|
private SelectInstances selectInstances;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RedisService redisService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
// 存入mqttIp
|
// 存入mqttIp
|
||||||
selectInstances.saveIps();
|
selectInstances.saveIps();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.mobai.runner;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Saisai
|
||||||
|
* @className Prop
|
||||||
|
* @description 描述
|
||||||
|
* @date 2024/6/25 20:17
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class Prop {
|
||||||
|
|
||||||
|
private String vin;
|
||||||
|
|
||||||
|
private String key;
|
||||||
|
|
||||||
|
private Long timeLength;
|
||||||
|
|
||||||
|
private Double activeLength;
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
package com.mobai.runner;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Saisai
|
||||||
|
* @className PropRunner
|
||||||
|
* @description 描述
|
||||||
|
* @date 2024/6/25 19:57
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class PropRunner implements ApplicationRunner {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisTemplate<String,String> redis;
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
|
List<String> vehicles = new ArrayList<>() {{
|
||||||
|
add("VIN12345678912345");
|
||||||
|
add("VIN123456789DIJE4");
|
||||||
|
add("RQIP0AGC6Z94T1RUS");
|
||||||
|
add("RKGBI2CKLW4T3PM1J");
|
||||||
|
add("BHA38GWIMIMI4ZWKT");
|
||||||
|
add("HX603FUDTV1G8AWKV");
|
||||||
|
add("1HHUXJR83E94JQGAJ");
|
||||||
|
add("BFJVGBO2Q5RAYYU25");
|
||||||
|
add("Z34C1IBPJELI9OYWY");
|
||||||
|
add("35O90QPUFX21N8V4N");
|
||||||
|
add("7YG74PQ38G1E7VIVS");
|
||||||
|
add("OSHY1OFUVQESGRDJ8");
|
||||||
|
add("WZHFHDAIYUVKSHH3G");
|
||||||
|
add("OGV23LLM03B27CLH6");
|
||||||
|
add("TITEFBRAARGOY0XUC");
|
||||||
|
add("UOUVE8I86UYO1K3EH");
|
||||||
|
add("V5RF71OEG3KHBQZWD");
|
||||||
|
add("ICO9G1BONWOTW9M12");
|
||||||
|
add("NAEYC4ZL2NFZ5M71Q");
|
||||||
|
add("6WT4TREX19NZQ77V9");
|
||||||
|
add("9VY825QOP64E3VNIY");
|
||||||
|
add("OB5S8N9H273A1G8LM");
|
||||||
|
add("EYT4VJUKYW9E9TK5N");
|
||||||
|
add("P4G2E395RK19P6XKJ");
|
||||||
|
add("GX9KC2J8JHZ1R8SO8");
|
||||||
|
add("DCORIPLUYDVXU0KGU");
|
||||||
|
add("L17GSFUCFE8S4FP9B");
|
||||||
|
add("NC412M5S1324G9YDM");
|
||||||
|
add("P4UEJAMK7LTAES501");
|
||||||
|
add("61C8G3ANN70LFNEKW");
|
||||||
|
add("MKSMAB8ZFN95MFQ0B");
|
||||||
|
add("TWNBFOED42MVCFZZT");
|
||||||
|
add("H217SHM5A7MG5TZ3S");
|
||||||
|
add("K1M1YZPEQL70P7YN3");
|
||||||
|
add("8VRKWMJFHTVA7MZER");
|
||||||
|
add("Q7974VNRYU2YFEKUY");
|
||||||
|
add("SZYE7YS1WFOHNS0WO");
|
||||||
|
add("CBJ3GCQTZFSA57D23");
|
||||||
|
add("9L95HD8ZEVR699KEV");
|
||||||
|
add("S3LG8SXSAEP0726SI");
|
||||||
|
add("P5YK7NVRG8NTCH47Q");
|
||||||
|
add("YFE693JRRAU6071O7");
|
||||||
|
add("SZGMMFSU4IM8GMKK3");
|
||||||
|
add("JVVBHNUK69I5JHML4");
|
||||||
|
add("M639G6A8JHZLR4063");
|
||||||
|
add("D2V9G1WZYKEEPG2R5");
|
||||||
|
add("30X8PZ4WR3L5WR6DG");
|
||||||
|
add("UNFQSB382G7RGUNIU");
|
||||||
|
add("HTU4R6HA2964MUTE6");
|
||||||
|
add("FML2ZFVWBI1D2F3ZR");
|
||||||
|
add("MU6D6J2A6RMHBLEXL");
|
||||||
|
add("F1CFKRQK8J4NEK7GZ");
|
||||||
|
add("1VR3C0G5DTZPEFWQI");
|
||||||
|
add("LZLXYLRJPTBOZJRLZ");
|
||||||
|
add("83PLIK2ISNEG4033V");
|
||||||
|
add("A4Z8INWK4ZWELXFGQ");
|
||||||
|
add("OC29LYH6WYRMI40D8");
|
||||||
|
add("612131WM4VGW7R3ED");
|
||||||
|
add("HJET0O7W8O2V7WLI6");
|
||||||
|
add("7TZCJ21J2B202MQA3");
|
||||||
|
add("DIGC27FH0U3HHQ908");
|
||||||
|
add("XAVU7RIDUZSNQ1CQX");
|
||||||
|
add("ZV150C64IRG3UL3GI");
|
||||||
|
add("NM2KDD0NQXC0J29EK");
|
||||||
|
add("H4Y8PT0FQ17VIJF0A");
|
||||||
|
add("NKDJYNRXONCI4TO6I");
|
||||||
|
add("SEM06B25293OWC0DM");
|
||||||
|
add("MJARBUOUKDE8Y5ATN");
|
||||||
|
add("ALX5X34MK65O1JOCT");
|
||||||
|
add("E69ISH5TST723TSTF");
|
||||||
|
add("J1KQ3MDXXDTJ48ANJ");
|
||||||
|
add("F7WAZ4JWQTJMMV4D3");
|
||||||
|
add("0MYV55ESMZCEZDDCB");
|
||||||
|
add("670BE4AIWFBYLCQQ2");
|
||||||
|
add("BWBPYZHQN8L5JTHT0");
|
||||||
|
add("9LIBQ5ZKPITT9QRKD");
|
||||||
|
add("CXCYATA7W19KERXYK");
|
||||||
|
add("HI2WKQX3D4NBHEB6O");
|
||||||
|
add("1WYB9O5VPPNEH6ZMX");
|
||||||
|
add("8PT2CFO6U6O55725X");
|
||||||
|
add("LMD36P0ANAVF8K2T0");
|
||||||
|
add("LIN3D2KSZF6NLH60M");
|
||||||
|
add("U7O3UXKISX0BUVDXQ");
|
||||||
|
add("Q8BVQ3QZTLFRMEVE7");
|
||||||
|
add("48GH1IHSVFQ6YGS85");
|
||||||
|
add("JAV0VJUJYOTOK9KSY");
|
||||||
|
add("0Q9P3PVTP2DA8CGW1");
|
||||||
|
add("TFNRFAV9FPKKNL85E");
|
||||||
|
add("D72TKF55550SXZTN3");
|
||||||
|
add("65KXGYUNHF057P3U1");
|
||||||
|
add("DMZTXS8KI9DVUG7XV");
|
||||||
|
add("KRB0Z7V7O8JHX499X");
|
||||||
|
add("JFMTVY934AK0RQMUV");
|
||||||
|
add("FFMXWPWW05L6WHBZM");
|
||||||
|
add("M0SP12TDXYOPAN6HE");
|
||||||
|
add("U30LDVYCMPDQHJIZG");
|
||||||
|
add("CAT9ZPQCR66218L5T");
|
||||||
|
add("659XTVJ2IMA60REM1");
|
||||||
|
add("KRB0Z7V7O8JHX499X");
|
||||||
|
}};
|
||||||
|
vehicles = vehicles.stream().distinct().collect(Collectors.toList());
|
||||||
|
// 公司旗下的车辆
|
||||||
|
redis.opsForList().leftPushAll("vehicles",vehicles);
|
||||||
|
// 公司集
|
||||||
|
redis.opsForList().leftPushAll("company",new ArrayList<String>(){{
|
||||||
|
add("vehicles");
|
||||||
|
add("vehicles1");
|
||||||
|
add("vehicles2");
|
||||||
|
}});
|
||||||
|
// 公司提供的指标
|
||||||
|
redis.opsForValue().set("vehicles", JSON.toJSONString(new Prop(){{
|
||||||
|
setVin(null);
|
||||||
|
setKey(JSON.toJSONString(new ArrayList<String>(){{
|
||||||
|
add("speed:150");
|
||||||
|
add("mileage:150");
|
||||||
|
add("voltage:150");
|
||||||
|
add("current:150");
|
||||||
|
}}));
|
||||||
|
}}));
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,8 +16,6 @@ public interface FluxGetInfoService {
|
||||||
|
|
||||||
Result getInfo(String ip);
|
Result getInfo(String ip);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Result vehicleConnection(VehicleConnectionReq req);
|
Result vehicleConnection(VehicleConnectionReq req);
|
||||||
|
|
||||||
Result<List<MqttServerModel>> getIps();
|
Result<List<MqttServerModel>> getIps();
|
||||||
|
|
|
@ -78,16 +78,7 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result vehicleConnection(VehicleConnectionReq req) {
|
public Result vehicleConnection(VehicleConnectionReq req) {
|
||||||
// "vehicleVin": "VIN1234567894" vin
|
|
||||||
// "timestamp": "11111" new Date().getMillis()
|
|
||||||
// "username": "你好" Vin + timestamp
|
|
||||||
// "nonce": "33" 随机
|
|
||||||
|
|
||||||
log.warn("参数为:{}", req);
|
log.warn("参数为:{}", req);
|
||||||
// String string = redis.opsForList().range("fluxMq", 0, -1).get(0);
|
|
||||||
// List<MqttServerModel> mqtts = JSON.parseArray(string, MqttServerModel.class);
|
|
||||||
// log.info("集合:{}",mqtts);
|
|
||||||
// tcp://192.168.1.1:1883
|
|
||||||
synchronized(this){
|
synchronized(this){
|
||||||
if (redis.hasKey("fluxMqIndex")) {
|
if (redis.hasKey("fluxMqIndex")) {
|
||||||
redis.opsForValue().increment("fluxMqIndex", 1);
|
redis.opsForValue().increment("fluxMqIndex", 1);
|
||||||
|
@ -96,17 +87,13 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
|
||||||
}
|
}
|
||||||
int index = Integer.valueOf(redis.opsForValue().get("fluxMqIndex"));
|
int index = Integer.valueOf(redis.opsForValue().get("fluxMqIndex"));
|
||||||
log.info("下标:{}", index);
|
log.info("下标:{}", index);
|
||||||
// List<String> fluxmq = redis.opsForList().range("mqttIp", 0, 5);
|
|
||||||
String mqttIp = redis.opsForList().index("mqttIp", index);
|
String mqttIp = redis.opsForList().index("mqttIp", index);
|
||||||
|
|
||||||
// log.info(fluxmq);
|
|
||||||
log.info(mqttIp);
|
log.info(mqttIp);
|
||||||
MqttServerModel mqttServerModel = JSON.parseObject(mqttIp, MqttServerModel.class);
|
MqttServerModel mqttServerModel = JSON.parseObject(mqttIp, MqttServerModel.class);
|
||||||
if (index + 1 >= 80) {
|
if (index + 1 >= 80) {
|
||||||
redis.opsForValue().set("fluxMqIndex", 0 + "");
|
redis.opsForValue().set("fluxMqIndex", 0 + "");
|
||||||
}
|
}
|
||||||
log.info("已获取到对象:{}", mqttServerModel);
|
log.info("已获取到对象:{}", mqttServerModel);
|
||||||
|
|
||||||
return Result.success(mqttServerModel);
|
return Result.success(mqttServerModel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,9 +101,8 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<List<MqttServerModel>> getIps() {
|
public Result<List<MqttServerModel>> getIps() {
|
||||||
List<String> ips = new ArrayList<>();
|
|
||||||
DescribeInstancesResponse response = selectInstances.getInfo();
|
DescribeInstancesResponse response = selectInstances.getInfo();
|
||||||
ips = selectInstances.ips(response);
|
List<String> ips = selectInstances.ips(response);
|
||||||
log.info("当前实例ip为{}", ips);
|
log.info("当前实例ip为{}", ips);
|
||||||
List<String> finalIps = ips;
|
List<String> finalIps = ips;
|
||||||
List<MqttServerModel> collect = ips.stream().map(ip ->
|
List<MqttServerModel> collect = ips.stream().map(ip ->
|
||||||
|
|
|
@ -2,7 +2,7 @@ server:
|
||||||
port: 8081
|
port: 8081
|
||||||
spring:
|
spring:
|
||||||
redis:
|
redis:
|
||||||
host: 127.0.0.1
|
host: 175.24.138.82
|
||||||
port: 6379
|
port: 6379
|
||||||
password:
|
password:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
|
|
Loading…
Reference in New Issue