From 05bc3254348bdac32847ef2694f9d3a505f0434e Mon Sep 17 00:00:00 2001
From: Liu Wu <2780205363@qq.com>
Date: Sun, 29 Sep 2024 10:55:32 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86=E6=A8=A1?=
=?UTF-8?q?=E5=9D=97=E5=88=9D=E5=A7=8B=E5=8C=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-modules/cloud-event/pom.xml | 138 ++++++++++++++++++
.../java/com/muyu/event/EventApplication.java | 13 ++
.../com/muyu/event/basics/EventHandler.java | 25 ++++
.../muyu/event/basics/EventProcessBasics.java | 25 ++++
.../muyu/event/basics/EventQueueConfig.java | 35 +++++
.../com/muyu/event/basics/StartEvent.java | 28 ++++
.../com/muyu/event/config/IoTDBConfig.java | 128 ++++++++++++++++
.../event/config/KafkaConsumerConfig.java | 129 ++++++++++++++++
.../event/config/KafkaProviderConfig.java | 127 ++++++++++++++++
.../event/config/KafkaSendResultHandler.java | 65 +++++++++
.../config/MyKafkaListenerErrorHandler.java | 34 +++++
.../muyu/event/constant/EventConstant.java | 15 ++
.../muyu/event/consumer/KafkaConsumer.java | 31 ++++
.../event/controller/IoTDBController.java | 74 ++++++++++
.../muyu/event/domian/AddCarInformation.java | 28 ++++
.../com/muyu/event/domian/CarInformation.java | 45 ++++++
.../java/com/muyu/event/domian/Event.java | 42 ++++++
.../com/muyu/event/domian/EventActuate.java | 28 ++++
.../AutoStartupEventListener.java | 25 ++++
.../muyu/event/eventDispose/StorageEvent.java | 37 +++++
.../com/muyu/event/service/IoTDBService.java | 22 +++
.../event/service/impl/IoTDBServiceImpl.java | 51 +++++++
.../main/java/com/muyu/event/util/Iotdb.java | 66 +++++++++
.../java/com/muyu/event/util/Receive.java | 53 +++++++
.../cloud-event/src/main/resources/banner.txt | 2 +
.../src/main/resources/bootstrap.yml | 64 ++++++++
.../src/main/resources/logback/dev.xml | 74 ++++++++++
.../src/main/resources/logback/prod.xml | 88 +++++++++++
.../src/main/resources/logback/test.xml | 88 +++++++++++
.../cloud-modules-vehiclegateway/pom.xml | 4 +
cloud-modules/cloud-modules-wechat/pom.xml | 4 +
cloud-modules/pom.xml | 1 +
cloud-modules/saas/pom.xml | 4 +
33 files changed, 1593 insertions(+)
create mode 100644 cloud-modules/cloud-event/pom.xml
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaProviderConfig.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaSendResultHandler.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/config/MyKafkaListenerErrorHandler.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/constant/EventConstant.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/KafkaConsumer.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/AddCarInformation.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/CarInformation.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/Event.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/EventActuate.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/AutoStartupEventListener.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/StorageEvent.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IoTDBService.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IoTDBServiceImpl.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Iotdb.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Receive.java
create mode 100644 cloud-modules/cloud-event/src/main/resources/banner.txt
create mode 100644 cloud-modules/cloud-event/src/main/resources/bootstrap.yml
create mode 100644 cloud-modules/cloud-event/src/main/resources/logback/dev.xml
create mode 100644 cloud-modules/cloud-event/src/main/resources/logback/prod.xml
create mode 100644 cloud-modules/cloud-event/src/main/resources/logback/test.xml
diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml
new file mode 100644
index 0000000..eb13417
--- /dev/null
+++ b/cloud-modules/cloud-event/pom.xml
@@ -0,0 +1,138 @@
+
+
+ 4.0.0
+
+ com.muyu
+ cloud-modules
+ 3.6.3
+
+
+ cloud-event
+
+
+ 17
+ 17
+ UTF-8
+
+
+
+ cloud-event 数据处理
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-bootstrap
+ 4.1.2
+
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-discovery
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-config
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-sentinel
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
+
+ com.mysql
+ mysql-connector-j
+
+
+
+
+ com.muyu
+ cloud-common-datasource
+
+
+
+
+ com.muyu
+ cloud-common-datascope
+
+
+
+
+ com.muyu
+ cloud-common-log
+
+
+
+
+ com.muyu
+ cloud-common-api-doc
+
+
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.1.0
+
+
+
+
+ org.apache.iotdb
+ iotdb-session
+ 1.3.2
+
+
+ org.apache.iotdb
+ node-commons
+ 1.3.2
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
+
+
+
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java
new file mode 100644
index 0000000..061c7d7
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java
@@ -0,0 +1,13 @@
+package com.muyu.event;
+
+import com.muyu.common.security.annotation.EnableMyFeignClients;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+@EnableMyFeignClients
+public class EventApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(EventApplication.class,args);
+ }
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java
new file mode 100644
index 0000000..85b3488
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java
@@ -0,0 +1,25 @@
+package com.muyu.event.basics;
+
+/**
+ * @author 刘武
+ * @package:
+ * @name:EventHandler
+ * @date:2024/9/29
+ */
+public class EventHandler {
+
+ private static final ThreadLocal EVENT_THREAD = new ThreadLocal<>();
+
+ public static void set(final EventQueueConfig handler) {
+ EVENT_THREAD.set(handler);
+ }
+
+ public static EventQueueConfig get() {
+ return EVENT_THREAD.get();
+ }
+
+ public static void remove(){
+ EVENT_THREAD.remove();
+ }
+
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java
new file mode 100644
index 0000000..98b7684
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java
@@ -0,0 +1,25 @@
+package com.muyu.event.basics;
+
+
+public abstract class EventProcessBasics {
+
+ /**
+ * 下一个事件对象
+ */
+ protected EventProcessBasics nextEvent;
+
+ /**
+ * 下一个事件
+ * @param nextHandler 下一个事件处理
+ */
+ public void setNextHandler(EventProcessBasics nextHandler) {
+ this.nextEvent = nextHandler;
+ }
+
+ /**
+ * 事件处理抽象类
+ * @param eventKey 事件唯一key
+ */
+ public abstract void handleEvent(String eventKey);
+
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java
new file mode 100644
index 0000000..daf4bb2
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java
@@ -0,0 +1,35 @@
+package com.muyu.event.basics;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * @author 刘武
+ * @package:
+ * @name:EventQueueConfig
+ * @date:2024/9/29
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class EventQueueConfig {
+
+ private LinkedBlockingDeque taskNodeQueue = new LinkedBlockingDeque<>();
+
+ public void addEvent(EventProcessBasics obj){
+ this.taskNodeQueue.add(obj);
+ }
+
+ public boolean hashEventNext(){
+ return !taskNodeQueue.isEmpty();
+ }
+
+ private EventProcessBasics nextTaskNode(){
+ return taskNodeQueue.poll();
+ }
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java
new file mode 100644
index 0000000..74d882b
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java
@@ -0,0 +1,28 @@
+package com.muyu.event.basics;
+
+
+import com.muyu.event.domian.EventActuate;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.List;
+
+/**
+ * @author 刘武
+ * @package:
+ * @name:StartEvent
+ * @date:2024/9/29
+ */
+
+public class StartEvent extends ApplicationEvent {
+
+ private EventActuate eventActuate;
+
+ public StartEvent(EventActuate source) {
+ super(source);
+ this.eventActuate = source;
+ }
+
+ public EventActuate getEventActuate() {
+ return eventActuate;
+ }
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java
new file mode 100644
index 0000000..eb71267
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java
@@ -0,0 +1,128 @@
+package com.muyu.event.config;
+
+import com.muyu.event.domian.CarInformation;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * iotdb数据库 配置
+ * @author 刘武
+ * @package:com.muyu.event.config
+ * @name:IOTDBConfig
+ * @date:2024/9/28 19:35
+ */
+@Configuration
+public class IoTDBConfig {
+
+ public static final String HOST="47.116.173.119";
+ public static final Integer PORT=6667;
+ public static final String USERNAME="root";
+ public static final String PASSWORD="root";
+ public static final String TABLENAME="root.four.car_information";
+
+
+ /**
+ * 初始化连接
+ * @return
+ */
+ public Session initIoTDB(){
+ // 初始化与连接
+ Session session = new Session.Builder()
+ .host(HOST)
+ .port(PORT)
+ .username(USERNAME)
+ .password(PASSWORD)
+ .version(Version.V_1_0)
+ .build();
+ return session;
+ };
+
+
+ /**
+ * 实时数据库添加
+ * @param list
+ */
+ public void insertIoTDB(List list){
+ Session session = initIoTDB();
+ // 开启session Rpc不压缩
+ try {
+ session.open(false);
+
+ //添加字段名称
+ ArrayList measurements = new ArrayList<>();
+ //添加字段类型
+ ArrayList types = new ArrayList<>();
+
+ measurements.add("car_vin");
+ measurements.add("information");
+
+
+ session.insertRecord(TABLENAME,System.currentTimeMillis(),measurements,list);
+
+ //关闭连接
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (StatementExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public CarInformation queryIoTDB(String carVin) {
+ Session session = initIoTDB();
+ try {
+ session.open(false);
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ Long timeMillis = System.currentTimeMillis();
+
+ CarInformation carInformation = new CarInformation();
+
+ try(SessionDataSet dataSet= session.executeQueryStatement("select * from root.four.car_information where car_vin='"+carVin+"'")){
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024);
+ while (dataSet.hasNext()){
+// List fields = dataSet.next().getFields();
+// carInformation.setCarVin(String.valueOf(fields.get(0)));
+// carInformation.setInformation(String.valueOf(fields.get(1)));
+
+ String[] fields = dataSet.next().toString().split("\t");
+ carInformation.setTime(Long.valueOf(fields[0]));
+ carInformation.setCarVin(fields[1]);
+ carInformation.setInformation(fields[2]);
+ }
+
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (StatementExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ //关闭连接
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ return carInformation;
+ };
+
+
+
+
+
+
+
+
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..36610d7
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java
@@ -0,0 +1,129 @@
+package com.muyu.event.config;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author 徐一杰
+ * @date 2022/10/31 18:05
+ * kafka配置,也可以写在yml,这个文件会覆盖yml
+ */
+@SpringBootConfiguration
+public class KafkaConsumerConfig {
+
+ /**
+ * 配置 Kafka的 主机地址
+ */
+ @Value("${spring.kafka.consumer.bootstrap-servers}")
+ private String bootstrapServers;
+ /**
+ * 配置分分组
+ */
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+ /**
+ * 是否自动提交 偏移量
+ */
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
+ private boolean enableAutoCommit;
+ /**
+ * 消费者与Kafka的心跳续约的会话超时时间
+ */
+ @Value("${spring.kafka.properties.session.timeout.ms}")
+ private String sessionTimeout;
+ /**
+ * 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ */
+ @Value("${spring.kafka.properties.max.poll.interval.ms}")
+ private String maxPollIntervalTime;
+
+ @Value("${spring.kafka.consumer.max-poll-records}")
+ private String maxPollRecords;
+
+ @Value("${spring.kafka.consumer.auto-offset-reset}")
+ private String autoOffsetReset;
+
+ @Value("${spring.kafka.listener.concurrency}")
+ private Integer concurrency;
+
+ @Value("${spring.kafka.listener.missing-topics-fatal}")
+ private boolean missingTopicsFatal;
+
+ @Value("${spring.kafka.listener.poll-timeout}")
+ private long pollTimeout;
+
+ @Bean
+ public Map consumerConfigs() {
+ Map propsMap = new HashMap<>(16);
+ propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+ propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+ //自动提交的时间间隔,自动提交开启时生效
+ propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
+ //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+ //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
+ //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
+ //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+ propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+ //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
+ //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
+ //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
+ //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
+ //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
+ //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
+ //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
+ propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+ //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
+ propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
+ //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ return propsMap;
+ }
+
+ @Bean
+ public ConsumerFactory