From d6167ebfedd030add46a15c048aa03afef033c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=91=AB?= <1173628408@qq.com> Date: Mon, 30 Sep 2024 16:28:08 +0800 Subject: [PATCH] =?UTF-8?q?fix():=E6=B7=BB=E5=8A=A0rabbitmq=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E7=B1=BB,=E6=B7=BB=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=9F=BA=E7=A1=80=E7=B1=BB=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 4 +- cloud-common/cloud-common-cache/pom.xml | 4 + .../muyu/common/domain/InformationData.java | 21 +++++ .../iotdb/config/IotDBSessionConfig.java | 50 ++++++++--- .../common/iotdb/constant/IotdbConstant.java | 13 +++ .../common/rabbit/config/TopicConfig.java | 75 ++++++++++++++++ .../rabbit/constants/RabbitmqConstants.java | 24 +++-- .../rabbit/consumer/RabbitMQConsumerUtil.java | 59 ++++++++++++ .../rabbit/producer/RabbitMQProducerUtil.java | 4 +- .../src/main/resources/bootstrap.yml | 4 +- .../com/muyu/data/basics/EventHandler.java | 25 ------ .../muyu/data/basics/EventQueueConfig.java | 35 -------- .../java/com/muyu/data/domain/DataJSON.java | 3 +- .../com/muyu/data/domain/EventActuate.java | 3 +- .../com/muyu/data/domain/Information.java | 90 +++++++++++++++++++ .../data/event/AutoStartupEventListener.java | 7 +- .../com/muyu/data/event/EventStrategy.java | 21 +++++ .../muyu/data/event/EventStrategyContext.java | 30 +++++++ .../event/tactics/FenceStrategyEvent.java | 18 ++++ .../data/event/tactics/IotdbStoreEvent.java | 28 ++++++ .../cloud-modules-data-process-server/pom.xml | 27 ++++++ .../java/com/muyu/data/basic/EventCustom.java | 24 +++++ .../com/muyu/data/basic/EventListener.java | 16 ++++ .../com/muyu/data/basic/EventPublisher.java | 32 +++++++ .../muyu/data/consumer/GoOfflineConsumer.java | 38 ++++++++ .../muyu/data/consumer/GoOnlineConsumer.java | 45 ++++++++++ .../muyu/data/consumer/VehicleConsumer.java | 72 +++++++++++++++ .../muyu/data/controller/IotdbController.java | 40 +++++++++ .../data/controller/RabbitController.java | 32 +++++++ .../muyu/data/controller/TestController.java | 40 +++++++++ .../data/listener/AddDatabaseListener.java | 42 +++++++++ .../muyu/data/monitor/CatRabbitMonitor.java | 45 ++++++++++ .../java/com/muyu/data/util/CacheUtil.java | 38 ++++++++ .../src/main/resources/bootstrap.yml | 6 ++ .../enterpise-common/pom.xml | 5 ++ .../java/com/muyu/domain/CarFirmMiddle.java | 27 ++++++ .../src/main/resources/bootstrap.yml | 6 +- .../enterprise-cache/pom.xml | 14 +-- .../enterprise/cache/VehicleCacheService.java | 28 ++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../src/main/resources/bootstrap.yml | 4 +- .../src/main/resources/bootstrap.yml | 4 +- .../src/main/resources/bootstrap.yml | 4 +- .../src/main/resources/bootstrap.yml | 4 +- pom.xml | 30 +++++++ 45 files changed, 1033 insertions(+), 109 deletions(-) create mode 100644 cloud-common/cloud-common-cache/src/main/java/com/muyu/common/domain/InformationData.java create mode 100644 cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/constant/IotdbConstant.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java delete mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventHandler.java delete mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventQueueConfig.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/Information.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategy.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategyContext.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/FenceStrategyEvent.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/IotdbStoreEvent.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventCustom.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventListener.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventPublisher.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOfflineConsumer.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOnlineConsumer.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/VehicleConsumer.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/IotdbController.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/RabbitController.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/TestController.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/listener/AddDatabaseListener.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/monitor/CatRabbitMonitor.java create mode 100644 cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/util/CacheUtil.java create mode 100644 cloud-modules/cloud-modules-enterprise/enterpise-common/src/main/java/com/muyu/domain/CarFirmMiddle.java create mode 100644 cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/java/com/muyu/enterprise/cache/VehicleCacheService.java diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index 1c0718e..83a37f2 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # Spring spring: application: diff --git a/cloud-common/cloud-common-cache/pom.xml b/cloud-common/cloud-common-cache/pom.xml index 488b785..39ddb01 100644 --- a/cloud-common/cloud-common-cache/pom.xml +++ b/cloud-common/cloud-common-cache/pom.xml @@ -27,6 +27,10 @@ com.muyu cloud-common-redis + + com.muyu + enterpise-common + diff --git a/cloud-common/cloud-common-cache/src/main/java/com/muyu/common/domain/InformationData.java b/cloud-common/cloud-common-cache/src/main/java/com/muyu/common/domain/InformationData.java new file mode 100644 index 0000000..69d9b96 --- /dev/null +++ b/cloud-common/cloud-common-cache/src/main/java/com/muyu/common/domain/InformationData.java @@ -0,0 +1,21 @@ +package com.muyu.common.domain; + +import com.muyu.domain.SysCar; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @Author WangXin + * @Data 2024/9/30 + * @Description 消息数据对象 + * @Version 1.0.0 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class InformationData { + private SysCar sysCar; +} diff --git a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java index 042acf0..be4af58 100644 --- a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java +++ b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java @@ -20,6 +20,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.muyu.common.iotdb.constant.IotdbConstant.ROOT_DATA_DATAJSON; +import static com.muyu.common.iotdb.constant.IotdbConstant.SELECT_ROOT_DATA_DATAJSON_DATASOURCE; + /** * @Author WangXin * @Data 2024/9/30 @@ -64,18 +67,23 @@ public class IotDBSessionConfig { * @param deviceId * @param time * @param measurements - * @param values + * @param value */ - public void insertRecord(SessionPool sessionPool,String deviceId, long time, List measurements, List values) { + public void insertRecord(SessionPool sessionPool,String deviceId, + long time, List measurements,List dataTypeList, JSONObject value) { try { - log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); - sessionPool.insertRecord(deviceId, time, measurements, values); + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, value); + sessionPool.insertRecord(deviceId, time, measurements,dataTypeList,new Object[]{value.toJSONString()}); } catch (Exception e) { log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", - deviceId, time, measurements, values, e.getMessage()); + deviceId, time, measurements, value, e.getMessage()); } } + public void JSONObject(JSONObject value){ + + } + public SessionDataSet selectRecord(SessionPool sessionPool,String sql) { log.info("iotdb数据查询:sql:[{}]",sql); SessionDataSetWrapper sessionDataSetWrapper = null; @@ -90,11 +98,29 @@ public class IotDBSessionConfig { } + public JSONObject getJsonFindByTime(SessionPool sessionPool,String fieldName,String deviceId,Long timestamp) { + String sql = String.format("SELECT %s FROM %s WHERE time = %d", fieldName, deviceId, timestamp); + SessionDataSet sessionDataSet = selectRecord(sessionPool, sql); + try { + while (sessionDataSet.hasNext()){ + RowRecord next = sessionDataSet.next(); + for (Field field : next.getFields()) { + String stringValue = field.getStringValue(); + } + } + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + return null; + } + + public static void main(String[] args) { SessionPool sessionPool = new SessionPool("127.0.0.1", 6667, "root", "root", 10); - String ROOT_DATA_DATAJSON = "root.car.data.datajson"; - String SELECT_ROOT_DATA_DATAJSON_DATASOURCE = "select * from root.car.data.datajson"; + String jsonValue = """ { @@ -110,14 +136,13 @@ public class IotDBSessionConfig { } } """; + JSONObject value = JSONObject.parseObject(jsonValue); IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig(); - List values = new ArrayList<>(); - values.add(jsonValue); - ArrayList objects = new ArrayList<>(); - objects.add("datasource"); - iotDBSessionConfig.insertRecord(sessionPool,ROOT_DATA_DATAJSON,System.currentTimeMillis(),objects,values); + List measurements = List.of("datasource"); + List datatypeList = List.of(TSDataType.TEXT); + iotDBSessionConfig.insertRecord(sessionPool,ROOT_DATA_DATAJSON,System.currentTimeMillis(),measurements,datatypeList,value); SessionDataSet sessionDataSet = iotDBSessionConfig.selectRecord(sessionPool,SELECT_ROOT_DATA_DATAJSON_DATASOURCE); @@ -140,7 +165,6 @@ public class IotDBSessionConfig { } catch (IoTDBConnectionException e) { throw new RuntimeException(e); } - log.info("数据为:{}", JSONObject.toJSONString(longMapHashMap)); } diff --git a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/constant/IotdbConstant.java b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/constant/IotdbConstant.java new file mode 100644 index 0000000..7cc1e24 --- /dev/null +++ b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/constant/IotdbConstant.java @@ -0,0 +1,13 @@ +package com.muyu.common.iotdb.constant; + +/** + * @Author WangXin + * @Data 2024/9/30 + * @Description iotdb数据库常量 + * @Version 1.0.0 + */ +public interface IotdbConstant { + + String ROOT_DATA_DATAJSON = "root.car.data.datajson"; + String SELECT_ROOT_DATA_DATAJSON_DATASOURCE = "select * from root.car.data.datajson"; +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java new file mode 100644 index 0000000..58d717b --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java @@ -0,0 +1,75 @@ +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author: WangXin + * @Time: 2024/4/22 11:55 + * @Description: 主题模式配置 + */ +@Configuration +public class TopicConfig { + + /** + * 主题模式交换机 + * @return exchange + */ + @Bean(name = "topicExchange") + public Exchange getTopicExchange(){ + return ExchangeBuilder + .topicExchange("exchange_topic") + .build(); + } + + /** + * 主题队列 01 + * @return queue + */ + @Bean(name = "topicQueue01") + public Queue getTopicQueue01(){ + return QueueBuilder + .durable("queue_topic_01") + .build(); + } + + /** + * 主题队列 02 + * @return queue + */ + @Bean(name = "topicQueue02") + public Queue getTopicQueue02(){ + return QueueBuilder + .durable("queue_topic_02") + .build(); + } + + /** + * 绑定队列 01 + * @return binding + */ + @Bean + public Binding getTopicBinding01(){ + return BindingBuilder + .bind(getTopicQueue01()) + .to(getTopicExchange()) + //路由键 队列1接收debug级别的消息 + .with("front.#") + .noargs(); + } + + /** + * 绑定队列 02 + * @return binding + */ + @Bean + public Binding getTopicBinding02(){ + return BindingBuilder + .bind(getTopicQueue02()) + .to(getTopicExchange()) + // 路由键 队列2接收info级别的消息 + .with("back.order.*") + .noargs(); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java index dbd4c9d..8feea85 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java @@ -6,17 +6,29 @@ package com.muyu.common.rabbit.constants; * @Description: rabbitmq常量 * @Version 1.0.0 */ -public class RabbitmqConstants { +public interface RabbitmqConstants { //普通队列 - public static final String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME"; + String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME"; - public static final String lOG_QUEUE_NAME = "LOG_QUEUE_NAME"; + String lOG_QUEUE_NAME = "LOG_QUEUE_NAME"; //延迟队列 //队列名称 - public static final String DELAYED_QUEUE_NAME = "delayed_queue"; + String DELAYED_QUEUE_NAME = "delayed_queue"; //交换机名称 - public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE"; + String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE"; //交换机 - public static final String DELAYED_ROUTING_KEY = "delayed"; + String DELAYED_ROUTING_KEY = "delayed"; + /** + * 上下线监听交换机 + */ + String TOP_BOTTOM_STITCHING = "top_bottom_stitching"; + /** + * 上线规则 + */ + String TOP_RULE = "car.top.data"; + /** + * 车辆下线规则 + */ + String BOTTOM_RULE = "car.bottom.data"; } diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java index 58d0663..2181562 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java @@ -77,5 +77,64 @@ public class RabbitMQConsumerUtil { } + /** + * 普通消费者 + * @param data 数据类型 + * @param message + * @param channel + */ + public void carUpConsumer(String data,Message message , Channel channel) { + log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message); + try { + // 获取到消息 开始消费 + log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data)); + + + Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId()); + + if (add != 1) { + return; + } + + + /** + * -----------------------------------以下为异步业务操作---------------------------- + */ + log.info("[ 根据vin拿到缓存 ] vin为 --》 {}",data); + log.info("[ 存入本地缓存 ] 数据为 --》 {}",data); + log.info("[ 存入本地缓存 ] 数据为 --》 {}",data); + /** + * ------------------------------------------------------------------------------ + */ + // 消费消息成功之后需要确认 + // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 + // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message); + + } catch (Exception e) { + log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e); + // 消息回退 拒绝消费消息 + // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 + // boolean requeue 是否回到原来的队列 + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); +// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); + } catch (IOException ex) { + log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex); + } + }finally { + try { + channel.close(); + } catch (Exception e) { + log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e); + } + } + } + + + + + } diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java index 28d9bdc..fc7c3b8 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java @@ -133,9 +133,9 @@ public class RabbitMQProducerUtil { * @param msg 响应的内容 * @return 结果集 */ - public Result topicSendMessage(String exchange, String rule, Object obj, String msg) { + public Result topicSendMessage(String exchange, String rule, Object obj) { - log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg); + log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj); // 发送简单模型消息 // 第一个参数: 绑定规则 相当于 队列名称 // 第二个参数:消息内容 diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index dfefd86..25f0a82 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # Spring spring: diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventHandler.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventHandler.java deleted file mode 100644 index a4ead9b..0000000 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventHandler.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.muyu.data.basics; - -/** - * @Author WangXin - * @Data 2024/9/29 - * @Description 事件队列 - * @Version 1.0.0 - */ -public class EventHandler { - - private static final ThreadLocal EVENT_THREAD = new ThreadLocal<>(); - - public static void set(final EventQueueConfig handler) { - EVENT_THREAD.set(handler); - } - - public static EventQueueConfig get() { - return EVENT_THREAD.get(); - } - - public static void remove(){ - EVENT_THREAD.remove(); - } - -} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventQueueConfig.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventQueueConfig.java deleted file mode 100644 index 9328177..0000000 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/basics/EventQueueConfig.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.muyu.data.basics; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.concurrent.LinkedBlockingDeque; - -/** - * @Author WangXin - * @Data 2024/9/29 - * @Description 事件队列配置 - * @Version 1.0.0 - */ -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class EventQueueConfig { - - private LinkedBlockingDeque taskNodeQueue = new LinkedBlockingDeque<>(); - - public void addEvent(EventProcessBasics obj){ - this.taskNodeQueue.add(obj); - } - - public boolean hashEventNext(){ - return !taskNodeQueue.isEmpty(); - } - - private EventProcessBasics nextTaskNode(){ - return taskNodeQueue.poll(); - } -} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/DataJSON.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/DataJSON.java index d989468..7b77798 100644 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/DataJSON.java +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/DataJSON.java @@ -1,5 +1,6 @@ package com.muyu.data.domain; +import com.alibaba.fastjson2.JSONObject; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.AllArgsConstructor; @@ -29,5 +30,5 @@ public class DataJSON { * 车辆JSON数据 */ @Schema(name = "车辆JSON数据") - private String datasource; + private JSONObject datasource; } diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/EventActuate.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/EventActuate.java index dad7197..d894a4f 100644 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/EventActuate.java +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/EventActuate.java @@ -1,5 +1,6 @@ package com.muyu.data.domain; +import com.alibaba.fastjson2.JSONObject; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -21,7 +22,7 @@ public class EventActuate { /** * json数据 */ - private String jsonData; + private JSONObject jsonData; /** * 事件驱动key集合 */ diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/Information.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/Information.java new file mode 100644 index 0000000..cdf3a09 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/domain/Information.java @@ -0,0 +1,90 @@ +package com.muyu.data.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.Date; +/** + * @author Bai + * @date 2024/9/20 10:29 + * @description Information: 车辆信息表实体类 + * @version: 1.0 + */ + +/** + * 车辆信息表实体类 + */ + +@Data +@SuperBuilder +@Builder +@AllArgsConstructor +@NoArgsConstructor +@TableName(value = "car_information",autoResultMap = true) +public class Information { + //自增主键 + @TableId(value = "id",type = IdType.AUTO) + private Long id; + //车辆VIn马 + @Schema(name = "车辆VIn马",type = "String") + private String carVin; + //车辆品牌 + @Schema(name = "车辆品牌",type = "String") + private String carBrand; + //车辆类型外键 + @Schema(name = "车辆类型外键",type = "Integer") + private Integer typeId; + //车辆类型名称 + @Schema(type = "String",description = "车辆类型名称") + private String typeName; + //电子围栏外键 + @Schema(name = "电子围栏外键",type = "String") + private String groupId; + //车辆电机厂商 + @Schema(name = "车辆电机厂商",type = "String") + private String carMotorManufacturer; + //电机型号 + @Schema(name = "电机型号",type = "String") + private String carMotorModel; + //车辆电池厂商 + @Schema(name = "车辆电池厂商",type = "String") + private String carBatteryManufacturer; + //电池型号 + @Schema(name = "电池型号",type = "String") + private String carBatteryModel; + //围栏组编码 + @Schema(name = "围栏组编码",type = "String") + private String groupCode; + //启用状态(1.在线 2.离线 3.已断开 4.待连接 5.维修中 + @Schema(name = "启用状态(1.在线 2.离线 3.已断开 4.待连接 5.维修中",type = "String") + private String state; + //创建人 + @Schema(name = "创建人",type = "Integer") + private String createBy; + //创建时间 + @Schema(name = "创建时间",type = "Date") + @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date createTime; + //更新人 + @Schema(name = "更新人",type = "Integer") + private String updateBy; + //更新时间 + @Schema(name = "更新时间",type = "Date") + @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date updateTime; + //备注 + @Schema(name = "备注",type = "String") + private String remark; + //策略id + @Schema(name = "策略id",type = "Integer") + private Integer strategyId; + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/AutoStartupEventListener.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/AutoStartupEventListener.java index 5be3090..be93603 100644 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/AutoStartupEventListener.java +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/AutoStartupEventListener.java @@ -1,6 +1,9 @@ package com.muyu.data.event; +import com.alibaba.fastjson2.JSONObject; import com.muyu.data.basics.StartEvent; +import com.muyu.data.domain.EventActuate; +import com.muyu.data.event.tactics.IotdbStoreEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -16,7 +19,9 @@ public class AutoStartupEventListener implements ApplicationListener @Override public void onApplicationEvent(StartEvent event) { - + EventActuate eventActuate = event.getEventActuate(); + JSONObject jsonData = eventActuate.getJsonData(); + new IotdbStoreEvent().execute(jsonData); } diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategy.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategy.java new file mode 100644 index 0000000..ed6e9a4 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategy.java @@ -0,0 +1,21 @@ +package com.muyu.data.event; + +import com.alibaba.fastjson2.JSONObject; + +import java.util.List; + +/** + * @Author WangXin + * @Data 2024/9/29 + * @Description 事件策略接口 + * @Version 1.0.0 + */ + +public interface EventStrategy { + + /** + * 方法执行 + * @param jsonObject + */ + void execute(JSONObject jsonObject); +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategyContext.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategyContext.java new file mode 100644 index 0000000..466ada8 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/EventStrategyContext.java @@ -0,0 +1,30 @@ +package com.muyu.data.event; + +import com.alibaba.fastjson2.JSONObject; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * @Author WangXin + * @Data 2024/9/29 + * @Description 事件执行上下文 + * @Version 1.0.0 + */ +@Data +@AllArgsConstructor +public class EventStrategyContext { + /** + * 事件接口 + */ + private EventStrategy eventStrategy; + + /** + * 调用策略类中的方法 + * @param jsonObject json对象 + */ + public void handleEvent(JSONObject jsonObject) { + if (jsonObject != null) { + eventStrategy.execute(jsonObject); + } + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/FenceStrategyEvent.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/FenceStrategyEvent.java new file mode 100644 index 0000000..fcf99a5 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/FenceStrategyEvent.java @@ -0,0 +1,18 @@ +package com.muyu.data.event.tactics; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.data.event.EventStrategy; + +/** + * @Author WangXin + * @Data 2024/9/30 + * @Description 电子围栏事件 + * @Version 1.0.0 + */ +public class FenceStrategyEvent implements EventStrategy { + + @Override + public void execute(JSONObject jsonObject) { + + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/IotdbStoreEvent.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/IotdbStoreEvent.java new file mode 100644 index 0000000..b33d3ad --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-common/src/main/java/com/muyu/data/event/tactics/IotdbStoreEvent.java @@ -0,0 +1,28 @@ +package com.muyu.data.event.tactics; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.data.event.EventStrategy; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Author WangXin + * @Data 2024/9/29 + * @Description Iotdb存储事件 + * @Version 1.0.0 + */ +@Log4j2 +@Component +public class IotdbStoreEvent implements EventStrategy { + + /** + * 执行存储事件 + * @param jsonObject json对象 + */ + @Override + public void execute(JSONObject jsonObject) { + log.info("[存储事件] ---》 json对象:{}", jsonObject); + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/pom.xml b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/pom.xml index f9c8263..6c18295 100644 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/pom.xml +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/pom.xml @@ -22,6 +22,33 @@ + + + com.muyu + enterprise-cache + + + com.muyu + cloud-modules-data-process-common + + + com.muyu + cloud-common-iotdb + + + com.github.ben-manes.caffeine + caffeine + + + + com.muyu + cloud-common-kafka + + + + com.muyu + cloud-common-rabbit + com.alibaba.cloud diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventCustom.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventCustom.java new file mode 100644 index 0000000..cb009a2 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventCustom.java @@ -0,0 +1,24 @@ +package com.muyu.data.basic; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.context.ApplicationEvent; + +/** + * 事件类型 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-29 20:03 + **/ +public class EventCustom extends ApplicationEvent { + + private JSONObject data; + + public EventCustom(Object source, JSONObject data) { + super(source); + this.data = data; + } + public JSONObject getData() { + return data; + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventListener.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventListener.java new file mode 100644 index 0000000..b62ab73 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventListener.java @@ -0,0 +1,16 @@ +package com.muyu.data.basic; + +import org.springframework.context.ApplicationListener; + +/** + * 事件监听接口 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-29 20:05 + **/ +public interface EventListener extends ApplicationListener { + + void onEvent(EventCustom event); + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventPublisher.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventPublisher.java new file mode 100644 index 0000000..65e5c8d --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/basic/EventPublisher.java @@ -0,0 +1,32 @@ +package com.muyu.data.basic; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; + +/** + * 策略发送事件 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-29 17:43 + **/ +@Component +public class EventPublisher implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher publisher; + + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } + + public void publishEvent(JSONObject message) { + EventCustom event = new EventCustom(this, message); + publisher.publishEvent(event); + } + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOfflineConsumer.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOfflineConsumer.java new file mode 100644 index 0000000..8313dfe --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOfflineConsumer.java @@ -0,0 +1,38 @@ +package com.muyu.data.consumer; + + +import com.muyu.data.util.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 下线监听 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-30 10:39 + **/ +@Log4j2 +@Component +public class GoOfflineConsumer { + + + @Autowired + private CacheUtil cacheUtil; + + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "GO_OFFLINE", durable = "true"), + exchange = @Exchange(value = "OFFLINE_EXCHANGE", type = "fanout"))) + public void online(String vin) { + log.info("车辆vin: {},车辆开始消费", vin); + cacheUtil.remove(vin); + } + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOnlineConsumer.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOnlineConsumer.java new file mode 100644 index 0000000..7ea77c7 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/GoOnlineConsumer.java @@ -0,0 +1,45 @@ +package com.muyu.data.consumer; + +import com.muyu.data.domain.Information; +import com.muyu.data.util.CacheUtil; +import com.muyu.domain.CarInformation; +import com.muyu.enterprise.cache.VehicleCacheService; + + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 上线监听 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-29 16:37 + **/ +@Log4j2 +@Component +public class GoOnlineConsumer { + + + @Autowired + private CacheUtil cacheUtil; + + @Autowired + private VehicleCacheService vehicleCacheService; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "GO_ONLINE", durable = "true"), + exchange = @Exchange(value = "ONLINE_EXCHANGE", type = "fanout"))) + public void online(String vin) { + log.info("车辆vin: {},车辆开始消费", vin); + CarInformation carInformation = vehicleCacheService.get(vin); + cacheUtil.put(vin,carInformation); + } + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/VehicleConsumer.java new file mode 100644 index 0000000..6e79973 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/consumer/VehicleConsumer.java @@ -0,0 +1,72 @@ +package com.muyu.data.consumer; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.data.basic.EventPublisher; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** +* 车辆消费者 +* @program: cloud-server +* @author: WangXin +* @create: 2024-09-28 14:55 +**/ +@Log4j2 +@Component +public class VehicleConsumer implements ApplicationRunner, ApplicationListener { + + @Autowired + private KafkaConsumer consumer; + + @Autowired + private EventPublisher eventPublisher; + + private final String topic = "vehicle"; + + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + + + @Async + @Override + public void run(ApplicationArguments args) throws Exception { + log.info("开始监听kafka-topic:{}", topic); + List topics = Collections.singletonList(topic); + consumer.subscribe(topics); + + while (true) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(record -> { + executorService.submit(() -> handleRecord(record)); + }); + } + } + + private void handleRecord(ConsumerRecord record) { + String value = record.value(); + JSONObject jsonObject = JSONObject.parseObject(value); + log.info("value: {}", value); + eventPublisher.publishEvent(jsonObject); + } + + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("关闭kafka和线程"); + consumer.close(); + executorService.shutdown(); + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/IotdbController.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/IotdbController.java new file mode 100644 index 0000000..ba43830 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/IotdbController.java @@ -0,0 +1,40 @@ +package com.muyu.data.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.common.core.domain.Result; +import com.muyu.common.iotdb.config.IotDBSessionConfig; +import jakarta.annotation.Resource; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +import static com.muyu.common.iotdb.constant.IotdbConstant.ROOT_DATA_DATAJSON; + +/** + * iotdb测试 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-28 19:09 + **/ +@RestController +@RequestMapping("iotdb") +public class IotdbController { + + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + @PostMapping("/add") + public Result addJSON(@RequestBody JSONObject jsonObject) { + iotDBSessionConfig.insertRecord(iotDBSessionConfig.getSessionPool(),ROOT_DATA_DATAJSON,System.currentTimeMillis(), List.of("datasource"),List.of(TSDataType.TEXT),jsonObject); + return Result.success(); + } + + @GetMapping("/findByDataTime/{time}") + public Result findByDataTime(@PathVariable("time") Long time){ + return null; + } + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/RabbitController.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/RabbitController.java new file mode 100644 index 0000000..3e967e5 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/RabbitController.java @@ -0,0 +1,32 @@ +package com.muyu.data.controller; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * rabbit测试 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-30 00:04 + **/ +@RestController +@RequestMapping("rabbit") +public class RabbitController { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @GetMapping("/send") + public void send() { + rabbitTemplate.convertAndSend("ONLINE_EXCHANGE", "", "vin123456"); + } + + + + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/TestController.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/TestController.java new file mode 100644 index 0000000..64bd2f9 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/controller/TestController.java @@ -0,0 +1,40 @@ +package com.muyu.data.controller; + +import cn.hutool.json.JSONObject; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** +* 测试kafka +* @program: cloud-server +* @author: WangXin +* @create: 2024-09-28 14:55 +**/ +@RestController +@RequestMapping +public class TestController { + + @Autowired + private KafkaProducer kafkaProducer; + + + private static final String topic = "vehicle"; + @GetMapping("send") + public String sendKafka(){ + + JSONObject entries = new JSONObject(); + entries.set("car_vin","vin123123"); + entries.set("car_name","奥迪"); + String entriesString = entries.toString(); + ProducerRecord producerRecord = new ProducerRecord(topic,entriesString); + kafkaProducer.send(producerRecord); + + return "success"; + } + + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/listener/AddDatabaseListener.java new file mode 100644 index 0000000..3acac0d --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/listener/AddDatabaseListener.java @@ -0,0 +1,42 @@ +package com.muyu.data.listener; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.data.basic.EventCustom; +import com.muyu.data.basic.EventListener; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; + +/** + * 添加数据库事件 + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-29 17:34 + **/ +@Configuration +public class AddDatabaseListener implements EventListener { + + + @Override + public void onEvent(EventCustom event) { + + JSONObject jsonObject = event.getData(); + List keys = new ArrayList<>(); + List values = new ArrayList<>(); + + jsonObject.forEach((key, value) -> { + keys.add(key); + values.add((String) value); + }); + + long time = System.currentTimeMillis(); + + + } + + @Override + public void onApplicationEvent(EventCustom event) { + onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/monitor/CatRabbitMonitor.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/monitor/CatRabbitMonitor.java new file mode 100644 index 0000000..89f59b7 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/monitor/CatRabbitMonitor.java @@ -0,0 +1,45 @@ +package com.muyu.data.monitor; + +import com.muyu.common.core.domain.Result; +import com.muyu.common.rabbit.constants.RabbitmqConstants; +import com.muyu.common.rabbit.consumer.RabbitMQConsumerUtil; +import com.muyu.common.rabbit.producer.RabbitMQProducerUtil; +import jakarta.annotation.Resource; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @Author WangXin + * @Data 2024/9/30 + * @Description 车辆上下线监听器 + * @Version 1.0.0 + */ +@Component +public class CatRabbitMonitor { + + @Resource + private RabbitMQConsumerUtil rabbitMQConsumerUtil; + @Resource + private RabbitMQProducerUtil rabbitMQProducerUtil; + + @RabbitListener(queues = {"queue_topic_01"}) + public void topicConsumer01(String msg){ + System.out.println("消费者 -01- 接收消息:" + msg); + } + + + /** + * 测试上线方法 + * @param carVin + */ + public void topicConsumer02(String carVin){ + Result result = rabbitMQProducerUtil.topicSendMessage( + RabbitmqConstants.TOP_BOTTOM_STITCHING, + RabbitmqConstants.TOP_RULE, + carVin + ); + if (result.getCode() != Result.SUCCESS){ + throw new RuntimeException(result.getMsg()); + } + } +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/util/CacheUtil.java b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/util/CacheUtil.java new file mode 100644 index 0000000..17facd8 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/java/com/muyu/data/util/CacheUtil.java @@ -0,0 +1,38 @@ +package com.muyu.data.util; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.muyu.data.domain.Information; +import org.springframework.stereotype.Component; + +/** + * 缓存工具类 + * + * @program: cloud-server + * @author: WangXin + * @create: 2024-09-30 10:08 + **/ +@Component +public class CacheUtil { + + private final Cache cache; + + public CacheUtil() { + this.cache = Caffeine.newBuilder() + .maximumSize(500L) + .build(); + } + + public T get(String key) { + return cache.getIfPresent(key); + } + + public void put(String key, T value) { + cache.put(key, value); + } + + public void remove(String key) { + cache.invalidate(key); + } + +} diff --git a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/resources/bootstrap.yml index 50546a7..f65506a 100644 --- a/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-data-process/cloud-modules-data-process-server/src/main/resources/bootstrap.yml @@ -11,6 +11,12 @@ nacos: # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: + iotdb: + ip: 127.0.0.1 + username: root + port: 6667 + password: root + maxSize: 10 amqp: deserialization: trust: diff --git a/cloud-modules/cloud-modules-enterprise/enterpise-common/pom.xml b/cloud-modules/cloud-modules-enterprise/enterpise-common/pom.xml index eafa13e..8d98d66 100644 --- a/cloud-modules/cloud-modules-enterprise/enterpise-common/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/enterpise-common/pom.xml @@ -41,5 +41,10 @@ swagger-annotations-jakarta + + com.muyu + cloud-common-system + + diff --git a/cloud-modules/cloud-modules-enterprise/enterpise-common/src/main/java/com/muyu/domain/CarFirmMiddle.java b/cloud-modules/cloud-modules-enterprise/enterpise-common/src/main/java/com/muyu/domain/CarFirmMiddle.java new file mode 100644 index 0000000..db633c6 --- /dev/null +++ b/cloud-modules/cloud-modules-enterprise/enterpise-common/src/main/java/com/muyu/domain/CarFirmMiddle.java @@ -0,0 +1,27 @@ +package com.muyu.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @Author WangXin + * @Data 2024/9/30 + * @Description 车辆公司中间表 + * @Version 1.0.0 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class CarFirmMiddle { + /** + * 车辆ID + */ + private Long CarId; + /** + * 公司ID + */ + private Long firmId; +} diff --git a/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/resources/bootstrap.yml index 7f2f29b..6057fa1 100644 --- a/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-enterprise/enterpise-service/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: @@ -19,7 +19,7 @@ spring: allow-bean-definition-overriding: true application: # 应用名称 - name: cloud-saas + name: cloud-warn profiles: # 环境配置 active: dev diff --git a/cloud-modules/cloud-modules-enterprise/enterprise-cache/pom.xml b/cloud-modules/cloud-modules-enterprise/enterprise-cache/pom.xml index c3ff392..aa61d15 100644 --- a/cloud-modules/cloud-modules-enterprise/enterprise-cache/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/enterprise-cache/pom.xml @@ -18,19 +18,13 @@ - - - - - com.muyu - cloud-common-cache - ${muyu.version} - - com.muyu enterpise-common - ${muyu.version} + + + com.muyu + cloud-common-cache diff --git a/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/java/com/muyu/enterprise/cache/VehicleCacheService.java b/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/java/com/muyu/enterprise/cache/VehicleCacheService.java new file mode 100644 index 0000000..019fa3a --- /dev/null +++ b/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/java/com/muyu/enterprise/cache/VehicleCacheService.java @@ -0,0 +1,28 @@ +package com.muyu.enterprise.cache; + +import com.muyu.common.cache.CacheAbsBasic; +import com.muyu.domain.CarInformation; + +/** + * 车辆缓存服务 + * @className: VehicleCacheService ️✈️ + * @author: Yang 鹏 🦅 + * @date: 2024/9/30 00:36 ⏰ + * @Version: 1.0 + * @description: + */ +public class VehicleCacheService extends CacheAbsBasic { + + @Override + public String keyPre() { + return "vehicle:info:"; + } + + @Override + public String decode(String key){ + return key.replace("vehicle:info:",""); + } + + +} + diff --git a/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 3257b72..ca0bb51 100644 --- a/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-modules/cloud-modules-enterprise/enterprise-cache/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,3 @@ com.muyu.enterpise.cache.MessageValueCacheService com.muyu.enterpise.cache.SysCarCacheService +com.muyu.enterprise.cache.VehicleCacheService diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index 167c09f..da919cf 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index 6b135ee..ca7c3dd 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index ff1b73b..d749a32 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml index 93371a1..9d829bf 100644 --- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml +++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml @@ -4,10 +4,10 @@ server: # nacos线上地址 nacos: - addr: 123.57.152.124:8848 + addr: 127.0.0.1:8848 user-name: nacos password: nacos - namespace: xyr + namespace: wx # Spring spring: diff --git a/pom.xml b/pom.xml index 144c170..5d9803f 100644 --- a/pom.xml +++ b/pom.xml @@ -307,6 +307,36 @@ caffeine ${caffeine.version} + + + com.muyu + cloud-common-kafka + ${muyu.version} + + + + com.muyu + cloud-common-iotdb + ${muyu.version} + + + + com.muyu + cloud-modules-data-process-common + ${muyu.version} + + + + com.muyu + cloud-common-cache + ${muyu.version} + + + + com.muyu + enterprise-cache + ${muyu.version} +