Compare commits

...

2 Commits

Author SHA1 Message Date
xinzirun 576befa0ee fix(): 修改nacos命名空间配置 2024-10-02 16:50:55 +08:00
xinzirun 6f95055a6b feat(): 新增rabbitmq公共模块代码 2024-10-02 16:49:39 +08:00
63 changed files with 397 additions and 2432 deletions

View File

@ -0,0 +1,44 @@
package com.muyu.common.rabbit.callback;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/10/2 9:26
* @Description broker
*/
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData
* @param ack truebrokerfalsebroker
* @param cause ackfalseacktrue
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到broker成功");
} else {
log.info("消息发送到broker失败失败原因{}", cause);
}
}
}

View File

@ -0,0 +1,39 @@
package com.muyu.common.rabbit.callback;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/10/2 9:33
* @Description
*/
@Slf4j
@Component
public class ReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
*/
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(),
returnedMessage.getExchange(), returnedMessage.getReplyText());
}
}

View File

@ -0,0 +1,72 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zi run
* @Date 2024/10/2 16:41
* @Description RabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.password}")
private String password;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitAdmin
*
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* RabbitMQ
*
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}

View File

@ -1,28 +1,41 @@
package com.muyu.common.rabbit;
package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
/**
* @Author: zi run
* @Date 2024/10/2 10:06
* @Description rabbitMQ
*/
@Configuration
public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
public class RabbitListenerConfig implements RabbitListenerConfigurer {
static {
// 设置为信任所有类型的反序列化,确保消息能够正确反序列化
System.setProperty("spring.amqp.deserialization.trust.all", "true");
}
//以下配置RabbitMQ消息服务
/**
* RabbitMQ
*/
@Autowired
public ConnectionFactory connectionFactory;
@Autowired
private MessageConverter jsonMessageConverter;
/**
*
* @return
* bean
*
* @return DefaultMessageHandlerMethodFactory
*/
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
@ -32,8 +45,14 @@ public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit
return factory;
}
/**
* RabbitMQ
*
* @param rabbitListenerEndpointRegistrar
*/
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
// 注册自定义的消息处理方法工厂
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}

View File

@ -0,0 +1,26 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zi run
* @Date 2024/10/2 9:17
* @Description rabbitMQ
*/
@Configuration
public class RabbitMQMessageConverterConfig {
/**
*
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -1 +1,3 @@
com.muyu.common.rabbit.RabbitListenerConfigurer
com.muyu.common.rabbit.config.RabbitAdminConfig
com.muyu.common.rabbit.config.RabbitListenerConfig
com.muyu.common.rabbit.config.RabbitMQMessageConverterConfig

View File

@ -1,74 +0,0 @@
package com.muyu.enterprise.MQTT;
import com.alibaba.nacos.api.remote.PushCallBack;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameClientMQTT
* @Date2024/9/28 12:11
*/
public class ClientMQTT {
//String topic = "vehicle";
// String content = "Message from MqttPublishSample";
// int qos = 2;
// String broker = "tcp://106.15.136.7:1883";
// String clientId = "JavaSample";
//MQTT代理服务器地址
public static final String HOST="tcp://106.15.136.7:1883";
public static final String TOPIC1="pos_message_all";
private static final String clientId="12345678";
private MqttClient client;
private MqttConnectOptions options;
private String userName="mqtt"; //非必须
private String passWord="mqtt"; //非必须
private void start(){
try{
// host为主机名clientid即连接MQTT的客户端ID一般以唯一标识符表示MemoryPersistence设置clientid的保存形式默认为以内存保存
client= new MqttClient(HOST,clientId,new MemoryPersistence());
// MQTT的连接设置
options=new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置断开后重新连接
options.setAutomaticReconnect(true);
// 设置回调
client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC1);
//setWill方法如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱
options.setWill(topic,"close".getBytes(),1,true);
client.connect(options);
//订阅消息
int[] Qos = {1} ; //0最多一次 、1最少一次 、2只有一次
String[] topics1 = {TOPIC1};
client.subscribe(topics1,Qos);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args){
ClientMQTT clientMQTT = new ClientMQTT();
clientMQTT.start();
}
}

View File

@ -1,69 +0,0 @@
package com.muyu.enterprise.MQTT;
/**
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameMQTTReceiveCallback
* @Date2024/9/27 22:21
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
*
*
* MqttCallbackCallBack MqttCallBack
*
*
*
*
* (1):public void messageArrived(MqttTopic topic, MqttMessage message)
*
* (2):public void connectionLost(Throwable cause)
*
* (3):public void deliveryComplete(MqttDeliveryToken token))
* QoS 1 QoS 2
* MqttClient.connect
*
*/
public class MQTTReceiveCallback implements MqttCallback {
// @Override
// public void connectionLost(Throwable throwable) {
// //连接丢失后,一般在这里面进行重连
// System.out.println("连接断开,可以做重连");
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// //subscribe后得到的消息会执行到这里面
// System.out.println("接收消息主题:"+topic);
// System.out.println("接收消息Qos"+message.getQos());
// System.out.println("接收消息内容:"+new String(message.getPayload()));
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("deliveryComplete----------"+token.isComplete());
// }
@Override
public void connectionLost(Throwable throwable){
//连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//subscribe后得到的消息会执行到这面
System.out.println("接收消息主题:"+topic);
System.out.println("接收消息Qos"+mqttMessage.getQos());
System.out.println("接收消息内容:"+new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete());
}
}

View File

@ -1,187 +0,0 @@
package com.muyu.enterprise.MQTT;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* @Author
* @Packagecom.muyu.enterprise.mqtt
* @Projectcloud-server
* @nameMyMqttClient
* @Date2024/9/27 22:20
*/
public class MyMqttClient {
public static MqttClient mqttClient =null;
private static MemoryPersistence memoryPersistence=null;
private static MqttConnectOptions mqttConectOptions=null;
private static String ClinentName=""; //待填 将在服务端出现的名字
private static String IP=""; //待填 服务器IP
public static void main(String[] args) {
start(ClinentName);
}
public static void start(String clientId){
//初始化连接设置对象
mqttConectOptions=new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
//这里设置为true表示每次连接到服务器都以新的身份连接
mqttConectOptions.setCleanSession(true);
//设置连接超时时间,单位是秒
mqttConectOptions.setConnectionTimeout(10);
//设置持久化方式
memoryPersistence=new MemoryPersistence();
if(null!=clientId){
try{
mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
System.out.println("连接状态:"+mqttClient.isConnected());
//设置连接和回调
if(null!=mqttClient){
if(!mqttClient.isConnected()){
//创建回调函数对象
MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
//客户端添加回调函数
mqttClient.setCallback(MQTTReceiveCallback);
//创建连接
try{
System.out.println("创建连接");
mqttClient.connect(mqttConectOptions);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
}else {
System.out.println("mqttClient为空");
}
System.out.println("连接状态"+mqttClient.isConnected());
}
// 关闭连接
public void closeConnect(){
//关闭储存方式
if(null!=memoryPersistence){
try{
memoryPersistence.close();
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("memoryPersistence为空");
}
if(null!=mqttClient){
if(mqttClient.isConnected()){
try{
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient未连接");
}
}else {
System.out.println("mqttClient为空");
}
}
//发布消息
public static void publishMessage(String pubTopic,String message,int qos){
if(null!=mqttClient && mqttClient.isConnected()){
System.out.println("发布消息"+mqttClient.isConnected());
System.out.println("id"+mqttClient.isConnected());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
MqttTopic topic = mqttClient.getTopic(pubTopic);
if(null!=topic){
try{
MqttDeliveryToken publish = topic.publish(mqttMessage);
if(!publish.isComplete()){
System.out.println("消息发布成功");
}
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}
}
}
//重新连接
public static void reConnect(){
if(null!=mqttClient&&mqttClient.isConnected()){
if(!mqttClient.isConnected()){
if(null!=mqttConectOptions){
try{
mqttClient.connect(mqttConectOptions);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttConnectOptions为空");
}
}else {
System.out.println("mqttClient为空或已连接");
}
}else {
start(ClinentName);
}
}
//订阅主题
public static void suvTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try{
mqttClient.subscribe(topic,1);
} catch (MqttException e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient出错");
}
}
//清空主题
public void cleanTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try{
mqttClient.subscribe(topic);
} catch (Exception e) {
// TODO 自动生成的捕获块
throw new RuntimeException(e);
}
}else {
System.out.println("mqttClient出错");
}
}
}

View File

@ -1,52 +0,0 @@
package com.muyu.enterprise.MQTT;
/**
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @namePushCallback
* @Date2024/9/28 14:35
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
*
*
* MqttCallbackCallBack MqttCallBack
*
*
*
*
* public void messageArrived(MqttTopic topic, MqttMessage message)
*
* public void connectionLost(Throwable cause)
*
* public void deliveryComplete(MqttDeliveryToken token))
* QoS 1 QoS 2
* MqttClient.connect
*
*/
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
}

View File

@ -1,113 +0,0 @@
package com.muyu.enterprise.MQTT;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
*
* @Author
* @Packagecom.muyu.enterprise.MQTT
* @Projectcloud-server
* @nameServerMQTT
* @Date2024/9/28 14:32
*/
@Log4j2
public class ServerMQTT {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
public static final String HOST = "tcp://127.0.0.1:1883";
//定义一个主题
public static final String TOPIC = "pos_message_all";
//定义MQTT的ID可以在MQTT服务配置中指定
private static final String clientId = "server11";
private MqttClient client;
private static MqttTopic topic11;
// private String userName = "mqtt"; //非必须
// private String passWord = "mqtt"; //非必须
private static MqttMessage message;
/**
*
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence设置clientid的保存形式默认为以内存保存
client=new MqttClient(HOST, clientId,new MemoryPersistence());
connect();
}
/**
*
*/
private void connect()
{
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
// options.setUserName(userName);
// options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try{
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttException
*/
public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{
MqttDeliveryToken token= topic.publish(message);
token.waitForCompletion();
System.out.println("消息已完全发布!"+token.isComplete());
log.info("消息已完全发布!"+token.isComplete());
}
/**
*
* @param clieId
* @param msg
* @throws Exception
*/
public static void sendMessage(String clieId,String msg)throws Exception{
ServerMQTT server = new ServerMQTT();
server.message = new MqttMessage();
server.message.setQos(1); //保证消息能到达一次
server.message.setRetained(true);
String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}";
try{
publish(server.topic11 , server.message);
//断开连接
// server.client.disconnect();
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
sendMessage("123444","哈哈哈");
}
}

View File

@ -81,5 +81,11 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
<!-- rabbit模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -15,11 +15,11 @@ import java.util.Collection;
/**
* @Author: zi run
* @Date 2024/9/29 16:53
* @Description
* @Description Kafka
*/
@Slf4j
@RequiredArgsConstructor
public class TestConsumer implements InitializingBean {
public class TestKafkaConsumer implements InitializingBean {
/**
* kafka

View File

@ -0,0 +1,70 @@
package com.muyu.event.process.consumer;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Author: zi run
* @Date 2024/10/2 10:35
* @Description RabbitMQ
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestRabbitMQConsumer {
/**
* redis
*/
private final StringRedisTemplate redisTemplate;
/**
*
*/
private static final String queueName = "test-rabbitmq";
/**
* redis
*/
private static final String testRabbitMQKey = "test-rabbitmq";
/**
* RabbitMQ
*
* @param msg String
* @param message
* @param channel RabbitMQ
*/
@RabbitListener(queuesToDeclare = @Queue(name = queueName))
public void consumer(String msg, Message message, Channel channel) {
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(testRabbitMQKey, messageId);
if (count != null && count.intValue() == 1) {
log.info("测试RabbitMQ消费者获取到消息消息内容{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("测试RabbitMQ消费者消费消息成功");
}
} catch (Exception e) {
redisTemplate.opsForSet().remove(testRabbitMQKey, messageId);
log.error("测试RabbitMQ消费者消费消息异常消息内容{},异常信息:{}", msg, e.getMessage());
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ex) {
log.error("测试RabbitMQ消费者回退消息异常消息内容{} 异常信息:{}", msg, ex.getMessage());
}
}
}
}

View File

@ -12,6 +12,7 @@ import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.Duration;
@ -52,6 +53,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener<C
* @param args
* @throws Exception
*/
@Async
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING);
@ -94,7 +96,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener<C
}
try {
kafkaConsumer.close(); // 关闭Kafka消费者
kafkaConsumer.close();
} catch (Exception e) {
log.error("关闭Kafka消费者时发生错误", e);
}

View File

@ -8,6 +8,7 @@ import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -16,6 +17,7 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* @Author: zi run
@ -42,6 +44,16 @@ public class TestEventController extends BaseController {
*/
private final TestIoTDBService testIoTDBService;
/**
* Spring AMQP
*/
private final RabbitTemplate rabbitTemplate;
/**
* rabbitMQ
*/
private static final String rabbitMQQueueName = "test-rabbitmq";
/**
* Kafka
*
@ -59,6 +71,20 @@ public class TestEventController extends BaseController {
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
/**
* RabbitMQ
*
* @return
*/
@GetMapping(value = "/sendRabbitmq")
public Result<String> sendRabbitMQ() {
rabbitTemplate.convertAndSend(rabbitMQQueueName, "测试RabbitMQ队列消息", message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
* @return

View File

@ -45,3 +45,5 @@ spring:
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# kafka共享配置
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# rabbit共享配置
- application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

View File

@ -1,20 +0,0 @@
package com.muyu.event.process;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: zi run
* @Date 2024/9/28 22:31
* @Description
*/
@EnableCustomConfig
@EnableMyFeignClients
@SpringBootApplication
public class CloudEventProcessApplication {
public static void main(String[] args) {
SpringApplication.run(CloudEventProcessApplication.class, args);
}
}

View File

@ -1,37 +0,0 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationEvent;
/**
* @Author: zi run
* @Date 2024/9/30 15:11
* @Description
*/
public class BasicEvent<T> extends ApplicationEvent {
/**
*
*/
private final T data;
/**
*
*
* @param source
* @param data
*/
public BasicEvent(Object source, T data) {
super(source);
this.data = data;
}
/**
*
*
* @return
*/
public T getData() {
return data;
}
}

View File

@ -1,37 +0,0 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/30 15:37
* @Description
*/
@Component
public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>> {
/**
*
*/
private final BasicEventListener<T> listener;
/**
*
*
* @param listener
*/
public BasicEventHandler(BasicEventListener<T> listener) {
this.listener = listener;
}
/**
*
*
* @param event
*/
@Override
public void onApplicationEvent(BasicEvent<T> event) {
listener.onEvent(event);
}
}

View File

@ -1,16 +0,0 @@
package com.muyu.event.process.basic;
/**
* @Author: zi run
* @Date 2024/9/30 15:35
* @Description
*/
public interface BasicEventListener<T> {
/**
*
*
* @param event
*/
void onEvent(BasicEvent<T> event);
}

View File

@ -1,38 +0,0 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/29 22:01
* @Description
*/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
/**
*
*/
private ApplicationEventPublisher publisher;
/**
*
*
* @param publisher
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
/**
*
* @param event
* @param <T>
*/
public <T> void publish(BasicEvent<T> event) {
publisher.publishEvent(event);
}
}

View File

@ -1,52 +0,0 @@
package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collection;
/**
* @Author: zi run
* @Date 2024/9/29 16:53
* @Description
*/
@Slf4j
//@Component
@RequiredArgsConstructor
public class TestConsumer implements InitializingBean {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
* kafka
*/
private static final String topicName = "test-topic";
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
log.info("启动线程监听Topic: {}", topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> {
String value = record.value();
log.info("从Kafka中消费的原始数据: {}", value);
});
}
}).start();
}
}

View File

@ -1,93 +0,0 @@
package com.muyu.event.process.consumer;
import com.muyu.event.process.basic.EventPublisher;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author: zi run
* @Date 2024/9/29 23:23
* @Description
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
*
*/
private final EventPublisher eventPublisher;
/**
* ()
*/
public final static String MESSAGE_PARSING = "MessageParsing";
/**
* 线线
*/
private final ExecutorService executorService =
Executors.newFixedThreadPool(10);
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
List<String> topics = Collections.singletonList(MESSAGE_PARSING);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
}
}
private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
String message = consumerRecord.value();
log.info("接收到车辆报文数据,内容:{}", message);
log.info("------------------------------------------------");
eventPublisher.publish(new IoTDBInsertDataEvent(this, message));
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭线程池和Kafka消费者");
try {
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
log.error("线程池关闭被中断,强制关闭", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
try {
kafkaConsumer.close(); // 关闭Kafka消费者
} catch (Exception e) {
log.error("关闭Kafka消费者时发生错误", e);
}
}
}

View File

@ -1,88 +0,0 @@
package com.muyu.event.process.controller;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.core.constant.Constants;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.web.controller.BaseController;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 16:24
* @Description
*/
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/test-event")
public class TestEventController extends BaseController {
/**
* kafka
*/
private final KafkaProducer<String, String> kafkaProducer;
/**
* kafka
*/
private static final String kafkaTopicName = "test-topic";
/**
* IoTDB
*/
private final TestIoTDBService testIoTDBService;
/**
* Kafka
*
* @return
*/
@GetMapping(value = "/sendKafka")
public Result<String> senKafka() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id","1");
jsonObject.put("name","张三");
jsonObject.put("age","18");
jsonObject.put("sex","男");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
kafkaProducer.send(producerRecord);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
* @return
*/
@GetMapping(value = "/list")
public Result<List<Map<String, Object>>> list() {
return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
*
* @return
*/
@PostMapping(value = "/save")
public Result<String> save() {
String deviceId = "root.test";
ArrayList<String> keyList = new ArrayList<>();
ArrayList<String> valueList = new ArrayList<>();
keyList.add("car_vin");
keyList.add("car_name");
valueList.add("VIN123456");
valueList.add("宝马");
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
}

View File

@ -1,20 +0,0 @@
package com.muyu.event.process.event;
import com.muyu.event.process.basic.BasicEvent;
/**
* @Author: zi run
* @Date 2024/9/29 21:19
* @Description IoTDB
*/
public class IoTDBInsertDataEvent extends BasicEvent<String> {
/**
* IoTDB
*
* @param messsge
*/
public IoTDBInsertDataEvent(Object source, String messsge) {
super(source, messsge);
}
}

View File

@ -1,72 +0,0 @@
package com.muyu.event.process.iotdb.basic.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.session.pool.SessionPool;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/28 22:41
* @Description IoTDB
*/
@Slf4j
@Component
@Configuration
public class IoTDBSessionConfig {
@Value("${spring.iotdb.username}")
private String username;
@Value("${spring.iotdb.password}")
private String password;
@Value("${spring.iotdb.ip}")
private String ip;
@Value("${spring.iotdb.port}")
private int port;
@Value("${spring.iotdb.maxSize}")
private int maxSize;
/**
* IoTDB
*/
private static SessionPool sessionPool = null;
/**
* IoTDB
* @return ioTDB
*/
public SessionPool getSessionPool() {
if (sessionPool == null) {
sessionPool = new SessionPool(ip, port, username, password, maxSize);
}
return sessionPool;
}
/**
* IoTDB
*
* @param deviceId
* @param time
* @param measurements
* @param values
*
* <p> IoTDB
* 便</p>
*/
public void insertRecord(String deviceId, long time, List<String> measurements, List<String> values) {
getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
sessionPool.insertRecord(deviceId, time, measurements, values);
} catch (Exception e) {
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
deviceId, time, measurements, values, e.getMessage());
}
}
}

View File

@ -1,290 +0,0 @@
package com.muyu.event.process.iotdb.basic.service;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/28 23:37
* @Description IoTDB
*/
public interface IService {
/**
* Tablet IoTDB
*
* @param tablet Tablet
*/
void insertTablet(Tablet tablet);
/**
* Tablets IoTDB
*
* @param tablets Map Tablets
*/
void insertTablets(Map<String, Tablet> tablets);
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param values
*/
void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param types
* @param values
*/
void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types, List<Object> values);
/**
* string
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
void insertStringRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList);
/**
*
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList);
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
/**
*
*
* @param path root.ln.wf01.wt01.temperature
* @param endTime
*/
void deleteData(String path, long endTime);
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param endTime
*/
void deleteData(List<String> paths, long endTime);
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param startTime
* @param endTime
* @param outTime
* @return SessionDataSet (Time,paths)
*/
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime);
/**
*
*
* @param paths "root.ln.wf01.wt01.temperature"
* @param startTime
* @param endTime
* @param outTime
* @param clazz
* @param <T>
* @return null
*/
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
Class<? extends IoTDbRecordAble> clazz);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @return SessionDataSet
*/
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @param clazz
* @return null
* @param <T>
*/
<T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz);
/**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @return SessionDataSet
*/
SessionDataSet executeLastDataQueryForOneDevice(String db, String device,
List<String> sensors, boolean isLegalPathNodes);
/**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @param clazz
* @return null
* @param <T>
*/
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz);
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @param slidingStep
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval, long slidingStep);
/**
* SQL
*
* @param sql SQLIotDB
* @return SessionDataSet null
*/
SessionDataSet executeQueryStatement(String sql);
/**
* SQL
*
* @param sql SQLIotDB
*/
void executeNonQueryStatement(String sql);
/**
*
*
* @param sessionDataSet SessionDataSet
* @param titleList
* @return Map
*/
List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList);
/**
*
*
* @param sessionDataSet
* @param titleList
* @param clazz
* @return
* @param <T>
*/
<T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
Class<? extends IoTDbRecordAble> clazz);
/**
* MeasurementSchemas
*
* @param obj
* @return MeasurementSchema
*/
List<MeasurementSchema> buildMeasurementSchemas(Object obj);
/**
* MeasurementSchemaValuesDTO
*
* @param obj
* @return MeasurementSchemaValuesDTO
*/
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj);
}

View File

@ -1,765 +0,0 @@
package com.muyu.event.process.iotdb.basic.service.impl;
import com.alibaba.fastjson.JSON;
import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig;
import com.muyu.event.process.iotdb.basic.service.IService;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/28 23:38
* @Description IoTDB
*/
@Slf4j
@Service
public class ServiceImpl implements IService {
/**
* IoTDB
*/
@Autowired
private IoTDBSessionConfig ioTDBSessionConfig;
/**
* Tablet IoTDB
*
* @param tablet Tablet
*/
@Override
public void insertTablet(Tablet tablet) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库tablet:[{}]", tablet);
sessionPool.insertTablet(tablet);
} catch (Exception e) {
log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage());
}
}
/**
* Tablets IoTDB
*
* @param tablets Map Tablets
*/
@Override
public void insertTablets(Map<String, Tablet> tablets) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库tablets:[{}]", tablets);
sessionPool.insertTablets(tablets);
} catch (Exception e) {
log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage());
}
}
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param values
*/
@Override
public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
sessionPool.insertRecord(deviceId, time, measurements, values);
} catch (Exception e) {
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
deviceId, time, measurements, values, e.getMessage());
}
}
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param types
* @param values
*/
@Override
public void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types, List<Object> values) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], types:[{}], values:[{}]",
deviceId, measurements, types, values);
sessionPool.insertRecord(deviceId, time, measurements, types, values);
} catch (Exception e) {
log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " +
"values={}, error={}", deviceId, time, measurements, types, values, e.getMessage());
}
}
/**
* string
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
@Override
public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], valuesList:[{}]",
deviceIds, measurementsList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " +
"valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage());
}
}
/**
*
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
@Override
public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]",
deviceIds, measurementsList, typesList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " +
"valuesList={}, error={}",
deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
}
}
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
@Override
public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], valuesList:[{}]",
deviceId, measurementsList, valuesList);
sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " +
"measurementsList={}, valuesList={}, error={}",
deviceId, times, measurementsList, valuesList, e.getMessage());
}
}
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
@Override
public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]",
deviceId, measurementsList, typesList, valuesList);
sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " +
"measurementsList={},typesList={},valuesList={}, error={}",
deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
}
}
/**
*
*
* @param path root.ln.wf01.wt01.temperature
* @param endTime
*/
@Override
public void deleteData(String path, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据删除path:[{}], endTime:[{}]", path, endTime);
sessionPool.deleteData(path, endTime);
} catch (Exception e) {
log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
}
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param endTime
*/
@Override
public void deleteData(List<String> paths, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据删除paths:[{}], endTime:[{}]", paths, endTime);
sessionPool.deleteData(paths, endTime);
} catch (Exception e) {
log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
}
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param startTime
* @param endTime
* @param outTime
* @return SessionDataSet (Time,paths)
*/
@Override
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询paths:[{}], startTime[{}], endTime:[{}],outTime:[{}]",
paths, startTime, endTime, outTime);
sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}], " +
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
*
*
* @param paths "root.ln.wf01.wt01.temperature"
* @param startTime
* @param endTime
* @param outTime
* @param clazz
* @param <T>
* @return null
*/
@Override
public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}], " +
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
}
return resultEntities;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @return SessionDataSet
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询paths:[{}], lastTime:[{}]", paths, lastTime);
sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}",
paths, lastTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @param clazz
* @return null
* @param <T>
*/
@Override
public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}",
paths, lastTime, e.getMessage());
}
return resultEntities;
}
/**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @return SessionDataSet
*/
@Override
public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]",
db, device, sensors, isLegalPathNodes);
sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " +
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @param clazz
* @return null
* @param <T>
*/
@Override
public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes,
Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " +
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
}
return resultEntities;
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}]", paths, aggregations);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,error={}",
paths, aggregations, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}]",
paths, aggregations, startTime, endTime);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}], " +
"startTime[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}]",
paths, aggregations, startTime, endTime, interval);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval
);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] , " +
"startTime[{}], endTime:[{}], interval:[{}], error={}",
paths, aggregations, startTime, endTime, interval, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @param slidingStep
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval, long slidingStep) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}], " +
"slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime,
interval, slidingStep);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] , " +
"startTime[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}",
paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* SQL
*
* @param sql SQLIotDB
* @return SessionDataSet null
*/
@Override
public SessionDataSet executeQueryStatement(String sql) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb SQL查询sql:[{}]", sql);
sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* SQL
*
* @param sql SQLIotDB
*/
@Override
public void executeNonQueryStatement(String sql) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb SQL无查询sql:[{}]", sql);
sessionPool.executeNonQueryStatement(sql);
} catch (Exception e) {
log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
}
}
/**
*
*
* @param sessionDataSet SessionDataSet
* @param titleList
* @return Map
*/
@SneakyThrows
@Override
public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
int fetchSize = sessionDataSet.getFetchSize();
List<Map<String, Object>> resultList = new ArrayList<>();
titleList.remove("Time");
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(next.getTimestamp());
resultMap.put("time", timeString);
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
resultMap.put(splitString(titleList.get(i)), null);
} else {
resultMap.put(splitString(titleList.get(i)),
field.getObjectValue(field.getDataType()).toString());
}
}
resultList.add(resultMap);
}
}
return resultList;
}
/**
*
*
* @param sessionDataSet
* @param titleList
* @param clazz
* @return
* @param <T>
*/
@SneakyThrows
@Override
public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
Class<? extends IoTDbRecordAble> clazz) {
int fetchSize = sessionDataSet.getFetchSize();
List<T> resultList = new ArrayList<>();
titleList.remove("Time");
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(next.getTimestamp());
resultMap.put("time", timeString);
if (titleList.stream().anyMatch(str -> str.contains("."))) {
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
String title = titleList.get(i);
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
resultMap.put(splitString(title), null);
} else {
resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
}
}
} else {
Field fieldName = fields.get(0);
Field fieldValue = fields.get(1);
Field fieldDataType = fields.get(2);
if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
Object mapValue = convertStringToType(
fieldValue.getObjectValue(fieldValue.getDataType()).toString(),
fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()
);
resultMap.put(splitString(mapKey), mapValue);
}
}
String jsonString = JSON.toJSONString(resultMap);
resultList.add(JSON.parseObject(jsonString, (Type) clazz));
}
}
return resultList;
}
/**
*
*
* @param str
* @return
*/
public static String splitString(String str) {
String[] parts = str.split("\\.");
if (parts.length <= 0) {
return str;
} else {
return parts[parts.length - 1];
}
}
/**
*
*
* @param value
* @param typeName
* @return
*/
public static Object convertStringToType(String value, String typeName) {
String type = typeName.toLowerCase();
if (type.isEmpty()) {
return value;
}
if ("boolean".equals(type)) {
return Boolean.parseBoolean(value);
} else if ("double".equals(type)) {
return Double.parseDouble(value);
} else if ("int32".equals(type)) {
return Integer.parseInt(value);
} else if ("int64".equals(type)) {
return Long.parseLong(value);
} else if ("float".equals(type)) {
return Float.parseFloat(value);
} else if ("text".equals(type)) {
return value;
} else {
return value;
}
}
/**
* TSDataType
*
* @param type
* @return TSDataType
*/
public static TSDataType getTsDataTypeByString(String type) {
String typeName = splitString(type).toLowerCase();
if ("boolean".equals(typeName)) {
return TSDataType.BOOLEAN;
} else if ("double".equals(typeName)) {
return TSDataType.DOUBLE;
} else if ("int".equals(typeName) || "integer".equals(typeName)) {
return TSDataType.INT32;
} else if ("long".equals(typeName)) {
return TSDataType.INT64;
} else if ("float".equals(typeName)) {
return TSDataType.FLOAT;
} else if ("text".equals(typeName)) {
return TSDataType.TEXT;
} else if ("string".equals(typeName)) {
return TSDataType.TEXT;
} else {
return TSDataType.UNKNOWN;
}
}
/**
* MeasurementSchemas
*
* @param obj
* @return MeasurementSchema
*/
@Override
public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field ->
new MeasurementSchema(field.getName(),
getTsDataTypeByString(
field.getType().getName()
))).
collect(Collectors.toList());
return schemaList;
}
/**
* MeasurementSchemaValuesDTO
*
* @param obj
* @return MeasurementSchemaValuesDTO
*/
@SneakyThrows
@Override
public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
List<MeasurementSchema> schemaList = new ArrayList<>();
List<Object> values = new ArrayList<>();
List<Integer> valuesIsNullIndex = new ArrayList<>();
int valueIndex = 0;
for (java.lang.reflect.Field field : fields) {
MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(),
getTsDataTypeByString(field.getType().getName()));
schemaList.add(measurementSchema);
Object value = field.get(obj);
if (value == null) {
valuesIsNullIndex.add(valueIndex);
}
values.add(value);
valueIndex++;
}
measurementSchemaValuesDTO.setSchemaList(schemaList);
measurementSchemaValuesDTO.setValues(values);
return measurementSchemaValuesDTO;
}
}

View File

@ -1,33 +0,0 @@
package com.muyu.event.process.iotdb.domain;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @Author: zi run
* @Date 2024/9/29 0:11
* @Description JSON
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@Tag(name = "ionDB数据源对象")
public class DataJSON {
/**
*
*/
@Schema(name = "时间戳")
private Long timestamp;
/**
* JSON
*/
@Schema(name = "车辆JSON数据")
private String datasource;
}

View File

@ -1,36 +0,0 @@
package com.muyu.event.process.iotdb.domain;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @Author: zi run
* @Date 2024/9/29 0:18
* @Description
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ResultEntity extends IoTDbRecordAble {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
*
*/
private String time;
}

View File

@ -1,43 +0,0 @@
package com.muyu.event.process.iotdb.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author: zi run
* @Date 2024/9/29 0:22
* @Description
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TestDataType {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
* Double
*/
private Double testDouble;
/**
* Long
*/
private Long testLong;
}

View File

@ -1,67 +0,0 @@
package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 0:12
* @Description
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InsertDataDTO {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
* InsertDataDTO
*
* @return InsertDataDTO
*/
public InsertDataDTO buildOne() {
InsertDataDTO insertDataDTO = new InsertDataDTO();
insertDataDTO.setHardware("ss");
insertDataDTO.setStatus(true);
insertDataDTO.setTemperature(12.0F);
return insertDataDTO;
}
/**
* InsertDataDTO
*
* @return InsertDataDTO
*/
public List<InsertDataDTO> buildList() {
List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
int buildNum = 10;
for (int i = 0; i < buildNum; i++) {
InsertDataDTO insertDataDTO = new InsertDataDTO();
insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
insertDataDTO.setStatus(i % 2 == 0);
insertDataDTO.setTemperature(12.0F + i);
insertDataDTOS.add(insertDataDTO);
}
return insertDataDTOS;
}
}

View File

@ -1,12 +0,0 @@
package com.muyu.event.process.iotdb.domain.dto;
import lombok.Data;
/**
* @Author: zi run
* @Date 2024/9/29 0:23
* @Description IoTDB
*/
@Data
public class IoTDbRecordAble {
}

View File

@ -1,36 +0,0 @@
package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 0:26
* @Description
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class MeasurementSchemaValuesDTO {
/**
*
*/
private List<MeasurementSchema> schemaList;
/**
* schemaList
*/
private List<Object> values;
/**
*
*/
private List<Integer> valueIsNullIndex;
}

View File

@ -1,11 +0,0 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
/**
* @Author: zi run
* @Date 2024/9/29 22:38
* @Description IoTDB
*/
public interface IoTDBService extends IService {
}

View File

@ -1,20 +0,0 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:23
* @Description IoTDB
*/
public interface TestIoTDBService extends IService {
/**
* IoTDB
* @return
*/
List<Map<String, Object>> list();
}

View File

@ -1,14 +0,0 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.IoTDBService;
import org.springframework.stereotype.Service;
/**
* @Author: zi run
* @Date 2024/9/29 22:39
* @Description IoTDB
*/
@Service
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
}

View File

@ -1,33 +0,0 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.isession.SessionDataSet;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:24
* @Description IoTDB
*/
@Slf4j
@Service
public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService {
/**
* IoTDB
* @return
*/
@Override
public List<Map<String, Object>> list() {
String sql = "select * from root.test";
SessionDataSet sessionDataSet = this.executeQueryStatement(sql);
List<Map<String, Object>> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes());
log.info("查询IoTDB数据为{}", list.toString());
return list;
}
}

View File

@ -1,68 +0,0 @@
package com.muyu.event.process.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.process.basic.BasicEvent;
import com.muyu.event.process.basic.BasicEventListener;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import com.muyu.event.process.iotdb.service.IoTDBService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/29 22:12
* @Description IoTDB
*/
@Component
@RequiredArgsConstructor
public class IoTDBInsertDataListener implements BasicEventListener<String> {
/**
* IoTDB
*/
private final IoTDBService ioTDBService;
/**
*
*/
private static final String DEVICE_ID = "root.vehicle";
/**
* IoTDB
*
* @param event
*/
@Override
public void onEvent(BasicEvent<String> event) {
JSONObject data = JSONObject.parseObject(event.getData());
List<String> keyList = extractKeys(data);
List<String> valueList = extractValues(data);
ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList);
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractKeys(JSONObject data) {
return data.keySet().stream().collect(Collectors.toList());
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractValues(JSONObject data) {
return data.values().stream()
.map(Object::toString)
.collect(Collectors.toList());
}
}

View File

@ -54,6 +54,12 @@
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Mqtt3包 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- MuYu Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>

View File

@ -60,10 +60,10 @@
<artifactId>cloud-common-datascope</artifactId>
</dependency>
<!-- Mqtt3包 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- MuYu Common Log -->
@ -84,27 +84,31 @@
<artifactId>ecs20140526</artifactId>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-console</artifactId>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-util</artifactId>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>cloudapi20160714</artifactId>
</dependency>
</dependencies>
<repositories>
<repository>
<id>aliyun-repo</id>

43
pom.xml
View File

@ -51,6 +51,7 @@
<kafka.clients.verison>3.0.0</kafka.clients.verison>
<iotdb.session.verison>1.3.1</iotdb.session.verison>
<mybatis.plus.join.version>1.4.13</mybatis.plus.join.version>
<org.eclipse.paho.client.mqttv3.version>1.2.5</org.eclipse.paho.client.mqttv3.version>
</properties>
<!-- 依赖声明 -->
@ -219,6 +220,48 @@
<version>${iotdb.session.verison}</version>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>ecs20140526</artifactId>
<version>${ecs20140526.version}</version>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
<version>${tea-openapi.version}</version>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-console</artifactId>
<version>${tea-console.version}</version>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-util</artifactId>
<version>${tea-util.version}</version>
</dependency>
<!-- 阿里云创建ecs实例 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>cloudapi20160714</artifactId>
<version>${cloudapi20160714.version}</version>
</dependency>
<!-- Mqtt3包 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${org.eclipse.paho.client.mqttv3.version}</version>
</dependency>
<!-- 核心模块 -->
<dependency>
<groupId>com.muyu</groupId>