Merge branch 'dev.carData' into dev

# Conflicts:
#	cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java
dev.carData
张腾 2024-10-08 09:23:27 +08:00
commit fe70b7d3c1
17 changed files with 322 additions and 144 deletions

View File

@ -1,9 +1,11 @@
package com.muyu.carData; package com.muyu.carData;
import com.muyu.carData.listener.MyListener; import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients; import com.muyu.common.security.annotation.EnableMyFeignClients;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.scheduling.annotation.EnableAsync;
/** /**
* @Author * @Author
@ -14,11 +16,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
*/ */
@SpringBootApplication @SpringBootApplication
@EnableMyFeignClients @EnableMyFeignClients
@EnableCustomConfig
@EnableAsync
@EnableCaching
public class CarDataApplication { public class CarDataApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication application = new SpringApplication(CarDataApplication.class); SpringApplication.run(CarDataApplication.class,args);
application.addListeners(new MyListener());
application.run(args);
} }
} }

View File

@ -1,15 +1,22 @@
package com.muyu.carData.config.lotdbconfig; package com.muyu.carData.config.lotdbconfig;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session; import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
/** /**
@ -76,6 +83,66 @@ public class IotDBSessionConfig {
} }
/**
*
* @param deviceId
* @param time
* @param measurements
* @param values
*/
public void execute(String deviceId,long time,List<String> measurements,List<String> values){
if (CollectionUtils.isEmpty(measurements) && !CollectionUtils.isEmpty(values)){
try {
iotSession().insertAlignedRecord(deviceId,time,measurements,values);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
}
}
}
/**
*
* @param sql
* @return
*/
public List<HashMap<String,Object>> executeQuery(String sql){
logger.info("sql:{}",sql);
List<HashMap<String,Object>> list = new ArrayList<>();
try {
SessionDataSet sessionDataSet = iotSession().executeQueryStatement(sql);
int fetchSize = sessionDataSet.getFetchSize();
List<String> columnNames = sessionDataSet.getColumnNames();
logger.info("columnNames:{}",columnNames);
List<String> columnTypes = sessionDataSet.getColumnTypes();
logger.info("columnTypes:{}",columnTypes);
if (fetchSize > 0){
while (sessionDataSet.hasNext()){
HashMap<String, Object> map = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
String key = field.getStringValue();
Object value = field.getObjectValue(field.getDataType());
map.put(key,value);
}
list.add(map);
}
sessionDataSet.closeOperationHandle();
}
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
return list;
}

View File

@ -0,0 +1,34 @@
package com.muyu.carData.consumer;
import com.muyu.carData.util.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** 线
* @Author
* @Packagecom.muyu.carData.consumer
* @Projectcloud-server-8
* @nameCarOffConsumer
* @Date2024/10/4 14:30
*/
@Log4j2
@Component
public class CarOffConsumer {
@Autowired
private CacheUtil cacheUtil;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "CAR_OFFLINE",durable = "true"),
exchange = @Exchange(value = "CAR_OFF_EXCHANGE",type = "fanout")
))
public void inline(String vin){
log.info("车辆vin:{},车辆下线成功!开始消费...");
cacheUtil.remove(vin);
}
}

View File

@ -0,0 +1,33 @@
package com.muyu.carData.consumer;
import com.muyu.carData.util.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**线
* @Author
* @Packagecom.muyu.carData.consumer
* @Projectcloud-server-8
* @nameCarOnlineConsumer
* @Date2024/10/4 14:27
*/
@Log4j2
@Component
public class CarOnlineConsumer {
@Autowired
private CacheUtil cacheUtil;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "CAR_ONLINE",durable = "true"),
exchange = @Exchange(value = "ONLINE_EXCHANGE",type = "fanout")
))
public void online(String vin){
log.info("车辆vin:{},已上线,开始消费",vin);
}
}

View File

@ -1,19 +1,23 @@
package com.muyu.carData.consumer; package com.muyu.carData.consumer;
import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSONObject;
import cn.hutool.json.JSONUtil; import com.muyu.carData.event.EventPublisher;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.carData.pojo.Student;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.Collection; import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* @Author * @Author
@ -24,33 +28,43 @@ import java.util.Collection;
*/ */
@Component @Component
@Log4j2 @Log4j2
public class MyKafkaConsumer implements InitializingBean { public class MyKafkaConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
@Autowired @Autowired
private KafkaConsumer kafkaConsumer; private KafkaConsumer kafkaConsumer;
@Autowired
private EventPublisher eventPublisher;
private final String topicName = "carJsons"; private final String topicName = "carJsons";
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override @Override
public void afterPropertiesSet() throws Exception { public void run(ApplicationArguments args) throws Exception {
log.info("启动线程开始监听topic:{}",topicName); log.info("开始监听kafka-topic:{}",topicName);
Thread thread = new Thread(() -> { List<String> topics = Collections.singletonList(topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics); kafkaConsumer.subscribe(topics);
while (true){ while (true){
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { consumerRecords.forEach(record ->{
//从consumerRecord中获取消费数据 executorService.submit(() -> handleRecord(record));
String value = consumerRecord.value(); log.info("数据为:{},消费成功!",record);
log.info("从Kafka中消费的原始数据===============>>{}",value);
}
}
}); });
thread.start(); }
}
log.info("启动线程结束监听topic:{}",topicName);
private void handleRecord(ConsumerRecord<String,String> record) {
String value = record.value();
JSONObject jsonObject = JSONObject.parseObject(value);
eventPublisher.publishEvent(jsonObject);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭kafka和线程");
kafkaConsumer.close();
executorService.shutdown();
} }
} }

View File

@ -1,3 +1,4 @@
/*
package com.muyu.carData.controller; package com.muyu.carData.controller;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
@ -8,13 +9,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
*/
/** /**
* @Author * @Author
* @Packagecom.muyu.carData.controller * @Packagecom.muyu.carData.controller
* @Projectcloud-server-8 * @Projectcloud-server-8
* @nameTestController * @nameTestController
* @Date2024/9/26 23:56 * @Date2024/9/26 23:56
*/ *//*
@RestController @RestController
@RequestMapping("/testCache") @RequestMapping("/testCache")
@Log4j2 @Log4j2
@ -42,3 +45,4 @@ public class CacheController {
} }
} }
*/

View File

@ -1,3 +1,4 @@
/*
package com.muyu.carData.controller; package com.muyu.carData.controller;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -9,13 +10,15 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
*/
/** /**
* @Author * @Author
* @Packagecom.muyu.carData.testcontroller * @Packagecom.muyu.carData.testcontroller
* @Projectcloud-server-8 * @Projectcloud-server-8
* @nameKafkaProducerController * @nameKafkaProducerController
* @Date2024/9/28 15:10 * @Date2024/9/28 15:10
*/ *//*
@RestController @RestController
@RequestMapping("/produce") @RequestMapping("/produce")
@Log4j2 @Log4j2
@ -43,3 +46,4 @@ public class KafkaProducerController {
} }
*/

View File

@ -15,8 +15,12 @@ public class EsSaveEvent extends ApplicationEvent {
private JSONObject data; private JSONObject data;
public EsSaveEvent(JSONObject source) { public EsSaveEvent(Object source,JSONObject data) {
super(source); super(source);
this.data = source; this.data = data;
}
public JSONObject getData(){
return data;
} }
} }

View File

@ -0,0 +1,15 @@
package com.muyu.carData.event;
import org.springframework.context.ApplicationListener;
/**
* @Author
* @Packagecom.muyu.carData.event
* @Projectcloud-server-8
* @nameEventListener
* @Date2024/10/4 10:39
*/
public interface EventListener extends ApplicationListener<EsSaveEvent> {
void onEvent(EsSaveEvent event);
}

View File

@ -0,0 +1,28 @@
package com.muyu.carData.event;
import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.event
* @Projectcloud-server-8
* @nameEventPublisher
* @Date2024/10/4 10:40
*/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publishEvent(JSONObject message){
EsSaveEvent esSaveEvent = new EsSaveEvent(this, message);
publisher.publishEvent(esSaveEvent);
}
}

View File

@ -1,22 +0,0 @@
package com.muyu.carData.listener;
import com.muyu.carData.event.EsSaveEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameCustomEventListener
* @Date2024/9/29 23:49
*/
@Component
public class CustomEventListener {
@EventListener
public void handMyEvent(EsSaveEvent event){
//处理事件详情
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.carData.listener;
import com.alibaba.fastjson.JSONObject;
import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig;
import com.muyu.carData.event.EsSaveEvent;
import com.muyu.carData.event.EventListener;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameInsertIotDBListener
* @Date2024/10/6 9:25
*/
@Log4j2
@Component
public class InsertIotDBListener implements EventListener {
@Override
public void onEvent(EsSaveEvent event) {
JSONObject jsonObject = event.getData();
log.info("持久化:监听到数据:{}", jsonObject);
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
jsonObject.forEach((key,value) -> {
keys.add(key);
values.add((String) value);
});
IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig();
long time = System.currentTimeMillis();
iotDBSessionConfig.execute("root.vehicle",time,keys,values);
log.info("数据写入成功");
}
@Override
public void onApplicationEvent(EsSaveEvent event) {
onEvent(event);
}
}

View File

@ -1,20 +0,0 @@
package com.muyu.carData.listener;
import com.muyu.carData.event.EsSaveEvent;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.ApplicationListener;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameMyListener
* @Date2024/9/29 21:18
*/
@Log4j2
public class MyListener implements ApplicationListener<EsSaveEvent> {
@Override
public void onApplicationEvent(EsSaveEvent event) {
log.info("监听到自定义事件........");
}
}

View File

@ -1,41 +0,0 @@
package com.muyu.carData.pojo;
import com.muyu.carData.config.cacheconfig.ExpiryTime;
import lombok.*;
import lombok.experimental.SuperBuilder;
/**
* @Author
* @Packagecom.muyu.carData.pojo
* @Projectcloud-server-8
* @nameStudent
* @Date2024/9/27 0:40
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class Student extends ExpiryTime{
/**
*
*/
private Integer id;
/**
*
*/
private String name;
/**
*
*/
private String sex;
/**
*
*/
private long time = System.currentTimeMillis();
}

View File

@ -1,29 +0,0 @@
package com.muyu.carData.pulisher;
import com.alibaba.fastjson.JSONObject;
import com.muyu.carData.event.EsSaveEvent;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.pulisher
* @Projectcloud-server-8
* @nameCustomEventPublisher
* @Date2024/9/29 23:51
*/
@Log4j2
@Component
@AllArgsConstructor
public class CustomEventPublisher {
private ApplicationEventPublisher applicationEventPublisher;
public void publish(JSONObject data){
EsSaveEvent esSaveEvent = new EsSaveEvent(data);
applicationEventPublisher.publishEvent(esSaveEvent);
log.info("事件发布成功 - 消息是:{}",data);
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.carData.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.util
* @Projectcloud-server-8
* @nameCacheUtil
* @Date2024/10/4 10:42
*/
@Component
public class CacheUtil<T> {
private final Cache<String,T> cache;
public CacheUtil() {
this.cache = Caffeine.newBuilder()
.maximumSize(500L)
.build();
}
public T get(String key) {
return (T)cache.getIfPresent(key);
}
public void put(String key, T value) {
cache.put(key, value);
}
public void remove(String key) {
cache.invalidate(key);
}
}

View File

@ -22,6 +22,8 @@ import java.util.List;
* mqtt * mqtt
* *
* @ClassName MqttTest * @ClassName MqttTest
* @Description
* @Date 2024/9/28 23:49
*/ */
@Slf4j @Slf4j
@Component @Component