Compare commits
2 Commits
17245c91b3
...
576befa0ee
Author | SHA1 | Date |
---|---|---|
|
576befa0ee | |
|
6f95055a6b |
|
@ -0,0 +1,44 @@
|
|||
package com.muyu.common.rabbit.callback;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 9:26
|
||||
* @Description 消息发送到broker确认回调
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
|
||||
|
||||
@Resource
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.rabbitTemplate.setConfirmCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 回调确认方法(消息发送之后 消息无论发送成功还是失败都会执行这个回调方法)
|
||||
* @param correlationData 回调的相关数据
|
||||
* @param ack 确认结果(true表示消息已经被broker接收,false表示消息未被broker接收)
|
||||
* @param cause 失败原因(当ack为false时,表示拒绝接收消息的原因;当ack为true时,该值为空)
|
||||
*/
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
if (ack) {
|
||||
log.info("消息发送到broker成功!");
|
||||
} else {
|
||||
log.info("消息发送到broker失败,失败原因:{}", cause);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.muyu.common.rabbit.callback;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 9:33
|
||||
* @Description 消息发送失败时回调
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ReturnsCallback implements RabbitTemplate.ReturnsCallback {
|
||||
|
||||
@Resource
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
rabbitTemplate.setReturnsCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息发送到队列失败时执行
|
||||
* @param returnedMessage 返回的消息和元数据
|
||||
*/
|
||||
@Override
|
||||
public void returnedMessage(ReturnedMessage returnedMessage) {
|
||||
log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(),
|
||||
returnedMessage.getExchange(), returnedMessage.getReplyText());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 16:41
|
||||
* @Description RabbitMQ连接和管理功能配置
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitAdminConfig {
|
||||
|
||||
/**
|
||||
* RabbitMQ服务器的主机地址
|
||||
*/
|
||||
@Value("${spring.rabbitmq.host}")
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* RabbitMQ的用户名
|
||||
*/
|
||||
@Value("${spring.rabbitmq.username}")
|
||||
private String username;
|
||||
|
||||
/**
|
||||
* RabbitMQ的密码
|
||||
*/
|
||||
@Value("${spring.rabbitmq.password}")
|
||||
private String password;
|
||||
|
||||
/**
|
||||
* RabbitMQ的虚拟主机
|
||||
*/
|
||||
@Value("${spring.rabbitmq.virtualhost}")
|
||||
private String virtualhost;
|
||||
|
||||
/**
|
||||
* 创建并初始化RabbitAdmin实例
|
||||
*
|
||||
* @return RabbitAdmin 实例
|
||||
*/
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin() {
|
||||
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
|
||||
rabbitAdmin.setAutoStartup(true);
|
||||
return rabbitAdmin;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建RabbitMQ连接工厂
|
||||
*
|
||||
* @return ConnectionFactory 实例
|
||||
*/
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory() {
|
||||
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
||||
connectionFactory.setAddresses(host);
|
||||
connectionFactory.setUsername(username);
|
||||
connectionFactory.setPassword(password);
|
||||
connectionFactory.setVirtualHost(virtualhost);
|
||||
|
||||
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
|
||||
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
||||
connectionFactory.setPublisherReturns(true);
|
||||
return connectionFactory;
|
||||
}
|
||||
}
|
|
@ -1,28 +1,41 @@
|
|||
package com.muyu.common.rabbit;
|
||||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 10:06
|
||||
* @Description rabbitMQ的监听器配置
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
|
||||
public class RabbitListenerConfig implements RabbitListenerConfigurer {
|
||||
|
||||
static {
|
||||
// 设置为信任所有类型的反序列化,确保消息能够正确反序列化
|
||||
System.setProperty("spring.amqp.deserialization.trust.all", "true");
|
||||
}
|
||||
|
||||
//以下配置RabbitMQ消息服务
|
||||
/**
|
||||
* RabbitMQ连接工厂,用于创建连接
|
||||
*/
|
||||
@Autowired
|
||||
public ConnectionFactory connectionFactory;
|
||||
|
||||
@Autowired
|
||||
private MessageConverter jsonMessageConverter;
|
||||
|
||||
/**
|
||||
* 处理器方法工厂
|
||||
* @return
|
||||
* 创建处理器方法工厂的bean
|
||||
*
|
||||
* @return DefaultMessageHandlerMethodFactory 实例,用于处理消息的转换和方法调用
|
||||
*/
|
||||
@Bean
|
||||
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
|
||||
|
@ -32,8 +45,14 @@ public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit
|
|||
return factory;
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置RabbitMQ监听器的消息处理方法工厂。
|
||||
*
|
||||
* @param rabbitListenerEndpointRegistrar 实例,用于注册监听器的配置
|
||||
*/
|
||||
@Override
|
||||
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
|
||||
// 注册自定义的消息处理方法工厂
|
||||
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
|
||||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 9:17
|
||||
* @Description rabbitMQ消息转换器配置
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitMQMessageConverterConfig {
|
||||
|
||||
/**
|
||||
* 消息转换配置
|
||||
*
|
||||
* @return 消息转换器
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter jsonMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
|
@ -1 +1,3 @@
|
|||
com.muyu.common.rabbit.RabbitListenerConfigurer
|
||||
com.muyu.common.rabbit.config.RabbitAdminConfig
|
||||
com.muyu.common.rabbit.config.RabbitListenerConfig
|
||||
com.muyu.common.rabbit.config.RabbitMQMessageConverterConfig
|
|
@ -1,74 +0,0 @@
|
|||
package com.muyu.enterprise.MQTT;
|
||||
|
||||
import com.alibaba.nacos.api.remote.PushCallBack;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttTopic;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
|
||||
/**
|
||||
* 模拟一个客户端接收消息
|
||||
* @Author:李庆帅
|
||||
* @Package:com.muyu.enterprise.MQTT
|
||||
* @Project:cloud-server
|
||||
* @name:ClientMQTT
|
||||
* @Date:2024/9/28 12:11
|
||||
*/
|
||||
public class ClientMQTT {
|
||||
//String topic = "vehicle";
|
||||
// String content = "Message from MqttPublishSample";
|
||||
// int qos = 2;
|
||||
// String broker = "tcp://106.15.136.7:1883";
|
||||
// String clientId = "JavaSample";
|
||||
//MQTT代理服务器地址
|
||||
public static final String HOST="tcp://106.15.136.7:1883";
|
||||
public static final String TOPIC1="pos_message_all";
|
||||
private static final String clientId="12345678";
|
||||
private MqttClient client;
|
||||
private MqttConnectOptions options;
|
||||
private String userName="mqtt"; //非必须
|
||||
private String passWord="mqtt"; //非必须
|
||||
|
||||
private void start(){
|
||||
try{
|
||||
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
||||
client= new MqttClient(HOST,clientId,new MemoryPersistence());
|
||||
// MQTT的连接设置
|
||||
options=new MqttConnectOptions();
|
||||
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
|
||||
options.setCleanSession(false);
|
||||
// 设置连接的用户名
|
||||
options.setUserName(userName);
|
||||
// 设置连接的密码
|
||||
options.setPassword(passWord.toCharArray());
|
||||
// 设置超时时间 单位为秒
|
||||
options.setConnectionTimeout(10);
|
||||
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
|
||||
options.setKeepAliveInterval(20);
|
||||
//设置断开后重新连接
|
||||
options.setAutomaticReconnect(true);
|
||||
// 设置回调
|
||||
client.setCallback(new PushCallback());
|
||||
MqttTopic topic = client.getTopic(TOPIC1);
|
||||
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
|
||||
//遗嘱
|
||||
options.setWill(topic,"close".getBytes(),1,true);
|
||||
client.connect(options);
|
||||
//订阅消息
|
||||
int[] Qos = {1} ; //0:最多一次 、1:最少一次 、2:只有一次
|
||||
String[] topics1 = {TOPIC1};
|
||||
client.subscribe(topics1,Qos);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args){
|
||||
ClientMQTT clientMQTT = new ClientMQTT();
|
||||
clientMQTT.start();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,69 +0,0 @@
|
|||
package com.muyu.enterprise.MQTT;
|
||||
|
||||
/**
|
||||
* @Author:李庆帅
|
||||
* @Package:com.muyu.enterprise.MQTT
|
||||
* @Project:cloud-server
|
||||
* @name:MQTTReceiveCallback
|
||||
* @Date:2024/9/27 22:21
|
||||
*/
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
|
||||
/**
|
||||
* 发布消息的回调类
|
||||
*
|
||||
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
|
||||
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
|
||||
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
|
||||
* 必须在回调类中实现三个方法:
|
||||
*
|
||||
* (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
|
||||
*
|
||||
* (2):public void connectionLost(Throwable cause)在断开连接时调用。
|
||||
*
|
||||
* (3):public void deliveryComplete(MqttDeliveryToken token))
|
||||
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
|
||||
* 由 MqttClient.connect 激活此回调。
|
||||
*
|
||||
*/
|
||||
public class MQTTReceiveCallback implements MqttCallback {
|
||||
// @Override
|
||||
// public void connectionLost(Throwable throwable) {
|
||||
// //连接丢失后,一般在这里面进行重连
|
||||
// System.out.println("连接断开,可以做重连");
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
// //subscribe后得到的消息会执行到这里面
|
||||
// System.out.println("接收消息主题:"+topic);
|
||||
// System.out.println("接收消息Qos:"+message.getQos());
|
||||
// System.out.println("接收消息内容:"+new String(message.getPayload()));
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
// System.out.println("deliveryComplete----------"+token.isComplete());
|
||||
// }
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable){
|
||||
//连接丢失后,一般在这里面进行重连
|
||||
System.out.println("连接断开,可以做重连");
|
||||
}
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
||||
//subscribe后得到的消息会执行到这面
|
||||
System.out.println("接收消息主题:"+topic);
|
||||
System.out.println("接收消息Qos:"+mqttMessage.getQos());
|
||||
System.out.println("接收消息内容:"+new String(mqttMessage.getPayload()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,187 +0,0 @@
|
|||
package com.muyu.enterprise.MQTT;
|
||||
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
|
||||
/**
|
||||
* 客户端类
|
||||
* @Author:李庆帅
|
||||
* @Package:com.muyu.enterprise.mqtt
|
||||
* @Project:cloud-server
|
||||
* @name:MyMqttClient
|
||||
* @Date:2024/9/27 22:20
|
||||
*/
|
||||
public class MyMqttClient {
|
||||
|
||||
public static MqttClient mqttClient =null;
|
||||
private static MemoryPersistence memoryPersistence=null;
|
||||
private static MqttConnectOptions mqttConectOptions=null;
|
||||
private static String ClinentName=""; //待填 将在服务端出现的名字
|
||||
private static String IP=""; //待填 服务器IP
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
start(ClinentName);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void start(String clientId){
|
||||
//初始化连接设置对象
|
||||
mqttConectOptions=new MqttConnectOptions();
|
||||
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
|
||||
//这里设置为true表示每次连接到服务器都以新的身份连接
|
||||
mqttConectOptions.setCleanSession(true);
|
||||
//设置连接超时时间,单位是秒
|
||||
mqttConectOptions.setConnectionTimeout(10);
|
||||
//设置持久化方式
|
||||
memoryPersistence=new MemoryPersistence();
|
||||
if(null!=clientId){
|
||||
try{
|
||||
mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
System.out.println("连接状态:"+mqttClient.isConnected());
|
||||
//设置连接和回调
|
||||
if(null!=mqttClient){
|
||||
if(!mqttClient.isConnected()){
|
||||
//创建回调函数对象
|
||||
MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
|
||||
//客户端添加回调函数
|
||||
mqttClient.setCallback(MQTTReceiveCallback);
|
||||
//创建连接
|
||||
try{
|
||||
System.out.println("创建连接");
|
||||
mqttClient.connect(mqttConectOptions);
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient为空");
|
||||
}
|
||||
System.out.println("连接状态"+mqttClient.isConnected());
|
||||
}
|
||||
// 关闭连接
|
||||
public void closeConnect(){
|
||||
//关闭储存方式
|
||||
if(null!=memoryPersistence){
|
||||
try{
|
||||
memoryPersistence.close();
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}else {
|
||||
System.out.println("memoryPersistence为空");
|
||||
}
|
||||
|
||||
if(null!=mqttClient){
|
||||
if(mqttClient.isConnected()){
|
||||
try{
|
||||
mqttClient.disconnect();
|
||||
mqttClient.close();
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient未连接");
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient为空");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//发布消息
|
||||
public static void publishMessage(String pubTopic,String message,int qos){
|
||||
if(null!=mqttClient && mqttClient.isConnected()){
|
||||
System.out.println("发布消息"+mqttClient.isConnected());
|
||||
System.out.println("id"+mqttClient.isConnected());
|
||||
MqttMessage mqttMessage = new MqttMessage();
|
||||
mqttMessage.setQos(qos);
|
||||
|
||||
MqttTopic topic = mqttClient.getTopic(pubTopic);
|
||||
|
||||
if(null!=topic){
|
||||
try{
|
||||
MqttDeliveryToken publish = topic.publish(mqttMessage);
|
||||
if(!publish.isComplete()){
|
||||
System.out.println("消息发布成功");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//重新连接
|
||||
public static void reConnect(){
|
||||
if(null!=mqttClient&&mqttClient.isConnected()){
|
||||
if(!mqttClient.isConnected()){
|
||||
if(null!=mqttConectOptions){
|
||||
try{
|
||||
mqttClient.connect(mqttConectOptions);
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttConnectOptions为空");
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient为空或已连接");
|
||||
}
|
||||
}else {
|
||||
start(ClinentName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//订阅主题
|
||||
public static void suvTopic(String topic){
|
||||
if(null!=mqttClient && mqttClient.isConnected()){
|
||||
try{
|
||||
mqttClient.subscribe(topic,1);
|
||||
} catch (MqttException e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient出错");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//清空主题
|
||||
public void cleanTopic(String topic){
|
||||
if(null!=mqttClient && mqttClient.isConnected()){
|
||||
try{
|
||||
mqttClient.subscribe(topic);
|
||||
} catch (Exception e) {
|
||||
// TODO 自动生成的捕获块
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}else {
|
||||
System.out.println("mqttClient出错");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
package com.muyu.enterprise.MQTT;
|
||||
|
||||
/**
|
||||
* 发布消息的回调类
|
||||
* @Author:李庆帅
|
||||
* @Package:com.muyu.enterprise.MQTT
|
||||
* @Project:cloud-server
|
||||
* @name:PushCallback
|
||||
* @Date:2024/9/28 14:35
|
||||
*/
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
|
||||
/**
|
||||
* 发布消息的回调类
|
||||
*
|
||||
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
|
||||
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
|
||||
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
|
||||
* 必须在回调类中实现三个方法:
|
||||
*
|
||||
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
|
||||
*
|
||||
* public void connectionLost(Throwable cause)在断开连接时调用。
|
||||
*
|
||||
* public void deliveryComplete(MqttDeliveryToken token))
|
||||
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
|
||||
* 由 MqttClient.connect 激活此回调。
|
||||
*
|
||||
*/
|
||||
public class PushCallback implements MqttCallback {
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
// 连接丢失后,一般在这里面进行重连
|
||||
System.out.println("连接断开,可以做重连");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
||||
// subscribe后得到的消息会执行到这里面
|
||||
System.out.println("接收消息主题 : " + topic);
|
||||
System.out.println("接收消息Qos : " + mqttMessage.getQos());
|
||||
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
|
||||
}
|
||||
}
|
|
@ -1,113 +0,0 @@
|
|||
package com.muyu.enterprise.MQTT;
|
||||
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
|
||||
/**
|
||||
* 这是发送消息的服务端
|
||||
* 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
|
||||
* @Author:李庆帅
|
||||
* @Package:com.muyu.enterprise.MQTT
|
||||
* @Project:cloud-server
|
||||
* @name:ServerMQTT
|
||||
* @Date:2024/9/28 14:32
|
||||
*/
|
||||
@Log4j2
|
||||
public class ServerMQTT {
|
||||
|
||||
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
|
||||
public static final String HOST = "tcp://127.0.0.1:1883";
|
||||
//定义一个主题
|
||||
public static final String TOPIC = "pos_message_all";
|
||||
//定义MQTT的ID,可以在MQTT服务配置中指定
|
||||
private static final String clientId = "server11";
|
||||
|
||||
private MqttClient client;
|
||||
private static MqttTopic topic11;
|
||||
|
||||
// private String userName = "mqtt"; //非必须
|
||||
// private String passWord = "mqtt"; //非必须
|
||||
|
||||
private static MqttMessage message;
|
||||
|
||||
/**
|
||||
* 构造方法
|
||||
* @throws MqttException
|
||||
*/
|
||||
public ServerMQTT() throws MqttException {
|
||||
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
||||
client=new MqttClient(HOST, clientId,new MemoryPersistence());
|
||||
connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* 用来连接服务器
|
||||
*/
|
||||
private void connect()
|
||||
{
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setCleanSession(false);
|
||||
// options.setUserName(userName);
|
||||
// options.setPassword(passWord.toCharArray());
|
||||
// 设置超时时间
|
||||
options.setConnectionTimeout(10);
|
||||
// 设置会话心跳时间
|
||||
options.setKeepAliveInterval(20);
|
||||
try{
|
||||
client.setCallback(new PushCallback());
|
||||
client.connect(options);
|
||||
topic11 = client.getTopic(TOPIC);
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param topic
|
||||
* @param message
|
||||
* @throws MqttException
|
||||
*/
|
||||
public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{
|
||||
MqttDeliveryToken token= topic.publish(message);
|
||||
|
||||
token.waitForCompletion();
|
||||
System.out.println("消息已完全发布!"+token.isComplete());
|
||||
log.info("消息已完全发布!"+token.isComplete());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param clieId
|
||||
* @param msg
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void sendMessage(String clieId,String msg)throws Exception{
|
||||
ServerMQTT server = new ServerMQTT();
|
||||
server.message = new MqttMessage();
|
||||
server.message.setQos(1); //保证消息能到达一次
|
||||
server.message.setRetained(true);
|
||||
String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}";
|
||||
try{
|
||||
publish(server.topic11 , server.message);
|
||||
//断开连接
|
||||
// server.client.disconnect();
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
sendMessage("123444","哈哈哈");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -81,5 +81,11 @@
|
|||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- rabbit模块 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-rabbit</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -18,4 +18,4 @@ public class CloudEventProcessApplication {
|
|||
public static void main(String[] args) {
|
||||
SpringApplication.run(CloudEventProcessApplication.class, args);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,4 +34,4 @@ public class BasicEvent<T> extends ApplicationEvent {
|
|||
public T getData() {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,4 +34,4 @@ public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>>
|
|||
public void onApplicationEvent(BasicEvent<T> event) {
|
||||
listener.onEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,4 +13,4 @@ public interface BasicEventListener<T> {
|
|||
* @param event 事件对象
|
||||
*/
|
||||
void onEvent(BasicEvent<T> event);
|
||||
}
|
||||
}
|
|
@ -35,4 +35,4 @@ public class EventPublisher implements ApplicationEventPublisherAware {
|
|||
public <T> void publish(BasicEvent<T> event) {
|
||||
publisher.publishEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,11 +15,11 @@ import java.util.Collection;
|
|||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 16:53
|
||||
* @Description 测试消费者
|
||||
* @Description 测试Kafka消费者
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class TestConsumer implements InitializingBean {
|
||||
public class TestKafkaConsumer implements InitializingBean {
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
|
@ -48,4 +48,4 @@ public class TestConsumer implements InitializingBean {
|
|||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
package com.muyu.event.process.consumer;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/10/2 10:35
|
||||
* @Description 测试RabbitMQ消费者
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class TestRabbitMQConsumer {
|
||||
|
||||
/**
|
||||
* redis服务实例
|
||||
*/
|
||||
private final StringRedisTemplate redisTemplate;
|
||||
|
||||
|
||||
/**
|
||||
* 队列名称
|
||||
*/
|
||||
private static final String queueName = "test-rabbitmq";
|
||||
|
||||
/**
|
||||
* redis缓存键名
|
||||
*/
|
||||
private static final String testRabbitMQKey = "test-rabbitmq";
|
||||
|
||||
/**
|
||||
* 消费者方法,用于处理从RabbitMQ队列中接收到的消息
|
||||
*
|
||||
* @param msg 消息内容,类型为String
|
||||
* @param message 原始消息对象,包含消息的属性信息
|
||||
* @param channel RabbitMQ通道,用于进行消息确认和拒绝操作
|
||||
*/
|
||||
@RabbitListener(queuesToDeclare = @Queue(name = queueName))
|
||||
public void consumer(String msg, Message message, Channel channel) {
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
try {
|
||||
Long count = redisTemplate.opsForSet().add(testRabbitMQKey, messageId);
|
||||
|
||||
if (count != null && count.intValue() == 1) {
|
||||
log.info("测试RabbitMQ消费者获取到消息,消息内容:{}", msg);
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
log.info("测试RabbitMQ消费者消费消息成功!");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
redisTemplate.opsForSet().remove(testRabbitMQKey, messageId);
|
||||
log.error("测试RabbitMQ消费者消费消息异常,消息内容:{},异常信息:{}", msg, e.getMessage());
|
||||
|
||||
try {
|
||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||
} catch (IOException ex) {
|
||||
log.error("测试RabbitMQ消费者回退消息异常,消息内容:{}, 异常信息:{}", msg, ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -12,6 +12,7 @@ import org.springframework.boot.ApplicationArguments;
|
|||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
|
@ -52,6 +53,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener<C
|
|||
* @param args 应用程序参数
|
||||
* @throws Exception 如果启动过程中发生错误
|
||||
*/
|
||||
@Async
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING);
|
||||
|
@ -94,9 +96,9 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener<C
|
|||
}
|
||||
|
||||
try {
|
||||
kafkaConsumer.close(); // 关闭Kafka消费者
|
||||
kafkaConsumer.close();
|
||||
} catch (Exception e) {
|
||||
log.error("关闭Kafka消费者时发生错误", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@ import com.muyu.event.process.iotdb.service.TestIoTDBService;
|
|||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
@ -16,6 +17,7 @@ import org.springframework.web.bind.annotation.RestController;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
|
@ -42,6 +44,16 @@ public class TestEventController extends BaseController {
|
|||
*/
|
||||
private final TestIoTDBService testIoTDBService;
|
||||
|
||||
/**
|
||||
* 发送和接收消息的Spring AMQP组件
|
||||
*/
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* rabbitMQ队列名称
|
||||
*/
|
||||
private static final String rabbitMQQueueName = "test-rabbitmq";
|
||||
|
||||
/**
|
||||
* 发送Kafka测试消息
|
||||
*
|
||||
|
@ -59,6 +71,20 @@ public class TestEventController extends BaseController {
|
|||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送RabbitMQ测试消息
|
||||
*
|
||||
* @return 响应结果
|
||||
*/
|
||||
@GetMapping(value = "/sendRabbitmq")
|
||||
public Result<String> sendRabbitMQ() {
|
||||
rabbitTemplate.convertAndSend(rabbitMQQueueName, "测试RabbitMQ队列消息", message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 响应结果
|
||||
|
@ -85,4 +111,4 @@ public class TestEventController extends BaseController {
|
|||
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,4 +17,4 @@ public class IoTDBInsertDataEvent extends BasicEvent<String> {
|
|||
public IoTDBInsertDataEvent(Object source, String messsge) {
|
||||
super(source, messsge);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -69,4 +69,4 @@ public class IoTDBSessionConfig {
|
|||
deviceId, time, measurements, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -287,4 +287,4 @@ public interface IService {
|
|||
* @return MeasurementSchemaValuesDTO 对象
|
||||
*/
|
||||
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj);
|
||||
}
|
||||
}
|
|
@ -762,4 +762,4 @@ public class ServiceImpl implements IService {
|
|||
measurementSchemaValuesDTO.setValues(values);
|
||||
return measurementSchemaValuesDTO;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,4 +30,4 @@ public class DataJSON {
|
|||
*/
|
||||
@Schema(name = "车辆JSON数据")
|
||||
private String datasource;
|
||||
}
|
||||
}
|
|
@ -33,4 +33,4 @@ public class ResultEntity extends IoTDbRecordAble {
|
|||
*/
|
||||
private String time;
|
||||
|
||||
}
|
||||
}
|
|
@ -40,4 +40,4 @@ public class TestDataType {
|
|||
* 测试Long类型
|
||||
*/
|
||||
private Long testLong;
|
||||
}
|
||||
}
|
|
@ -64,4 +64,4 @@ public class InsertDataDTO {
|
|||
}
|
||||
return insertDataDTOS;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -9,4 +9,4 @@ import lombok.Data;
|
|||
*/
|
||||
@Data
|
||||
public class IoTDbRecordAble {
|
||||
}
|
||||
}
|
|
@ -33,4 +33,4 @@ public class MeasurementSchemaValuesDTO {
|
|||
* 存储值为空的索引列表
|
||||
*/
|
||||
private List<Integer> valueIsNullIndex;
|
||||
}
|
||||
}
|
|
@ -8,4 +8,4 @@ import com.muyu.event.process.iotdb.basic.service.IService;
|
|||
* @Description IoTDB业务层
|
||||
*/
|
||||
public interface IoTDBService extends IService {
|
||||
}
|
||||
}
|
|
@ -17,4 +17,4 @@ public interface TestIoTDBService extends IService {
|
|||
* @return 返回结果
|
||||
*/
|
||||
List<Map<String, Object>> list();
|
||||
}
|
||||
}
|
|
@ -11,4 +11,4 @@ import org.springframework.stereotype.Service;
|
|||
*/
|
||||
@Service
|
||||
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
|
||||
}
|
||||
}
|
|
@ -30,4 +30,4 @@ public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBServic
|
|||
log.info("查询IoTDB数据为:{}", list.toString());
|
||||
return list;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -63,4 +63,4 @@ public class IoTDBInsertDataListener implements BasicEventListener<String> {
|
|||
.map(Object::toString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,4 +44,6 @@ spring:
|
|||
# 系统环境Config共享配置
|
||||
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# kafka共享配置
|
||||
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# rabbit共享配置
|
||||
- application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
|
@ -1,20 +0,0 @@
|
|||
package com.muyu.event.process;
|
||||
|
||||
import com.muyu.common.security.annotation.EnableCustomConfig;
|
||||
import com.muyu.common.security.annotation.EnableMyFeignClients;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 22:31
|
||||
* @Description 事件处理微服启动类
|
||||
*/
|
||||
@EnableCustomConfig
|
||||
@EnableMyFeignClients
|
||||
@SpringBootApplication
|
||||
public class CloudEventProcessApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(CloudEventProcessApplication.class, args);
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:11
|
||||
* @Description 基础事件
|
||||
*/
|
||||
public class BasicEvent<T> extends ApplicationEvent {
|
||||
|
||||
/**
|
||||
* 事件携带的数据
|
||||
*/
|
||||
private final T data;
|
||||
|
||||
/**
|
||||
* 构造函数,初始化事件源和数据
|
||||
*
|
||||
* @param source 事件源对象
|
||||
* @param data 事件携带的数据
|
||||
*/
|
||||
public BasicEvent(Object source, T data) {
|
||||
super(source);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取事件携带的数据
|
||||
*
|
||||
* @return 事件数据
|
||||
*/
|
||||
public T getData() {
|
||||
return data;
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:37
|
||||
* @Description 基础事件处理器
|
||||
*/
|
||||
@Component
|
||||
public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>> {
|
||||
|
||||
/**
|
||||
* 具体事件监听器
|
||||
*/
|
||||
private final BasicEventListener<T> listener;
|
||||
|
||||
/**
|
||||
* 构造函数,用于注入具体事件监听器
|
||||
*
|
||||
* @param listener 具体事件监听器
|
||||
*/
|
||||
public BasicEventHandler(BasicEventListener<T> listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理应用事件
|
||||
*
|
||||
* @param event 事件对象
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(BasicEvent<T> event) {
|
||||
listener.onEvent(event);
|
||||
}
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:35
|
||||
* @Description 基础事件监听器
|
||||
*/
|
||||
public interface BasicEventListener<T> {
|
||||
|
||||
/**
|
||||
* 处理事件的方法
|
||||
*
|
||||
* @param event 事件对象
|
||||
*/
|
||||
void onEvent(BasicEvent<T> event);
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:01
|
||||
* @Description 事件发布者
|
||||
*/
|
||||
@Component
|
||||
public class EventPublisher implements ApplicationEventPublisherAware {
|
||||
|
||||
/**
|
||||
* 应用程序事件发布者,用于发布事件
|
||||
*/
|
||||
private ApplicationEventPublisher publisher;
|
||||
|
||||
/**
|
||||
* 设置应用程序事件发布者
|
||||
*
|
||||
* @param publisher 应用程序事件发布者实例
|
||||
*/
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布事件
|
||||
* @param event 要发布的事件
|
||||
* @param <T> 事件数据类型
|
||||
*/
|
||||
public <T> void publish(BasicEvent<T> event) {
|
||||
publisher.publishEvent(event);
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
package com.muyu.event.process.consumer;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 16:53
|
||||
* @Description 测试消费者
|
||||
*/
|
||||
@Slf4j
|
||||
//@Component
|
||||
@RequiredArgsConstructor
|
||||
public class TestConsumer implements InitializingBean {
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
*/
|
||||
private final KafkaConsumer<String, String> kafkaConsumer;
|
||||
|
||||
/**
|
||||
* kafka主题名称
|
||||
*/
|
||||
private static final String topicName = "test-topic";
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
new Thread(() -> {
|
||||
log.info("启动线程监听Topic: {}", topicName);
|
||||
ThreadUtil.sleep(1000);
|
||||
Collection<String> topics = Lists.newArrayList(topicName);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
|
||||
consumerRecords.forEach(record -> {
|
||||
String value = record.value();
|
||||
log.info("从Kafka中消费的原始数据: {}", value);
|
||||
});
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
|
@ -1,93 +0,0 @@
|
|||
package com.muyu.event.process.consumer;
|
||||
|
||||
import com.muyu.event.process.basic.EventPublisher;
|
||||
import com.muyu.event.process.event.IoTDBInsertDataEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 23:23
|
||||
* @Description 车辆消费者
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
*/
|
||||
private final KafkaConsumer<String, String> kafkaConsumer;
|
||||
|
||||
/**
|
||||
* 事件发布者
|
||||
*/
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
/**
|
||||
* 协议解析报文传递数据(队列名称)
|
||||
*/
|
||||
public final static String MESSAGE_PARSING = "MessageParsing";
|
||||
|
||||
/**
|
||||
* 设定固定大小的线程池,线程数量与当前可用的处理器核心数相同
|
||||
*/
|
||||
private final ExecutorService executorService =
|
||||
Executors.newFixedThreadPool(10);
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
|
||||
List<String> topics = Collections.singletonList(MESSAGE_PARSING);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
|
||||
consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
|
||||
String message = consumerRecord.value();
|
||||
log.info("接收到车辆报文数据,内容:{}", message);
|
||||
log.info("------------------------------------------------");
|
||||
eventPublisher.publish(new IoTDBInsertDataEvent(this, message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ContextClosedEvent event) {
|
||||
log.info("关闭线程池和Kafka消费者");
|
||||
|
||||
try {
|
||||
executorService.shutdown();
|
||||
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error("线程池关闭被中断,强制关闭", e);
|
||||
executorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
try {
|
||||
kafkaConsumer.close(); // 关闭Kafka消费者
|
||||
} catch (Exception e) {
|
||||
log.error("关闭Kafka消费者时发生错误", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
package com.muyu.event.process.controller;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.common.core.constant.Constants;
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.core.web.controller.BaseController;
|
||||
import com.muyu.event.process.iotdb.service.TestIoTDBService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 16:24
|
||||
* @Description 测试事件控制层
|
||||
*/
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
@RequestMapping(value = "/test-event")
|
||||
public class TestEventController extends BaseController {
|
||||
|
||||
/**
|
||||
* kafka生产者
|
||||
*/
|
||||
private final KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
/**
|
||||
* kafka主题名称
|
||||
*/
|
||||
private static final String kafkaTopicName = "test-topic";
|
||||
|
||||
/**
|
||||
* 测试IoTDB业务层
|
||||
*/
|
||||
private final TestIoTDBService testIoTDBService;
|
||||
|
||||
/**
|
||||
* 发送Kafka测试消息
|
||||
*
|
||||
* @return 响应结果
|
||||
*/
|
||||
@GetMapping(value = "/sendKafka")
|
||||
public Result<String> senKafka() {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("id","1");
|
||||
jsonObject.put("name","张三");
|
||||
jsonObject.put("age","18");
|
||||
jsonObject.put("sex","男");
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
|
||||
kafkaProducer.send(producerRecord);
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 响应结果
|
||||
*/
|
||||
@GetMapping(value = "/list")
|
||||
public Result<List<Map<String, Object>>> list() {
|
||||
return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向IoTDB添加数据
|
||||
*
|
||||
* @return 响应结果
|
||||
*/
|
||||
@PostMapping(value = "/save")
|
||||
public Result<String> save() {
|
||||
String deviceId = "root.test";
|
||||
ArrayList<String> keyList = new ArrayList<>();
|
||||
ArrayList<String> valueList = new ArrayList<>();
|
||||
keyList.add("car_vin");
|
||||
keyList.add("car_name");
|
||||
valueList.add("VIN123456");
|
||||
valueList.add("宝马");
|
||||
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
package com.muyu.event.process.event;
|
||||
|
||||
import com.muyu.event.process.basic.BasicEvent;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 21:19
|
||||
* @Description 向IoTDB插入数据事件
|
||||
*/
|
||||
public class IoTDBInsertDataEvent extends BasicEvent<String> {
|
||||
|
||||
/**
|
||||
* 构造函数,向IoTDB插入数据创建事件
|
||||
*
|
||||
* @param messsge 消息
|
||||
*/
|
||||
public IoTDBInsertDataEvent(Object source, String messsge) {
|
||||
super(source, messsge);
|
||||
}
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.basic.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.session.pool.SessionPool;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 22:41
|
||||
* @Description IoTDB会话配置
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Configuration
|
||||
public class IoTDBSessionConfig {
|
||||
|
||||
@Value("${spring.iotdb.username}")
|
||||
private String username;
|
||||
|
||||
@Value("${spring.iotdb.password}")
|
||||
private String password;
|
||||
|
||||
@Value("${spring.iotdb.ip}")
|
||||
private String ip;
|
||||
|
||||
@Value("${spring.iotdb.port}")
|
||||
private int port;
|
||||
|
||||
@Value("${spring.iotdb.maxSize}")
|
||||
private int maxSize;
|
||||
|
||||
/**
|
||||
* IoTDB会话池
|
||||
*/
|
||||
private static SessionPool sessionPool = null;
|
||||
|
||||
/**
|
||||
* 获取IoTDB会话对象
|
||||
* @return ioTDB会话对象
|
||||
*/
|
||||
public SessionPool getSessionPool() {
|
||||
if (sessionPool == null) {
|
||||
sessionPool = new SessionPool(ip, port, username, password, maxSize);
|
||||
}
|
||||
return sessionPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* 向IoTDB中插入特定设备的记录
|
||||
*
|
||||
* @param deviceId 设备的唯一标识符
|
||||
* @param time 记录的时间戳,以毫秒为单位
|
||||
* @param measurements 与记录关联的测量名称列表
|
||||
* @param values 每个测量对应的值列表。值的顺序必须与测量名称一一对应
|
||||
*
|
||||
* <p>该方法从会话池中获取一个会话,并尝试将指定的记录插入到 IoTDB 中。
|
||||
* 如果插入失败,将记录错误信息,便于后续排查。</p>
|
||||
*/
|
||||
public void insertRecord(String deviceId, long time, List<String> measurements, List<String> values) {
|
||||
getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
|
||||
deviceId, time, measurements, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,290 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.basic.service;
|
||||
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
import org.apache.iotdb.tsfile.write.record.Tablet;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 23:37
|
||||
* @Description IoTDB基准业务层
|
||||
*/
|
||||
public interface IService {
|
||||
|
||||
/**
|
||||
* 插入一个 Tablet 对象到 IoTDB 数据库
|
||||
*
|
||||
* @param tablet 要插入的 Tablet 对象,包含待写入的数据
|
||||
*/
|
||||
void insertTablet(Tablet tablet);
|
||||
|
||||
/**
|
||||
* 将给定的 Tablets 插入到 IoTDB 数据库中。
|
||||
*
|
||||
* @param tablets 一个 Map,包含要插入的 Tablets
|
||||
*/
|
||||
void insertTablets(Map<String, Tablet> tablets);
|
||||
|
||||
/**
|
||||
* 单条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
|
||||
|
||||
/**
|
||||
* 单条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param types 数据项对应类型列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
void insertRecord(String deviceId, long time, List<String> measurements,
|
||||
List<TSDataType> types, List<Object> values);
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名)root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
void insertStringRecords(List<String> deviceIds, List<Long> times,
|
||||
List<List<String>> measurementsList, List<List<String>> valuesList);
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,
|
||||
List<List<String>> measurementsList, List<List<String>> valuesList);
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
|
||||
|
||||
/**
|
||||
* 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param path 单个字段 root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
void deleteData(String path, long endTime);
|
||||
|
||||
/**
|
||||
* 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
void deleteData(List<String> paths, long endTime);
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param outTime 超时时间
|
||||
* @return SessionDataSet (Time,paths)
|
||||
*/
|
||||
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime);
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature"
|
||||
* @param startTime 查询数据的起始时间(包含该时间点)
|
||||
* @param endTime 查询数据的结束时间(不包含该时间点)
|
||||
* @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间
|
||||
* @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
*/
|
||||
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
|
||||
Class<? extends IoTDbRecordAble> clazz);
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
*/
|
||||
<T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz);
|
||||
|
||||
/**
|
||||
* 最新点查询(快速查询单设备下指定序列最新点)
|
||||
*
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeLastDataQueryForOneDevice(String db, String device,
|
||||
List<String> sensors, boolean isLegalPathNodes);
|
||||
|
||||
/**
|
||||
* 查询单个设备的最新数据(获取指定设备的最新传感器数据)
|
||||
*
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
*/
|
||||
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
|
||||
boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz);
|
||||
|
||||
/**
|
||||
* 聚合查询
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
|
||||
|
||||
/**
|
||||
* 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime);
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval 查询的时间间隔(单位为毫秒)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime, long interval);
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval 查询的时间间隔(单位为毫秒)
|
||||
* @param slidingStep 滑动步长(单位为毫秒)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime, long interval, long slidingStep);
|
||||
|
||||
/**
|
||||
* SQL查询
|
||||
*
|
||||
* @param sql SQL查询语句,支持IotDB的查询语法
|
||||
* @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null
|
||||
*/
|
||||
SessionDataSet executeQueryStatement(String sql);
|
||||
|
||||
|
||||
/**
|
||||
* SQL非查询
|
||||
*
|
||||
* @param sql SQL查询语句,支持IotDB的查询语法
|
||||
*/
|
||||
void executeNonQueryStatement(String sql);
|
||||
|
||||
/**
|
||||
* 封装处理数据
|
||||
*
|
||||
* @param sessionDataSet 包含查询结果的SessionDataSet对象
|
||||
* @param titleList 列标题列表,用于映射字段名称
|
||||
* @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值
|
||||
*/
|
||||
List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList);
|
||||
|
||||
/**
|
||||
* 封装处理数据(不支持聚合查询)
|
||||
*
|
||||
* @param sessionDataSet 查询返回的结果集
|
||||
* @param titleList 查询返回的结果集内的字段名
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 返回封装后的对象列表,每个对象对应一行结果集数据
|
||||
* @param <T> 返回对象的类型
|
||||
*/
|
||||
<T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
|
||||
Class<? extends IoTDbRecordAble> clazz);
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemas
|
||||
*
|
||||
* @param obj 要从中提取字段信息的对象
|
||||
* @return 返回一个包含 MeasurementSchema 的列表
|
||||
*/
|
||||
List<MeasurementSchema> buildMeasurementSchemas(Object obj);
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemaValuesDTO
|
||||
*
|
||||
* @param obj 要从中提取字段信息和对应值的对象
|
||||
* @return MeasurementSchemaValuesDTO 对象
|
||||
*/
|
||||
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj);
|
||||
}
|
|
@ -1,765 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.basic.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig;
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
|
||||
import org.apache.iotdb.session.pool.SessionPool;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
import org.apache.iotdb.tsfile.read.common.Field;
|
||||
import org.apache.iotdb.tsfile.read.common.RowRecord;
|
||||
import org.apache.iotdb.tsfile.write.record.Tablet;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 23:38
|
||||
* @Description IoTDB基准业务实现层
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class ServiceImpl implements IService {
|
||||
|
||||
/**
|
||||
* IoTDB会话配置
|
||||
*/
|
||||
@Autowired
|
||||
private IoTDBSessionConfig ioTDBSessionConfig;
|
||||
|
||||
/**
|
||||
* 插入一个 Tablet 对象到 IoTDB 数据库
|
||||
*
|
||||
* @param tablet 要插入的 Tablet 对象,包含待写入的数据
|
||||
*/
|
||||
@Override
|
||||
public void insertTablet(Tablet tablet) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:tablet:[{}]", tablet);
|
||||
sessionPool.insertTablet(tablet);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将给定的 Tablets 插入到 IoTDB 数据库中。
|
||||
*
|
||||
* @param tablets 一个 Map,包含要插入的 Tablets
|
||||
*/
|
||||
@Override
|
||||
public void insertTablets(Map<String, Tablet> tablets) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:tablets:[{}]", tablets);
|
||||
sessionPool.insertTablets(tablets);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
|
||||
deviceId, time, measurements, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param types 数据项对应类型列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecord(String deviceId, long time, List<String> measurements,
|
||||
List<TSDataType> types, List<Object> values) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]",
|
||||
deviceId, measurements, types, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, types, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " +
|
||||
"values={}, error={}", deviceId, time, measurements, types, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名)root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<String>> valuesList) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]",
|
||||
deviceIds, measurementsList, valuesList);
|
||||
sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " +
|
||||
"valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]",
|
||||
deviceIds, measurementsList, typesList, valuesList);
|
||||
sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " +
|
||||
"valuesList={}, error={}",
|
||||
deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<String>> valuesList) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]",
|
||||
deviceId, measurementsList, valuesList);
|
||||
sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " +
|
||||
"measurementsList={}, valuesList={}, error={}",
|
||||
deviceId, times, measurementsList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
|
||||
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]",
|
||||
deviceId, measurementsList, typesList, valuesList);
|
||||
sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " +
|
||||
"measurementsList={},typesList={},valuesList={}, error={}",
|
||||
deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param path 单个字段 root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
@Override
|
||||
public void deleteData(String path, long endTime) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime);
|
||||
sessionPool.deleteData(path, endTime);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
@Override
|
||||
public void deleteData(List<String> paths, long endTime) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime);
|
||||
sessionPool.deleteData(paths, endTime);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param outTime 超时时间
|
||||
* @return SessionDataSet (Time,paths)
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]",
|
||||
paths, startTime, endTime, outTime);
|
||||
sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " +
|
||||
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature"
|
||||
* @param startTime 查询数据的起始时间(包含该时间点)
|
||||
* @param endTime 查询数据的结束时间(不包含该时间点)
|
||||
* @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间
|
||||
* @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
|
||||
Class<? extends IoTDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " +
|
||||
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime);
|
||||
sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}",
|
||||
paths, lastTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}",
|
||||
paths, lastTime, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(快速查询单设备下指定序列最新点)
|
||||
*
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
|
||||
boolean isLegalPathNodes) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]",
|
||||
db, device, sensors, isLegalPathNodes);
|
||||
sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " +
|
||||
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询单个设备的最新数据(获取指定设备的最新传感器数据)
|
||||
*
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 查询结果的对象列表,如果查询失败则返回 null
|
||||
* @param <T> 返回数据的对象类型泛型
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
|
||||
boolean isLegalPathNodes,
|
||||
Class<? extends IoTDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " +
|
||||
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}",
|
||||
paths, aggregations, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]",
|
||||
paths, aggregations, startTime, endTime);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}], " +
|
||||
"startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval 查询的时间间隔(单位为毫秒)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime, long interval) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]",
|
||||
paths, aggregations, startTime, endTime, interval);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(
|
||||
paths, aggregations, startTime, endTime, interval
|
||||
);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " +
|
||||
"startTime:[{}], endTime:[{}], interval:[{}], error={}",
|
||||
paths, aggregations, startTime, endTime, interval, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval 查询的时间间隔(单位为毫秒)
|
||||
* @param slidingStep 滑动步长(单位为毫秒)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
|
||||
long startTime, long endTime, long interval, long slidingStep) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], " +
|
||||
"slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime,
|
||||
interval, slidingStep);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " +
|
||||
"startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}",
|
||||
paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL查询
|
||||
*
|
||||
* @param sql SQL查询语句,支持IotDB的查询语法
|
||||
* @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeQueryStatement(String sql) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
|
||||
try {
|
||||
log.info("iotdb SQL查询:sql:[{}]", sql);
|
||||
sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL非查询
|
||||
*
|
||||
* @param sql SQL查询语句,支持IotDB的查询语法
|
||||
*/
|
||||
@Override
|
||||
public void executeNonQueryStatement(String sql) {
|
||||
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb SQL无查询:sql:[{}]", sql);
|
||||
sessionPool.executeNonQueryStatement(sql);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 封装处理数据
|
||||
*
|
||||
* @param sessionDataSet 包含查询结果的SessionDataSet对象
|
||||
* @param titleList 列标题列表,用于映射字段名称
|
||||
* @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
|
||||
int fetchSize = sessionDataSet.getFetchSize();
|
||||
List<Map<String, Object>> resultList = new ArrayList<>();
|
||||
titleList.remove("Time");
|
||||
if (fetchSize > 0) {
|
||||
while (sessionDataSet.hasNext()) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
RowRecord next = sessionDataSet.next();
|
||||
List<Field> fields = next.getFields();
|
||||
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||
.format(next.getTimestamp());
|
||||
resultMap.put("time", timeString);
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Field field = fields.get(i);
|
||||
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
|
||||
resultMap.put(splitString(titleList.get(i)), null);
|
||||
} else {
|
||||
resultMap.put(splitString(titleList.get(i)),
|
||||
field.getObjectValue(field.getDataType()).toString());
|
||||
}
|
||||
}
|
||||
resultList.add(resultMap);
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 封装处理数据(不支持聚合查询)
|
||||
*
|
||||
* @param sessionDataSet 查询返回的结果集
|
||||
* @param titleList 查询返回的结果集内的字段名
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return 返回封装后的对象列表,每个对象对应一行结果集数据
|
||||
* @param <T> 返回对象的类型
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
|
||||
Class<? extends IoTDbRecordAble> clazz) {
|
||||
int fetchSize = sessionDataSet.getFetchSize();
|
||||
List<T> resultList = new ArrayList<>();
|
||||
titleList.remove("Time");
|
||||
if (fetchSize > 0) {
|
||||
while (sessionDataSet.hasNext()) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
RowRecord next = sessionDataSet.next();
|
||||
List<Field> fields = next.getFields();
|
||||
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||
.format(next.getTimestamp());
|
||||
resultMap.put("time", timeString);
|
||||
if (titleList.stream().anyMatch(str -> str.contains("."))) {
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Field field = fields.get(i);
|
||||
String title = titleList.get(i);
|
||||
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
|
||||
resultMap.put(splitString(title), null);
|
||||
} else {
|
||||
resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Field fieldName = fields.get(0);
|
||||
Field fieldValue = fields.get(1);
|
||||
Field fieldDataType = fields.get(2);
|
||||
if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
|
||||
String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
|
||||
Object mapValue = convertStringToType(
|
||||
fieldValue.getObjectValue(fieldValue.getDataType()).toString(),
|
||||
fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()
|
||||
);
|
||||
resultMap.put(splitString(mapKey), mapValue);
|
||||
}
|
||||
}
|
||||
|
||||
String jsonString = JSON.toJSONString(resultMap);
|
||||
resultList.add(JSON.parseObject(jsonString, (Type) clazz));
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 分割获取字段名
|
||||
*
|
||||
* @param str 输入的字符串
|
||||
* @return 字段名
|
||||
*/
|
||||
public static String splitString(String str) {
|
||||
String[] parts = str.split("\\.");
|
||||
if (parts.length <= 0) {
|
||||
return str;
|
||||
} else {
|
||||
return parts[parts.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据数据值和数据类型返回对应数据类型数据
|
||||
*
|
||||
* @param value 数据值
|
||||
* @param typeName 数据类型
|
||||
* @return 转换后的数据值
|
||||
*/
|
||||
public static Object convertStringToType(String value, String typeName) {
|
||||
String type = typeName.toLowerCase();
|
||||
if (type.isEmpty()) {
|
||||
return value;
|
||||
}
|
||||
if ("boolean".equals(type)) {
|
||||
return Boolean.parseBoolean(value);
|
||||
} else if ("double".equals(type)) {
|
||||
return Double.parseDouble(value);
|
||||
} else if ("int32".equals(type)) {
|
||||
return Integer.parseInt(value);
|
||||
} else if ("int64".equals(type)) {
|
||||
return Long.parseLong(value);
|
||||
} else if ("float".equals(type)) {
|
||||
return Float.parseFloat(value);
|
||||
} else if ("text".equals(type)) {
|
||||
return value;
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象属性的数据类型返回对应的TSDataType
|
||||
*
|
||||
* @param type 属性的数据类型
|
||||
* @return TSDataType
|
||||
*/
|
||||
public static TSDataType getTsDataTypeByString(String type) {
|
||||
String typeName = splitString(type).toLowerCase();
|
||||
if ("boolean".equals(typeName)) {
|
||||
return TSDataType.BOOLEAN;
|
||||
} else if ("double".equals(typeName)) {
|
||||
return TSDataType.DOUBLE;
|
||||
} else if ("int".equals(typeName) || "integer".equals(typeName)) {
|
||||
return TSDataType.INT32;
|
||||
} else if ("long".equals(typeName)) {
|
||||
return TSDataType.INT64;
|
||||
} else if ("float".equals(typeName)) {
|
||||
return TSDataType.FLOAT;
|
||||
} else if ("text".equals(typeName)) {
|
||||
return TSDataType.TEXT;
|
||||
} else if ("string".equals(typeName)) {
|
||||
return TSDataType.TEXT;
|
||||
} else {
|
||||
return TSDataType.UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemas
|
||||
*
|
||||
* @param obj 要从中提取字段信息的对象
|
||||
* @return 返回一个包含 MeasurementSchema 的列表
|
||||
*/
|
||||
@Override
|
||||
public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
|
||||
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
|
||||
List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field ->
|
||||
new MeasurementSchema(field.getName(),
|
||||
getTsDataTypeByString(
|
||||
field.getType().getName()
|
||||
))).
|
||||
collect(Collectors.toList());
|
||||
return schemaList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemaValuesDTO
|
||||
*
|
||||
* @param obj 要从中提取字段信息和对应值的对象
|
||||
* @return MeasurementSchemaValuesDTO 对象
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
|
||||
MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
|
||||
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
|
||||
List<MeasurementSchema> schemaList = new ArrayList<>();
|
||||
List<Object> values = new ArrayList<>();
|
||||
List<Integer> valuesIsNullIndex = new ArrayList<>();
|
||||
int valueIndex = 0;
|
||||
for (java.lang.reflect.Field field : fields) {
|
||||
MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(),
|
||||
getTsDataTypeByString(field.getType().getName()));
|
||||
schemaList.add(measurementSchema);
|
||||
Object value = field.get(obj);
|
||||
if (value == null) {
|
||||
valuesIsNullIndex.add(valueIndex);
|
||||
}
|
||||
values.add(value);
|
||||
valueIndex++;
|
||||
}
|
||||
measurementSchemaValuesDTO.setSchemaList(schemaList);
|
||||
measurementSchemaValuesDTO.setValues(values);
|
||||
return measurementSchemaValuesDTO;
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:11
|
||||
* @Description JSON数据对象
|
||||
*/
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Tag(name = "ionDB数据源对象")
|
||||
public class DataJSON {
|
||||
|
||||
/**
|
||||
* 时间戳
|
||||
*/
|
||||
@Schema(name = "时间戳")
|
||||
private Long timestamp;
|
||||
|
||||
/**
|
||||
* 车辆JSON数据
|
||||
*/
|
||||
@Schema(name = "车辆JSON数据")
|
||||
private String datasource;
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:18
|
||||
* @Description 结果实体
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class ResultEntity extends IoTDbRecordAble {
|
||||
|
||||
/**
|
||||
* 温度值
|
||||
*/
|
||||
private Float temperature;
|
||||
|
||||
/**
|
||||
* 硬件标识
|
||||
*/
|
||||
private String hardware;
|
||||
|
||||
/**
|
||||
* 状态标识
|
||||
*/
|
||||
private Boolean status;
|
||||
|
||||
/**
|
||||
* 时间
|
||||
*/
|
||||
private String time;
|
||||
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:22
|
||||
* @Description 测试数据类型
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class TestDataType {
|
||||
|
||||
/**
|
||||
* 温度值
|
||||
*/
|
||||
private Float temperature;
|
||||
|
||||
/**
|
||||
* 硬件标识
|
||||
*/
|
||||
private String hardware;
|
||||
|
||||
/**
|
||||
* 状态标识
|
||||
*/
|
||||
private Boolean status;
|
||||
|
||||
/**
|
||||
* 测试Double类型
|
||||
*/
|
||||
private Double testDouble;
|
||||
|
||||
/**
|
||||
* 测试Long类型
|
||||
*/
|
||||
private Long testLong;
|
||||
}
|
|
@ -1,67 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:12
|
||||
* @Description 插入数据 数据转换对象
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class InsertDataDTO {
|
||||
|
||||
/**
|
||||
* 温度值
|
||||
*/
|
||||
private Float temperature;
|
||||
|
||||
/**
|
||||
* 硬件标识
|
||||
*/
|
||||
private String hardware;
|
||||
|
||||
/**
|
||||
* 状态标识
|
||||
*/
|
||||
private Boolean status;
|
||||
|
||||
/**
|
||||
* 创建一个单一的InsertDataDTO实例,并设置默认值。
|
||||
*
|
||||
* @return 一个配置好的InsertDataDTO对象
|
||||
*/
|
||||
public InsertDataDTO buildOne() {
|
||||
InsertDataDTO insertDataDTO = new InsertDataDTO();
|
||||
insertDataDTO.setHardware("ss");
|
||||
insertDataDTO.setStatus(true);
|
||||
insertDataDTO.setTemperature(12.0F);
|
||||
return insertDataDTO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建一个单一的InsertDataDTO实例,并设置默认值。
|
||||
*
|
||||
* @return 一个配置好的InsertDataDTO对象
|
||||
*/
|
||||
public List<InsertDataDTO> buildList() {
|
||||
List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
|
||||
int buildNum = 10;
|
||||
for (int i = 0; i < buildNum; i++) {
|
||||
InsertDataDTO insertDataDTO = new InsertDataDTO();
|
||||
insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
|
||||
insertDataDTO.setStatus(i % 2 == 0);
|
||||
insertDataDTO.setTemperature(12.0F + i);
|
||||
insertDataDTOS.add(insertDataDTO);
|
||||
}
|
||||
return insertDataDTOS;
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:23
|
||||
* @Description IoTDB数据库记录对象
|
||||
*/
|
||||
@Data
|
||||
public class IoTDbRecordAble {
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 0:26
|
||||
* @Description 测量模式及其对应的值 数据传输对象
|
||||
*/
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class MeasurementSchemaValuesDTO {
|
||||
|
||||
/**
|
||||
* 测量模式列表,每个元素表示一个测量的定义,包括名称、数据类型等信息。
|
||||
*/
|
||||
private List<MeasurementSchema> schemaList;
|
||||
|
||||
/**
|
||||
* 对应于测量模式的实际值列表,存储与 schemaList 中每个测量相对应的值。
|
||||
*/
|
||||
private List<Object> values;
|
||||
|
||||
/**
|
||||
* 存储值为空的索引列表
|
||||
*/
|
||||
private List<Integer> valueIsNullIndex;
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.service;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:38
|
||||
* @Description IoTDB业务层
|
||||
*/
|
||||
public interface IoTDBService extends IService {
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.service;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 17:23
|
||||
* @Description 测试IoTDB业务层
|
||||
*/
|
||||
public interface TestIoTDBService extends IService {
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 返回结果
|
||||
*/
|
||||
List<Map<String, Object>> list();
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.service.impl;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
|
||||
import com.muyu.event.process.iotdb.service.IoTDBService;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:39
|
||||
* @Description IoTDB业务实现层
|
||||
*/
|
||||
@Service
|
||||
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
package com.muyu.event.process.iotdb.service.impl;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
|
||||
import com.muyu.event.process.iotdb.service.TestIoTDBService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 17:24
|
||||
* @Description 测试IoTDB业务实现层
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService {
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 返回结果
|
||||
*/
|
||||
@Override
|
||||
public List<Map<String, Object>> list() {
|
||||
String sql = "select * from root.test";
|
||||
SessionDataSet sessionDataSet = this.executeQueryStatement(sql);
|
||||
List<Map<String, Object>> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes());
|
||||
log.info("查询IoTDB数据为:{}", list.toString());
|
||||
return list;
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
package com.muyu.event.process.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.event.process.basic.BasicEvent;
|
||||
import com.muyu.event.process.basic.BasicEventListener;
|
||||
import com.muyu.event.process.event.IoTDBInsertDataEvent;
|
||||
import com.muyu.event.process.iotdb.service.IoTDBService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:12
|
||||
* @Description 向IoTDB插入数据事件监听器
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class IoTDBInsertDataListener implements BasicEventListener<String> {
|
||||
|
||||
/**
|
||||
* IoTDB业务层
|
||||
*/
|
||||
private final IoTDBService ioTDBService;
|
||||
|
||||
/**
|
||||
* 设备名(表名)
|
||||
*/
|
||||
private static final String DEVICE_ID = "root.vehicle";
|
||||
|
||||
/**
|
||||
* 处理接收到的事件,将数据插入到 IoTDB
|
||||
*
|
||||
* @param event 接收到的事件,包含需要插入的数据
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(BasicEvent<String> event) {
|
||||
JSONObject data = JSONObject.parseObject(event.getData());
|
||||
List<String> keyList = extractKeys(data);
|
||||
List<String> valueList = extractValues(data);
|
||||
ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从给定的JSONObject中提取所有的键
|
||||
*
|
||||
* @param data 要提取键的JSONObject
|
||||
* @return 键的列表
|
||||
*/
|
||||
private List<String> extractKeys(JSONObject data) {
|
||||
return data.keySet().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 从给定的 JSONObject 中提取所有的值,并将其转换为字符串
|
||||
*
|
||||
* @param data 要提取值的JSONObject
|
||||
* @return 值的列表,以字符串形式表示
|
||||
*/
|
||||
private List<String> extractValues(JSONObject data) {
|
||||
return data.values().stream()
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
|
@ -54,6 +54,12 @@
|
|||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Mqtt3包 -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.paho</groupId>
|
||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataSource -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
|
|
|
@ -60,10 +60,10 @@
|
|||
<artifactId>cloud-common-datascope</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Mqtt3包 -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.paho</groupId>
|
||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>1.2.5</version>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common Log -->
|
||||
|
@ -84,27 +84,31 @@
|
|||
<artifactId>ecs20140526</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-openapi</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-console</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-util</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>cloudapi20160714</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>aliyun-repo</id>
|
||||
|
|
43
pom.xml
43
pom.xml
|
@ -51,6 +51,7 @@
|
|||
<kafka.clients.verison>3.0.0</kafka.clients.verison>
|
||||
<iotdb.session.verison>1.3.1</iotdb.session.verison>
|
||||
<mybatis.plus.join.version>1.4.13</mybatis.plus.join.version>
|
||||
<org.eclipse.paho.client.mqttv3.version>1.2.5</org.eclipse.paho.client.mqttv3.version>
|
||||
</properties>
|
||||
|
||||
<!-- 依赖声明 -->
|
||||
|
@ -219,6 +220,48 @@
|
|||
<version>${iotdb.session.verison}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>ecs20140526</artifactId>
|
||||
<version>${ecs20140526.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-openapi</artifactId>
|
||||
<version>${tea-openapi.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-console</artifactId>
|
||||
<version>${tea-console.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-util</artifactId>
|
||||
<version>${tea-util.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 阿里云创建ecs实例 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>cloudapi20160714</artifactId>
|
||||
<version>${cloudapi20160714.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Mqtt3包 -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.paho</groupId>
|
||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>${org.eclipse.paho.client.mqttv3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 核心模块 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
|
|
Loading…
Reference in New Issue