commit 80a6fb69921c3e665046b679e331440b63283957 Author: Yunfei Du <278774021@qq.com> Date: Tue Jun 25 13:11:51 2024 +0800 事件:实时数据+故障报警 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..63574ec --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..8d66637 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..82dbec8 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9faf85d --- /dev/null +++ b/pom.xml @@ -0,0 +1,162 @@ + + + 4.0.0 + + com.muyu + event + 1.0-SNAPSHOT + 事件驱动系统 + + + 17 + 17 + UTF-8 + 2.3.7.RELEASE + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.47 + + + org.springframework.amqp + spring-rabbit-test + test + + + + org.springframework.boot + spring-boot-starter-amqp + + + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-clients + 3.3.0 + + + org.springframework.kafka + spring-kafka-test + test + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + mysql + mysql-connector-java + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.2.2 + + + + org.apache.iotdb + iotdb-session + 0.14.0-preview1 + + + + cn.hutool + hutool-all + 5.6.3 + + + + com.alibaba + fastjson + 1.2.83 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + 1.18.22 + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + UTF-8 + + + + org.springframework.boot + spring-boot-maven-plugin + 2.3.7.RELEASE + + com.zhouhong.iotdbdemo.IotdbDemoApplication + + + + repackage + + repackage + + + + + + + diff --git a/src/main/java/com/muyu/eventdriven/EventDrivenApplication.java b/src/main/java/com/muyu/eventdriven/EventDrivenApplication.java new file mode 100644 index 0000000..e395388 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/EventDrivenApplication.java @@ -0,0 +1,15 @@ +package com.muyu.eventdriven; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cache.annotation.EnableCaching; + +@SpringBootApplication +@EnableCaching +public class EventDrivenApplication { + + public static void main(String[] args) { + SpringApplication.run(EventDrivenApplication.class, args); + } + +} diff --git a/src/main/java/com/muyu/eventdriven/EventDrivenRunner.java b/src/main/java/com/muyu/eventdriven/EventDrivenRunner.java new file mode 100644 index 0000000..5421441 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/EventDrivenRunner.java @@ -0,0 +1,151 @@ +package com.muyu.eventdriven; + +import com.alibaba.fastjson.JSON; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.server.EventInfoService; +import com.muyu.eventdriven.server.impl.EventInfoServiceImpl; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.checkerframework.checker.units.qual.A; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationContext; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + + +/** + * @ClassName EventDriveenApplication + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/6/25 10:03 + */ + +@Log4j2 +@Component +public class EventDrivenRunner implements ApplicationRunner { + @Autowired + private EventInfoServiceImpl eventInfoService; + @Autowired + private RedisTemplate< String, String > redisTemplate; + + @Autowired + private ApplicationContext applicationContext; + @Autowired + private EventTactics eventTactics; + private final AtomicInteger start = new AtomicInteger(); + + private static List classNameList = new ArrayList<>((Arrays.asList( + "StorageEvent", + "ElectronicFenceEvent", + "FaultAlarmEvent", + "RealTimeDataEvent", + "IndexWarningEvent"))); + + @Override + public void run(ApplicationArguments args) throws Exception { + if (start.get() != 0) { + return; + } + synchronized (this) { + if (start.get() != 0) { + return; + } + start.set(1); + + // kafka分区监听器 + new Thread(() -> { + ArrayList< TopicPartition > topicPartitions = new ArrayList<>(); +// List< String > topics = redisTemplate.opsForList ( ).range ( "ipList", 0, -1 ).stream ().distinct ().collect( Collectors.toList()); + List topics = new ArrayList ( ){{ + add ( "47.92.213.100" ); + add ( "47.92.95.98" ); + }}; + for (String topic : topics) { + for (int i = 0; i < 8; i++) { + TopicPartition topicPartition = new TopicPartition(topic, i); + topicPartitions.add(topicPartition); + } + } + Properties props = new Properties(){{ + put("bootstrap.servers", "43.142.12.243:9092"); + put("auto.commit.interval.ms", "1000"); + put("group.id", "test"); + put("enable.auto.commit", "true"); + put("session.timeout.ms", "30000"); + put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + }}; + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(topicPartitions); + try { + while (true) { + ConcurrentHashMap< String, ArrayList< VehicleData > > stringListHashMap = new ConcurrentHashMap<>(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + ConcurrentHashMap< String, ArrayList< VehicleData > > map = new ConcurrentHashMap<> ( ); + stringListHashMap= eventInfoService.getVehicleData(records, map); + if (!records.isEmpty()) { + ArrayList< VehicleData > vehicleData1 = new ArrayList<> ( ); + ConcurrentHashMap< String, ArrayList< VehicleData > > finalStringListHashMap = stringListHashMap; + new Thread(() -> { + for (TopicPartition partition : records.partitions()) { + String value=""; + String key1=""; + + List< ConsumerRecord > partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + key1=record.key ( ); + log.info("Offset = {}, Key = {}, Value = {}", record.offset(), key1, record.value()); + value= record.value ( ); + System.out.println (value ); + VehicleData vehicleData = JSON.parseObject ( value, VehicleData.class ); + vehicleData1.add ( vehicleData ); +// VehicleData vehicle = kafkaConsumerListenerExample.getVehicle(record.value()); +// VehicleEvent events = eventsService.getEvents(vehicle.getVin()); +// HandlerHelper.doHandler(events, vehicle, redisService); + } + // 处理拉取到的消息,将消息按车辆事件类型分类 + + finalStringListHashMap.forEach((key, value1) -> { + // 从 Redis 中获取车辆事件处理类的列表 +// String vehicleEventString = redisTemplate.opsForHash().get(RedisConstants.VEHICLE_EVENT, key).toString(); + String vehicleEventString = "1,2,3,4,5"; + for (String str : vehicleEventString.split(",")) { + CompletableFuture.runAsync(() -> { + applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value1); + }); + } + }); + + } + }).start(); + + } else { + Thread.sleep(10); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Consumer was interrupted.", e); + } finally { + consumer.close(); + } + }).start(); + } + } +} diff --git a/src/main/java/com/muyu/eventdriven/config/CaffeineCacheConfig.java b/src/main/java/com/muyu/eventdriven/config/CaffeineCacheConfig.java new file mode 100644 index 0000000..25d1fe9 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/CaffeineCacheConfig.java @@ -0,0 +1,28 @@ +package com.muyu.eventdriven.config; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.context.annotation.Bean; + +import java.util.concurrent.TimeUnit; + +/** + * @ClassName CaffeineCacheConfig + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/6/25 11:41 + */ + +public class CaffeineCacheConfig { + @Bean + public Cache caffeineCache() { + return Caffeine.newBuilder() + // 设置最后一次写入或访问后经过固定时间过期 + .expireAfterWrite(5, TimeUnit.SECONDS) + // 初始的缓存空间大小 + .initialCapacity(100) + // 缓存的最大条数 + .maximumSize(1000) + .build(); + } +} diff --git a/src/main/java/com/muyu/eventdriven/config/EventDrivenConfig.java b/src/main/java/com/muyu/eventdriven/config/EventDrivenConfig.java new file mode 100644 index 0000000..ea60070 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/EventDrivenConfig.java @@ -0,0 +1,12 @@ +package com.muyu.eventdriven.config; + +/** + * @ClassName DataAccessClientConfig + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/5/9 19:52 + */ +//@ComponentScan +//@Import({EventDrivenRunner.class}) +//public class EventDrivenConfig { +//} diff --git a/src/main/java/com/muyu/eventdriven/config/EventDrivenRunner.java b/src/main/java/com/muyu/eventdriven/config/EventDrivenRunner.java new file mode 100644 index 0000000..1ccd25c --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/EventDrivenRunner.java @@ -0,0 +1,20 @@ +package com.muyu.eventdriven.config; + + +/** + * @ClassName DataAccessClientRunner + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/5/9 19:53 + */ +//@Log4j2 +//@Component +//public class EventDrivenRunner implements ApplicationRunner { +// @Autowired +// private EventTacticsManage eventTacticsManage; +// +// @Override +// public void run(ApplicationArguments args){ +// eventTacticsManage.initEventTacticsManage(); +// } +//} diff --git a/src/main/java/com/muyu/eventdriven/config/caffeine/CaffeineCacheConfig.java b/src/main/java/com/muyu/eventdriven/config/caffeine/CaffeineCacheConfig.java new file mode 100644 index 0000000..66c872d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/caffeine/CaffeineCacheConfig.java @@ -0,0 +1,29 @@ +package com.muyu.eventdriven.config.caffeine; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.TimeUnit; + +/** + * @ClassName CaffeineCacheConfig + * @Description 本地缓存配置类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午9:39 + */ +@Configuration +public class CaffeineCacheConfig { + @Bean + public Cache caffeineCache() { + return Caffeine.newBuilder() + // 设置最后一次写入或访问后经过固定时间过期 + .expireAfterWrite(5, TimeUnit.SECONDS) + // 初始的缓存空间大小 + .initialCapacity(100) + // 缓存的最大条数 + .maximumSize(1000) + .build(); + } +} diff --git a/src/main/java/com/muyu/eventdriven/config/iotdb/IotDBSessionConfig.java b/src/main/java/com/muyu/eventdriven/config/iotdb/IotDBSessionConfig.java new file mode 100644 index 0000000..a29df97 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/iotdb/IotDBSessionConfig.java @@ -0,0 +1,189 @@ +package com.muyu.eventdriven.config.iotdb; + +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.session.util.Version; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.List; + +/** + * @ClassName AsVehicleEvent + * @Description iotdb配置工具类 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +@Log4j2 +@Component +@Configuration +public class IotDBSessionConfig { + + private static Session session; + + @Value("${spring.iotdb.ip}") + private String LOCAL_HOST ; + @Bean + public Session getSession() throws IoTDBConnectionException, StatementExecutionException { + if (session == null) { + log.info("正在连接iotdb......."); + session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build(); + session.open(false); + session.setFetchSize(100); + log.info("iotdb连接成功~"); + // 设置时区 + session.setTimeZone("+08:00"); + } + return session; + } + + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * time:时间戳 + * measurementsList:物理量 即:属性 + * type:数据类型: BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6); + * valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecordType(String deviceId, Long time,List measurementsList, TSDataType type,List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() != valuesList.size()) { + throw new ServerException("measurementsList 与 valuesList 值不对应"); + } + List types = new ArrayList<>(); + measurementsList.forEach(item -> { + types.add(type); + }); + session.insertRecord(deviceId, time, measurementsList, types, valuesList); + } + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param measurementsList:物理量 即:属性 + * @param valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecord(deviceId, time, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + /** + * description: 批量插入 + * author: zhouhong + */ + public void insertRecords(List deviceIdList, List timeList, List> measurementsList, List> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecords(deviceIdList, timeList, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + + /** + * description: 插入操作 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param schemaList: 属性值 + 数据类型 例子: List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64)); + * @param maxRowNumber: + * @return + */ + public void insertTablet(String deviceId, Long time,List schemaList, List valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException { + + Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber); + // 向iotdb里面添加数据 + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, time); + for (int i = 0; i < valueList.size(); i++) { + tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i)); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet, true); + tablet.reset(); + } + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + /** + * description: 根据SQL查询 + * author: zhouhong + */ + public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException { + return session.executeQueryStatement(sql); + } + + /** + * description: 删除分组 如 root.a1eaKSRpRty + * author: zhouhong + * @param groupName:分组名称 + * @return + */ + public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroup(groupName); + } + + /** + * description: 根据Timeseries删除 如:root.a1eaKSRpRty.CA3013A303A25467.breath (个人理解:为具体的物理量) + * author: zhouhong + */ + public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseries); + } + /** + * description: 根据Timeseries批量删除 + * author: zhouhong + */ + public void deleteTimeserieList(List timeseriesList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseriesList); + } + + /** + * description: 根据分组批量删除 + * author: zhouhong + */ + public void deleteStorageGroupList(List storageGroupList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroups(storageGroupList); + } + + /** + * description: 根据路径和结束时间删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(path, endTime); + } + /** + * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathListAndEndTime(List pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, endTime); + } + /** + * description: 根据路径集合和时间段批量删除 + * author: zhouhong + */ + public void deleteDataByPathListAndTime(List pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, startTime, endTime); + } + +} diff --git a/src/main/java/com/muyu/eventdriven/config/rabbit/RabbitRoutingConfig.java b/src/main/java/com/muyu/eventdriven/config/rabbit/RabbitRoutingConfig.java new file mode 100644 index 0000000..2ebf884 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/rabbit/RabbitRoutingConfig.java @@ -0,0 +1,51 @@ +package com.muyu.eventdriven.config.rabbit; + +import com.muyu.eventdriven.constants.RabbitConstants; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @ClassName RabbitRoutingConfig + * @Description rabbit配置类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:35 + */ +@Configuration +public class RabbitRoutingConfig { + /** + * Queue 可以有4个参数 + * 1.name 队列名 + * 2.durable 持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true + * 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false + * 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false + */ + @Bean + public Queue createRoutingQueueA() { + return new Queue(RabbitConstants.QUEUE_STATUS_ABNORMAL, true); + } + + @Bean + public Queue createRoutingQueueB() { + return new Queue(RabbitConstants.QUEUE_STATUS_NORMAL, true); + } + + @Bean + public DirectExchange routingExchange() { + //配置广播路由器 + return new DirectExchange(RabbitConstants.EXCHANGE_STATUS); + } + + @Bean + public Binding bingQueueAToRoutingExchange() { + return BindingBuilder.bind(createRoutingQueueA()).to(routingExchange()).with(RabbitConstants.STATUS_ABNORMAL); + } + + @Bean + public Binding bingQueueBToRoutingExchange() { + return BindingBuilder.bind(createRoutingQueueB()).to(routingExchange()).with(RabbitConstants.STATUS_NORMAL); + } +} diff --git a/src/main/java/com/muyu/eventdriven/config/redis/RedisListenerConfig.java b/src/main/java/com/muyu/eventdriven/config/redis/RedisListenerConfig.java new file mode 100644 index 0000000..8c32c63 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/config/redis/RedisListenerConfig.java @@ -0,0 +1,22 @@ +package com.muyu.eventdriven.config.redis; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +/** + * @ClassName RedisListenerConfig + * @Description redis键过期监听配置类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:23 + */ +@Configuration +public class RedisListenerConfig { + @Bean + public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + return container; + } +} diff --git a/src/main/java/com/muyu/eventdriven/constants/Constants.java b/src/main/java/com/muyu/eventdriven/constants/Constants.java new file mode 100644 index 0000000..b6675da --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/constants/Constants.java @@ -0,0 +1,134 @@ +package com.muyu.eventdriven.constants; + +/** + * 通用常量信息 + * + * @author muyu + */ +public class Constants { + /** + * UTF-8 字符集 + */ + public static final String UTF8 = "UTF-8"; + + /** + * GBK 字符集 + */ + public static final String GBK = "GBK"; + + /** + * www主域 + */ + public static final String WWW = "www."; + + /** + * RMI 远程方法调用 + */ + public static final String LOOKUP_RMI = "rmi:"; + + /** + * LDAP 远程方法调用 + */ + public static final String LOOKUP_LDAP = "ldap:"; + + /** + * LDAPS 远程方法调用 + */ + public static final String LOOKUP_LDAPS = "ldaps:"; + + /** + * http请求 + */ + public static final String HTTP = "http://"; + + /** + * https请求 + */ + public static final String HTTPS = "https://"; + + /** + * 成功标记 + */ + public static final Integer SUCCESS = 200; + + /** + * 失败标记 + */ + public static final Integer FAIL = 500; + + /** + * 登录成功状态 + */ + public static final String LOGIN_SUCCESS_STATUS = "0"; + + /** + * 登录失败状态 + */ + public static final String LOGIN_FAIL_STATUS = "1"; + + /** + * 登录成功 + */ + public static final String LOGIN_SUCCESS = "Success"; + + /** + * 注销 + */ + public static final String LOGOUT = "Logout"; + + /** + * 注册 + */ + public static final String REGISTER = "Register"; + + /** + * 登录失败 + */ + public static final String LOGIN_FAIL = "Error"; + + /** + * 当前记录起始索引 + */ + public static final String PAGE_NUM = "pageNum"; + + /** + * 每页显示记录数 + */ + public static final String PAGE_SIZE = "pageSize"; + + /** + * 排序列 + */ + public static final String ORDER_BY_COLUMN = "orderByColumn"; + + /** + * 排序的方向 "desc" 或者 "asc". + */ + public static final String IS_ASC = "isAsc"; + + /** + * 验证码有效期(分钟) + */ + public static final long CAPTCHA_EXPIRATION = 2; + + /** + * 资源映射路径 前缀 + */ + public static final String RESOURCE_PREFIX = "/profile"; + + /** + * 自动识别json对象白名单配置(仅允许解析的包名,范围越小越安全) + */ + public static final String[] JSON_WHITELIST_STR = {"org.springframework", "com.muyu"}; + + /** + * 定时任务白名单配置(仅允许访问的包名,如其他需要可以自行添加) + */ + public static final String[] JOB_WHITELIST_STR = {"com.muyu"}; + + /** + * 定时任务违规的字符 + */ + public static final String[] JOB_ERROR_STR = {"java.net.URL", "javax.naming.InitialContext", "org.yaml.snakeyaml", + "org.springframework", "org.apache", "com.muyu.common.core.utils.file"}; +} diff --git a/src/main/java/com/muyu/eventdriven/constants/FaultCodeConstants.java b/src/main/java/com/muyu/eventdriven/constants/FaultCodeConstants.java new file mode 100644 index 0000000..3815b06 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/constants/FaultCodeConstants.java @@ -0,0 +1,27 @@ +package com.muyu.eventdriven.constants; + +/** + * @ClassName FaultCodeConstants + * @Description 故障码常量 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:49 + */ +public class FaultCodeConstants { + public static final String VEHICLESTATUS = "GZ001"; + public static final String CHARGINGSTATUS = "GZ002"; + public static final String OPERATINGSTATUS = "GZ003"; + public static final String SOCSTATUS = "GZ004"; + public static final String CHARGINGENERGYSTORAGESTATUS = "GZ005"; + public static final String DRIVEMOTORSTATUS = "GZ006"; + public static final String POSITIONSTATUS = "GZ007"; + public static final String EASSTATUS = "GZ008"; + public static final String PTCSTATUS = "GZ009"; + public static final String EPSSTATUS = "GZ010"; + public static final String ABSSTATUS = "GZ011"; + public static final String MCUSTATUS = "GZ012"; + public static final String HEATINGSTATUS = "GZ013"; + public static final String BATTERYSTATUS = "GZ014"; + public static final String BATTERYINSULATIONSTATUS = "GZ015"; + public static final String DCDCSTATUS = "GZ016"; + public static final String CHGSTATUS = "GZ017"; +} diff --git a/src/main/java/com/muyu/eventdriven/constants/HttpStatus.java b/src/main/java/com/muyu/eventdriven/constants/HttpStatus.java new file mode 100644 index 0000000..670fc96 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/constants/HttpStatus.java @@ -0,0 +1,93 @@ +package com.muyu.eventdriven.constants; + +/** + * 返回状态码 + * + * @author muyu + */ +public class HttpStatus { + /** + * 操作成功 + */ + public static final int SUCCESS = 200; + + /** + * 对象创建成功 + */ + public static final int CREATED = 201; + + /** + * 请求已经被接受 + */ + public static final int ACCEPTED = 202; + + /** + * 操作已经执行成功,但是没有返回数据 + */ + public static final int NO_CONTENT = 204; + + /** + * 资源已被移除 + */ + public static final int MOVED_PERM = 301; + + /** + * 重定向 + */ + public static final int SEE_OTHER = 303; + + /** + * 资源没有被修改 + */ + public static final int NOT_MODIFIED = 304; + + /** + * 参数列表错误(缺少,格式不匹配) + */ + public static final int BAD_REQUEST = 400; + + /** + * 未授权 + */ + public static final int UNAUTHORIZED = 401; + + /** + * 访问受限,授权过期 + */ + public static final int FORBIDDEN = 403; + + /** + * 资源,服务未找到 + */ + public static final int NOT_FOUND = 404; + + /** + * 不允许的http方法 + */ + public static final int BAD_METHOD = 405; + + /** + * 资源冲突,或者资源被锁 + */ + public static final int CONFLICT = 409; + + /** + * 不支持的数据,媒体类型 + */ + public static final int UNSUPPORTED_TYPE = 415; + + /** + * 系统内部错误 + */ + public static final int ERROR = 500; + + /** + * 接口未实现 + */ + public static final int NOT_IMPLEMENTED = 501; + + /** + * 系统警告消息 + */ + public static final int WARN = 601; +} diff --git a/src/main/java/com/muyu/eventdriven/constants/RabbitConstants.java b/src/main/java/com/muyu/eventdriven/constants/RabbitConstants.java new file mode 100644 index 0000000..ae62d4f --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/constants/RabbitConstants.java @@ -0,0 +1,15 @@ +package com.muyu.eventdriven.constants; + +/** + * @ClassName RabbitConstants + * @Description Rabbit常量类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:39 + */ +public class RabbitConstants { + public static final String QUEUE_STATUS_ABNORMAL = "zhiLian-vehicle-end"; + public static final String QUEUE_STATUS_NORMAL = "zhiLian-vehicle-start"; + public static final String EXCHANGE_STATUS = "exchange_status"; + public static final String STATUS_ABNORMAL = "abnormal"; + public static final String STATUS_NORMAL = "normal"; +} diff --git a/src/main/java/com/muyu/eventdriven/constants/RedisConstants.java b/src/main/java/com/muyu/eventdriven/constants/RedisConstants.java new file mode 100644 index 0000000..bfce36e --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/constants/RedisConstants.java @@ -0,0 +1,12 @@ +package com.muyu.eventdriven.constants; + +/** + * @ClassName RedisConstants + * @Description Redis常量 + * @Author Yunfei.Du + * @Date 2024/6/20 下午3:30 + */ +public class RedisConstants { + public static final String VEHICLE_EVENT = "vehicle_event"; + public static final String VEHICLE_FAULT_KEY = "vehicle_fault_key:"; +} diff --git a/src/main/java/com/muyu/eventdriven/consumer/KafkaConsumers.java b/src/main/java/com/muyu/eventdriven/consumer/KafkaConsumers.java new file mode 100644 index 0000000..237dcf5 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/consumer/KafkaConsumers.java @@ -0,0 +1,49 @@ +package com.muyu.eventdriven.consumer; + +import com.muyu.eventdriven.domain.VehicleKafka; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * @ClassName KafkaConsumers + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/6/9 上午9:54 + */ +@Component +public class KafkaConsumers { + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + + + + + public KafkaConsumer kafkaConsumer(VehicleKafka vehicleKafka){ + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + // 指定分区策略 + properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); + // 指定消费者组,必须参数 + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test1"); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + // 订阅主题分区 + List topicPartitions = new ArrayList<>(); + //第一个是主题名 第二个是分区 + topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions())); + consumer.assign(topicPartitions); + return consumer; + + } +} diff --git a/src/main/java/com/muyu/eventdriven/consumer/redis/RedisKeyExpirationListener.java b/src/main/java/com/muyu/eventdriven/consumer/redis/RedisKeyExpirationListener.java new file mode 100644 index 0000000..40adc5e --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/consumer/redis/RedisKeyExpirationListener.java @@ -0,0 +1,54 @@ +package com.muyu.eventdriven.consumer.redis; + +import com.alibaba.fastjson.JSON; +import com.muyu.eventdriven.constants.RabbitConstants; +import com.muyu.eventdriven.constants.RedisConstants; +import com.muyu.eventdriven.domain.RabbitFalut; +import com.muyu.eventdriven.domain.VehicleFaultStatus; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.UUID; + +/** + * @ClassName RedisKeyExpirationListener + * @Description redsi键过期监听类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:27 + */ +@Log4j2 +@Component +public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { + @Autowired + private RabbitTemplate rabbitTemplate; + + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + //拿到过期key的信息并做处理 + @Override + public void onMessage(Message messages, byte[] pattern) { + String key = messages.toString(); + if (key.contains(RedisConstants.VEHICLE_FAULT_KEY)){ + String[] split = key.split(":"); + RabbitFalut rabbitFalut = new RabbitFalut(); + rabbitFalut.setEndTime(new Date()); + rabbitFalut.setVin(split[1]); + rabbitFalut.setFaultCode(split[2]); + rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.QUEUE_STATUS_ABNORMAL, JSON.toJSONString(rabbitFalut),message ->{ + message.getMessageProperties().setMessageId( UUID.randomUUID().toString()); + //设置消息延迟时间为5秒 + message.getMessageProperties().setDelay(5000); + log.info ( "消息发送成功" ); + return message; + }); + } + } +} diff --git a/src/main/java/com/muyu/eventdriven/controller/EventInfoController.java b/src/main/java/com/muyu/eventdriven/controller/EventInfoController.java new file mode 100644 index 0000000..01f121b --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/controller/EventInfoController.java @@ -0,0 +1,33 @@ +package com.muyu.eventdriven.controller; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.muyu.eventdriven.domain.rest.Result; +import com.muyu.eventdriven.server.EventInfoService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * @ClassName EventInfoController + * @Description 事件信息控制层 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:58 + */ +@RestController +@RequestMapping("/eventInfo") +public class EventInfoController { + @Autowired + private EventInfoService eventInfoService; + + @GetMapping("/CreatKafkaConsumer") + public void creatKafkaConsumer(@RequestParam("vin") String vin) throws JsonProcessingException { + eventInfoService.creatKafkaConsumer(vin); + } + + @GetMapping("/CloseKafkaConsumer") + public Result closeKafkaConsumer(@RequestParam("vin") String vin) { + return eventInfoService.closeKafkaConsumer(vin); + } +} diff --git a/src/main/java/com/muyu/eventdriven/controller/IotDbController.java b/src/main/java/com/muyu/eventdriven/controller/IotDbController.java new file mode 100644 index 0000000..1381929 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/controller/IotDbController.java @@ -0,0 +1,63 @@ +package com.muyu.eventdriven.controller; + +import com.muyu.eventdriven.config.iotdb.IotDBSessionConfig; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.model.param.IotDbParam; +import com.muyu.eventdriven.response.ResponseData; +import com.muyu.eventdriven.server.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.rmi.ServerException; + +/** + * @ClassName AsVehicleEvent + * @Description iotdb控制层 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +@Log4j2 +@RestController +public class IotDbController { + + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + /** + * 插入数据 + * @param iotDbParam + */ + @PostMapping("/api/device/insert") + public ResponseData insert(@RequestBody VehicleData vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException { + iotDbServer.insertData(vehicleData); + return ResponseData.success(); + } + + /** + * 查询数据 + * @param iotDbParam + */ + @PostMapping("/api/device/queryData") + public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception { + return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam)); + } + + /** + * 删除分组 + * @return + */ + @PostMapping("/api/device/deleteGroup") + public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException { + iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty"); + iotDBSessionConfig.deleteStorageGroup("root.smartretirement"); + return ResponseData.success(); + } + +} diff --git a/src/main/java/com/muyu/eventdriven/domain/AsVehicleEvent.java b/src/main/java/com/muyu/eventdriven/domain/AsVehicleEvent.java new file mode 100644 index 0000000..d4f8c57 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/AsVehicleEvent.java @@ -0,0 +1,22 @@ +package com.muyu.eventdriven.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ClassName AsVehicleEvent + * @Description 车辆拥有事件 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class AsVehicleEvent { + private Integer id; + private String vehicleVin; + private Integer eventInfoId; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/EventInfo.java b/src/main/java/com/muyu/eventdriven/domain/EventInfo.java new file mode 100644 index 0000000..55b4bf5 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/EventInfo.java @@ -0,0 +1,22 @@ +package com.muyu.eventdriven.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ClassName EventInfo + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:14 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class EventInfo { + private Integer id; + private String eventName; + private Integer eventTypeId; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/EventTacticsManage.java b/src/main/java/com/muyu/eventdriven/domain/EventTacticsManage.java new file mode 100644 index 0000000..2cee917 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/EventTacticsManage.java @@ -0,0 +1,33 @@ +package com.muyu.eventdriven.domain; + +import org.springframework.stereotype.Component; + +/** + * @ClassName EventTactics + * @Description 事件 + * @Author Yunfei.Du + * @Date 2024/6/20 下午3:57 + */ +@Component +public class EventTacticsManage { + +// public void initEventTacticsManage(){ +// ArrayList strings = new ArrayList<>(); +// strings.add("com.muyu.eventdriven.tactics.basics.StorageEvent"); +// strings.add("com.muyu.eventdriven.tactics.system.ElectronicFenceEvent"); +// strings.add("com.muyu.eventdriven.tactics.system.FaultAlarmEvent"); +// strings.add("com.muyu.eventdriven.tactics.system.RealTimeDataEvent"); +// strings.add("com.muyu.eventdriven.tactics.system.IndexWarningEvent"); +// for (int i = 1; i <= strings.size(); i++) { +// eventContextMap.put(String.valueOf(i),new EventContext(strings.get(i-1))); +// } +// } +// +// public EventContext getEventContext(String key){ +// return eventContextMap.get(key); +// } +// +// public void setEventContext(String key,EventContext eventContext){ +// eventContextMap.put(key,eventContext); +// } +} diff --git a/src/main/java/com/muyu/eventdriven/domain/EventType.java b/src/main/java/com/muyu/eventdriven/domain/EventType.java new file mode 100644 index 0000000..791bdd2 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/EventType.java @@ -0,0 +1,21 @@ +package com.muyu.eventdriven.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ClassName EventType + * @Description 事件类型实体类 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:15 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class EventType { + private Integer id; + private String EventTypeName; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/RabbitFalut.java b/src/main/java/com/muyu/eventdriven/domain/RabbitFalut.java new file mode 100644 index 0000000..0d840d9 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/RabbitFalut.java @@ -0,0 +1,19 @@ +package com.muyu.eventdriven.domain; + +import lombok.Data; + +import java.util.Date; + +/** + * @ClassName RabbitFalut + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/6/25 11:42 + */ +@Data +public class RabbitFalut { + private String faultCode; + private String vin; + private Date startTime; + private Date endTime; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/VehicleData.java b/src/main/java/com/muyu/eventdriven/domain/VehicleData.java new file mode 100644 index 0000000..7de74d3 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/VehicleData.java @@ -0,0 +1,261 @@ +package com.muyu.eventdriven.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.math.BigDecimal; + +/** + * @ClassName VehicleData + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/6/5 下午6:52 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class VehicleData { + + /** + * VIN + */ + private String vin; + + /** + * 行驶路线 + */ + private String drivingRoute; + + + /** + * 经度 + */ + private String longitude; + + /** + * 维度 + */ + private String latitude; + /** + * 速度 + */ + private String speed; + + /** + * 里程 + */ + private BigDecimal mileage; + + /** + * 总电压 + */ + private String voltage; + + /** + * 总电流 + */ + private String current; + + /** + * 绝缘电阻 + */ + private String resistance; + + /** + * 档位 + */ + private String gear = "P"; + + /** + * 加速踏板行程值 + */ + private String accelerationPedal; + + /** + * 制动踏板行程值 + */ + private String brakePedal; + + /** + * 燃料消耗率 + */ + private String fuelConsumptionRate; + + /** + * 电机控制器温度 + */ + private String motorControllerTemperature; + + /** + * 电机转速 + */ + private String motorSpeed; + + /** + * 电机转矩 + */ + private String motorTorque; + + /** + * 电机温度 + */ + private String motorTemperature; + + /** + * 电机电压 + */ + private String motorVoltage; + + /** + * 电机电流 + */ + private String motorCurrent; + + /** + * 动力电池剩余电量SOC + */ + private BigDecimal remainingBattery; + + /** + * 电池总容量 + */ + private BigDecimal batteryLevel; + + /** + * 当前状态允许的最大反馈功率 + */ + private String maximumFeedbackPower; + + /** + * 当前状态允许最大放电功率 + */ + private String maximumDischargePower; + + /** + * BMS自检计数器 + */ + private String selfCheckCounter; + + /** + * 动力电池充放电电流 + */ + private String totalBatteryCurrent; + + /** + * 动力电池负载端总电压V3 + */ + private String totalBatteryVoltage; + + /** + * 单次最大电压 + */ + private String singleBatteryMaxVoltage; + + /** + * 单体电池最低电压 + */ + private String singleBatteryMinVoltage; + + /** + * 单体电池最高温度 + */ + private String singleBatteryMaxTemperature; + + /** + * 单体电池最低温度 + */ + private String singleBatteryMinTemperature; + + /** + * 动力电池可用容量 + */ + private String availableBatteryCapacity; + + /** + * 车辆状态 + */ + private int vehicleStatus = 1; + + /** + * 充电状态 + */ + private int chargingStatus = 1; + + /** + * 运行状态 + */ + private int operatingStatus = 1; + + /** + * SOC + */ + private int socStatus = 1; + + /** + * 可充电储能装置工作状态 + */ + private int chargingEnergyStorageStatus = 1; + + /** + * 驱动电机状态 + */ + private int driveMotorStatus = 1; + + /** + * 定位是否有效 + */ + private int positionStatus = 1; + + /** + * EAS(汽车防盗系统)状态 + */ + private int easStatus = 1; + + /** + * PTC(电动加热器)状态 + */ + private int ptcStatus = 1; + + /** + * EPS(电动助力系统)状态 + */ + private int epsStatus = 1; + + /** + * ABS(防抱死)状态 + */ + private int absStatus = 1; + + /** + * MCU(电机/逆变器)状态 + */ + private int mcuStatus = 1; + + /** + * 动力电池加热状态 + */ + private int heatingStatus = 1; + + /** + * 动力电池当前状态 + */ + private int batteryStatus = 1; + + /** + * 动力电池保温状态 + */ + private int batteryInsulationStatus = 1; + + /** + * DCDC(电力交换系统)状态 + */ + private int dcdcStatus = 1; + + /** + * CHG(充电机)状态 + */ + private int chgStatus = 1; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/VehicleFaultStatus.java b/src/main/java/com/muyu/eventdriven/domain/VehicleFaultStatus.java new file mode 100644 index 0000000..8169cad --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/VehicleFaultStatus.java @@ -0,0 +1,23 @@ +package com.muyu.eventdriven.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ClassName VehicleFaultStatus + * @Description 车辆状态报警类 + * @Author Yunfei.Du + * @Date 2024/6/21 上午11:46 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class VehicleFaultStatus { + public String vin; + public Long timestamp; + public String faultCode; + public String faultType; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/VehicleKafka.java b/src/main/java/com/muyu/eventdriven/domain/VehicleKafka.java new file mode 100644 index 0000000..ff578b0 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/VehicleKafka.java @@ -0,0 +1,19 @@ +package com.muyu.eventdriven.domain; + +import lombok.Data; + +/** + * @ClassName Test + * @Description 描述 + * @Author Yunfei.Du + * @Date 2024/6/9 上午10:56 + */ +@Data +public class VehicleKafka { + //分区 + private Integer partitions; + + private String key; + //主题 + private String consumerName; +} diff --git a/src/main/java/com/muyu/eventdriven/domain/rest/Result.java b/src/main/java/com/muyu/eventdriven/domain/rest/Result.java new file mode 100644 index 0000000..171e52c --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/domain/rest/Result.java @@ -0,0 +1,112 @@ +package com.muyu.eventdriven.domain.rest; + +import com.muyu.eventdriven.constants.Constants; +import com.muyu.eventdriven.constants.HttpStatus; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 响应信息主体 + * + * @author muyu + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Result implements Serializable { + /** + * 成功 + */ + public static final int SUCCESS = Constants.SUCCESS; + /** + * 失败 + */ + public static final int FAIL = Constants.FAIL; + /** + * 警告 + */ + public static final int WARN = HttpStatus.WARN; + + private static final long serialVersionUID = 1L; + private int code; + + private String msg; + + private T data; + + public static Result success () { + return restResult(null, SUCCESS, null); + } + + public static Result success (T data) { + return restResult(data, SUCCESS, null); + } + + public static Result success (T data, String msg) { + return restResult(data, SUCCESS, msg); + } + + public static Result error () { + return restResult(null, FAIL, null); + } + + public static Result error (String msg) { + return restResult(null, FAIL, msg); + } + + public static Result error (T data) { + return restResult(data, FAIL, null); + } + + public static Result error (T data, String msg) { + return restResult(data, FAIL, msg); + } + + public static Result error (int code, String msg) { + return restResult(null, code, msg); + } + + + + public static Result warn () { + return restResult(null, WARN, null); + } + + public static Result warn (String msg) { + return restResult(null, WARN, msg); + } + + public static Result warn (T data) { + return restResult(data, WARN, null); + } + + public static Result warn (T data, String msg) { + return restResult(data, WARN, msg); + } + + public static Result warn (int code, String msg) { + return restResult(null, code, msg); + } + + private static Result restResult (T data, int code, String msg) { + return Result.builder() + .code(code) + .data(data) + .msg(msg) + .build(); + } + + public static Boolean isError (Result ret) { + return !isSuccess(ret); + } + + public static Boolean isSuccess (Result ret) { + return Result.SUCCESS == ret.getCode(); + } + +} diff --git a/src/main/java/com/muyu/eventdriven/listener/RedisExpirationListener.java b/src/main/java/com/muyu/eventdriven/listener/RedisExpirationListener.java new file mode 100644 index 0000000..a2db53d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/listener/RedisExpirationListener.java @@ -0,0 +1,48 @@ +package com.muyu.eventdriven.listener; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +/** + * RedisKeyExpirationListener + * + * @author Yangle + * Date 2024/6/21 14:12 + */ +@Log4j2 +@Configuration +public class RedisExpirationListener extends KeyExpirationEventMessageListener { + + @Autowired + private RabbitTemplate rabbitTemplate; + + + public RedisExpirationListener(RedisMessageListenerContainer listenerContainer) { + super ( listenerContainer ); + } + + + /** + * 针对redis数据失效事件,进行数据处理 + * @param message 失效的key + */ + @Override + public void onMessage(Message message, byte[] pattern) { + log.info("过期redis数据:" + message.toString()); + try { + String key = message.toString(); + log.info("过期redis数据:" + key); + rabbitTemplate.convertAndSend("disconnect_connect","resolve_time",key); + } catch (Exception e) { + e.printStackTrace(); + log.error("【修改支付订单过期状态异常】:" + e.getMessage()); + } + } + + +} diff --git a/src/main/java/com/muyu/eventdriven/listener/VinConsumerRunner.java b/src/main/java/com/muyu/eventdriven/listener/VinConsumerRunner.java new file mode 100644 index 0000000..02d1e1d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/listener/VinConsumerRunner.java @@ -0,0 +1,111 @@ +//package com.mobai.kafka.listener; +// +//import com.mobai.domain.MqttServerModel; +//import com.mobai.domain.Vehicle; +//import com.mobai.domain.VehicleEvent; +//import com.mobai.forest.ForestGet; +//import com.mobai.utils.RedisService; +//import com.mobai.vehicle.event.service.EventsService; +//import com.mobai.vehicle.ownModel.HandlerHelper; +//import lombok.extern.log4j.Log4j2; +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.common.TopicPartition; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.boot.ApplicationArguments; +//import org.springframework.boot.ApplicationRunner; +//import org.springframework.stereotype.Component; +// +//import java.time.Duration; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.Properties; +//import java.util.concurrent.atomic.AtomicInteger; +// +///** +// * @author Saisai +// * @className VinConsumerRunner +// * @description 描述 +// * @date 2024/6/18 17:18 +// */ +// +//@Log4j2 +//@Component +//public class VinConsumerRunner implements ApplicationRunner { +// +// @Autowired +// private ForestGet forestGet; +// +// @Autowired +// private KafkaConsumerListenerExample kafkaConsumerListenerExample; +// +// @Autowired +// private RedisService redisService; +// +// @Autowired +// private EventsService eventsService; +// +// +// private final AtomicInteger start = new AtomicInteger(); +// @Override +// public void run(ApplicationArguments args) throws Exception { +// if (start.get() != 0) { +// return; +// } +// synchronized (this) { +// if (start.get() != 0) { +// return; +// } +// start.set(1); +// +// // kafka分区监听器 +// new Thread(() -> { +// ArrayList topicPartitions = new ArrayList<>(); +// List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList(); +// for (String topic : topics) { +// for (int i = 0; i < 8; i++) { +// TopicPartition topicPartition = new TopicPartition(topic, i); +// topicPartitions.add(topicPartition); +// } +// } +// Properties props = new Properties(){{ +// put("bootstrap.servers", "localhost:9092"); +// put("auto.commit.interval.ms", "1000"); +// put("group.id", "test"); +// put("enable.auto.commit", "true"); +// put("session.timeout.ms", "30000"); +// put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +// put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +// }}; +// KafkaConsumer consumer = new KafkaConsumer<>(props); +// consumer.assign(topicPartitions); +// try { +// while (true) { +// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +// if (!records.isEmpty()) { +// new Thread(() -> { +// for (TopicPartition partition : records.partitions()) { +// List> partitionRecords = records.records(partition); +// for (ConsumerRecord record : partitionRecords) { +// log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value()); +// Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value()); +// VehicleEvent events = eventsService.getEvents(vehicle.getVin()); +// HandlerHelper.doHandler(events, vehicle, redisService); +// } +// } +// }).start(); +// } else { +// Thread.sleep(10); +// } +// } +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// log.error("Consumer was interrupted.", e); +// } finally { +// consumer.close(); +// } +// }).start(); +// } +// } +//} diff --git a/src/main/java/com/muyu/eventdriven/mapper/EventDrivenMapper.java b/src/main/java/com/muyu/eventdriven/mapper/EventDrivenMapper.java new file mode 100644 index 0000000..672d427 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/mapper/EventDrivenMapper.java @@ -0,0 +1,14 @@ +package com.muyu.eventdriven.mapper; + +import org.apache.ibatis.annotations.Mapper; + +/** + * @ClassName EventDrivenMapper + * @Description 事件系统DAO层 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:47 + */ +@Mapper +public interface EventDrivenMapper { + +} diff --git a/src/main/java/com/muyu/eventdriven/model/param/IotDbParam.java b/src/main/java/com/muyu/eventdriven/model/param/IotDbParam.java new file mode 100644 index 0000000..a9ba6f0 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/model/param/IotDbParam.java @@ -0,0 +1,41 @@ +package com.muyu.eventdriven.model.param; + +import lombok.Data; +/** + * @ClassName AsVehicleEvent + * @Description iotdb控制层入参 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +@Data +public class IotDbParam { + /*** + * 产品PK + */ + private String pk; + /*** + * 设备号 + */ + private String sn; + /*** + * 时间 + */ + private Long time; + /*** + * 实时呼吸 + */ + private String breath; + /*** + * 实时心率 + */ + private String heart; + /*** + * 查询开始时间 + */ + private String startTime; + /*** + * 查询结束时间 + */ + private String endTime; + +} diff --git a/src/main/java/com/muyu/eventdriven/model/result/IotDbResult.java b/src/main/java/com/muyu/eventdriven/model/result/IotDbResult.java new file mode 100644 index 0000000..ccb214d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/model/result/IotDbResult.java @@ -0,0 +1,34 @@ +package com.muyu.eventdriven.model.result; + +import lombok.Data; + +/** + * @ClassName AsVehicleEvent + * @Description iotdb控制层返回结果 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +@Data +public class IotDbResult { + /*** + * 时间 + */ + private String time; + /*** + * 产品PK + */ + private String pk; + /*** + * 设备号 + */ + private String sn; + /*** + * 实时呼吸 + */ + private String breath; + /*** + * 实时心率 + */ + private String heart; + +} diff --git a/src/main/java/com/muyu/eventdriven/response/ErrorResponseData.java b/src/main/java/com/muyu/eventdriven/response/ErrorResponseData.java new file mode 100644 index 0000000..41894c3 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/response/ErrorResponseData.java @@ -0,0 +1,78 @@ +package com.muyu.eventdriven.response; + +/** + * @ClassName AsVehicleEvent + * @Description 错误返回封装 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +public class ErrorResponseData extends ResponseData { + private String exceptionClazz; + + ErrorResponseData(String message) { + super(false, DEFAULT_ERROR_CODE, message, message, (Object)null); + } + + public ErrorResponseData(Integer code, String message) { + super(false, code, message, message, (Object)null); + } + + ErrorResponseData(Integer code, String message, Object object) { + super(false, code, message, object); + } + + ErrorResponseData(Integer code, String message, String localizedMsg, Object object) { + super(false, code, message, localizedMsg, object); + } + + public boolean equals(final Object o) { + if (o == this) { + return true; + } else if (!(o instanceof ErrorResponseData)) { + return false; + } else { + ErrorResponseData other = (ErrorResponseData)o; + if (!other.canEqual(this)) { + return false; + } else if (!super.equals(o)) { + return false; + } else { + Object this$exceptionClazz = this.getExceptionClazz(); + Object other$exceptionClazz = other.getExceptionClazz(); + if (this$exceptionClazz == null) { + if (other$exceptionClazz != null) { + return false; + } + } else if (!this$exceptionClazz.equals(other$exceptionClazz)) { + return false; + } + + return true; + } + } + } + + protected boolean canEqual(final Object other) { + return other instanceof ErrorResponseData; + } + + public int hashCode() { + int result = super.hashCode(); + Object $exceptionClazz = this.getExceptionClazz(); + result = result * 59 + ($exceptionClazz == null ? 43 : $exceptionClazz.hashCode()); + return result; + } + + public String getExceptionClazz() { + return this.exceptionClazz; + } + + public void setExceptionClazz(final String exceptionClazz) { + this.exceptionClazz = exceptionClazz; + } + + public String toString() { + return "ErrorResponseData(exceptionClazz=" + this.getExceptionClazz() + ")"; + } +} + diff --git a/src/main/java/com/muyu/eventdriven/response/ResponseData.java b/src/main/java/com/muyu/eventdriven/response/ResponseData.java new file mode 100644 index 0000000..b3ff368 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/response/ResponseData.java @@ -0,0 +1,215 @@ +package com.muyu.eventdriven.response; + +/** + * @ClassName AsVehicleEvent + * @Description 返回结果封装 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ +public class ResponseData { + public static final String DEFAULT_SUCCESS_MESSAGE = "请求成功"; + public static final String DEFAULT_ERROR_MESSAGE = "网络异常"; + public static final Integer DEFAULT_SUCCESS_CODE = 200; + public static final Integer DEFAULT_ERROR_CODE = 500; + private Boolean success; + private Integer code; + private String message; + private String localizedMsg; + private Object data; + + public ResponseData() { + } + + public ResponseData(Boolean success, Integer code, String message, Object data) { + this.success = success; + this.code = code; + this.message = message; + this.data = data; + } + + public ResponseData(Boolean success, Integer code, String message, String localizedMsg, Object data) { + this.success = success; + this.code = code; + this.message = message; + this.localizedMsg = localizedMsg; + this.data = data; + } + + public ResponseData(Boolean success, Integer code, String message) { + this.success = success; + this.code = code; + this.message = message; + } + + public static SuccessResponseData success() { + return new SuccessResponseData(); + } + + public static SuccessResponseData success(Object object) { + return new SuccessResponseData(object); + } + + public static SuccessResponseData success(Integer code, String message, Object object) { + return new SuccessResponseData(code, message, object); + } + + public static SuccessResponseData success(Integer code, String message) { + return new SuccessResponseData(code, message); + } + + public static SuccessResponseData success(Integer code, String message, String localizedMsg, Object object) { + return new SuccessResponseData(code, message, localizedMsg, object); + } + + public static ErrorResponseData error(String message) { + return new ErrorResponseData(message); + } + + public static ErrorResponseData error(Integer code, String message) { + return new ErrorResponseData(code, message); + } + + public static ErrorResponseData error(Integer code, String message, Object object) { + return new ErrorResponseData(code, message, object); + } + + public static ErrorResponseData error(Integer code, String message, String localizedMsg, Object object) { + return new ErrorResponseData(code, message, localizedMsg, object); + } + + public Boolean getSuccess() { + return this.success; + } + + public Integer getCode() { + return this.code; + } + + public String getMessage() { + return this.message; + } + + public String getLocalizedMsg() { + return this.localizedMsg; + } + + public Object getData() { + return this.data; + } + + public void setSuccess(final Boolean success) { + this.success = success; + } + + public void setCode(final Integer code) { + this.code = code; + } + + public void setMessage(final String message) { + this.message = message; + } + + public void setLocalizedMsg(final String localizedMsg) { + this.localizedMsg = localizedMsg; + } + + public void setData(final Object data) { + this.data = data; + } + + public boolean equals(final Object o) { + if (o == this) { + return true; + } else if (!(o instanceof ResponseData)) { + return false; + } else { + ResponseData other = (ResponseData)o; + if (!other.canEqual(this)) { + return false; + } else { + label71: { + Object this$success = this.getSuccess(); + Object other$success = other.getSuccess(); + if (this$success == null) { + if (other$success == null) { + break label71; + } + } else if (this$success.equals(other$success)) { + break label71; + } + + return false; + } + + Object this$code = this.getCode(); + Object other$code = other.getCode(); + if (this$code == null) { + if (other$code != null) { + return false; + } + } else if (!this$code.equals(other$code)) { + return false; + } + + label57: { + Object this$message = this.getMessage(); + Object other$message = other.getMessage(); + if (this$message == null) { + if (other$message == null) { + break label57; + } + } else if (this$message.equals(other$message)) { + break label57; + } + + return false; + } + + Object this$localizedMsg = this.getLocalizedMsg(); + Object other$localizedMsg = other.getLocalizedMsg(); + if (this$localizedMsg == null) { + if (other$localizedMsg != null) { + return false; + } + } else if (!this$localizedMsg.equals(other$localizedMsg)) { + return false; + } + + Object this$data = this.getData(); + Object other$data = other.getData(); + if (this$data == null) { + if (other$data == null) { + return true; + } + } else if (this$data.equals(other$data)) { + return true; + } + + return false; + } + } + } + + protected boolean canEqual(final Object other) { + return other instanceof ResponseData; + } + + public int hashCode() { + int result1 = 1; + Object $success = this.getSuccess(); + int result = result1 * 59 + ($success == null ? 43 : $success.hashCode()); + Object $code = this.getCode(); + result = result * 59 + ($code == null ? 43 : $code.hashCode()); + Object $message = this.getMessage(); + result = result * 59 + ($message == null ? 43 : $message.hashCode()); + Object $localizedMsg = this.getLocalizedMsg(); + result = result * 59 + ($localizedMsg == null ? 43 : $localizedMsg.hashCode()); + Object $data = this.getData(); + result = result * 59 + ($data == null ? 43 : $data.hashCode()); + return result; + } + + public String toString() { + return "ResponseData(success=" + this.getSuccess() + ", code=" + this.getCode() + ", message=" + this.getMessage() + ", localizedMsg=" + this.getLocalizedMsg() + ", data=" + this.getData() + ")"; + } +} diff --git a/src/main/java/com/muyu/eventdriven/response/SuccessResponseData.java b/src/main/java/com/muyu/eventdriven/response/SuccessResponseData.java new file mode 100644 index 0000000..6cdc3f5 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/response/SuccessResponseData.java @@ -0,0 +1,29 @@ +package com.muyu.eventdriven.response; +/** + * @ClassName AsVehicleEvent + * @Description 正确返回结果封装 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ + +public class SuccessResponseData extends ResponseData { + public SuccessResponseData() { + super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", (Object)null); + } + + public SuccessResponseData(Object object) { + super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", object); + } + + public SuccessResponseData(Integer code, String message, Object object) { + super(true, code, message, message, object); + } + + public SuccessResponseData(Integer code, String message, String localizedMsg, Object object) { + super(true, code, message, localizedMsg, object); + } + + public SuccessResponseData(Integer code, String message) { + super(true, code, message); + } +} diff --git a/src/main/java/com/muyu/eventdriven/server/EventInfoService.java b/src/main/java/com/muyu/eventdriven/server/EventInfoService.java new file mode 100644 index 0000000..84a8105 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/server/EventInfoService.java @@ -0,0 +1,15 @@ +package com.muyu.eventdriven.server; + +import com.muyu.eventdriven.domain.rest.Result; + +/** + * @ClassName EventInfo + * @Description 事件接口 + * @Author Yunfei.Du + * @Date 2024/6/18 上午9:34 + */ +public interface EventInfoService { + void creatKafkaConsumer(String vin); + + Result closeKafkaConsumer(String vin); +} diff --git a/src/main/java/com/muyu/eventdriven/server/IotDbServer.java b/src/main/java/com/muyu/eventdriven/server/IotDbServer.java new file mode 100644 index 0000000..4a27326 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/server/IotDbServer.java @@ -0,0 +1,24 @@ +package com.muyu.eventdriven.server; + + +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.model.param.IotDbParam; + +/** + * @ClassName AsVehicleEvent + * @Description iotdb接口类 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ + +public interface IotDbServer { + /** + * 添加数据 + */ + void insertData(VehicleData vehicleData); + + /** + * 查询数据 + */ + Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception; +} diff --git a/src/main/java/com/muyu/eventdriven/server/impl/EventInfoServiceImpl.java b/src/main/java/com/muyu/eventdriven/server/impl/EventInfoServiceImpl.java new file mode 100644 index 0000000..62890dd --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/server/impl/EventInfoServiceImpl.java @@ -0,0 +1,127 @@ +package com.muyu.eventdriven.server.impl; + +import com.alibaba.fastjson2.JSON; +import com.muyu.eventdriven.consumer.KafkaConsumers; +import com.muyu.eventdriven.domain.EventTacticsManage; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.domain.VehicleKafka; +import com.muyu.eventdriven.domain.rest.Result; +import com.muyu.eventdriven.server.EventInfoService; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @ClassName EventInfoServiceImpl + * @Description 事件实现层 + * @Author Yunfei.Du + * @Date 2024/6/18 上午9:35 + */ +@Service +@Log4j2 +public class EventInfoServiceImpl implements EventInfoService { + private static Map kafkaConsumerMap = new HashMap<>(); + + private static List classNameList = new ArrayList<>((Arrays.asList( + "StorageEvent", + "ElectronicFenceEvent", + "FaultAlarmEvent", + "RealTimeDataEvent", + "IndexWarningEvent"))); + + @Autowired + private KafkaConsumers kafkaConsumers; + @Autowired + private RedisTemplate redisTemplate; + @Autowired + private EventTacticsManage eventTacticsManage; + @Autowired + private ApplicationContext applicationContext; + + /** + * 创建 Kafka 消费者实例,用于消费与特定车辆 VIN 相关联的 Kafka 主题。 + * + * @param vehicleVin 车辆的唯一识别码(VIN),用于确定要消费的主题。 + */ + @Override + public void creatKafkaConsumer(String vehicleVin) { + // 从 Redis 中获取车辆 Kafka 配置信息 + Object o = redisTemplate.opsForHash().get("vehicleKafka", vehicleVin); + VehicleKafka vehicleKafka = JSON.parseObject ( o.toString (), VehicleKafka.class ); + + // 将车辆 VIN 添加到 Redis 列表中,标记该 VIN 有活跃的消费者 + redisTemplate.opsForList().rightPush(vehicleVin,""); + // 设置 VIN 在 Redis 中的过期时间,避免消费者长时间不活跃但仍被保留 + redisTemplate.expire(vehicleVin, 10, TimeUnit.HOURS); + // 检查是否存在已为当前车辆配置的 Kafka 消费者 + if (!kafkaConsumerMap.containsKey(vehicleKafka.getConsumerName() + "-" + vehicleKafka.getPartitions())) { + // 根据车辆 Kafka 配置创建一个新的 Kafka 分区消费者 + KafkaConsumer kafkaConsumer = kafkaConsumers.kafkaConsumer(vehicleKafka); + // 将新创建的消费者添加到映射中,键为消费者名称和分区组合 + new Thread ( () -> { + kafkaConsumerMap.put(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions(),kafkaConsumer); + // 持续检查消费者映射中是否存在当前车辆的消费者,存在则处理消息 + while (true){ + // 使用 ConcurrentHashMap 来保证线程安全 + ConcurrentHashMap> stringListHashMap = new ConcurrentHashMap<>(); + // 从 Kafka 拉取消息 + // 拉取消息 + ConsumerRecords msg = kafkaConsumer.poll(Duration.ofSeconds(100)); + // 处理拉取到的消息,将消息按车辆事件类型分类 + stringListHashMap = getVehicleData(msg, stringListHashMap); + // 对于每种车辆事件类型,异步处理相关消息 + stringListHashMap.forEach((key,value) -> { + // 从 Redis 中获取车辆事件处理类的列表 +// String vehicleEventString = redisTemplate.opsForHash().get(RedisConstants.VEHICLE_EVENT, key).toString(); + String vehicleEventString = "1,2,3,4,5"; + for (String str : vehicleEventString.split(",")) { + CompletableFuture.runAsync(() -> { + if (vehicleEventString.contains ( str )){ + applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value); + } + + }); + } + }); + } + }).start (); + } + } + + public ConcurrentHashMap> getVehicleData(ConsumerRecords msg,ConcurrentHashMap> stringListHashMap) { + for (ConsumerRecord consumerRecord : msg) { + try { + VehicleData vehicleData = JSON.parseObject(consumerRecord.value(), VehicleData.class); + // 使用 compute 方法简化数据添加逻辑 + stringListHashMap.compute(vehicleData.getVin(), (vin, list) -> { + if (list == null) { + list = new ArrayList<>(); + } + list.add(vehicleData); + return list; + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + return stringListHashMap; + } + + @Override + public Result closeKafkaConsumer(String vin) { + redisTemplate.delete(vin); + return Result.success("释放消费者"); + } +} diff --git a/src/main/java/com/muyu/eventdriven/server/impl/IotDbServerImpl.java b/src/main/java/com/muyu/eventdriven/server/impl/IotDbServerImpl.java new file mode 100644 index 0000000..28d32d0 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/server/impl/IotDbServerImpl.java @@ -0,0 +1,111 @@ +package com.muyu.eventdriven.server.impl; + +import com.alibaba.fastjson.JSON; +import com.muyu.eventdriven.config.iotdb.IotDBSessionConfig; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.model.param.IotDbParam; +import com.muyu.eventdriven.model.result.IotDbResult; +import com.muyu.eventdriven.server.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @ClassName AsVehicleEvent + * @Description iotdb实现层 + * @Author Yunfei.Du + * @Date 2024/6/16 下午3:29 + */ + +@Log4j2 +@Service +public class IotDbServerImpl implements IotDbServer { + + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + @Override + public void insertData(VehicleData vehicleData){ + + try { + // iotDbParam: 模拟设备上报消息 + String deviceId = "root.vehicle."+ vehicleData.getVin(); + // 将设备上报的数据存入数据库(时序数据库) + List measurementsList = new ArrayList<>(); + measurementsList.add("data"); + List valuesList = new ArrayList<>(); + valuesList.add(String.valueOf(JSON.toJSON(vehicleData))); + iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicleData.getDrivingRoute()), measurementsList, valuesList); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public List queryDataFromIotDb(IotDbParam iotDbParam) throws Exception { + List iotDbResultList = new ArrayList<>(); + + if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) { + String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= " + + iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime(); + SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql); + List columnNames = sessionDataSet.getColumnNames(); + List titleList = new ArrayList<>(); + // 排除Time字段 -- 方便后面后面拼装数据 + for (int i = 1; i < columnNames.size(); i++) { + String[] temp = columnNames.get(i).split("\\."); + titleList.add(temp[temp.length - 1]); + } + // 封装处理数据 + packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList); + } else { + log.info("PK或者SN不能为空!!"); + } + return iotDbResultList; + } + /** + * 封装处理数据 + * @param iotDbParam + * @param iotDbResultList + * @param sessionDataSet + * @param titleList + * @throws StatementExecutionException + * @throws IoTDBConnectionException + */ + private void packagingData(IotDbParam iotDbParam, List iotDbResultList, SessionDataSet sessionDataSet, List titleList) + throws StatementExecutionException, IoTDBConnectionException { + int fetchSize = sessionDataSet.getFetchSize(); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + IotDbResult iotDbResult = new IotDbResult(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + iotDbResult.setTime(timeString); + Map map = new HashMap<>(); + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + // 这里的需要按照类型获取 + map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString()); + } + iotDbResult.setTime(timeString); + iotDbResult.setPk(iotDbParam.getPk()); + iotDbResult.setSn(iotDbParam.getSn()); + iotDbResult.setHeart(map.get("heart")); + iotDbResult.setBreath(map.get("breath")); + iotDbResultList.add(iotDbResult); + } + } + } +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/EventTactics.java b/src/main/java/com/muyu/eventdriven/tactics/EventTactics.java new file mode 100644 index 0000000..2f25414 --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/EventTactics.java @@ -0,0 +1,16 @@ +package com.muyu.eventdriven.tactics; + +import com.muyu.eventdriven.domain.VehicleData; + +import java.util.List; + +/** + * @ClassName EventTactics + * @Description 事件处理系统策略模式接口 + * @Author Yunfei.Du + * @Date 2024/6/20 上午10:33 + */ +public interface EventTactics { + void eventManage(VehicleData vehicleData); + void eventManage(String vin,List vehicleDataList); +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/basics/StorageEvent.java b/src/main/java/com/muyu/eventdriven/tactics/basics/StorageEvent.java new file mode 100644 index 0000000..70c6c1b --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/basics/StorageEvent.java @@ -0,0 +1,48 @@ +package com.muyu.eventdriven.tactics.basics; + +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.server.IotDbServer; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @ClassName StorageEvent + * @Description 存储事件 + * @Author Yunfei.Du + * @Date 2024/6/20 上午10:38 + */ +@Service("StorageEvent") +@Log4j2 +@Primary +public class StorageEvent implements EventTactics { + @Autowired + private IotDbServer iotDbServer; + + /** + * 单条数据的处理 + * @param vehicleData + */ + @Override + public void eventManage(VehicleData vehicleData) { + iotDbServer.insertData(vehicleData); + log.info("车辆{}执行存储事件",vehicleData.getVin()); + } + + /** + * 多条数据的处理 + * @param vin + * @param vehicleDataList + */ + @Override + public void eventManage(String vin, List vehicleDataList) { + log.info("车辆{}执行存储事件",vin); + vehicleDataList.forEach(vehicleData -> { + iotDbServer.insertData(vehicleData); + }); + } +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/system/ElectronicFenceEvent.java b/src/main/java/com/muyu/eventdriven/tactics/system/ElectronicFenceEvent.java new file mode 100644 index 0000000..771325c --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/system/ElectronicFenceEvent.java @@ -0,0 +1,37 @@ +package com.muyu.eventdriven.tactics.system; + +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @ClassName ElectronicFenceEvent + * @Description 电子围栏 + * @Author Yunfei.Du + * @Date 2024/6/20 上午10:42 + */ +@Service("ElectronicFenceEvent") +@Log4j2 +public class ElectronicFenceEvent implements EventTactics { + /** + * 单条数据的处理 + * @param vehicleData + */ + @Override + public void eventManage(VehicleData vehicleData) { + log.info("车辆{}执行电子围栏事件",vehicleData.getVin()); + } + + /** + * 多条数据的处理 + * @param vin + * @param vehicleDataList + */ + @Override + public void eventManage(String vin, List vehicleDataList) { + log.info("车辆{}执行电子围栏事件",vin); + } +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/system/FaultAlarmEvent.java b/src/main/java/com/muyu/eventdriven/tactics/system/FaultAlarmEvent.java new file mode 100644 index 0000000..85d122d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/system/FaultAlarmEvent.java @@ -0,0 +1,149 @@ +package com.muyu.eventdriven.tactics.system; + +import com.alibaba.fastjson.JSON; +import com.github.benmanes.caffeine.cache.Cache; +import com.muyu.eventdriven.constants.FaultCodeConstants; +import com.muyu.eventdriven.constants.RabbitConstants; +import com.muyu.eventdriven.constants.RedisConstants; +import com.muyu.eventdriven.domain.RabbitFalut; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.domain.VehicleFaultStatus; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * @ClassName FaultAlarmEvent + * @Description 故障报警 + * @Author Yunfei.Du + * @Date 2024/6/20 上午10:58 + */ +@Service("FaultAlarmEvent") +@Log4j2 +//@RequiredArgsConstructor +public class FaultAlarmEvent implements EventTactics { + @Autowired + private Cache caffeineCache; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private RabbitTemplate rabbitTemplate; + + + @Override + public void eventManage(VehicleData vehicleData) { + log.info("车辆{}执行故障报警事件",vehicleData.getVin()); + } + + @Override + public void eventManage(String vin, List vehicleDataList) { + vehicleDataList.stream().forEach(vehicleData -> { + //车辆状态 + if (vehicleData.getVehicleStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.VEHICLESTATUS); + } + //充电状态 + if (vehicleData.getChargingStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGSTATUS); + } + //运行状态 + if (vehicleData.getOperatingStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.OPERATINGSTATUS); + } + //SOC + if (vehicleData.getSocStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.SOCSTATUS); + } + //可充电储能装置工作状态 + if (vehicleData.getChargingEnergyStorageStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGENERGYSTORAGESTATUS); + } + //驱动电机状态 + if (vehicleData.getDriveMotorStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.DRIVEMOTORSTATUS); + } + //定位是否有效 + if (vehicleData.getPositionStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.POSITIONSTATUS); + } + //EAS(汽车防盗系统)状态 + if (vehicleData.getEasStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.EASSTATUS); + } + //PTC(电动加热器)状态 + if (vehicleData.getPtcStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.PTCSTATUS); + } + //EPS(电动助力系统)状态 + if (vehicleData.getEpsStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.EPSSTATUS); + } + //ABS(防抱死)状态 + if (vehicleData.getAbsStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.ABSSTATUS); + } + //MCU(电机/逆变器)状态 + if (vehicleData.getMcuStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.MCUSTATUS); + } + //动力电池加热状态 + if (vehicleData.getHeatingStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.HEATINGSTATUS); + } + //动力电池当前状态 + if (vehicleData.getBatteryStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.BATTERYSTATUS); + } + //动力电池保温状态 + if (vehicleData.getBatteryInsulationStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.BATTERYINSULATIONSTATUS); + } + //DCDC(电力交换系统)状态 + if (vehicleData.getDcdcStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.DCDCSTATUS); + } + //CHG(充电机)状态 + if (vehicleData.getChgStatus() == 0){ + hasLocalCache(vehicleData, FaultCodeConstants.CHGSTATUS); + } + + }); + log.info("车辆{}执行故障报警事件",vin); + } + + /** + * 检查车辆故障数据是否已在本地缓存中,并根据情况更新缓存和发送故障状态消息。 + * 如果缓存中已存在车辆的故障数据,则不进行任何操作。 + * 如果缓存中不存在车辆的故障数据,则将数据存入缓存,并发送车辆故障状态消息。 + * + * @param vehicleData 车辆数据,包含车辆vin和行驶路线等信息。 + * @param faultCode 故障代码,用于标识具体的故障类型。 + */ + public void hasLocalCache(VehicleData vehicleData,String faultCode){ + Object o = caffeineCache.get(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode, key -> vehicleData.getDrivingRoute()); + if (o.toString().equals(vehicleData.getDrivingRoute())){ + RabbitFalut rabbitFalut = new RabbitFalut(); + rabbitFalut.setStartTime(new Date()); + rabbitFalut.setVin(vehicleData.getVin()); + rabbitFalut.setFaultCode(faultCode); + rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.QUEUE_STATUS_NORMAL, JSON.toJSONString(rabbitFalut),message ->{ + message.getMessageProperties().setMessageId( UUID.randomUUID().toString()); + //设置消息延迟时间为5秒 + message.getMessageProperties().setDelay(5000); + log.info ( "消息发送成功" ); + return message; + }); + } + redisTemplate.opsForValue().set(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode,vehicleData.getDrivingRoute(),10, TimeUnit.SECONDS); + } +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/system/IndexWarningEvent.java b/src/main/java/com/muyu/eventdriven/tactics/system/IndexWarningEvent.java new file mode 100644 index 0000000..ea2ec0d --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/system/IndexWarningEvent.java @@ -0,0 +1,28 @@ +package com.muyu.eventdriven.tactics.system; + +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @ClassName IndexWarningEvent + * @Description 指标预警 + * @Author Yunfei.Du + * @Date 2024/6/20 上午11:00 + */ +@Service("IndexWarningEvent") +@Log4j2 +public class IndexWarningEvent implements EventTactics { + @Override + public void eventManage(VehicleData vehicleData) { + log.info("车辆{}执行指标预警事件",vehicleData.getVin()); + } + + @Override + public void eventManage(String vin, List vehicleDataList) { + log.info("车辆{}执行指标预警事件",vin); + } +} diff --git a/src/main/java/com/muyu/eventdriven/tactics/system/RealTimeDataEvent.java b/src/main/java/com/muyu/eventdriven/tactics/system/RealTimeDataEvent.java new file mode 100644 index 0000000..809004f --- /dev/null +++ b/src/main/java/com/muyu/eventdriven/tactics/system/RealTimeDataEvent.java @@ -0,0 +1,44 @@ +package com.muyu.eventdriven.tactics.system; + +import com.alibaba.fastjson.JSON; +import com.muyu.eventdriven.domain.VehicleData; +import com.muyu.eventdriven.tactics.EventTactics; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @ClassName RealTimeDataEvent + * @Description 实时数据 + * @Author Yunfei.Du + * @Date 2024/6/20 上午10:47 + */ +@Service("RealTimeDataEvent") +@Log4j2 +public class RealTimeDataEvent implements EventTactics { + @Autowired + private RedisTemplate redisTemplate; + + @Override + public void eventManage(VehicleData vehicleData) { + if (redisTemplate.hasKey(vehicleData.getVin())){ + log.info("{}监听到的消息内容: {}", vehicleData.getVin(),vehicleData); + redisTemplate.opsForList().rightPush(vehicleData.getVin(), JSON.toJSONString(vehicleData)); + } + log.info("车辆{}执行实时数据事件",vehicleData.getVin()); + } + + @Override + public void eventManage(String vin, List vehicleDataList) { + log.info("车辆{}执行实时数据事件",vin); + if (redisTemplate.hasKey(vin)){ + log.info("{}监听到的消息内容: {}", vin,vehicleDataList); + vehicleDataList.forEach(vehicleData -> { + redisTemplate.opsForList().rightPush(vin, JSON.toJSONString(vehicleData)); + }); + } + } +} diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..9ef9f96 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,51 @@ +server: + port: 9006 + +spring: + rabbitmq: + username: guest + password: guest + virtualHost: / + port: 5672 + host: 43.142.12.243 + kafka: + #config/consumer.properties配置的bootstrap.servers + bootstrap-servers: 43.142.12.243:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + #这个可以和config/consumer.properties里的group.id不同 + group-id: test-consumer-group + redis: + host: 127.0.0.1 + port: 6379 + password: dyf@123 + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://127.0.0.1:3306/event-driven?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + username: root + password: 1234 + + # mybatis配置 + mybatis: + # 搜索指定包别名 + typeAliasesPackage: com.muyu.eventdriven.domain + # 配置mapper的扫描,找到所有的mapper.xml映射文件 + mapperLocations: classpath:mapper/**/*.xml + configuration: + map-underscore-to-camel-case: true + # 将mapper接口所在包的日志级别改成debug,可以在控制台打印sql + logging: + level: + com.bwie.**: debug + application: + name: event-driven + iotdb: + username: root + password: root + ip: 43.142.12.243 + port: 6667 + fetchSize: 10000 diff --git a/src/main/resources/mapper/EventDrivenMapper.xml b/src/main/resources/mapper/EventDrivenMapper.xml new file mode 100644 index 0000000..622f457 --- /dev/null +++ b/src/main/resources/mapper/EventDrivenMapper.xml @@ -0,0 +1,5 @@ + + + + +