Compare commits

...

2 Commits

Author SHA1 Message Date
Number7 2e940349ef Merge remote-tracking branch 'refs/remotes/origin/dev.template' into dev
# Conflicts:
#	cloud-auth/src/main/resources/bootstrap.yml
#	cloud-gateway/src/main/resources/bootstrap.yml
#	cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
#	cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml
#	cloud-modules/cloud-modules-vehiclegateway/src/main/resources/bootstrap.yml
#	cloud-modules/saas/saas-server/src/main/java/com/muyu/server/SaasApplication.java
#	cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml
2024-10-10 09:28:44 +08:00
Number7 cf286d48f9 feat():新增数据预警 2024-10-10 09:26:52 +08:00
10 changed files with 254 additions and 12 deletions

View File

@ -1,5 +1,4 @@
package com.muyu.cloud.common.many.datasource; package com.muyu.cloud.common.many.datasource;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration; import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;
@ -22,12 +21,10 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* @Author: DongZeLiang * @Author: DongZeLiang
* @date: 2024/6/3 * @date: 2024/6/3
@ -67,7 +64,6 @@ public class ManyDataSource implements ApplicationRunner{
return null; return null;
} }
} }
// private List<EntInfo> dataPrimarySourceInfoList(){ // private List<EntInfo> dataPrimarySourceInfoList(){
// List<EntInfo> list = new ArrayList<>(); // List<EntInfo> list = new ArrayList<>();
// list.add( // list.add(
@ -79,7 +75,6 @@ public class ManyDataSource implements ApplicationRunner{
// ); // );
// return list; // return list;
// } // }
@Bean @Bean
public DynamicDataSource dynamicDataSource(DruidDataSourceFactory druidDataSourceFactory) { public DynamicDataSource dynamicDataSource(DruidDataSourceFactory druidDataSourceFactory) {
// 企业列表 企业CODE端口IP // 企业列表 企业CODE端口IP
@ -98,7 +93,6 @@ public class ManyDataSource implements ApplicationRunner{
dynamicDataSource.setDefineTargetDataSources(dataSourceMap); dynamicDataSource.setDefineTargetDataSources(dataSourceMap);
return dynamicDataSource; return dynamicDataSource;
} }
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
DruidDataSourceFactory druidDataSourceFactory = SpringUtils.getBean(DruidDataSourceFactory.class); DruidDataSourceFactory druidDataSourceFactory = SpringUtils.getBean(DruidDataSourceFactory.class);

View File

@ -114,6 +114,18 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId> <artifactId>cloud-common-rabbit</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu.common</groupId>
<artifactId>saas-common</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>saas-cache</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>

View File

@ -0,0 +1,16 @@
package com.muyu.event.service;
import com.alibaba.fastjson2.JSONObject;
/**
* @author liuxinyue
* @Packagecom.muyu.event.service
* @nameIncidentService
* @Date2024/10/9 15:02
*/
public interface IncidentService {
void warnEventProcess(JSONObject jsonObject) throws Exception;
}

View File

@ -0,0 +1,160 @@
package com.muyu.event.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.cache.MessageTemplateTypeCacheService;
import com.muyu.cache.SysCarCacheService;
import com.muyu.cache.WarnRuleCacheService;
import com.muyu.cache.WarnStrategyCacheService;
import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.domain.resp.WarnRuleResp;
import com.muyu.common.domain.resp.WarnStrategyResp;
import com.muyu.event.consumer.MessageConsumer;
import com.muyu.event.service.IncidentService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author liuxinyue
* @Packagecom.muyu.event.service.impl
* @nameIncidentServiceImpl
* @Date2024/10/9 15:02
*/
@Log4j2
@Service
public class IncidentServiceImpl implements IncidentService {
private static int DURATION_SECONDS = 5;
private static List<JSONObject> receivedStrings = new ArrayList<>();
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int elapsedSeconds = 0;
private static String file="elapsed";
private static List<MessageTemplateType> messageTemplateTypes=null;
private static Long msgTypeId=null;
//滑窗时间
private static Long slideTime=null;
//增长率
private static Long slideFrequency=null;
//预警策略
@Resource
private WarnStrategyCacheService warnStrategyCacheService;
//车辆
@Resource
private SysCarCacheService sysCarCacheService;
//预警规则
@Resource
private WarnRuleCacheService warnRuleCacheService;
//报文模版
@Resource
private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
private final String topic="four_car";
@Autowired
private MessageConsumer messageConsumer;
@Autowired
public KafkaConsumer consumer;
@Override
public void warnEventProcess(JSONObject jsonObject) throws Exception {
receivedStrings.add(jsonObject);
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
//根据这个车辆VIN查询出他对应的车辆类型
String carVin=null;
//报文模版的ID
Integer templateId=null;
for (JSONObject receivedString : receivedStrings) {
carVin = (String) receivedString.get("carVin");
}
SysCar carByVin = null;
List<SysCarVo> carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
Map<String, SysCarVo> carMap = carVoList.stream()
.collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity()));
//获取到了这个车辆的信息
carByVin = carMap.get(carVin);
//获取到这辆车绑定的报文模版
templateId=carByVin.getTemplateId();
//这个是这辆车对应的所有策略
List<WarnStrategyResp> carWithWarnStrategyList=null;
List<WarnStrategyResp> warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre());
for (WarnStrategyResp warnStrategyResp : warnStrategyResps) {
if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){
carWithWarnStrategyList.add(warnStrategyResp);
}
}
//该车对应的所有预警规则
List<WarnRuleResp> warnRuleResp=null;
List<WarnRuleResp> warnRuleResps = warnRuleCacheService.get(warnRuleCacheService.keyPre());
for (WarnStrategyResp warnStrategyResp : carWithWarnStrategyList) {
for (WarnRuleResp ruleResp : warnRuleResps) {
if(warnStrategyResp.getId().equals(ruleResp.getStrategyId())){
warnRuleResp.add(ruleResp);
}
}
}
//报文模版
messageTemplateTypes = messageTemplateTypeCacheService.get(messageTemplateTypeCacheService.keyPre());
for (WarnRuleResp ruleResp : warnRuleResp) {
//每一个规则他绑定了报文模版里面对应的一个配置 比如:电池,或者车速
msgTypeId = ruleResp.getMsgTypeId();
//将规则中对应的滑窗时间赋值为DURATION_SECONDS
DURATION_SECONDS = Math.toIntExact(ruleResp.getSlideTime());
slideFrequency = ruleResp.getSlideFrequency();
}
// 定义一个任务,每秒执行一次
Runnable task = new Runnable() {
@Override
public void run() {
// 清理超过的数据
cleanUpOldStrings();
// 检查超速条件
checkForSpeeding();
}
};
// 每隔1秒执行一次任务
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}
// 清理超过60秒的数据
private static void cleanUpOldStrings() {
long currentTime = System.currentTimeMillis();
receivedStrings.removeIf(jsonObject ->
currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS)
);
}
// 检查是否有超速情况
private static void checkForSpeeding() {
if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回
for (int i = 0; i < receivedStrings.size(); i++) {
JSONObject current = receivedStrings.get(i);
JSONObject next = receivedStrings.get(i + 1);
for (MessageTemplateType messageTemplateType : messageTemplateTypes) {
if(messageTemplateType.getMessageTemplateTypeId().equals(msgTypeId)){
Short currentElapsed = current.getShort(messageTemplateType.getMessageField());
Short nextElapsed = next.getShort(messageTemplateType.getMessageField());
if (nextElapsed > currentElapsed + slideFrequency) {
log.info("出错啦,出错啦,您的"+messageTemplateType.getMessageField()+"不正常,请检查!!!");
}
}
}
}
}
}

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.53.251:8848 addr: 47.101.53.251:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: yzl namespace: lxy
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.53.251:8848 addr: 47.101.53.251:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: yzl namespace: lxy
# Spring # Spring
spring: spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.53.251:8848 addr: 47.101.53.251:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: yzl namespace: lxy
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring:

View File

@ -104,7 +104,6 @@ public class MqttConfigure {
me.printStackTrace(); me.printStackTrace();
} }
} }
public JSONObject messageParsing(String templateMessage) { public JSONObject messageParsing(String templateMessage) {
//给一个JSON对象 //给一个JSON对象
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();

View File

@ -50,6 +50,7 @@ public class test {
//报文模版 //报文模版
@Resource @Resource
private MessageTemplateTypeCacheService messageTemplateTypeCacheService; private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
public void main(String[] args) { public void main(String[] args) {
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings //协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
//根据这个车辆VIN查询出他对应的车辆类型 //根据这个车辆VIN查询出他对应的车辆类型
@ -107,7 +108,6 @@ public class test {
// 每隔1秒执行一次任务 // 每隔1秒执行一次任务
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
} }
// 清理超过60秒的数据 // 清理超过60秒的数据
private static void cleanUpOldStrings() { private static void cleanUpOldStrings() {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
@ -115,7 +115,6 @@ public class test {
currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS) currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS)
); );
} }
// 检查是否有超速情况 // 检查是否有超速情况
private static void checkForSpeeding() { private static void checkForSpeeding() {
if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回 if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回

View File

@ -0,0 +1,62 @@
# Tomcat
server:
port: 15277
# nacos线上地址
nacos:
addr: 47.101.53.251:8848
user-name: nacos
password: nacos
namespace: lxy
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
mvc:
pathmatch:
matching-strategy: ant_path_matcher
amqp:
deserialization:
trust:
all: true
main:
allow-bean-definition-overriding: true
application:
# 应用名称
name: cloud-template
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: ${nacos.addr}
# nacos用户名
username: ${nacos.user-name}
# nacos密码
password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
config:
# 服务注册地址
server-addr: ${nacos.addr}
# nacos用户名
username: ${nacos.user-name}
# nacos密码
password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
# 系统共享配置
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# xxl-job 配置文件
- application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
logging:
level:
com.muyu.system.mapper: DEBUG