Compare commits
2 Commits
88fdcfcbab
...
2e940349ef
Author | SHA1 | Date |
---|---|---|
|
2e940349ef | |
|
cf286d48f9 |
|
@ -1,5 +1,4 @@
|
|||
package com.muyu.cloud.common.many.datasource;
|
||||
|
||||
import com.alibaba.druid.pool.DruidDataSource;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;
|
||||
|
@ -22,12 +21,10 @@ import org.springframework.boot.ApplicationRunner;
|
|||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: DongZeLiang
|
||||
* @date: 2024/6/3
|
||||
|
@ -67,7 +64,6 @@ public class ManyDataSource implements ApplicationRunner{
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// private List<EntInfo> dataPrimarySourceInfoList(){
|
||||
// List<EntInfo> list = new ArrayList<>();
|
||||
// list.add(
|
||||
|
@ -79,7 +75,6 @@ public class ManyDataSource implements ApplicationRunner{
|
|||
// );
|
||||
// return list;
|
||||
// }
|
||||
|
||||
@Bean
|
||||
public DynamicDataSource dynamicDataSource(DruidDataSourceFactory druidDataSourceFactory) {
|
||||
// 企业列表 企业CODE,端口,IP
|
||||
|
@ -98,7 +93,6 @@ public class ManyDataSource implements ApplicationRunner{
|
|||
dynamicDataSource.setDefineTargetDataSources(dataSourceMap);
|
||||
return dynamicDataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
DruidDataSourceFactory druidDataSourceFactory = SpringUtils.getBean(DruidDataSourceFactory.class);
|
||||
|
|
|
@ -114,6 +114,18 @@
|
|||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-rabbit</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.muyu.common</groupId>
|
||||
<artifactId>saas-common</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>saas-cache</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package com.muyu.event.service;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
|
||||
/**
|
||||
* @author liuxinyue
|
||||
* @Package:com.muyu.event.service
|
||||
* @name:IncidentService
|
||||
* @Date:2024/10/9 15:02
|
||||
*/
|
||||
public interface IncidentService {
|
||||
|
||||
void warnEventProcess(JSONObject jsonObject) throws Exception;
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
package com.muyu.event.service.impl;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.cache.MessageTemplateTypeCacheService;
|
||||
import com.muyu.cache.SysCarCacheService;
|
||||
import com.muyu.cache.WarnRuleCacheService;
|
||||
import com.muyu.cache.WarnStrategyCacheService;
|
||||
import com.muyu.common.domain.MessageTemplateType;
|
||||
import com.muyu.common.domain.SysCar;
|
||||
import com.muyu.common.domain.resp.SysCarVo;
|
||||
import com.muyu.common.domain.resp.WarnRuleResp;
|
||||
import com.muyu.common.domain.resp.WarnStrategyResp;
|
||||
import com.muyu.event.consumer.MessageConsumer;
|
||||
import com.muyu.event.service.IncidentService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.stereotype.Service;
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
/**
|
||||
* @author liuxinyue
|
||||
* @Package:com.muyu.event.service.impl
|
||||
* @name:IncidentServiceImpl
|
||||
* @Date:2024/10/9 15:02
|
||||
*/
|
||||
@Log4j2
|
||||
@Service
|
||||
public class IncidentServiceImpl implements IncidentService {
|
||||
private static int DURATION_SECONDS = 5;
|
||||
private static List<JSONObject> receivedStrings = new ArrayList<>();
|
||||
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
private static int elapsedSeconds = 0;
|
||||
private static String file="elapsed";
|
||||
private static List<MessageTemplateType> messageTemplateTypes=null;
|
||||
private static Long msgTypeId=null;
|
||||
//滑窗时间
|
||||
private static Long slideTime=null;
|
||||
//增长率
|
||||
private static Long slideFrequency=null;
|
||||
//预警策略
|
||||
@Resource
|
||||
private WarnStrategyCacheService warnStrategyCacheService;
|
||||
//车辆
|
||||
@Resource
|
||||
private SysCarCacheService sysCarCacheService;
|
||||
//预警规则
|
||||
@Resource
|
||||
private WarnRuleCacheService warnRuleCacheService;
|
||||
//报文模版
|
||||
@Resource
|
||||
private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
|
||||
|
||||
private final String topic="four_car";
|
||||
|
||||
@Autowired
|
||||
private MessageConsumer messageConsumer;
|
||||
|
||||
@Autowired
|
||||
public KafkaConsumer consumer;
|
||||
|
||||
@Override
|
||||
public void warnEventProcess(JSONObject jsonObject) throws Exception {
|
||||
|
||||
receivedStrings.add(jsonObject);
|
||||
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
|
||||
//根据这个车辆VIN查询出他对应的车辆类型
|
||||
String carVin=null;
|
||||
//报文模版的ID
|
||||
Integer templateId=null;
|
||||
for (JSONObject receivedString : receivedStrings) {
|
||||
carVin = (String) receivedString.get("carVin");
|
||||
}
|
||||
SysCar carByVin = null;
|
||||
List<SysCarVo> carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
|
||||
Map<String, SysCarVo> carMap = carVoList.stream()
|
||||
.collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity()));
|
||||
//获取到了这个车辆的信息
|
||||
carByVin = carMap.get(carVin);
|
||||
//获取到这辆车绑定的报文模版
|
||||
templateId=carByVin.getTemplateId();
|
||||
//这个是这辆车对应的所有策略
|
||||
List<WarnStrategyResp> carWithWarnStrategyList=null;
|
||||
List<WarnStrategyResp> warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre());
|
||||
for (WarnStrategyResp warnStrategyResp : warnStrategyResps) {
|
||||
if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){
|
||||
carWithWarnStrategyList.add(warnStrategyResp);
|
||||
}
|
||||
}
|
||||
//该车对应的所有预警规则
|
||||
List<WarnRuleResp> warnRuleResp=null;
|
||||
List<WarnRuleResp> warnRuleResps = warnRuleCacheService.get(warnRuleCacheService.keyPre());
|
||||
for (WarnStrategyResp warnStrategyResp : carWithWarnStrategyList) {
|
||||
for (WarnRuleResp ruleResp : warnRuleResps) {
|
||||
if(warnStrategyResp.getId().equals(ruleResp.getStrategyId())){
|
||||
warnRuleResp.add(ruleResp);
|
||||
}
|
||||
}
|
||||
}
|
||||
//报文模版
|
||||
messageTemplateTypes = messageTemplateTypeCacheService.get(messageTemplateTypeCacheService.keyPre());
|
||||
for (WarnRuleResp ruleResp : warnRuleResp) {
|
||||
//每一个规则他绑定了报文模版里面对应的一个配置 比如:电池,或者车速
|
||||
msgTypeId = ruleResp.getMsgTypeId();
|
||||
//将规则中对应的滑窗时间赋值为DURATION_SECONDS
|
||||
DURATION_SECONDS = Math.toIntExact(ruleResp.getSlideTime());
|
||||
slideFrequency = ruleResp.getSlideFrequency();
|
||||
}
|
||||
// 定义一个任务,每秒执行一次
|
||||
Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// 清理超过的数据
|
||||
cleanUpOldStrings();
|
||||
// 检查超速条件
|
||||
checkForSpeeding();
|
||||
}
|
||||
};
|
||||
// 每隔1秒执行一次任务
|
||||
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
// 清理超过60秒的数据
|
||||
private static void cleanUpOldStrings() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
receivedStrings.removeIf(jsonObject ->
|
||||
currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS)
|
||||
);
|
||||
}
|
||||
// 检查是否有超速情况
|
||||
private static void checkForSpeeding() {
|
||||
if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回
|
||||
for (int i = 0; i < receivedStrings.size(); i++) {
|
||||
JSONObject current = receivedStrings.get(i);
|
||||
JSONObject next = receivedStrings.get(i + 1);
|
||||
for (MessageTemplateType messageTemplateType : messageTemplateTypes) {
|
||||
if(messageTemplateType.getMessageTemplateTypeId().equals(msgTypeId)){
|
||||
Short currentElapsed = current.getShort(messageTemplateType.getMessageField());
|
||||
Short nextElapsed = next.getShort(messageTemplateType.getMessageField());
|
||||
if (nextElapsed > currentElapsed + slideFrequency) {
|
||||
log.info("出错啦,出错啦,您的"+messageTemplateType.getMessageField()+"不正常,请检查!!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -7,7 +7,7 @@ nacos:
|
|||
addr: 47.101.53.251:8848
|
||||
user-name: nacos
|
||||
password: nacos
|
||||
namespace: yzl
|
||||
namespace: lxy
|
||||
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
||||
# Spring
|
||||
spring:
|
||||
|
|
|
@ -7,7 +7,7 @@ nacos:
|
|||
addr: 47.101.53.251:8848
|
||||
user-name: nacos
|
||||
password: nacos
|
||||
namespace: yzl
|
||||
namespace: lxy
|
||||
|
||||
# Spring
|
||||
spring:
|
||||
|
|
|
@ -7,7 +7,7 @@ nacos:
|
|||
addr: 47.101.53.251:8848
|
||||
user-name: nacos
|
||||
password: nacos
|
||||
namespace: yzl
|
||||
namespace: lxy
|
||||
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
||||
# Spring
|
||||
spring:
|
||||
|
|
|
@ -104,7 +104,6 @@ public class MqttConfigure {
|
|||
me.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public JSONObject messageParsing(String templateMessage) {
|
||||
//给一个JSON对象
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
|
|
|
@ -50,6 +50,7 @@ public class test {
|
|||
//报文模版
|
||||
@Resource
|
||||
private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
|
||||
|
||||
public void main(String[] args) {
|
||||
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
|
||||
//根据这个车辆VIN查询出他对应的车辆类型
|
||||
|
@ -107,7 +108,6 @@ public class test {
|
|||
// 每隔1秒执行一次任务
|
||||
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
// 清理超过60秒的数据
|
||||
private static void cleanUpOldStrings() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
@ -115,7 +115,6 @@ public class test {
|
|||
currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS)
|
||||
);
|
||||
}
|
||||
|
||||
// 检查是否有超速情况
|
||||
private static void checkForSpeeding() {
|
||||
if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
# Tomcat
|
||||
server:
|
||||
port: 15277
|
||||
|
||||
# nacos线上地址
|
||||
nacos:
|
||||
addr: 47.101.53.251:8848
|
||||
user-name: nacos
|
||||
password: nacos
|
||||
namespace: lxy
|
||||
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
||||
# Spring
|
||||
spring:
|
||||
mvc:
|
||||
pathmatch:
|
||||
matching-strategy: ant_path_matcher
|
||||
amqp:
|
||||
deserialization:
|
||||
trust:
|
||||
all: true
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
application:
|
||||
# 应用名称
|
||||
name: cloud-template
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: dev
|
||||
cloud:
|
||||
nacos:
|
||||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: ${nacos.addr}
|
||||
# nacos用户名
|
||||
username: ${nacos.user-name}
|
||||
# nacos密码
|
||||
password: ${nacos.password}
|
||||
# 命名空间
|
||||
namespace: ${nacos.namespace}
|
||||
config:
|
||||
# 服务注册地址
|
||||
server-addr: ${nacos.addr}
|
||||
# nacos用户名
|
||||
username: ${nacos.user-name}
|
||||
# nacos密码
|
||||
password: ${nacos.password}
|
||||
# 命名空间
|
||||
namespace: ${nacos.namespace}
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
shared-configs:
|
||||
# 系统共享配置
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# 系统环境Config共享配置
|
||||
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# xxl-job 配置文件
|
||||
- application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
|
||||
logging:
|
||||
level:
|
||||
com.muyu.system.mapper: DEBUG
|
Loading…
Reference in New Issue