feat(): 添加事件处理业务

dev.vehicleGateway
xinzirun 2024-09-30 00:52:20 +08:00
parent 6594363a91
commit 7100bf71bf
23 changed files with 491 additions and 27 deletions

View File

@ -52,6 +52,12 @@
<artifactId>mysql-connector-j</artifactId> <artifactId>mysql-connector-j</artifactId>
</dependency> </dependency>
<!-- IotDB会话 -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
</dependency>
<!-- MuYu Common DataSource --> <!-- MuYu Common DataSource -->
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
@ -82,10 +88,10 @@
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-core</artifactId>
</dependency> </dependency>
<!-- IotDB会话 --> <!-- kafka模块-->
<dependency> <dependency>
<groupId>org.apache.iotdb</groupId> <groupId>com.muyu</groupId>
<artifactId>iotdb-session</artifactId> <artifactId>cloud-common-kafka</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,37 @@
package com.muyu.event.process.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEvent;
/**
* @Author: zi run
* @Date 2024/9/29 21:19
* @Description
*/
public class CustomEvent extends ApplicationEvent {
/**
*
*/
private final JSONObject data;
/**
*
*
* @param source
* @param data JSON
*/
public CustomEvent(Object source, JSONObject data) {
super(source);
this.data = data;
}
/**
*
*
* @return JSONObject
*/
public JSONObject getData() {
return data;
}
}

View File

@ -0,0 +1,18 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationListener;
/**
* @Author: zi run
* @Date 2024/9/29 21:29
* @Description
*/
public interface EventListener extends ApplicationListener<CustomEvent> {
/**
*
*
* @param event
*/
void onEvent(CustomEvent event);
}

View File

@ -0,0 +1,39 @@
package com.muyu.event.process.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/29 22:01
* @Description
*/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
/**
*
*/
private ApplicationEventPublisher applicationEventPublisher;
/**
*
*
* @param applicationEventPublisher
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
*
*
* @param messages JSON
*/
public void publishEvent(JSONObject messages) {
applicationEventPublisher.publishEvent(new CustomEvent(this, messages));
}
}

View File

@ -0,0 +1,19 @@
package com.muyu.event.process.config;
import com.muyu.event.process.listener.AddDatabaseListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zi run
* @Date 2024/9/29 21:29
* @Description
*/
@Configuration
public class EventListenerConfig {
@Bean
public AddDatabaseListener addDatabaseListener() {
return new AddDatabaseListener();
}
}

View File

@ -0,0 +1,52 @@
package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collection;
/**
* @Author: zi run
* @Date 2024/9/29 16:53
* @Description
*/
@Slf4j
//@Component
@RequiredArgsConstructor
public class TestConsumer implements InitializingBean {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
* kafka
*/
private static final String topicName = "test-topic";
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
log.info("启动线程监听Topic: {}", topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> {
String value = record.value();
log.info("从Kafka中消费的原始数据: {}", value);
});
}
}).start();
}
}

View File

@ -0,0 +1,82 @@
package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.event.process.basic.EventPublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/29 23:23
* @Description
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VehicleConsumer implements InitializingBean {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
*
*/
private final EventPublisher eventPublisher;
/**
* ()
*/
public final static String MESSAGE_PARSING = "test-topic";
// @Override
// public void run(ApplicationArguments args) throws Exception {
// log.info("开始监听kafka-topic{}", MESSAGE_PARSING);
// List<String> topicList = Collections.singletonList(MESSAGE_PARSING);
// kafkaConsumer.subscribe(topicList);
//
// while (true) {
// ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
// consumerRecords.forEach(record -> {
// String value = record.value();
// log.info("接收到车辆报文数据,内容:{}", value);
// eventPublisher.publishEvent(JSONObject.parseObject(value));
// });
// }
// }
@Async
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
ThreadUtil.sleep(100);
Collection<String> topics = Lists.newArrayList(MESSAGE_PARSING);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(consumerRecord -> {
String message = consumerRecord.value();
log.info("接收到车辆报文数据,内容:{}", message);
eventPublisher.publishEvent(JSONObject.parseObject(message));
});
}
}).start();
}
}

View File

@ -0,0 +1,88 @@
package com.muyu.event.process.controller;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.core.constant.Constants;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.web.controller.BaseController;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 16:24
* @Description
*/
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/test-event")
public class TestEventController extends BaseController {
/**
* kafka
*/
private final KafkaProducer<String, String> kafkaProducer;
/**
* kafka
*/
private static final String kafkaTopicName = "test-topic";
/**
* IoTDB
*/
private final TestIoTDBService testIoTDBService;
/**
* Kafka
*
* @return
*/
@GetMapping(value = "/sendKafka")
public Result<String> senKafka() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id",1);
jsonObject.put("name","张三");
jsonObject.put("age",18);
jsonObject.put("sex","男");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
kafkaProducer.send(producerRecord);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
* @return
*/
@GetMapping(value = "/list")
public Result<List<Map<String, Object>>> list() {
return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
*
* @return
*/
@PostMapping(value = "/save")
public Result<String> save() {
String deviceId = "root.test";
ArrayList<String> keyList = new ArrayList<>();
ArrayList<String> valueList = new ArrayList<>();
keyList.add("car_vin");
keyList.add("car_name");
valueList.add("VIN123456");
valueList.add("宝马");
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.basic.config; package com.muyu.event.process.iotdb.basic.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.session.pool.SessionPool;

View File

@ -1,7 +1,7 @@
package com.muyu.event.process.basic.service; package com.muyu.event.process.iotdb.basic.service;
import com.muyu.event.process.domain.dto.IoTDbRecordAble; import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO; import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@ -14,9 +14,9 @@ import java.util.Map;
/** /**
* @Author: zi run * @Author: zi run
* @Date 2024/9/28 23:37 * @Date 2024/9/28 23:37
* @Description IoTDB * @Description IoTDB
*/ */
public interface IoTDBService { public interface IService {
/** /**
* Tablet IoTDB * Tablet IoTDB

View File

@ -1,11 +1,10 @@
package com.muyu.event.process.basic.service.impl; package com.muyu.event.process.iotdb.basic.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.muyu.event.process.basic.config.IoTDBSessionConfig; import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig;
import com.muyu.event.process.basic.service.IoTDBService; import com.muyu.event.process.iotdb.basic.service.IService;
import com.muyu.event.process.domain.dto.IoTDbRecordAble; import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO; import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TAggregationType;
@ -17,6 +16,7 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.lang.reflect.Type; import java.lang.reflect.Type;
@ -26,17 +26,17 @@ import java.util.stream.Collectors;
/** /**
* @Author: zi run * @Author: zi run
* @Date 2024/9/28 23:38 * @Date 2024/9/28 23:38
* @Description IoTDB * @Description IoTDB
*/ */
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor public class ServiceImpl implements IService {
public class IoTDBServiceImpl implements IoTDBService {
/** /**
* IoTDB * IoTDB
*/ */
private final IoTDBSessionConfig ioTDBSessionConfig; @Autowired
private IoTDBSessionConfig ioTDBSessionConfig;
/** /**
* Tablet IoTDB * Tablet IoTDB

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain; package com.muyu.event.process.iotdb.domain;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;

View File

@ -1,6 +1,6 @@
package com.muyu.event.process.domain; package com.muyu.event.process.iotdb.domain;
import com.muyu.event.process.domain.dto.IoTDbRecordAble; import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain; package com.muyu.event.process.iotdb.domain;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain.dto; package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain.dto; package com.muyu.event.process.iotdb.domain.dto;
import lombok.Data; import lombok.Data;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain.dto; package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;

View File

@ -0,0 +1,11 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
/**
* @Author: zi run
* @Date 2024/9/29 22:38
* @Description IoTDB
*/
public interface IoTDBService extends IService {
}

View File

@ -0,0 +1,20 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:23
* @Description IoTDB
*/
public interface TestIoTDBService extends IService {
/**
* IoTDB
* @return
*/
List<Map<String, Object>> list();
}

View File

@ -0,0 +1,14 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.IoTDBService;
import org.springframework.stereotype.Service;
/**
* @Author: zi run
* @Date 2024/9/29 22:39
* @Description IoTDB
*/
@Service
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
}

View File

@ -0,0 +1,33 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.isession.SessionDataSet;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:24
* @Description IoTDB
*/
@Slf4j
@Service
public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService {
/**
* IoTDB
* @return
*/
@Override
public List<Map<String, Object>> list() {
String sql = "select * from root.test";
SessionDataSet sessionDataSet = this.executeQueryStatement(sql);
List<Map<String, Object>> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes());
log.info("查询IoTDB数据为{}", list.toString());
return list;
}
}

View File

@ -0,0 +1,43 @@
package com.muyu.event.process.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.process.basic.CustomEvent;
import com.muyu.event.process.basic.EventListener;
import com.muyu.event.process.iotdb.basic.service.IService;
import com.muyu.event.process.iotdb.service.IoTDBService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 22:12
* @Description
*/
public class AddDatabaseListener implements EventListener {
/**
* IoTDB
*/
@Autowired
private IoTDBService ioTDBService;
@Override
public void onEvent(CustomEvent event) {
JSONObject data = event.getData();
List<String> keyList = new ArrayList<>();
List<String> valueList = new ArrayList<>();
data.forEach((key, value) -> {
keyList.add(key);
valueList.add((String) value);
});
ioTDBService.insertStringRecord("root.vehicle", System.currentTimeMillis(), keyList, valueList);
}
@Override
public void onApplicationEvent(CustomEvent event) {
onEvent(event);
}
}

View File

@ -42,4 +42,6 @@ spring:
# 系统共享配置 # 系统共享配置
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置 # 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# kafka共享配置
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}