diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java deleted file mode 100644 index 8055e42..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.muyu.common.kafka.config; - -import com.muyu.common.kafka.constants.KafkaConstants; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - * kafka 消息的消费者 配置类 - */ -@Configuration -public class KafkaConsumerConfig { - - @Bean - public KafkaConsumer kafkaConsumer() { - Map configs = new HashMap<>(); - //kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "60.204.221.52:9092"); - //开启consumer的偏移量(offset)自动提交到Kafka - configs.put("enable.auto.commit", true); - //consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒 - configs.put("auto.commit.interval", 5000); - //在Kafka中没有初始化偏移量或者当前偏移量不存在情况 - //earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量 - //latest, 在偏移量无效的情况下, 自动重置为最新的偏移量 - //none, 在偏移量无效的情况下, 抛出异常. - configs.put("auto.offset.reset", "latest"); - //请求阻塞的最大时间(毫秒) - configs.put("fetch.max.wait", 500); - //请求应答的最小字节数 - configs.put("fetch.min.size", 1); - //心跳间隔时间(毫秒) - configs.put("heartbeat-interval", 3000); - //一次调用poll返回的最大记录条数 - configs.put("max.poll.records", 500); - //指定消费组 - configs.put("group.id", KafkaConstants.KafkaGrop); - //指定key使用的反序列化类 - Deserializer keyDeserializer = new StringDeserializer(); - //指定value使用的反序列化类 - Deserializer valueDeserializer = new StringDeserializer(); - //创建Kafka消费者 - KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer); - return kafkaConsumer; - } - -} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java deleted file mode 100644 index 07b56d3..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.muyu.common.kafka.config; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - * kafka 消息的生产者 配置类 - */ -@Configuration -public class KafkaProviderConfig { - - @Bean - public KafkaProducer kafkaProducer() { - Map configs = new HashMap<>(); - //#kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "47.116.173.119:9092"); - //客户端发送服务端失败的重试次数 - configs.put("retries", 2); - //多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求. - //此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位) - configs.put("batch.size", 16384); - //生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位) - configs.put("buffer-memory", 33554432); - //生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化 - //acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1. - //acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失. - //acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失. - configs.put("acks", "-1"); - //指定key使用的序列化类 - Serializer keySerializer = new StringSerializer(); - //指定value使用的序列化类 - Serializer valueSerializer = new StringSerializer(); - //创建Kafka生产者 - KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer); - return kafkaProducer; - } - -} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java deleted file mode 100644 index b1b7180..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.muyu.common.kafka.constants; - - -public class KafkaConstants { - - public final static String KafkaTopic = "carJsons"; - -// public final static String KafkaGrop = "kafka_grop"; -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java index b4c997b..ef967fa 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java @@ -1,9 +1,11 @@ package com.muyu.carData; -import com.muyu.carData.listener.MyListener; +import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.scheduling.annotation.EnableAsync; /** * @Author:张腾 @@ -14,11 +16,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; */ @SpringBootApplication @EnableMyFeignClients +@EnableCustomConfig +@EnableAsync +@EnableCaching public class CarDataApplication { public static void main(String[] args) { - SpringApplication application = new SpringApplication(CarDataApplication.class); - application.addListeners(new MyListener()); - application.run(args); + SpringApplication.run(CarDataApplication.class,args); } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java index 9561bb3..989b907 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java @@ -1,15 +1,22 @@ package com.muyu.carData.config.lotdbconfig; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -76,6 +83,66 @@ public class IotDBSessionConfig { } + /** + * 添加操作 + * @param deviceId + * @param time 时间戳 + * @param measurements + * @param values 值 + */ + public void execute(String deviceId,long time,List measurements,List values){ + if (CollectionUtils.isEmpty(measurements) && !CollectionUtils.isEmpty(values)){ + try { + iotSession().insertAlignedRecord(deviceId,time,measurements,values); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 查询操作 + * @param sql + * @return + */ + public List> executeQuery(String sql){ + logger.info("sql:{}",sql); + List> list = new ArrayList<>(); + + try { + SessionDataSet sessionDataSet = iotSession().executeQueryStatement(sql); + int fetchSize = sessionDataSet.getFetchSize(); + List columnNames = sessionDataSet.getColumnNames(); + logger.info("columnNames:{}",columnNames); + List columnTypes = sessionDataSet.getColumnTypes(); + logger.info("columnTypes:{}",columnTypes); + if (fetchSize > 0){ + while (sessionDataSet.hasNext()){ + HashMap map = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + String key = field.getStringValue(); + Object value = field.getObjectValue(field.getDataType()); + map.put(key,value); + } + list.add(map); + } + + sessionDataSet.closeOperationHandle(); + } + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + return list; + } + diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java new file mode 100644 index 0000000..77a77b7 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java @@ -0,0 +1,34 @@ +package com.muyu.carData.consumer; + +import com.muyu.carData.util.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** 车辆下线消费者 + * @Author:张腾 + * @Package:com.muyu.carData.consumer + * @Project:cloud-server-8 + * @name:CarOffConsumer + * @Date:2024/10/4 14:30 + */ +@Log4j2 +@Component +public class CarOffConsumer { + + @Autowired + private CacheUtil cacheUtil; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "CAR_OFFLINE",durable = "true"), + exchange = @Exchange(value = "CAR_OFF_EXCHANGE",type = "fanout") + )) + public void inline(String vin){ + log.info("车辆vin:{},车辆下线成功!开始消费..."); + cacheUtil.remove(vin); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java new file mode 100644 index 0000000..b27827b --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java @@ -0,0 +1,33 @@ +package com.muyu.carData.consumer; + +import com.muyu.carData.util.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/**车辆上线消费者 + * @Author:张腾 + * @Package:com.muyu.carData.consumer + * @Project:cloud-server-8 + * @name:CarOnlineConsumer + * @Date:2024/10/4 14:27 + */ +@Log4j2 +@Component +public class CarOnlineConsumer { + + @Autowired + private CacheUtil cacheUtil; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "CAR_ONLINE",durable = "true"), + exchange = @Exchange(value = "ONLINE_EXCHANGE",type = "fanout") + )) + public void online(String vin){ + log.info("车辆vin:{},已上线,开始消费",vin); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java index 8d0aecc..c7e592c 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java @@ -1,19 +1,23 @@ package com.muyu.carData.consumer; -import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.json.JSONUtil; -import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import com.muyu.carData.pojo.Student; +import com.alibaba.fastjson.JSONObject; +import com.muyu.carData.event.EventPublisher; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import java.time.Duration; -import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /**卡夫卡消费者 * @Author:张腾 @@ -24,33 +28,43 @@ import java.util.Collection; */ @Component @Log4j2 -public class MyKafkaConsumer implements InitializingBean { +public class MyKafkaConsumer implements ApplicationRunner, ApplicationListener { @Autowired private KafkaConsumer kafkaConsumer; + @Autowired + private EventPublisher eventPublisher; + private final String topicName = "carJsons"; + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + @Override - public void afterPropertiesSet() throws Exception { - log.info("启动线程开始监听topic:{}",topicName); - Thread thread = new Thread(() -> { - ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(topicName); - kafkaConsumer.subscribe(topics); - while (true){ - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord consumerRecord : consumerRecords) { - //从consumerRecord中获取消费数据 - String value = consumerRecord.value(); - log.info("从Kafka中消费的原始数据===============>>:{}",value); - } - } - }); - thread.start(); + public void run(ApplicationArguments args) throws Exception { + log.info("开始监听kafka-topic:{}",topicName); + List topics = Collections.singletonList(topicName); + kafkaConsumer.subscribe(topics); - log.info("启动线程结束监听topic:{}",topicName); + while (true){ + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(record ->{ + executorService.submit(() -> handleRecord(record)); + log.info("数据为:{},消费成功!",record); + }); + } + } + private void handleRecord(ConsumerRecord record) { + String value = record.value(); + JSONObject jsonObject = JSONObject.parseObject(value); + eventPublisher.publishEvent(jsonObject); + } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("关闭kafka和线程"); + kafkaConsumer.close(); + executorService.shutdown(); } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java index f2d2c53..bc94adc 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java @@ -1,3 +1,4 @@ +/* package com.muyu.carData.controller; import com.github.benmanes.caffeine.cache.Cache; @@ -8,13 +9,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +*/ /** * @Author:张腾 * @Package:com.muyu.carData.controller * @Project:cloud-server-8 * @name:TestController * @Date:2024/9/26 23:56 - */ + *//* + @RestController @RequestMapping("/testCache") @Log4j2 @@ -42,3 +45,4 @@ public class CacheController { } } +*/ diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java index b167ef7..a0f5bcc 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java @@ -1,3 +1,4 @@ +/* package com.muyu.carData.controller; import com.alibaba.fastjson.JSONObject; @@ -9,13 +10,15 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +*/ /** * @Author:张腾 * @Package:com.muyu.carData.testcontroller * @Project:cloud-server-8 * @name:KafkaProducerController * @Date:2024/9/28 15:10 - */ + *//* + @RestController @RequestMapping("/produce") @Log4j2 @@ -43,3 +46,4 @@ public class KafkaProducerController { } +*/ diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java index f4d689f..fab5a76 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java @@ -15,8 +15,12 @@ public class EsSaveEvent extends ApplicationEvent { private JSONObject data; - public EsSaveEvent(JSONObject source) { + public EsSaveEvent(Object source,JSONObject data) { super(source); - this.data = source; + this.data = data; + } + + public JSONObject getData(){ + return data; } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java new file mode 100644 index 0000000..8bb30a0 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java @@ -0,0 +1,15 @@ +package com.muyu.carData.event; + +import org.springframework.context.ApplicationListener; + +/** 事件监听接口 + * @Author:张腾 + * @Package:com.muyu.carData.event + * @Project:cloud-server-8 + * @name:EventListener + * @Date:2024/10/4 10:39 + */ +public interface EventListener extends ApplicationListener { + + void onEvent(EsSaveEvent event); +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java new file mode 100644 index 0000000..e7bbc8c --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java @@ -0,0 +1,28 @@ +package com.muyu.carData.event; + +import com.alibaba.fastjson.JSONObject; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; + +/** 策略发送事件 + * @Author:张腾 + * @Package:com.muyu.carData.event + * @Project:cloud-server-8 + * @name:EventPublisher + * @Date:2024/10/4 10:40 + */ +@Component +public class EventPublisher implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher publisher; + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } + + public void publishEvent(JSONObject message){ + EsSaveEvent esSaveEvent = new EsSaveEvent(this, message); + publisher.publishEvent(esSaveEvent); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java deleted file mode 100644 index 4d63af0..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.muyu.carData.listener; - -import com.muyu.carData.event.EsSaveEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -/** - * @Author:张腾 - * @Package:com.muyu.carData.listener - * @Project:cloud-server-8 - * @name:CustomEventListener - * @Date:2024/9/29 23:49 - */ -@Component -public class CustomEventListener { - - @EventListener - public void handMyEvent(EsSaveEvent event){ - //处理事件详情 - - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java new file mode 100644 index 0000000..a617dcb --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java @@ -0,0 +1,45 @@ +package com.muyu.carData.listener; + +import com.alibaba.fastjson.JSONObject; +import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig; +import com.muyu.carData.event.EsSaveEvent; +import com.muyu.carData.event.EventListener; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +/** 数据持久化事件 + * @Author:张腾 + * @Package:com.muyu.carData.listener + * @Project:cloud-server-8 + * @name:InsertIotDBListener + * @Date:2024/10/6 9:25 + */ +@Log4j2 +@Component +public class InsertIotDBListener implements EventListener { + @Override + public void onEvent(EsSaveEvent event) { + JSONObject jsonObject = event.getData(); + log.info("持久化:监听到数据:{}", jsonObject); + List keys = new ArrayList<>(); + List values = new ArrayList<>(); + + jsonObject.forEach((key,value) -> { + keys.add(key); + values.add((String) value); + }); + + IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig(); + long time = System.currentTimeMillis(); + iotDBSessionConfig.execute("root.vehicle",time,keys,values); + log.info("数据写入成功"); + } + + @Override + public void onApplicationEvent(EsSaveEvent event) { + onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java deleted file mode 100644 index 5bfcb26..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.carData.listener; - -import com.muyu.carData.event.EsSaveEvent; -import lombok.extern.log4j.Log4j2; -import org.springframework.context.ApplicationListener; - -/**自定义监听器 - * @Author:张腾 - * @Package:com.muyu.carData.listener - * @Project:cloud-server-8 - * @name:MyListener - * @Date:2024/9/29 21:18 - */ -@Log4j2 -public class MyListener implements ApplicationListener { - @Override - public void onApplicationEvent(EsSaveEvent event) { - log.info("监听到自定义事件........"); - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java deleted file mode 100644 index 90d9937..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.muyu.carData.pojo; - -import com.muyu.carData.config.cacheconfig.ExpiryTime; -import lombok.*; -import lombok.experimental.SuperBuilder; - -/** - * @Author:张腾 - * @Package:com.muyu.carData.pojo - * @Project:cloud-server-8 - * @name:Student - * @Date:2024/9/27 0:40 - */ - -@Data -@SuperBuilder -@NoArgsConstructor -@AllArgsConstructor -@EqualsAndHashCode(callSuper = true) -public class Student extends ExpiryTime{ - - /** - * 主键 - */ - private Integer id; - - /** - * 姓名 - */ - private String name; - - /** - * 性别 - */ - private String sex; - - /** - * 插入时间 - */ - private long time = System.currentTimeMillis(); -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java deleted file mode 100644 index 2a32391..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.muyu.carData.pulisher; - -import com.alibaba.fastjson.JSONObject; -import com.muyu.carData.event.EsSaveEvent; -import lombok.AllArgsConstructor; -import lombok.extern.log4j.Log4j2; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.stereotype.Component; - -/**事件发布测试 - * @Author:张腾 - * @Package:com.muyu.carData.pulisher - * @Project:cloud-server-8 - * @name:CustomEventPublisher - * @Date:2024/9/29 23:51 - */ -@Log4j2 -@Component -@AllArgsConstructor -public class CustomEventPublisher { - - private ApplicationEventPublisher applicationEventPublisher; - - public void publish(JSONObject data){ - EsSaveEvent esSaveEvent = new EsSaveEvent(data); - applicationEventPublisher.publishEvent(esSaveEvent); - log.info("事件发布成功 - 消息是:{}",data); - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java new file mode 100644 index 0000000..5501ba4 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java @@ -0,0 +1,37 @@ +package com.muyu.carData.util; + + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.stereotype.Component; + +/** 缓存工具类 + * @Author:张腾 + * @Package:com.muyu.carData.util + * @Project:cloud-server-8 + * @name:CacheUtil + * @Date:2024/10/4 10:42 + */ +@Component +public class CacheUtil { + + private final Cache cache; + + public CacheUtil() { + this.cache = Caffeine.newBuilder() + .maximumSize(500L) + .build(); + } + + public T get(String key) { + return (T)cache.getIfPresent(key); + } + + public void put(String key, T value) { + cache.put(key, value); + } + + public void remove(String key) { + cache.invalidate(key); + } +} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java deleted file mode 100644 index fa5f209..0000000 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.muyu.server.mqtt; - -import com.alibaba.fastjson.JSONObject; - -import com.muyu.domain.CarMessage; -import com.muyu.server.service.CarMessageService; -import jakarta.annotation.PostConstruct; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - - -@Component -public class Demo { - @Resource - private CarMessageService service; - @Resource - private KafkaProducer kafkaProducer; - @PostConstruct - public void test() { - - String topic = "vehicle"; - String content = "Message from MqttPublishSample"; - int qos = 2; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; - - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: "+broker); - sampleClient.connect(connOpts); - sampleClient.subscribe(topic,0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - - } - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - - List list= service.selectCarMessageList(1,2); - String str = new String( mqttMessage.getPayload() ); - System.out.println(str); - String[] test = str.split(" "); - String[] results = new String[list.size()]; - List> futures = new ArrayList<>(); - for (CarMessage carmsg : list) { - futures.add(CompletableFuture.supplyAsync(() -> { - int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1; - int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex())); - StringBuilder hexBuilder = new StringBuilder(); - for (int j = startIndex; j < endIndex; j++) { - hexBuilder.append(test[j]); - } - // 创建16进制的对象 - String hex = hexBuilder.toString(); - // 转橙字符数组 - char[] result = new char[hex.length() / 2]; - for (int x = 0; x < hex.length(); x += 2) { - // 先转十进制 - int high = Character.digit(hex.charAt(x), 16); - // 转二进制 - int low = Character.digit(hex.charAt(x + 1), 16); - // 转字符 - result[x / 2] = (char) ((high << 4) + low); - } - return new String(result); - })); - } - for (int i = 0; i < futures.size(); i++) { - results[i] = futures.get(i).get(); - } - String jsonString = JSONObject.toJSONString( results ); - ProducerRecord producerRecord = new ProducerRecord<>( "carJsons", jsonString); - kafkaProducer.send(producerRecord); - } - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - }); - } catch(MqttException me) { - System.out.println("reason "+me.getReasonCode()); - System.out.println("msg "+me.getMessage()); - System.out.println("loc "+me.getLocalizedMessage()); - System.out.println("cause "+me.getCause()); - System.out.println("excep "+me); - me.printStackTrace(); - } - } - -} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java index 0f0f5de..45e2100 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java @@ -1,7 +1,6 @@ package com.muyu.mqtt; import com.alibaba.fastjson2.JSONObject; -import com.muyu.common.kafka.constants.KafkaConstants; import com.muyu.domain.CarMessage; import com.muyu.domain.KafKaData; @@ -90,7 +89,7 @@ public class MqttTest { .build()); } String jsonString = JSONObject.toJSONString(kafKaDataList); - ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + ProducerRecord producerRecord = new ProducerRecord<>("carJsons", jsonString); kafkaProducer.send(producerRecord); log.info("kafka投产:{}", jsonString); // HashMap stringStringHashMap = new HashMap<>();