From cf286d48f9e14ce4185112397ec1fcc835f3e316 Mon Sep 17 00:00:00 2001 From: Number7 <1845377266@qq.com> Date: Thu, 10 Oct 2024 09:26:52 +0800 Subject: [PATCH] =?UTF-8?q?feat():=E6=96=B0=E5=A2=9E=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=A2=84=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 2 +- .../many/datasource/ManyDataSource.java | 6 - .../src/main/resources/bootstrap.yml | 2 +- cloud-modules/cloud-event/pom.xml | 12 ++ .../muyu/event/consumer/MessageConsumer.java | 21 +-- .../muyu/event/service/IncidentService.java | 16 ++ .../service/impl/IncidentServiceImpl.java | 160 ++++++++++++++++++ .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../muyu/template/config/MqttConfigure.java | 1 - .../src/main/java/com/muyu/template/test.java | 3 +- .../src/main/resources/bootstrap.yml | 62 +++++++ .../src/main/resources/bootstrap.yml | 2 +- .../java/com/muyu/server/SaasApplication.java | 2 - .../src/main/resources/bootstrap.yml | 2 +- 17 files changed, 270 insertions(+), 29 deletions(-) create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java create mode 100644 cloud-modules/cloud-modules-template/src/main/resources/bootstrap.yml diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index 57dca73..7561373 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # Spring spring: application: diff --git a/cloud-common/cloud-common-saas/src/main/java/com/muyu/cloud/common/many/datasource/ManyDataSource.java b/cloud-common/cloud-common-saas/src/main/java/com/muyu/cloud/common/many/datasource/ManyDataSource.java index 04bf3b3..cf5c81b 100644 --- a/cloud-common/cloud-common-saas/src/main/java/com/muyu/cloud/common/many/datasource/ManyDataSource.java +++ b/cloud-common/cloud-common-saas/src/main/java/com/muyu/cloud/common/many/datasource/ManyDataSource.java @@ -1,5 +1,4 @@ package com.muyu.cloud.common.many.datasource; - import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration; @@ -22,12 +21,10 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - /** * @Author: DongZeLiang * @date: 2024/6/3 @@ -62,7 +59,6 @@ public class ManyDataSource implements ApplicationRunner{ return null; } } - // private List dataPrimarySourceInfoList(){ // List list = new ArrayList<>(); // list.add( @@ -74,7 +70,6 @@ public class ManyDataSource implements ApplicationRunner{ // ); // return list; // } - @Bean public DynamicDataSource dynamicDataSource(DruidDataSourceFactory druidDataSourceFactory) { // 企业列表 企业CODE,端口,IP @@ -93,7 +88,6 @@ public class ManyDataSource implements ApplicationRunner{ dynamicDataSource.setDefineTargetDataSources(dataSourceMap); return dynamicDataSource; } - @Override public void run(ApplicationArguments args) { DruidDataSourceFactory druidDataSourceFactory = SpringUtils.getBean(DruidDataSourceFactory.class); diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index de95a70..77dc05c 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # Spring spring: diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml index cae475e..25957cc 100644 --- a/cloud-modules/cloud-event/pom.xml +++ b/cloud-modules/cloud-event/pom.xml @@ -114,6 +114,18 @@ com.muyu cloud-common-rabbit + + com.muyu.common + saas-common + 3.6.3 + compile + + + com.muyu + saas-cache + 3.6.3 + compile + diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java index 3cb39c0..ed71470 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java @@ -1,9 +1,7 @@ package com.muyu.event.consumer; - - - import com.alibaba.fastjson2.JSONObject; import com.muyu.event.basic.EventPublisher; +import com.muyu.event.service.IncidentService; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,14 +9,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; - import java.time.Duration; import java.util.Collections; import java.util.List; - import static org.bouncycastle.asn1.x500.style.RFC4519Style.l; - -/** +/* * kafka监听 * @author 刘武 * @package:com.muyu.event.consumer @@ -36,12 +31,13 @@ public class MessageConsumer implements ApplicationRunner { private final String topic="four_car"; + @Autowired + private IncidentService incidentService; @Override public void run(ApplicationArguments args) throws Exception { List list = Collections.singletonList(topic); consumer.subscribe(list); - while (true){ ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); consumerRecords.forEach(record -> { @@ -49,9 +45,14 @@ public class MessageConsumer implements ApplicationRunner { JSONObject jsonObject = JSONObject.parseObject(value); log.info("value:{}",value); // eventPublisher.publishEvent(jsonObject); + try { + //预警 + incidentService.warnEventProcess(jsonObject); + + } catch (Exception e) { + throw new RuntimeException(e); + } }); - - } } } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java new file mode 100644 index 0000000..4f60079 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java @@ -0,0 +1,16 @@ +package com.muyu.event.service; + +import com.alibaba.fastjson2.JSONObject; + +/** + * @author liuxinyue + * @Package:com.muyu.event.service + * @name:IncidentService + * @Date:2024/10/9 15:02 + */ +public interface IncidentService { + + void warnEventProcess(JSONObject jsonObject) throws Exception; + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java new file mode 100644 index 0000000..fb90eb9 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java @@ -0,0 +1,160 @@ +package com.muyu.event.service.impl; +import com.alibaba.fastjson2.JSONObject; +import com.muyu.cache.MessageTemplateTypeCacheService; +import com.muyu.cache.SysCarCacheService; +import com.muyu.cache.WarnRuleCacheService; +import com.muyu.cache.WarnStrategyCacheService; +import com.muyu.common.domain.MessageTemplateType; +import com.muyu.common.domain.SysCar; +import com.muyu.common.domain.resp.SysCarVo; +import com.muyu.common.domain.resp.WarnRuleResp; +import com.muyu.common.domain.resp.WarnStrategyResp; +import com.muyu.event.consumer.MessageConsumer; +import com.muyu.event.service.IncidentService; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +/** + * @author liuxinyue + * @Package:com.muyu.event.service.impl + * @name:IncidentServiceImpl + * @Date:2024/10/9 15:02 + */ +@Log4j2 +@Service +public class IncidentServiceImpl implements IncidentService { + private static int DURATION_SECONDS = 5; + private static List receivedStrings = new ArrayList<>(); + private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private static int elapsedSeconds = 0; + private static String file="elapsed"; + private static List messageTemplateTypes=null; + private static Long msgTypeId=null; + //滑窗时间 + private static Long slideTime=null; + //增长率 + private static Long slideFrequency=null; + //预警策略 + @Resource + private WarnStrategyCacheService warnStrategyCacheService; + //车辆 + @Resource + private SysCarCacheService sysCarCacheService; + //预警规则 + @Resource + private WarnRuleCacheService warnRuleCacheService; + //报文模版 + @Resource + private MessageTemplateTypeCacheService messageTemplateTypeCacheService; + + private final String topic="four_car"; + + @Autowired + private MessageConsumer messageConsumer; + + @Autowired + public KafkaConsumer consumer; + + @Override + public void warnEventProcess(JSONObject jsonObject) throws Exception { + + receivedStrings.add(jsonObject); + //协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings + //根据这个车辆VIN查询出他对应的车辆类型 + String carVin=null; + //报文模版的ID + Integer templateId=null; + for (JSONObject receivedString : receivedStrings) { + carVin = (String) receivedString.get("carVin"); + } + SysCar carByVin = null; + List carVoList = sysCarCacheService.get(sysCarCacheService.keyPre()); + Map carMap = carVoList.stream() + .collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity())); + //获取到了这个车辆的信息 + carByVin = carMap.get(carVin); + //获取到这辆车绑定的报文模版 + templateId=carByVin.getTemplateId(); + //这个是这辆车对应的所有策略 + List carWithWarnStrategyList=null; + List warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre()); + for (WarnStrategyResp warnStrategyResp : warnStrategyResps) { + if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){ + carWithWarnStrategyList.add(warnStrategyResp); + } + } + //该车对应的所有预警规则 + List warnRuleResp=null; + List warnRuleResps = warnRuleCacheService.get(warnRuleCacheService.keyPre()); + for (WarnStrategyResp warnStrategyResp : carWithWarnStrategyList) { + for (WarnRuleResp ruleResp : warnRuleResps) { + if(warnStrategyResp.getId().equals(ruleResp.getStrategyId())){ + warnRuleResp.add(ruleResp); + } + } + } + //报文模版 + messageTemplateTypes = messageTemplateTypeCacheService.get(messageTemplateTypeCacheService.keyPre()); + for (WarnRuleResp ruleResp : warnRuleResp) { + //每一个规则他绑定了报文模版里面对应的一个配置 比如:电池,或者车速 + msgTypeId = ruleResp.getMsgTypeId(); + //将规则中对应的滑窗时间赋值为DURATION_SECONDS + DURATION_SECONDS = Math.toIntExact(ruleResp.getSlideTime()); + slideFrequency = ruleResp.getSlideFrequency(); + } + // 定义一个任务,每秒执行一次 + Runnable task = new Runnable() { + @Override + public void run() { + // 清理超过的数据 + cleanUpOldStrings(); + // 检查超速条件 + checkForSpeeding(); + } + }; + // 每隔1秒执行一次任务 + scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS); + } + // 清理超过60秒的数据 + private static void cleanUpOldStrings() { + long currentTime = System.currentTimeMillis(); + receivedStrings.removeIf(jsonObject -> + currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS) + ); + } + // 检查是否有超速情况 + private static void checkForSpeeding() { + if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回 + for (int i = 0; i < receivedStrings.size(); i++) { + JSONObject current = receivedStrings.get(i); + JSONObject next = receivedStrings.get(i + 1); + for (MessageTemplateType messageTemplateType : messageTemplateTypes) { + if(messageTemplateType.getMessageTemplateTypeId().equals(msgTypeId)){ + Short currentElapsed = current.getShort(messageTemplateType.getMessageField()); + Short nextElapsed = next.getShort(messageTemplateType.getMessageField()); + if (nextElapsed > currentElapsed + slideFrequency) { + log.info("出错啦,出错啦,您的"+messageTemplateType.getMessageField()+"不正常,请检查!!!"); + } + } + } + } + } + + + +} diff --git a/cloud-modules/cloud-event/src/main/resources/bootstrap.yml b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml index 1bc0fa4..a3e909a 100644 --- a/cloud-modules/cloud-event/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index 3a1eb6a..0b5c562 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index c5e8cb9..a70108e 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index afe1a22..84b2bf9 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java index 9802526..9eb42bd 100644 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java @@ -104,7 +104,6 @@ public class MqttConfigure { me.printStackTrace(); } } - public JSONObject messageParsing(String templateMessage) { //给一个JSON对象 JSONObject jsonObject = new JSONObject(); diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java index ad62d51..753bc2e 100644 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java @@ -50,6 +50,7 @@ public class test { //报文模版 @Resource private MessageTemplateTypeCacheService messageTemplateTypeCacheService; + public void main(String[] args) { //协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings //根据这个车辆VIN查询出他对应的车辆类型 @@ -107,7 +108,6 @@ public class test { // 每隔1秒执行一次任务 scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS); } - // 清理超过60秒的数据 private static void cleanUpOldStrings() { long currentTime = System.currentTimeMillis(); @@ -115,7 +115,6 @@ public class test { currentTime - jsonObject.getLong("time") > TimeUnit.SECONDS.toMillis(DURATION_SECONDS) ); } - // 检查是否有超速情况 private static void checkForSpeeding() { if (receivedStrings.size() < 2) return; // 如果数据不足,直接返回 diff --git a/cloud-modules/cloud-modules-template/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-template/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..1ff0cd8 --- /dev/null +++ b/cloud-modules/cloud-modules-template/src/main/resources/bootstrap.yml @@ -0,0 +1,62 @@ +# Tomcat +server: + port: 15277 + +# nacos线上地址 +nacos: + addr: 47.101.53.251:8848 + user-name: nacos + password: nacos + namespace: lxy +# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all +# Spring +spring: + mvc: + pathmatch: + matching-strategy: ant_path_matcher + amqp: + deserialization: + trust: + all: true + main: + allow-bean-definition-overriding: true + application: + # 应用名称 + name: cloud-template + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + config: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + # 系统共享配置 + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # 系统环境Config共享配置 + - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # xxl-job 配置文件 + - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + +logging: + level: + com.muyu.system.mapper: DEBUG diff --git a/cloud-modules/cloud-modules-vehiclegateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-vehiclegateway/src/main/resources/bootstrap.yml index b027b47..4eecbb4 100644 --- a/cloud-modules/cloud-modules-vehiclegateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-vehiclegateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # Spring spring: diff --git a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/SaasApplication.java b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/SaasApplication.java index 7186808..4d2fd21 100644 --- a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/SaasApplication.java +++ b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/SaasApplication.java @@ -1,6 +1,4 @@ package com.muyu.server; - - import com.muyu.common.security.annotation.EnableMyFeignClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml b/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml index faf0576..0ee7641 100644 --- a/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.53.251:8848 user-name: nacos password: nacos - namespace: yzl + namespace: lxy # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: