diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml new file mode 100644 index 0000000..eb13417 --- /dev/null +++ b/cloud-modules/cloud-event/pom.xml @@ -0,0 +1,138 @@ + + + 4.0.0 + + com.muyu + cloud-modules + 3.6.3 + + + cloud-event + + + 17 + 17 + UTF-8 + + + + cloud-event 数据处理 + + + + + + org.springframework.cloud + spring-cloud-starter-bootstrap + 4.1.2 + + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + org.springframework.kafka + spring-kafka + + + + + + com.mysql + mysql-connector-j + + + + + com.muyu + cloud-common-datasource + + + + + com.muyu + cloud-common-datascope + + + + + com.muyu + cloud-common-log + + + + + com.muyu + cloud-common-api-doc + + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.1.0 + + + + + org.apache.iotdb + iotdb-session + 1.3.2 + + + org.apache.iotdb + node-commons + 1.3.2 + + + org.springframework.kafka + spring-kafka + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java new file mode 100644 index 0000000..061c7d7 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/EventApplication.java @@ -0,0 +1,13 @@ +package com.muyu.event; + +import com.muyu.common.security.annotation.EnableMyFeignClients; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableMyFeignClients +public class EventApplication { + public static void main(String[] args) { + SpringApplication.run(EventApplication.class,args); + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java new file mode 100644 index 0000000..85b3488 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventHandler.java @@ -0,0 +1,25 @@ +package com.muyu.event.basics; + +/** + * @author 刘武 + * @package: + * @name:EventHandler + * @date:2024/9/29 + */ +public class EventHandler { + + private static final ThreadLocal EVENT_THREAD = new ThreadLocal<>(); + + public static void set(final EventQueueConfig handler) { + EVENT_THREAD.set(handler); + } + + public static EventQueueConfig get() { + return EVENT_THREAD.get(); + } + + public static void remove(){ + EVENT_THREAD.remove(); + } + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java new file mode 100644 index 0000000..98b7684 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventProcessBasics.java @@ -0,0 +1,25 @@ +package com.muyu.event.basics; + + +public abstract class EventProcessBasics { + + /** + * 下一个事件对象 + */ + protected EventProcessBasics nextEvent; + + /** + * 下一个事件 + * @param nextHandler 下一个事件处理 + */ + public void setNextHandler(EventProcessBasics nextHandler) { + this.nextEvent = nextHandler; + } + + /** + * 事件处理抽象类 + * @param eventKey 事件唯一key + */ + public abstract void handleEvent(String eventKey); + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java new file mode 100644 index 0000000..daf4bb2 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/EventQueueConfig.java @@ -0,0 +1,35 @@ +package com.muyu.event.basics; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.concurrent.LinkedBlockingDeque; + +/** + * @author 刘武 + * @package: + * @name:EventQueueConfig + * @date:2024/9/29 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class EventQueueConfig { + + private LinkedBlockingDeque taskNodeQueue = new LinkedBlockingDeque<>(); + + public void addEvent(EventProcessBasics obj){ + this.taskNodeQueue.add(obj); + } + + public boolean hashEventNext(){ + return !taskNodeQueue.isEmpty(); + } + + private EventProcessBasics nextTaskNode(){ + return taskNodeQueue.poll(); + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java new file mode 100644 index 0000000..74d882b --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basics/StartEvent.java @@ -0,0 +1,28 @@ +package com.muyu.event.basics; + + +import com.muyu.event.domian.EventActuate; +import org.springframework.context.ApplicationEvent; + +import java.util.List; + +/** + * @author 刘武 + * @package: + * @name:StartEvent + * @date:2024/9/29 + */ + +public class StartEvent extends ApplicationEvent { + + private EventActuate eventActuate; + + public StartEvent(EventActuate source) { + super(source); + this.eventActuate = source; + } + + public EventActuate getEventActuate() { + return eventActuate; + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java new file mode 100644 index 0000000..eb71267 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/IoTDBConfig.java @@ -0,0 +1,128 @@ +package com.muyu.event.config; + +import com.muyu.event.domian.CarInformation; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; + +/** + * iotdb数据库 配置 + * @author 刘武 + * @package:com.muyu.event.config + * @name:IOTDBConfig + * @date:2024/9/28 19:35 + */ +@Configuration +public class IoTDBConfig { + + public static final String HOST="47.116.173.119"; + public static final Integer PORT=6667; + public static final String USERNAME="root"; + public static final String PASSWORD="root"; + public static final String TABLENAME="root.four.car_information"; + + + /** + * 初始化连接 + * @return + */ + public Session initIoTDB(){ + // 初始化与连接 + Session session = new Session.Builder() + .host(HOST) + .port(PORT) + .username(USERNAME) + .password(PASSWORD) + .version(Version.V_1_0) + .build(); + return session; + }; + + + /** + * 实时数据库添加 + * @param list + */ + public void insertIoTDB(List list){ + Session session = initIoTDB(); + // 开启session Rpc不压缩 + try { + session.open(false); + + //添加字段名称 + ArrayList measurements = new ArrayList<>(); + //添加字段类型 + ArrayList types = new ArrayList<>(); + + measurements.add("car_vin"); + measurements.add("information"); + + + session.insertRecord(TABLENAME,System.currentTimeMillis(),measurements,list); + + //关闭连接 + session.close(); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } + } + + public CarInformation queryIoTDB(String carVin) { + Session session = initIoTDB(); + try { + session.open(false); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + Long timeMillis = System.currentTimeMillis(); + + CarInformation carInformation = new CarInformation(); + + try(SessionDataSet dataSet= session.executeQueryStatement("select * from root.four.car_information where car_vin='"+carVin+"'")){ + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); + while (dataSet.hasNext()){ +// List fields = dataSet.next().getFields(); +// carInformation.setCarVin(String.valueOf(fields.get(0))); +// carInformation.setInformation(String.valueOf(fields.get(1))); + + String[] fields = dataSet.next().toString().split("\t"); + carInformation.setTime(Long.valueOf(fields[0])); + carInformation.setCarVin(fields[1]); + carInformation.setInformation(fields[2]); + } + + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } + + //关闭连接 + try { + session.close(); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + return carInformation; + }; + + + + + + + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..36610d7 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaConsumerConfig.java @@ -0,0 +1,129 @@ +package com.muyu.event.config; + + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.serializer.JsonDeserializer; + + +import java.util.HashMap; +import java.util.Map; + +/** + * @author 徐一杰 + * @date 2022/10/31 18:05 + * kafka配置,也可以写在yml,这个文件会覆盖yml + */ +@SpringBootConfiguration +public class KafkaConsumerConfig { + + /** + * 配置 Kafka的 主机地址 + */ + @Value("${spring.kafka.consumer.bootstrap-servers}") + private String bootstrapServers; + /** + * 配置分分组 + */ + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + /** + * 是否自动提交 偏移量 + */ + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean enableAutoCommit; + /** + * 消费者与Kafka的心跳续约的会话超时时间 + */ + @Value("${spring.kafka.properties.session.timeout.ms}") + private String sessionTimeout; + /** + * 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + */ + @Value("${spring.kafka.properties.max.poll.interval.ms}") + private String maxPollIntervalTime; + + @Value("${spring.kafka.consumer.max-poll-records}") + private String maxPollRecords; + + @Value("${spring.kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + + @Value("${spring.kafka.listener.concurrency}") + private Integer concurrency; + + @Value("${spring.kafka.listener.missing-topics-fatal}") + private boolean missingTopicsFatal; + + @Value("${spring.kafka.listener.poll-timeout}") + private long pollTimeout; + + @Bean + public Map consumerConfigs() { + Map propsMap = new HashMap<>(16); + propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + //自动提交的时间间隔,自动提交开启时生效 + propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); + //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); + //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); + //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + return propsMap; + } + + @Bean + public ConsumerFactory consumerFactory() { + // 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 + try (JsonDeserializer deserializer = new JsonDeserializer<>()) { + deserializer.trustedPackages("*"); + return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); + } + } + + /** + * kafka监听容器工厂 负责 从 Kafka的主题中 取出消息进行消费 可以设置消费者的配置 + * @return + */ + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + factory.setConcurrency(concurrency); + // 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 + factory.setMissingTopicsFatal(missingTopicsFatal); + // 自动提交关闭,需要设置手动消息确认 + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.getContainerProperties().setPollTimeout(pollTimeout); + // 设置为批量监听,需要用List接收 + // factory.setBatchListener(true); + return factory; + } + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaProviderConfig.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaProviderConfig.java new file mode 100644 index 0000000..1fdcf05 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaProviderConfig.java @@ -0,0 +1,127 @@ +package com.muyu.event.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +import java.util.HashMap; +import java.util.Map; + +/** + * 主题生产者的配置类 + */ +@Configuration +public class KafkaProviderConfig { + + /** + * kafka 的主机地址 + */ + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + /** + * 配置 Kafka的事务 + */ + @Value("${spring.kafka.producer.transaction-id-prefix}") + private String transactionIdPrefix; + /** + * 发送确认机制 + */ + @Value("${spring.kafka.producer.acks}") + private String acks; + /** + * 发送重试 + */ + @Value("${spring.kafka.producer.retries}") + private String retries; + /** + * 发送消息的批次大小 + */ + @Value("${spring.kafka.producer.batch-size}") + private String batchSize; + /** + * 消息的缓冲区内存大小 + */ + @Value("${spring.kafka.producer.buffer-memory}") + private String bufferMemory; + + /** + * 设置 健的序列化方式 + */ + @Value("${spring.kafka.producer.key-serializer}") + private String keySerializer; + + /** + * 设置 值的序列化方式 + */ + @Value("${spring.kafka.producer.value-serializer}") + private String valueSerializer; + + /** + * 构建 map 配置消息生产者对象的配置 + * @return + */ + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(16); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + //开启事务必须设为all + props.put(ProducerConfig.ACKS_CONFIG, acks); + //发生错误后,消息重发的次数,开启事务必须大于0 + props.put(ProducerConfig.RETRIES_CONFIG, retries); + //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 + //批次的大小可以通过batch.size 参数设置.默认是16KB + //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 + //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 + //实测batchSize这个参数没有用 + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, + //即使数据没达到16KB,也将这个批次发送出去 + props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); + //生产者内存缓冲区的大小 + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + //反序列化,和生产者的序列化方式对应 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + return props; + } + + /** + * 构建 主题生产者工厂 + * @return + */ + @Bean + public ProducerFactory producerFactory() { + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); + //开启事务,会导致 LINGER_MS_CONFIG 配置失效 + factory.setTransactionIdPrefix(transactionIdPrefix); + return factory; + } + + /** + * 配置 Kafka的事务管理器 + * @param producerFactory + * @return + */ + @Bean + public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); + } + + /** + * 构建 KafkaTemplate + * @return + */ + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaSendResultHandler.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaSendResultHandler.java new file mode 100644 index 0000000..c0fa863 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/KafkaSendResultHandler.java @@ -0,0 +1,65 @@ +package com.muyu.event.config; + +import jakarta.annotation.Nullable; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Autowired + private KafkaTemplate kafkaTemplate; + + /** + * bean 初始化方法 + */ + @PostConstruct + public void init(){ + this.kafkaTemplate.setProducerListener(this); + } + + /** + * 消息发送到Kafka成功的回调 + * @param producerRecord + * @param recordMetadata + */ + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata){ + System.out.println("信息发送成功:"+ producerRecord.toString()); + } + + /** + * 息发送到 Kafka 失败的回调 + * @param producerRecord the failed record + * @param recordMetadata The metadata for the record that was sent (i.e. the partition + * and offset). If an error occurred, metadata will contain only valid topic and maybe + * the partition. If the partition is not provided in the ProducerRecord and an error + * occurs before partition is assigned, then the partition will be set to + * RecordMetadata.UNKNOWN_PARTITION. + * @param exception the exception thrown + */ + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, + Exception exception){ + System.out.println("消息发送失败: "+ producerRecord.toString()); + } + + + + + + + + + + + + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/MyKafkaListenerErrorHandler.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/MyKafkaListenerErrorHandler.java new file mode 100644 index 0000000..7c72f48 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/config/MyKafkaListenerErrorHandler.java @@ -0,0 +1,34 @@ +package com.muyu.event.config; + + +import lombok.NonNull; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Component +public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { + + @Override + @NonNull + public Object handleError(@NonNull Message message, + ListenerExecutionFailedException exception) { + return new Object(); + } + + @Override + @NonNull + public Object handleError(@NonNull Message message, + @NonNull ListenerExecutionFailedException exception, + Consumer consumer) { + System.out.println("消息详情:"+ message); + System.out.println("异常信息:"+ exception); + System.out.println("消费者详情:" +consumer.groupMetadata()); + System.out.println("监听主题:"+ consumer.listTopics()); + return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); + } + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/constant/EventConstant.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/constant/EventConstant.java new file mode 100644 index 0000000..fa152ac --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/constant/EventConstant.java @@ -0,0 +1,15 @@ +package com.muyu.event.constant; + + +/** + * 事件常量 + * @author 刘武 + * @package:com.muyu.event.constant + * @name:EventConstant + * @date:2024/9/28 19:25 + */ + +public interface EventConstant { + + String STORAGE_EVENT = "storageEvent"; +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/KafkaConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/KafkaConsumer.java new file mode 100644 index 0000000..6fe4eb6 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/KafkaConsumer.java @@ -0,0 +1,31 @@ +package com.muyu.event.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; + +/** + * kafka监听 + * @author 刘武 + * @package:com.muyu.event.consumer + * @name:KafkaConsumer + * @date:2024/9/28 23:34 + */ + +public class KafkaConsumer { + + + @KafkaListener(topics = "data") + public void dataKafkaConsumer(ConsumerRecord consumerRecord, Acknowledgment acknowledgment){ + Object key = consumerRecord.key(); + Object value = consumerRecord.value(); + + //事件调用 + + + //消息确认消费 + acknowledgment.acknowledge(); + } + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java new file mode 100644 index 0000000..3fa091c --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java @@ -0,0 +1,74 @@ +package com.muyu.event.controller; + +import com.muyu.common.core.domain.Result; +import com.muyu.event.domian.AddCarInformation; +import com.muyu.event.domian.CarInformation; +import com.muyu.event.service.IoTDBService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * iotdb数据CRUD + * @author 刘武 + * @package:com.muyu.event.controller + * @name:ItodbController + * @date:2024/9/28 19:17 + */ +@RestController() +public class IoTDBController { + + @Autowired + private IoTDBService tdbService; + + + /** + * 查询实时车辆信息列表 + * @return list + */ + @GetMapping("findIoTDBList") + private Result> findIoTDBList(){ + List list=tdbService.findIoTDBList(); + return Result.success(list); + }; + + /** + * 根据车辆vin 查询车辆实时信息 + * @param carVin + * @return + */ + @GetMapping("findIoTDBByVin") + public Result findIoTDBByVin(@RequestParam("carVin") String carVin){ + CarInformation carInformation=tdbService.findIoTDBByVin(carVin); + return Result.success(carInformation); + }; + + + /** + * 车辆添加 + * @param addCarInformation + * @return + */ + @GetMapping("insertIoTDB") + public Result insertIoTDB(@RequestBody AddCarInformation addCarInformation){ + tdbService.insertIoTDB(addCarInformation); + return Result.success("添加成功"); + }; + + + + + + + + + + + + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/AddCarInformation.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/AddCarInformation.java new file mode 100644 index 0000000..de8df2f --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/AddCarInformation.java @@ -0,0 +1,28 @@ +package com.muyu.event.domian; + +import com.muyu.common.core.annotation.Excel; +import lombok.Data; + +/** + * 数据添加实体类 + * @author 刘武 + * @package:com.muyu.event.domian + * @name:AddCarInformation + * @date:2024/9/28 23:10 + */ + +@Data +public class AddCarInformation { + /** + * 车辆vin码 + */ + @Excel(name = "车辆vin码") + private String carVin; + + /** + * 车辆vin码 + */ + @Excel(name = "车辆实时信息") + private String information; + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/CarInformation.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/CarInformation.java new file mode 100644 index 0000000..86dd53f --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/CarInformation.java @@ -0,0 +1,45 @@ +package com.muyu.event.domian; + +import com.alibaba.fastjson.JSON; +import com.muyu.common.core.annotation.Excel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 车辆实时信息实体类 + * @author 刘武 + * @package:com.muyu.event.domian + * @name:CarInformation + * @date:2024/9/28 19:25 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class CarInformation { + + /** + * 时间戳 + */ + @Excel(name = "时间戳") + private Long Time; + + /** + * 车辆vin码 + */ + @Excel(name = "车辆vin码") + private String carVin; + + /** + * 车辆实时信息 + */ + @Excel(name = "车辆实时信息") + private String information; + + + + + + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/Event.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/Event.java new file mode 100644 index 0000000..8111375 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/Event.java @@ -0,0 +1,42 @@ +package com.muyu.event.domian; + + +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.muyu.common.core.annotation.Excel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + + +@Data +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +@TableName("event") +public class Event { + + @TableId(value ="event_id" ) + @Excel(name = "事件id") + private Integer eventId; + + @Excel(name = "事件名称") + private String eventName; + + @Excel(name = "车辆vin") + private String carVin; + + private String createBy; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/EventActuate.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/EventActuate.java new file mode 100644 index 0000000..9ccd57e --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/domian/EventActuate.java @@ -0,0 +1,28 @@ +package com.muyu.event.domian; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.List; + +/** + * 事件驱动对象 + * @Author 刘武 + * @Data 2024/9/29 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class EventActuate { + /** + * json数据 + */ + private String jsonData; + /** + * 事件驱动key集合 + */ + private List eventKeys; +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/AutoStartupEventListener.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/AutoStartupEventListener.java new file mode 100644 index 0000000..6eec2db --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/AutoStartupEventListener.java @@ -0,0 +1,25 @@ +package com.muyu.event.eventDispose; + + +import com.muyu.event.basics.StartEvent; +import com.muyu.event.domian.EventActuate; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +/** + * @author 刘武 + * @package: + * @name:AutoStartupEventListener + * @date:2024/9/29 + */ +@Component +public class AutoStartupEventListener implements ApplicationListener { + + @Override + public void onApplicationEvent(StartEvent event) { + + EventActuate eventActuate = event.getEventActuate(); + + } + +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/StorageEvent.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/StorageEvent.java new file mode 100644 index 0000000..70d6e79 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/eventDispose/StorageEvent.java @@ -0,0 +1,37 @@ +package com.muyu.event.eventDispose; + + +import com.muyu.event.basics.EventProcessBasics; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.log4j.Log4j2; + +/** + * @author 刘武 + * @package: + * @name:StorageEvent + * @date:2024/9/29 + */ +@EqualsAndHashCode(callSuper = true) +@Log4j2 +@Data +@AllArgsConstructor +public class StorageEvent extends EventProcessBasics { + /** + * 事件名称 + */ + private String eventName; + + @Override + public void handleEvent(String eventKey) { + if (eventKey.equals(eventName)){ + log.info("开始执行 [{}] 事件", eventKey); + + }else if (nextEvent != null){ + nextEvent.handleEvent(eventKey); + }else { + log.info("处理结束,最后处理的事件为 [{}]", eventKey); + } + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IoTDBService.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IoTDBService.java new file mode 100644 index 0000000..1c8ef3f --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IoTDBService.java @@ -0,0 +1,22 @@ +package com.muyu.event.service; + +import com.muyu.event.domian.AddCarInformation; +import com.muyu.event.domian.CarInformation; + +import java.util.List; + +/** + * @author 刘武 + * @package:com.muyu.event.service + * @name:IotdbService + * @date:2024/9/28 19:19 + */ +public interface IoTDBService { + + + List findIoTDBList(); + + CarInformation findIoTDBByVin(String carVin); + + void insertIoTDB(AddCarInformation addCarInformation); +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IoTDBServiceImpl.java new file mode 100644 index 0000000..9ed111c --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IoTDBServiceImpl.java @@ -0,0 +1,51 @@ +package com.muyu.event.service.impl; + +import com.muyu.event.config.IoTDBConfig; +import com.muyu.event.domian.AddCarInformation; +import com.muyu.event.domian.CarInformation; +import com.muyu.event.service.IoTDBService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.sql.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * @author 刘武 + * @package:com.muyu.event.service + * @name:IotdbService + * @date:2024/9/28 19:19 + */ +@Service +public class IoTDBServiceImpl implements IoTDBService { + + @Autowired + private IoTDBConfig ioTDBConfig; + + + @Override + public List findIoTDBList() { + + + + return null; + } + + @Override + public CarInformation findIoTDBByVin(String carVin) { + CarInformation carInformation = ioTDBConfig.queryIoTDB(carVin); + return carInformation; + } + + @Override + public void insertIoTDB(AddCarInformation addCarInformation) { + ArrayList list = new ArrayList<>(); + if (addCarInformation!=null){ + list.add(addCarInformation.getCarVin()); + list.add(addCarInformation.getInformation()); + } + + ioTDBConfig.insertIoTDB(list); + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Iotdb.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Iotdb.java new file mode 100644 index 0000000..4fae5e3 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Iotdb.java @@ -0,0 +1,66 @@ +package com.muyu.event.util; + +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.util.ArrayList; +import java.util.List; + +public class Iotdb { + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + + System.out.println("测试数据库开始~~~~~"); + + // 初始化与连接 + Session session = new Session.Builder() + .host("47.116.173.119") + .port(6667) + .username("root") + .password("root") + .version(Version.V_1_0) + .build(); + + // 开启session Rpc不压缩 + session.open(false); + + // 写入数据 + ArrayList list = new ArrayList<>(); + list.add(100); + insertRecord(session,list); + + //查询数据 + queryRecord(session); + + //关闭连接 + session.close(); + + } + + + private static void insertRecord(Session session, List values) throws IoTDBConnectionException, StatementExecutionException { + ArrayList measurements = new ArrayList<>(); + ArrayList types = new ArrayList<>(); + + measurements.add("status"); + types.add(TSDataType.INT32); + session.insertRecord("root.four.test",System.currentTimeMillis(),measurements,types,values); + System.out.println("————————————————写入数据成功————————————————"); + } + + private static void queryRecord(Session session) throws IoTDBConnectionException, StatementExecutionException { + System.out.println("————————————————查询数据开始————————————————"); + try(SessionDataSet dataSet= session.executeQueryStatement("select * from root.four.test")){ + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); + while (dataSet.hasNext()){ + System.out.println(dataSet.next()); + } + + } + System.out.println("————————————————查询数据结束————————————————"); + } +} diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Receive.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Receive.java new file mode 100644 index 0000000..3dbe8d5 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/Receive.java @@ -0,0 +1,53 @@ +package com.muyu.event.util; + +import org.eclipse.paho.client.mqttv3.*; + +public class Receive { + + public static void main(String[] args) { + String topic = "vehicle"; + String broker = "tcp://47.101.53.251:1883"; + String clientId="lw"; + + try { + MqttClient mqttClient= new MqttClient(broker,clientId); + MqttConnectOptions connectOptions=new MqttConnectOptions(); + connectOptions.setCleanSession(true); + System.out.println("Connecting to broker:" + broker); + mqttClient.connect(connectOptions); + System.out.println("已连接"); + mqttClient.setCallback(new MqttCallback(){ + + @Override + public void connectionLost(Throwable throwable) { + System.out.println("Connect lost!"); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + System.out.println("Message arrived. topic:"+topic); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + + mqttClient.subscribe(topic); + System.out.println("Subscribed to topic " + topic); + } catch (MqttException e) { + System.out.println("reason "+e.getReasonCode()); + System.out.println("msg " +e.getMessage()); + System.out.println("loc " +e.getLocalizedMessage()); + System.out.println("cause "+e.getCause()); + System.out.println("excep "+e); + e.printStackTrace(); + } + + + } + + + +} diff --git a/cloud-modules/cloud-event/src/main/resources/banner.txt b/cloud-modules/cloud-event/src/main/resources/banner.txt new file mode 100644 index 0000000..0dd5eee --- /dev/null +++ b/cloud-modules/cloud-event/src/main/resources/banner.txt @@ -0,0 +1,2 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} diff --git a/cloud-modules/cloud-event/src/main/resources/bootstrap.yml b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..32cb41a --- /dev/null +++ b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml @@ -0,0 +1,64 @@ +# Tomcat +server: + port: 10003 +# nacos线上地址 +nacos: + addr: 47.101.53.251:8848 + user-name: nacos + password: nacos + namespace: four +# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all +# Spring +spring: + iotdb: + + + + amqp: + deserialization: + trust: + all: true + main: + allow-bean-definition-overriding: true + application: + # 应用名称 + name: cloud-data + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + config: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + # 系统共享配置 + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # 系统环境Config共享配置 + - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # xxl-job 配置文件 + - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # rabbit 配置文件 + - application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} +logging: + level: + com.muyu.fence.mapper: DEBUG + diff --git a/cloud-modules/cloud-event/src/main/resources/logback/dev.xml b/cloud-modules/cloud-event/src/main/resources/logback/dev.xml new file mode 100644 index 0000000..9060013 --- /dev/null +++ b/cloud-modules/cloud-event/src/main/resources/logback/dev.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-event/src/main/resources/logback/prod.xml b/cloud-modules/cloud-event/src/main/resources/logback/prod.xml new file mode 100644 index 0000000..8b47d5a --- /dev/null +++ b/cloud-modules/cloud-event/src/main/resources/logback/prod.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-event/src/main/resources/logback/test.xml b/cloud-modules/cloud-event/src/main/resources/logback/test.xml new file mode 100644 index 0000000..8b47d5a --- /dev/null +++ b/cloud-modules/cloud-event/src/main/resources/logback/test.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-vehiclegateway/pom.xml b/cloud-modules/cloud-modules-vehiclegateway/pom.xml index 6be8747..eac551f 100644 --- a/cloud-modules/cloud-modules-vehiclegateway/pom.xml +++ b/cloud-modules/cloud-modules-vehiclegateway/pom.xml @@ -11,6 +11,10 @@ cloud-modules-vehiclegateway + + cloud-modules-vehiclegateway 车辆网关 + + 17 17 diff --git a/cloud-modules/cloud-modules-wechat/pom.xml b/cloud-modules/cloud-modules-wechat/pom.xml index 8d94920..2b4888b 100644 --- a/cloud-modules/cloud-modules-wechat/pom.xml +++ b/cloud-modules/cloud-modules-wechat/pom.xml @@ -11,6 +11,10 @@ cloud-modules-wechat + + cloud-modules-wechat 微信公众号 + + 17 17 diff --git a/cloud-modules/pom.xml b/cloud-modules/pom.xml index d504b42..a384525 100644 --- a/cloud-modules/pom.xml +++ b/cloud-modules/pom.xml @@ -23,6 +23,7 @@ saas cloud-modules-vehiclegateway + cloud-event cloud-modules diff --git a/cloud-modules/saas/pom.xml b/cloud-modules/saas/pom.xml index 47561c5..0b9b259 100644 --- a/cloud-modules/saas/pom.xml +++ b/cloud-modules/saas/pom.xml @@ -16,6 +16,10 @@ saas-server + + saas 企业业务平台 + +