diff --git a/.idea/encodings.xml b/.idea/encodings.xml index 989f68e..1b735e6 100644 --- a/.idea/encodings.xml +++ b/.idea/encodings.xml @@ -1,12 +1,16 @@ + + - - + + + + diff --git a/.idea/misc.xml b/.idea/misc.xml index 82dbec8..c3f3b0a 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,3 @@ - diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 6c0b863..2c9deb1 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,7 @@ - + + \ No newline at end of file diff --git a/mobai-event-modules/pom.xml b/mobai-event-client/pom.xml similarity index 68% rename from mobai-event-modules/pom.xml rename to mobai-event-client/pom.xml index 13f634f..3cb63ae 100644 --- a/mobai-event-modules/pom.xml +++ b/mobai-event-client/pom.xml @@ -6,10 +6,10 @@ com.mobai event-analysis - 1.0-SNAPSHOT + 1.0.0 - mobai-event-modules + mobai-event-client 17 @@ -17,4 +17,13 @@ UTF-8 + + + + com.mobai + mobai-event-common + 1.0.0 + + + diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java new file mode 100644 index 0000000..2cee8d8 --- /dev/null +++ b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java @@ -0,0 +1,17 @@ +package com.mobai.vehicle.event.client; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author Mobai + * @className EventClientApplication + * @description 描述 + * @date 2024/6/14 20:40 + */ +@SpringBootApplication +public class EventClientApplication { + public static void main(String[] args) { + SpringApplication.run(EventClientApplication.class); + } +} diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java new file mode 100644 index 0000000..0cd96e7 --- /dev/null +++ b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java @@ -0,0 +1,26 @@ +package com.mobai.vehicle.event.client.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author Mobai + * @className MsgComponent + * @description 描述 + * @date 2024/6/14 21:47 + */ + +@Component +public class KafkaComponent { + + /** + * 自动删除队列 + * @return + */ + @Bean + public Queue autoDeleteQueue() { + return new AnonymousQueue(); + } + +} diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java new file mode 100644 index 0000000..e25f5cf --- /dev/null +++ b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java @@ -0,0 +1,79 @@ +package com.mobai.vehicle.event.client.config; + +import com.mobai.vehicle.event.client.domain.KafkaConfig; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author Mobai + * @className MsgComponent + * @description 描述 + * @date 2024/6/14 21:47 + */ + +@Log4j2 +@Component +public class MsgComponent { + + /** + * 队列声明 + * + * @param kafkaConfig + * @return + */ + @Bean + public Queue initVehicleEventClientQueue(KafkaConfig kafkaConfig) { + return new Queue(kafkaConfig.queueName(), true, true, true); + } + + + /** + * 绑定交换机和队列 + * + * @param vehicleEventExchange 交换机 + * @param initVehicleEventClientQueue 绑定队列 + * @return 绑定结果 + */ + @Bean + public Binding binding(FanoutExchange vehicleEventExchange, + Queue initVehicleEventClientQueue) { + return BindingBuilder.bind(initVehicleEventClientQueue).to(vehicleEventExchange); + } + + + @Bean + public SimpleMessageListenerContainer messageListenerContainer( + ConnectionFactory connectionFactory, + KafkaConfig kafkaConfig) { + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); + //针对哪些队列(参数为可变参数) + simpleMessageListenerContainer.setQueueNames(kafkaConfig.queueName()); + //同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。 + simpleMessageListenerContainer.setConcurrentConsumers(3); + //最大的消费者线程数 + simpleMessageListenerContainer.setMaxConcurrentConsumers(5); + //设置消息确认方式 NONE=不确认,MANUAL=手动确认,AUTO=自动确认; + //自动确认 + simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); + //simpleMessageListenerContainer.setMessageListener(message -> log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message)); + //手动确认(单条确认) + simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); + simpleMessageListenerContainer.setMessageListener( + (ChannelAwareMessageListener) (message, channel) -> { + log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message); + if (channel != null) { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } + }); + //消费端限流 + simpleMessageListenerContainer.setPrefetchCount(1); + return simpleMessageListenerContainer; + } + + +} diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java new file mode 100644 index 0000000..85b4d2a --- /dev/null +++ b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java @@ -0,0 +1,27 @@ +package com.mobai.vehicle.event.client.demo; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +public class Recv { + + private final static String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("175.24.138.82"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }; + channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); + } +} diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java new file mode 100644 index 0000000..d6314a1 --- /dev/null +++ b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java @@ -0,0 +1,34 @@ +package com.mobai.vehicle.event.client.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * 监听主题 + * @author Mobai + * @className KafkaConfig + * @description 描述 + * @date 2024/6/14 21:51 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Configuration +@ConfigurationProperties(prefix = "kafka") +public class KafkaConfig { + + + private String topic ; + + private int partition ; + + public String queueName() { + return topic + "." + partition; + } +} diff --git a/mobai-event-client/src/main/resources/application.yml b/mobai-event-client/src/main/resources/application.yml new file mode 100644 index 0000000..b1ad429 --- /dev/null +++ b/mobai-event-client/src/main/resources/application.yml @@ -0,0 +1,12 @@ +server: + port: 10001 +kafka: + topic: vehicle-event-topic0 + partition: 0 +spring: + rabbitmq: + host: 175.24.138.82 + stream: + username: guest + password: guest + diff --git a/mobai-event-common/pom.xml b/mobai-event-common/pom.xml index 003b18c..2627813 100644 --- a/mobai-event-common/pom.xml +++ b/mobai-event-common/pom.xml @@ -6,7 +6,7 @@ com.mobai event-analysis - 1.0-SNAPSHOT + 1.0.0 mobai-event-common @@ -17,4 +17,47 @@ UTF-8 + + + com.alibaba + fastjson + 1.2.83 + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.46 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-amqp + + + + + org.projectlombok + lombok + + + + org.springframework.data + spring-data-redis + 2.7.15 + + + + org.apache.poi + poi-ooxml + 4.1.2 + + + + diff --git a/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java b/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java new file mode 100644 index 0000000..05f3fb6 --- /dev/null +++ b/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java @@ -0,0 +1,261 @@ +package com.mobai.utils; + +import org.apache.poi.ss.formula.functions.T; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.core.BoundSetOperations; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ValueOperations; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * spring redis 工具类 + * + * @author muyu + **/ +@SuppressWarnings(value = {"unchecked", "rawtypes"}) +@Component +@Lazy +public class RedisService { + @Autowired + public RedisTemplate redisTemplate; + + /** + * 缓存基本的对象,Integer、String、实体类等 + * + * @param key 缓存的键值 + * @param value 缓存的值 + */ + public void setCacheObject(final String key, final T value) { + redisTemplate.opsForValue().set(key, value); + } + + /** + * 缓存基本的对象,Integer、String、实体类等 + * + * @param key 缓存的键值 + * @param value 缓存的值 + * @param timeout 时间 + * @param timeUnit 时间颗粒度 + */ + public void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit) { + redisTemplate.opsForValue().set(key, value, timeout, timeUnit); + } + + /** + * 设置有效时间 + * + * @param key Redis键 + * @param timeout 超时时间 + * @return true=设置成功;false=设置失败 + */ + public boolean expire(final String key, final long timeout) { + return expire(key, timeout, TimeUnit.SECONDS); + } + + /** + * 设置有效时间 + * + * @param key Redis键 + * @param timeout 超时时间 + * @param unit 时间单位 + * @return true=设置成功;false=设置失败 + */ + public boolean expire(final String key, final long timeout, final TimeUnit unit) { + return redisTemplate.expire(key, timeout, unit); + } + + /** + * 获取有效时间 + * + * @param key Redis键 + * @return 有效时间 + */ + public long getExpire(final String key) { + return redisTemplate.getExpire(key); + } + + /** + * 判断 key是否存在 + * + * @param key 键 + * @return true 存在 false不存在 + */ + public Boolean hasKey(String key) { + return redisTemplate.hasKey(key); + } + + /** + * 获得缓存的基本对象。 + * + * @param key 缓存键值 + * @return 缓存键值对应的数据 + */ + public T getCacheObject(final String key) { + ValueOperations operation = redisTemplate.opsForValue(); + return operation.get(key); + } + + /** + * 删除单个对象 + * + * @param key + */ + public boolean deleteObject(final String key) { + return redisTemplate.delete(key); + } + + /** + * 删除集合对象 + * + * @param collection 多个对象 + * @return + */ + public boolean deleteObject(final Collection collection) { + return redisTemplate.delete(collection) > 0; + } + + /** + * 缓存List数据 + * + * @param key 缓存的键值 + * @param dataList 待缓存的List数据 + * @return 缓存的对象 + */ + public long setCacheList(final String key, final List dataList) { + Long count = redisTemplate.opsForList().rightPushAll(key, dataList); + return count == null ? 0 : count; + } + + /** + * 获得缓存的list对象 + * + * @param key 缓存的键值 + * @return 缓存键值对应的数据 + */ + public List getCacheList(final String key) { + return redisTemplate.opsForList().range(key, 0, -1); + } + + /** + * 缓存Set + * + * @param key 缓存键值 + * @param dataSet 缓存的数据 + * @return 缓存数据的对象 + */ + public BoundSetOperations setCacheSet(final String key, final Set dataSet) { + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + Iterator it = dataSet.iterator(); + while (it.hasNext()) { + setOperation.add(it.next()); + } + return setOperation; + } + + /** + * 获得缓存的set + * + * @param key + * @return + */ + public Set getCacheSet(final String key) { + return redisTemplate.opsForSet().members(key); + } + + /** + * 缓存Map + * + * @param key + * @param dataMap + */ + public void setCacheMap(final String key, final Map dataMap) { + if (dataMap != null) { + redisTemplate.opsForHash().putAll(key, dataMap); + } + } + + /** + * 获得缓存的Map + * + * @param key + * @return + */ + public Map getCacheMap(final String key) { + return redisTemplate.opsForHash().entries(key); + } + + /** + * 往Hash中存入数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @param value 值 + */ + public void setCacheMapValue(final String key, final String hKey, final T value) { + redisTemplate.opsForHash().put(key, hKey, value); + } + + /** + * 获取Hash中的数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @return Hash中的对象 + */ + public T getCacheMapValue(final String key, final String hKey) { + HashOperations opsForHash = redisTemplate.opsForHash(); + return opsForHash.get(key, hKey); + } + + /** + * 获取多个Hash中的数据 + * + * @param key Redis键 + * @param hKeys Hash键集合 + * @return Hash对象集合 + */ + public List getMultiCacheMapValue(final String key, final Collection hKeys) { + return redisTemplate.opsForHash().multiGet(key, hKeys); + } + + /** + * 删除Hash中的某条数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @return 是否成功 + */ + public boolean deleteCacheMapValue(final String key, final String hKey) { + return redisTemplate.opsForHash().delete(key, hKey) > 0; + } + + /** + * 获得缓存的基本对象列表 + * + * @param pattern 字符串前缀 + * @return 对象列表 + */ + public Collection keys(final String pattern) { + return redisTemplate.keys(pattern); + } + + /** + * 存入一个集合 + * @param key + * @param t + * @param + */ + public > void setCacheList(String key, T t) { + redisTemplate.opsForList().leftPush(String.valueOf(key),t); + } + + public long setCacheList(final String key, final T dataList) { + Long count = redisTemplate.opsForList().rightPushAll(key, dataList); + return count == null ? 0 : count; + } +} diff --git a/mobai-event-common/src/main/java/com/mobai/vehcile/config/MsgConfig.java b/mobai-event-common/src/main/java/com/mobai/vehcile/config/MsgConfig.java new file mode 100644 index 0000000..5cc02ce --- /dev/null +++ b/mobai-event-common/src/main/java/com/mobai/vehcile/config/MsgConfig.java @@ -0,0 +1,20 @@ +package com.mobai.vehcile.config; + +import com.mobai.vehcile.event.constants.VehicleConstants; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.context.annotation.Bean; + +/** + * @author Mobai + * @className MsgConfig + * @description 描述 + * @date 2024/6/14 20:38 + */ +public class MsgConfig { + + + @Bean + public FanoutExchange vehicleEventExchange() { + return new FanoutExchange(VehicleConstants.VEHICLE_EVENT_EXCHANGE); + } +} diff --git a/mobai-event-common/src/main/java/com/mobai/vehcile/event/constants/VehicleConstants.java b/mobai-event-common/src/main/java/com/mobai/vehcile/event/constants/VehicleConstants.java new file mode 100644 index 0000000..7e06b31 --- /dev/null +++ b/mobai-event-common/src/main/java/com/mobai/vehcile/event/constants/VehicleConstants.java @@ -0,0 +1,20 @@ +package com.mobai.vehcile.event.constants; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @author Mobai + * @className VehicleConstants + * @description 描述 + * @date 2024/6/14 20:32 + */ + +public class VehicleConstants { + + public static final String VEHICLE_EVENT_EXCHANGE = "vehicle.event" ; + +} diff --git a/mobai-event-common/src/main/java/org/springframework/boot/autoconfigure/AutoConfiguration.java b/mobai-event-common/src/main/java/org/springframework/boot/autoconfigure/AutoConfiguration.java new file mode 100644 index 0000000..6e0aca1 --- /dev/null +++ b/mobai-event-common/src/main/java/org/springframework/boot/autoconfigure/AutoConfiguration.java @@ -0,0 +1,105 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.annotation.ImportCandidates; +import org.springframework.context.annotation.AnnotationBeanNameGenerator; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.AliasFor; +import org.springframework.core.io.support.SpringFactoriesLoader; + +import java.lang.annotation.*; + +/** + * Indicates that a class provides configuration that can be automatically applied by + * Spring Boot. Auto-configuration classes are regular + * {@link Configuration @Configuration} with the exception that + * {@literal Configuration#proxyBeanMethods() proxyBeanMethods} is always {@code false}. + *

+ * They are located using {@link ImportCandidates} and the {@link SpringFactoriesLoader} + * mechanism (keyed against {@link EnableAutoConfiguration}). + *

+ * Generally auto-configuration classes are marked as {@link Conditional @Conditional} + * (most often using {@link ConditionalOnClass @ConditionalOnClass} and + * {@link ConditionalOnMissingBean @ConditionalOnMissingBean} annotations). + * + * @author Moritz Halbritter + * @see EnableAutoConfiguration + * @see AutoConfigureBefore + * @see AutoConfigureAfter + * @see Conditional + * @see ConditionalOnClass + * @see ConditionalOnMissingBean + * @since 2.7.0 + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Configuration(proxyBeanMethods = false) +@AutoConfigureBefore +@AutoConfigureAfter +public @interface AutoConfiguration { + + /** + * Explicitly specify the name of the Spring bean definition associated with the + * {@code @AutoConfiguration} class. If left unspecified (the common case), a bean + * name will be automatically generated. + *

+ * The custom name applies only if the {@code @AutoConfiguration} class is picked up + * through component scanning or supplied directly to an + * {@link AnnotationConfigApplicationContext}. If the {@code @AutoConfiguration} class + * is registered as a traditional XML bean definition, the name/id of the bean element + * will take precedence. + * @return the explicit component name, if any (or empty String otherwise) + * @see AnnotationBeanNameGenerator + */ + @AliasFor(annotation = Configuration.class) + String value() default ""; + + /** + * The auto-configure classes that should have not yet been applied. + * @return the classes + */ + @AliasFor(annotation = AutoConfigureBefore.class, attribute = "value") + Class[] before() default {}; + + /** + * The names of the auto-configure classes that should have not yet been applied. + * @return the class names + */ + @AliasFor(annotation = AutoConfigureBefore.class, attribute = "name") + String[] beforeName() default {}; + + /** + * The auto-configure classes that should have already been applied. + * @return the classes + */ + @AliasFor(annotation = AutoConfigureAfter.class, attribute = "value") + Class[] after() default {}; + + /** + * The names of the auto-configure classes that should have already been applied. + * @return the class names + */ + @AliasFor(annotation = AutoConfigureAfter.class, attribute = "name") + String[] afterName() default {}; + +} diff --git a/mobai-event-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/mobai-event-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..e943b13 --- /dev/null +++ b/mobai-event-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.mobai.vehcile.config.MsgConfig diff --git a/mobai-event-iotDBDemo/pom.xml b/mobai-event-iotDBDemo/pom.xml new file mode 100644 index 0000000..93c6037 --- /dev/null +++ b/mobai-event-iotDBDemo/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + + com.mobai + event-analysis + 1.0.0 + + + mobai-event-iotDBDemo + + + 17 + 17 + UTF-8 + + + + + org.apache.iotdb + iotdb-session + 0.14.0-preview1 + + + + cn.hutool + hutool-all + 5.6.3 + + + + com.alibaba + fastjson + 1.2.83 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + com.mobai + mobai-event-common + 1.0.0 + compile + + + + diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java new file mode 100644 index 0000000..d37022d --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java @@ -0,0 +1,13 @@ +package com.mobai; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class IotDBApplication { + public static void main(String[] args) { + + SpringApplication.run(IotDBApplication.class); + } +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java new file mode 100644 index 0000000..d0d6bf9 --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java @@ -0,0 +1,186 @@ +package com.mobai.config; + +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.session.util.Version; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.List; + +/** + * description: iotdb 配置工具类(常用部分,如需要可以自行扩展) + * 注意:可以不需要创建分组,插入时默认前两个节点名称为分组名称 比如: root.a1eaKSRpRty.CA3013A303A25467 或者 + * root.a1eaKSRpRty.CA3013A303A25467.heart 他们的分组都为 root.a1eaKSRpRty + * author: zhouhong + */ +@Log4j2 +@Component +@Configuration +public class IotDBSessionConfig { + + private static Session session; + private static final String LOCAL_HOST = "175.24.138.82"; + @Bean + public Session getSession() throws IoTDBConnectionException, StatementExecutionException { + if (session == null) { + log.info("正在连接iotdb......."); + session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build(); + session.open(false); + session.setFetchSize(100); + log.info("iotdb连接成功~"); + // 设置时区 + session.setTimeZone("+08:00"); + } + return session; + } + + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * time:时间戳 + * measurementsList:物理量 即:属性 + * type:数据类型: BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6); + * valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecordType(String deviceId, Long time,List measurementsList, TSDataType type,List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() != valuesList.size()) { + throw new ServerException("measurementsList 与 valuesList 值不对应"); + } + List types = new ArrayList<>(); + measurementsList.forEach(item -> { + types.add(type); + }); + session.insertRecord(deviceId, time, measurementsList, types, valuesList); + } + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param measurementsList:物理量 即:属性 + * @param valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecord(deviceId, time, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + /** + * description: 批量插入 + * author: zhouhong + */ + public void insertRecords(List deviceIdList, List timeList, List> measurementsList, List> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecords(deviceIdList, timeList, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + + /** + * description: 插入操作 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param schemaList: 属性值 + 数据类型 例子: List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64)); + * @param maxRowNumber: + * @return + */ + public void insertTablet(String deviceId, Long time,List schemaList, List valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException { + + Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber); + // 向iotdb里面添加数据 + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, time); + for (int i = 0; i < valueList.size(); i++) { + tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i)); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet, true); + tablet.reset(); + } + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + /** + * description: 根据SQL查询 + * author: zhouhong + */ + public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException { + return session.executeQueryStatement(sql); + } + + /** + * description: 删除分组 如 root.a1eaKSRpRty + * author: zhouhong + * @param groupName:分组名称 + * @return + */ + public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroup(groupName); + } + + /** + * description: 根据Timeseries删除 如:root.a1eaKSRpRty.CA3013A303A25467.breath (个人理解:为具体的物理量) + * author: zhouhong + */ + public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseries); + } + /** + * description: 根据Timeseries批量删除 + * author: zhouhong + */ + public void deleteTimeserieList(List timeseriesList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseriesList); + } + + /** + * description: 根据分组批量删除 + * author: zhouhong + */ + public void deleteStorageGroupList(List storageGroupList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroups(storageGroupList); + } + + /** + * description: 根据路径和结束时间删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(path, endTime); + } + /** + * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathListAndEndTime(List pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, endTime); + } + /** + * description: 根据路径集合和时间段批量删除 + * author: zhouhong + */ + public void deleteDataByPathListAndTime(List pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, startTime, endTime); + } + +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java new file mode 100644 index 0000000..2fb3841 --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java @@ -0,0 +1,60 @@ +package com.mobai.controller; + + +import com.mobai.config.IotDBSessionConfig; +import com.mobai.domain.IotDbParam; +import com.mobai.domain.resp.ResponseData; +import com.mobai.service.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.rmi.ServerException; + +/** + * description: iotdb 控制层 + * date: 2022/8/15 21:50 + * author: zhouhong + */ +@Log4j2 +@RestController +public class IotDbController { + + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + /** + * 插入数据 + * @param iotDbParam + */ + @PostMapping("/api/device/insert") + public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException { + iotDbServer.insertData(iotDbParam); + return ResponseData.success(); + } + + /** + * 插入数据 + * @param iotDbParam + */ + @PostMapping("/api/device/queryData") + public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception { + return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam)); + } + + /** + * 删除分组 + * @return + */ + @PostMapping("/api/device/deleteGroup") + public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException { + iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty"); + iotDBSessionConfig.deleteStorageGroup("root.smartretirement"); + return ResponseData.success(); + } + +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java new file mode 100644 index 0000000..9a6b4db --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java @@ -0,0 +1,40 @@ +package com.mobai.domain; + +import lombok.Data; +/** + * description: 入参 + * date: 2022/8/15 21:53 + * author: zhouhong + */ +@Data +public class IotDbParam { + /*** + * 产品PK + */ + private String pk; + /*** + * 设备号 + */ + private String sn; + /*** + * 时间 + */ + private Long time; + /*** + * 实时呼吸 + */ + private String breath; + /*** + * 实时心率 + */ + private String heart; + /*** + * 查询开始时间 + */ + private String startTime; + /*** + * 查询结束时间 + */ + private String endTime; + +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java new file mode 100644 index 0000000..b46ce3b --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java @@ -0,0 +1,33 @@ +package com.mobai.domain; + +import lombok.Data; + +/** + * description: 返回结果 + * date: 2022/8/15 21:56 + * author: zhouhong + */ +@Data +public class IotDbResult { + /*** + * 时间 + */ + private String time; + /*** + * 产品PK + */ + private String pk; + /*** + * 设备号 + */ + private String sn; + /*** + * 实时呼吸 + */ + private String breath; + /*** + * 实时心率 + */ + private String heart; + +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java new file mode 100644 index 0000000..e0d474f --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java @@ -0,0 +1,50 @@ +package com.mobai.domain.resp; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassName ResponseData + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/21 16:29 + */ + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ResponseData { + private Integer code; + private String msg; + private Object data; + + public ResponseData(int code, String msg, Object data) { + this.code = code; + this.msg = msg; + this.data = data; + } + + public ResponseData(int code, String msg) { + this.code = code; + this.msg = msg; + this.data = null; + } + + + public static ResponseData success(String msg, Object data) { + return new ResponseData(200, msg, data); + } + + public static ResponseData success(String msg) { + return new ResponseData(200, msg, null); + } + + public static ResponseData success() { + return new ResponseData(200, "请求成功", null); + } + + public static ResponseData success(Object data) { + return new ResponseData(200, "请求成功", data); + } +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java new file mode 100644 index 0000000..9bae9bf --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java @@ -0,0 +1,21 @@ +package com.mobai.service; + +import com.mobai.domain.IotDbParam; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import java.rmi.ServerException; + +/** + * @ClassName IotDbServer + * @Description 描述 + * @Author Mobai + * @Date 2024/6/17 17:20 + */ +public interface IotDbServer { + + void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException; + + Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception; + +} diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java new file mode 100644 index 0000000..a86659b --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java @@ -0,0 +1,107 @@ +package com.mobai.service.impl; + +import com.mobai.config.IotDBSessionConfig; +import com.mobai.domain.IotDbParam; +import com.mobai.domain.IotDbResult; +import com.mobai.service.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * description: iot服务实现类 + * date: 2022/8/15 9:43 + * author: zhouhong + */ + +@Log4j2 +@Service +public class IotDbServerImpl implements IotDbServer { + + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + @Override + public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException { + // iotDbParam: 模拟设备上报消息 + // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码 + String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn(); + // 将设备上报的数据存入数据库(时序数据库) + List measurementsList = new ArrayList<>(); + measurementsList.add("heart"); + measurementsList.add("breath"); + List valuesList = new ArrayList<>(); + valuesList.add(String.valueOf(iotDbParam.getHeart())); + valuesList.add(String.valueOf(iotDbParam.getBreath())); + iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList); + } + + @Override + public List queryDataFromIotDb(IotDbParam iotDbParam) throws Exception { + List iotDbResultList = new ArrayList<>(); + + if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) { + String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= " + + iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime(); + SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql); + List columnNames = sessionDataSet.getColumnNames(); + List titleList = new ArrayList<>(); + // 排除Time字段 -- 方便后面后面拼装数据 + for (int i = 1; i < columnNames.size(); i++) { + String[] temp = columnNames.get(i).split("\\."); + titleList.add(temp[temp.length - 1]); + } + // 封装处理数据 + packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList); + } else { + log.info("PK或者SN不能为空!!"); + } + return iotDbResultList; + } + /** + * 封装处理数据 + * @param iotDbParam + * @param iotDbResultList + * @param sessionDataSet + * @param titleList + * @throws StatementExecutionException + * @throws IoTDBConnectionException + */ + private void packagingData(IotDbParam iotDbParam, List iotDbResultList, SessionDataSet sessionDataSet, List titleList) + throws StatementExecutionException, IoTDBConnectionException { + int fetchSize = sessionDataSet.getFetchSize(); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + IotDbResult iotDbResult = new IotDbResult(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + iotDbResult.setTime(timeString); + Map map = new HashMap<>(); + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + // 这里的需要按照类型获取 + map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString()); + } + iotDbResult.setTime(timeString); + iotDbResult.setPk(iotDbParam.getPk()); + iotDbResult.setSn(iotDbParam.getSn()); + iotDbResult.setHeart(map.get("heart")); + iotDbResult.setBreath(map.get("breath")); + iotDbResultList.add(iotDbResult); + } + } + } +} diff --git a/mobai-event-iotDBDemo/src/main/resources/application.yml b/mobai-event-iotDBDemo/src/main/resources/application.yml new file mode 100644 index 0000000..e5e12e4 --- /dev/null +++ b/mobai-event-iotDBDemo/src/main/resources/application.yml @@ -0,0 +1,11 @@ +server: + port: 8085 +spring: + redis: + host: 127.0.0.1 + rabbitmq: + host: 175.24.138.82 + stream: + username: guest + password: guest + diff --git a/mobai-event-modules/src/main/java/com/mobai/Main.java b/mobai-event-modules/src/main/java/com/mobai/Main.java deleted file mode 100644 index a56227b..0000000 --- a/mobai-event-modules/src/main/java/com/mobai/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.mobai; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/mobai-event-remote/pom.xml b/mobai-event-remote/pom.xml index 8929e8b..f606fb4 100644 --- a/mobai-event-remote/pom.xml +++ b/mobai-event-remote/pom.xml @@ -6,7 +6,7 @@ com.mobai event-analysis - 1.0-SNAPSHOT + 1.0.0 mobai-event-remote diff --git a/mobai-event-remote/src/main/java/com/mobai/Main.java b/mobai-event-remote/src/main/java/com/mobai/Main.java deleted file mode 100644 index a56227b..0000000 --- a/mobai-event-remote/src/main/java/com/mobai/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.mobai; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/mobai-event-service/pom.xml b/mobai-event-service/pom.xml new file mode 100644 index 0000000..a3e23bb --- /dev/null +++ b/mobai-event-service/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + com.mobai + event-analysis + 1.0.0 + + + mobai-event-service + + + 17 + 17 + UTF-8 + + + + + org.springframework.boot + spring-boot-starter + + + + com.mobai + mobai-event-common + 1.0.0 + + + diff --git a/mobai-event-service/src/main/java/com/mobai/EventAnalysisProducerApplication.java b/mobai-event-service/src/main/java/com/mobai/EventAnalysisProducerApplication.java new file mode 100644 index 0000000..51693ea --- /dev/null +++ b/mobai-event-service/src/main/java/com/mobai/EventAnalysisProducerApplication.java @@ -0,0 +1,14 @@ +package com.mobai; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class EventAnalysisProducerApplication { + public static void main(String[] args) { + + SpringApplication.run(EventAnalysisProducerApplication.class,args); + } + + +} diff --git a/mobai-event-service/src/main/java/com/mobai/controller/Producer.java b/mobai-event-service/src/main/java/com/mobai/controller/Producer.java new file mode 100644 index 0000000..9525c14 --- /dev/null +++ b/mobai-event-service/src/main/java/com/mobai/controller/Producer.java @@ -0,0 +1,25 @@ +package com.mobai.controller; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author Mobai + * @className Producer + * @description 描述 + * @date 2024/6/15 18:36 + */ +@RestController +@RequestMapping("/mq-service") +public class Producer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @RequestMapping("/send") + public void send(String message) { + rabbitTemplate.convertAndSend("vehicle.event", "", message); + } +} diff --git a/mobai-event-service/src/main/resources/application.yml b/mobai-event-service/src/main/resources/application.yml new file mode 100644 index 0000000..c28bbd0 --- /dev/null +++ b/mobai-event-service/src/main/resources/application.yml @@ -0,0 +1,11 @@ +server: + port: 8084 +spring: + redis: + host: 127.0.0.1 + rabbitmq: + host: 175.24.138.82 + stream: + username: guest + password: guest + diff --git a/pom.xml b/pom.xml index c595070..3e26f4d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,20 +6,20 @@ com.mobai event-analysis - 1.0-SNAPSHOT + 1.0.0 pom mobai-event-common - mobai-event-modules + mobai-event-service mobai-event-remote + mobai-event-client + mobai-event-iotDBDemo 17 17 UTF-8 - 1.2.83 - 2.0.46 @@ -35,37 +35,4 @@ - - - org.springframework.boot - spring-boot-starter - - - - org.springframework.boot - spring-boot-starter-web - - - - org.springframework.boot - spring-boot-starter-amqp - - - - org.springframework.boot - spring-boot-starter-test - - - - com.alibaba - fastjson - ${fasejson.version} - - - - com.alibaba.fastjson2 - fastjson2 - ${fasejson2.version} - -