From 84b54e9116f59546267f75d8591ab7a2166283d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 09:46:03 +0800 Subject: [PATCH 01/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9EMQ=E5=92=8CCa?= =?UTF-8?q?ffeine=E7=BC=93=E5=AD=98=E5=BA=93=E4=BE=9D=E8=B5=96=20=E5=92=8C?= =?UTF-8?q?=20=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-modules/cloud-modules-processing/pom.xml | 12 ++++++++++++ .../muyu/processing/abstraction/EventProcessor.java | 2 ++ .../src/main/resources/bootstrap.yml | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/cloud-modules/cloud-modules-processing/pom.xml b/cloud-modules/cloud-modules-processing/pom.xml index 127f05c..a8cc1f1 100644 --- a/cloud-modules/cloud-modules-processing/pom.xml +++ b/cloud-modules/cloud-modules-processing/pom.xml @@ -107,6 +107,18 @@ 3.6.3 + + + org.springframework.boot + spring-boot-starter-amqp + + + + com.github.ben-manes.caffeine + caffeine + 3.0.5 + + diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/abstraction/EventProcessor.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/abstraction/EventProcessor.java index f404fbc..70fb1ec 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/abstraction/EventProcessor.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/abstraction/EventProcessor.java @@ -2,6 +2,7 @@ package com.muyu.processing.abstraction; import com.alibaba.fastjson.JSONObject; import com.muyu.processing.interfaces.EventInterface; +import lombok.extern.log4j.Log4j2; /** * 事件处理抽象类 @@ -11,6 +12,7 @@ import com.muyu.processing.interfaces.EventInterface; * @name:EventProcessor * @Date:2024/9/28 20:58 */ +@Log4j2 public abstract class EventProcessor implements EventInterface { private EventProcessor eventProcessor; diff --git a/cloud-modules/cloud-modules-processing/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-processing/src/main/resources/bootstrap.yml index 6937759..eda0de7 100644 --- a/cloud-modules/cloud-modules-processing/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-processing/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: dev # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: From 7033d0e9dd9b3e814091732c4ca40666e825a7d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:01:08 +0800 Subject: [PATCH 02/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9EMQ=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=20=E5=92=8C=20MQ=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/ConfirmCallbackConfig.java | 42 ++++++++++++++++ .../processing/config/RabbitAdminConfig.java | 50 +++++++++++++++++++ .../processing/config/RabbitmqConfig.java | 18 +++++++ .../config/ReturnsCallbackConfig.java | 39 +++++++++++++++ .../muyu/processing/consumer/MQConsumer.java | 41 +++++++++++++++ 5 files changed, 190 insertions(+) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java new file mode 100644 index 0000000..4b18ab2 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java @@ -0,0 +1,42 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 确认回调配置 + */ +@Component +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 当前bean初始化的时候执行 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setConfirmCallback(this); + } + + /** + * 确认方法 + * @param correlationData correlation data for the callback. + * @param ack true for ack, false for nack + * @param cause An optional cause, for nack, when available, otherwise null. + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + System.out.println("消息发送到 broker 成功"); + } else { + System.out.println("消息发送到 broker 失败,失败的原因:" + cause); + } + } + +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java new file mode 100644 index 0000000..646886b --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java @@ -0,0 +1,50 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 构建 RabbitAdmin + */ +@Configuration +public class RabbitAdminConfig { + + @Value("${spring.rabbitmq.host}") + private String host; + @Value("${spring.rabbitmq.username}") + private String username; + @Value("${spring.rabbitmq.password}") + private String password; + @Value("${spring.rabbitmq.virtual-host}") + private String virtualhost; + + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + + /** + * rabbitAdmin + * @param connectionFactory + * @return + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java new file mode 100644 index 0000000..4ae5a51 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java @@ -0,0 +1,18 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 消息转换配置 + */ +@Configuration +public class RabbitmqConfig { + // 消息转换配置 + @Bean + public MessageConverter jsonMessageConverter(){ + return new Jackson2JsonMessageConverter(); + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java new file mode 100644 index 0000000..32ddbdd --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java @@ -0,0 +1,39 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送失败返回配置 + */ +@Component +public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 当前bean初始化的时候执行 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送达到 queue 失败执行 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("消息" + returnedMessage.getMessage().toString() + + "被交换机" + returnedMessage.getExchange() + "回退!" + + "退回原因为:" + returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java new file mode 100644 index 0000000..4dbc156 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java @@ -0,0 +1,41 @@ +package com.muyu.processing.consumer; + +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * MQ消费者 + * @Author:杨鹏 + * @Package:com.muyu.processing.consumer + * @Project:car-cloud-server + * @name:MQconsumer + * @Date:2024/9/29 17:19 + */ +@Log4j2 +@Component +public class MQConsumer { + + + + @RabbitListener(queuesToDeclare = @Queue("long_time_no_see")) + public void receive(String haha, Message message, Channel channel){ + + try { + log.info("MQ消费的消息的内容为{}",haha); + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + + } catch (Exception e) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + } + +} From f56787b5a8755899be6f970b4d7a9e6a63525177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:01:38 +0800 Subject: [PATCH 03/10] =?UTF-8?q?feat:()=20=E6=9D=A5=E7=B1=BB=E5=8A=A0?= =?UTF-8?q?=E6=B3=A8=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/muyu/processing/consumer/KafkaConsumerService.java | 1 + .../src/main/java/com/muyu/processing/utils/IotDb.java | 1 + 2 files changed, 2 insertions(+) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 6d92523..835f553 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collection; /** + * kafka消费者 * @Author:杨鹏 * @Package:com.muyu.processing.consumer * @Project:car-cloud-server diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/IotDb.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/IotDb.java index a36e2cb..c5e5cad 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/IotDb.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/IotDb.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.List; /** + * IotDb测试 * @Author:杨鹏 * @Package:com.muyu.processing.utils * @Project:car-cloud-server From 0e391451b0f7f605be1b46842e8aa939b07567f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:08:23 +0800 Subject: [PATCH 04/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9Eiotdb?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6,iotdb=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/processing/config/IotDBConfig.java | 107 ++++++++++++++++++ .../controller/IotDbController.java | 59 ++++++++++ 2 files changed, 166 insertions(+) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/IotDBConfig.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/IotDbController.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/IotDBConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/IotDBConfig.java new file mode 100644 index 0000000..b92d68d --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/IotDBConfig.java @@ -0,0 +1,107 @@ +package com.muyu.processing.config; + +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * iotdb配置类 + * @Author:杨鹏 + * @Package:com.muyu.processing.config + * @Project:car-cloud-server + * @name:IotDBConfig + * @Date:2024/9/30 15:13 + */ +@Log4j2 +@Component +@Configuration +public class IotDBConfig { + + /** + * 创建session + */ + @Bean + public static Session session(){ + Session session = null; + try { + session = new Session( + "47.101.49.53", + 6667, + "root", + "root" + ); + session.open(false); + session.setFetchSize(100); + } catch (Exception e) { + throw new RuntimeException(e); + } + return session; + } + + /** + * 新增数据 + */ + public void execute(String deviceId, Long time, List measurement, List values){ + if (!CollectionUtils.isEmpty(measurement) && !CollectionUtils.isEmpty(values)){ + try { + session().insertAlignedRecord(deviceId,time,measurement,values); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 查询数据 + */ + public List> executeQuery(String sql){ + log.info("sql:{}",sql); + ArrayList> list = new ArrayList<>(); + + try { + SessionDataSet sessionDataSet = session().executeQueryStatement(sql); + int fetchSize = sessionDataSet.getFetchSize(); + List columnNames = sessionDataSet.getColumnNames(); + List columnTypes = sessionDataSet.getColumnTypes(); + System.out.println(columnNames); + System.out.println(columnTypes); + if (fetchSize > 0){ + while (sessionDataSet.hasNext()){ + HashMap map = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + // 查询结果第一个为时间戳 + long timestamp = next.getTimestamp(); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + String key = field.getStringValue(); + // 这里的需要按照类型获取 + Object value = field.getObjectValue(field.getDataType()); + map.put(key, value); + } + list.add(map); + } + } + sessionDataSet.closeOperationHandle(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return list; + } + + +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/IotDbController.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/IotDbController.java new file mode 100644 index 0000000..a8ce170 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/IotDbController.java @@ -0,0 +1,59 @@ +package com.muyu.processing.controller; + +import com.muyu.processing.config.IotDBConfig; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * iotdb测试 + * @Author:杨鹏 + * @Package:com.muyu.processing.controller + * @Project:car-cloud-server + * @name:IotdbController + * @Date:2024/10/2 9:39 + */ +@RestController +@RequestMapping("iotdb") +public class IotDbController { + + @Resource + private IotDBConfig iotDBConfig; + + private String json = "{\n" + " \"carVin\": \"VIN123456\",\n" + + " \"carName\": \"宝马\",\n" + "}"; + + /** + * 添加数据 + */ + @GetMapping("add") + public void add(){ +// Map map = JSON.parseObject(json, Map.class); +// Set set = map.keySet(); + ArrayList key = new ArrayList<>(); + ArrayList value = new ArrayList<>(); + key.add("car_vin"); + key.add("car_name"); + value.add("VIN123456"); + value.add("宝马"); + System.out.println(key); + System.out.println(value); + long l = System.currentTimeMillis(); + iotDBConfig.execute("root.vehicle", l, key, value); + } + + /** + * 查询列表 + */ + @GetMapping("findList") + public void findList(){ + String sql = "select * from root.vehicle"; + List> list = iotDBConfig.executeQuery(sql); + System.out.println(list); + } +} From d02813f7e407ff6e307a2ae1a2f9eac3525a7b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:18:16 +0800 Subject: [PATCH 05/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9EMQ=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E8=80=85=20=E5=92=8C=20=E4=BF=AE=E6=94=B9kafka?= =?UTF-8?q?=E7=94=9F=E4=BA=A7=E8=80=85=E7=9A=84=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/processing/controller/TestKafka.java | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java index 732eb0f..9fcd7e9 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java @@ -1,12 +1,14 @@ package com.muyu.processing.controller; -import com.alibaba.fastjson.JSONObject; +import cn.hutool.json.JSONObject; import com.muyu.common.core.constant.KafkaConstants; +import com.muyu.common.core.utils.uuid.UUID; import com.muyu.common.kafka.config.KafkaProducerConfig; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.protocol.types.Field; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -14,6 +16,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** + * 消息队列测试-生产者 * @Author:杨鹏 * @Package:com.muyu.processing.controller * @Project:car-cloud-server @@ -29,10 +32,44 @@ public class TestKafka { @Resource private KafkaProducer kafkaProducer; + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 发送Kafka消息 + * @return String + */ @GetMapping("/send") - public void sendMsg(){ - ProducerRecord producerRecord = new ProducerRecord<>("zeshi", "你好啊"); + public String sendMsg(){ + JSONObject entries = new JSONObject(); + entries.set("vin","vin123468"); + entries.set("name","宝马"); + String entriesString = entries.toString(); + ProducerRecord producerRecord = new ProducerRecord<>("zeshi", entriesString); kafkaProducer.send(producerRecord); + return "OK"; } + /** + * 发送MQ消息 + * @return String + */ + @GetMapping("/sendMQ") + public String sendMQ(){ + rabbitTemplate.convertAndSend("long_time_no_see","晨哀,好久不见",message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); + return "OK"; + } + + /** + * 发送MQ队列消息 + * @return String + */ + @GetMapping("/sendDui") + public String sedDui() { + rabbitTemplate.convertAndSend("myExchange","Im.fine",""); + return "OK"; + } } From 4e1790a47c7d4cb84ba5d14248331df70cc86974 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:24:15 +0800 Subject: [PATCH 06/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9E=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E7=B1=BB=E5=9E=8B,=E4=BA=8B=E4=BB=B6=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E6=8E=A5=E5=8F=A3,=E7=AD=96=E7=95=A5=E5=8F=91?= =?UTF-8?q?=E9=80=81=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/processing/basic/EventCustom.java | 24 +++++++++++++++++++ .../muyu/processing/basic/EventListener.java | 16 +++++++++++++ .../muyu/processing/basic/EventPublisher.java | 23 ++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventListener.java create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java new file mode 100644 index 0000000..83b1028 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java @@ -0,0 +1,24 @@ +package com.muyu.processing.basic; + +import cn.hutool.json.JSONObject; +import org.springframework.context.ApplicationEvent; + +/** + * 事件类型 + * @Author:杨鹏 + * @Package:com.muyu.processing.basic + * @Project:car-cloud-server + * @name:EventCustom + * @Date:2024/9/29 21:18 + */ +public class EventCustom extends ApplicationEvent{ + private JSONObject data; + public EventCustom(Object source, JSONObject data) { + super(source); + this.data = data; + } + + public JSONObject getData(){ + return data; + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventListener.java new file mode 100644 index 0000000..a75baee --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventListener.java @@ -0,0 +1,16 @@ +package com.muyu.processing.basic; + +import org.springframework.context.ApplicationListener; + +/** + * 事件监听接口 + * @Author:杨鹏 + * @Package:com.muyu.processing.basic + * @Project:car-cloud-server + * @name:EventListener + * @Date:2024/9/29 22:29 + */ +public interface EventListener extends ApplicationListener { + void onEvent(EventCustom event); + +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java new file mode 100644 index 0000000..2551004 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java @@ -0,0 +1,23 @@ +package com.muyu.processing.basic; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; + +/** + * 策略发送事件 + * @Author:杨鹏 + * @Package:com.muyu.processing.basic + * @Project:car-cloud-server + * @name:EventPublisher + * @Date:2024/9/29 22:31 + */ +public class EventPublisher implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher publisher; + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher){ + this.publisher = applicationEventPublisher; + } + +} From 98560aacdcb47c443733165a882d8282ab04811b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:24:43 +0800 Subject: [PATCH 07/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9E=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E7=9B=91=E5=90=AC=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/muyu/processing/config/AppConfig.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/AppConfig.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/AppConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/AppConfig.java new file mode 100644 index 0000000..c3ca58a --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/AppConfig.java @@ -0,0 +1,23 @@ +package com.muyu.processing.config; + +import com.muyu.processing.listener.AddDatabaseListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 事件监听器 + * @Author:杨鹏 + * @Package:com.muyu.processing.config + * @Project:car-cloud-server + * @name:AppConfig + * @Date:2024/9/29 22:23 + */ +@Configuration +public class AppConfig { + + @Bean + public AddDatabaseListener addDatabaseListener(){ + return new AddDatabaseListener(); + } + +} From 0a2fc7a9ce1ac3a18601a09a398f67c0158693fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 10:25:26 +0800 Subject: [PATCH 08/10] =?UTF-8?q?feat:()=20=E6=B7=BB=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/AddDatabaseListener.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java new file mode 100644 index 0000000..006d5f2 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java @@ -0,0 +1,34 @@ +package com.muyu.processing.listener; + +import cn.hutool.json.JSONObject; +import com.muyu.processing.basic.EventCustom; +import com.muyu.processing.basic.EventListener; + +import java.util.ArrayList; + +/** + * 添加数据库事件 + * @Author:杨鹏 + * @Package:com.muyu.processing.listener + * @Project:car-cloud-server + * @name:AddDatabaseListener + * @Date:2024/9/29 22:25 + */ +public class AddDatabaseListener implements EventListener { + @Override + public void onEvent(EventCustom event) { + JSONObject jsonObject = event.getData(); + ArrayList keys = new ArrayList<>(); + ArrayList values = new ArrayList<>(); + jsonObject.forEach((key, value) ->{ + keys.add(key); + values.add(value); + }); + // 添加数据库 + } + + @Override + public void onApplicationEvent(EventCustom event) { + onEvent(event); + } +} From eade0c66eac5018c7d99482aef11dbd275ac0976 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Fri, 4 Oct 2024 16:48:40 +0800 Subject: [PATCH 09/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9E=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF=E7=9B=91=E5=90=AC=E5=92=8C=E4=BF=AE=E6=94=B9=E4=B8=8A?= =?UTF-8?q?=E7=BA=BF=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/OfflineMonitoringConsumer.java | 37 +++++++++++++++++++ ...mer.java => OnLineMonitoringConsumer.java} | 18 ++++++--- 2 files changed, 50 insertions(+), 5 deletions(-) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java rename cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/{MQConsumer.java => OnLineMonitoringConsumer.java} (70%) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java new file mode 100644 index 0000000..460fbe5 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -0,0 +1,37 @@ +package com.muyu.processing.consumer; + +import com.muyu.processing.utils.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 下线监听 + * @Author:杨鹏 + * @Package:com.muyu.processing.consumer + * @Project:cloud-vehicle + * @name:OfflineMonitoringConsumer + * @Date:2024/10/4 14:48 + */ +@Log4j2 +@Component +public class OfflineMonitoringConsumer { + + @Resource + private CacheUtil cacheUtil; + + /** + * 接收消息 + * @param vin 车辆vin + */ + @RabbitListener(queuesToDeclare = @Queue("offline_monitoring")) + public void receive(String vin){ + log.info("清除缓存中的数据,车辆vin: {}", vin); + // 清除缓存 + cacheUtil.remove(vin); + } + +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java similarity index 70% rename from cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java rename to cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index 4dbc156..673b466 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -1,5 +1,6 @@ package com.muyu.processing.consumer; +import com.muyu.processing.utils.CacheUtil; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; @@ -7,8 +8,10 @@ import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + /** - * MQ消费者 + * 上线监听 * @Author:杨鹏 * @Package:com.muyu.processing.consumer * @Project:car-cloud-server @@ -17,15 +20,20 @@ import org.springframework.stereotype.Component; */ @Log4j2 @Component -public class MQConsumer { - +public class OnLineMonitoringConsumer { + @Resource + private CacheUtil cacheUtil; + /** + * 上线监听车辆网关中车辆上线时 + */ @RabbitListener(queuesToDeclare = @Queue("long_time_no_see")) - public void receive(String haha, Message message, Channel channel){ + public void receive(String vin, Message message, Channel channel){ try { - log.info("MQ消费的消息的内容为{}",haha); + log.info("添加本地缓存,车辆vin: {}", vin); + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { From 2430d10401036f5d89aa7986f6f6de4da0558399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Mon, 7 Oct 2024 10:10:26 +0800 Subject: [PATCH 10/10] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9E=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E5=92=8C=E4=BF=AE=E6=94=B9=E4=B8=8A=E7=BA=BF=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/KafkaConsumerService.java | 19 +++++- .../consumer/OfflineMonitoringConsumer.java | 8 +++ .../consumer/OnLineMonitoringConsumer.java | 34 ++++++++++- .../processing/interfaces/EventInterface.java | 2 + .../com/muyu/processing/utils/CacheUtil.java | 58 +++++++++++++++++++ 5 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 835f553..1c2d7fd 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -6,7 +6,12 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.muyu.common.core.constant.KafkaConstants; +import com.muyu.domain.Fence; +import com.muyu.domain.Vehicle; +import com.muyu.domain.WarnRule; +import com.muyu.domain.WarnStrategy; import com.muyu.processing.interfaces.EventInterface; +import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -17,6 +22,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.Duration; import java.util.Collection; +import java.util.Map; /** * kafka消费者 @@ -33,6 +39,9 @@ public class KafkaConsumerService implements InitializingBean { @Resource private KafkaConsumer kafkaConsumer; + @Resource + private CacheUtil cacheUtil; + // @Resource // private EventInterface eventInterface; @@ -54,8 +63,16 @@ public class KafkaConsumerService implements InitializingBean { JSONObject jsonObject = JSON.parseObject(originalMsg); log.info("消费数据转换为JSON对象: " + jsonObject); log.info("消费数据转换为JSON对象: " + jsonObject.toString()); -// eventInterface.handle(jsonObject); + String value = jsonObject.toString(); + String vin = value.substring(0, 11); + Map map = (Map) cacheUtil.get(vin); + WarnRule warnRule = (WarnRule) map.get("warnRule"); + WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); + Vehicle vehicle = (Vehicle) map.get("vehicle"); + Object breakdown = map.get("breakdown"); + Fence fence = (Fence) map.get("fence"); +// eventInterface.handle(jsonObject); } } }); diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java index 460fbe5..d3f6232 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -1,5 +1,9 @@ package com.muyu.processing.consumer; +import com.muyu.enterprise.cache.FaultCacheService; +import com.muyu.enterprise.cache.FenceCahceService; +import com.muyu.enterprise.cache.VehicleCacheService; +import com.muyu.enterprise.cache.WarnRuleCacheService; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Queue; @@ -23,6 +27,10 @@ public class OfflineMonitoringConsumer { @Resource private CacheUtil cacheUtil; + + + + /** * 接收消息 * @param vin 车辆vin diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index 673b466..4127ede 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -1,5 +1,10 @@ package com.muyu.processing.consumer; +import com.muyu.domain.Fence; +import com.muyu.domain.Vehicle; +import com.muyu.domain.WarnRule; +import com.muyu.domain.WarnStrategy; +import com.muyu.enterprise.cache.*; import com.muyu.processing.utils.CacheUtil; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; @@ -9,6 +14,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.HashMap; /** * 上线监听 @@ -25,6 +31,21 @@ public class OnLineMonitoringConsumer { @Resource private CacheUtil cacheUtil; + @Resource + private VehicleCacheService vehicleCacheService; + + @Resource + private FaultCacheService faultCacheService; + + @Resource + private FenceCahceService fenceCahceService; + + @Resource + private WarnRuleCacheService warnRuleCacheService; + + @Resource + private WarnStrategyCacheService warnStrategyCacheService; + /** * 上线监听车辆网关中车辆上线时 */ @@ -33,7 +54,18 @@ public class OnLineMonitoringConsumer { try { log.info("添加本地缓存,车辆vin: {}", vin); - + WarnRule warnRule = warnRuleCacheService.get(vin); + WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); + Vehicle vehicle = vehicleCacheService.get(vin); + Object breakdown = faultCacheService.get(vin); + Fence fence = fenceCahceService.get(vin); + HashMap map = new HashMap<>(); + map.put("warnRule",warnRule); + map.put("warnStrategy",warnStrategy); + map.put("vehicle",vehicle); + map.put("breakdown",breakdown); + map.put("fence",fence); + cacheUtil.put(vin,map); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java index 7301387..32b77d6 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java @@ -1,6 +1,8 @@ package com.muyu.processing.interfaces; import com.alibaba.fastjson.JSONObject; +import org.checkerframework.checker.units.qual.C; +import org.springframework.stereotype.Component; /** * 事件处理接口 diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java new file mode 100644 index 0000000..46e5e54 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java @@ -0,0 +1,58 @@ +package com.muyu.processing.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.stereotype.Component; + +/** + * 缓存工具类 + * @Author:杨鹏 + * @Package:com.muyu.processing.utils + * @Project:cloud-vehicle + * @name:CacheUtil + * @Date:2024/10/4 15:14 + */ +@Component +public class CacheUtil { + + /** + * 缓存对象 + */ + private final Cache cache; + + /** + * 默认构建函数 + */ + public CacheUtil(){ + this.cache = Caffeine.newBuilder() + .maximumSize(500L) + .build(); + } + + /** + * 获得缓存 + * @param key 键 + * @return 返回的值 + */ + public T get(String key){ + return cache.getIfPresent(key); + } + + /** + * 添加缓存 + * @param key 键 + * @param value 值 + */ + public void put(String key, T value){ + cache.put(key, value); + } + + /** + * 删除缓存 + * @param key 键 + */ + public void remove(String key){ + cache.invalidate(key); + } + +}