数据中间层

master
冯凯 2023-12-05 09:41:30 +08:00
commit c2cca1c895
14 changed files with 656 additions and 0 deletions

46
.gitignore vendored 100644
View File

@ -0,0 +1,46 @@
######################################################################
# Build Tools
.gradle
/build/
!gradle/wrapper/gradle-wrapper.jar
target/
!.mvn/wrapper/maven-wrapper.jar
######################################################################
# IDE
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### JRebel ###
rebel.xml
### NetBeans ###
nbproject/private/
build/*
nbbuild/
dist/
nbdist/
.nb-gradle/
######################################################################
# Others
*.log
*.xml.versionsBackup
*.swp
!*/build/*.java
!*/build/*.html
!*/build/*.xml

18
Dockerfile 100644
View File

@ -0,0 +1,18 @@
#起始镜像
FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6
#暴露端口号
EXPOSE 8084
#挂载目录的位置
VOLUME /home/logs/mqttDemo
#构建复制外部文件到docker
COPY /target/mqttdemo.jar /home/app.jar
#工作目录 exec -it 进入容器内部后的默认的起始目录
WORKDIR /home
ENV TIME_ZONE Asia/Shanghai
#指定东八区
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
#启动java 程序
ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"]

123
pom.xml 100644
View File

@ -0,0 +1,123 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka</groupId>
<artifactId>mqttdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.15</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.15</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.18</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>13.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.15</version>
</dependency>
<!-- mybatis - plus 依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>dragon-release</id>
<name>dragon-releases</name>
<url>http://10.100.1.7:8081/repository/maven-releases/</url>
</repository>
</distributionManagement>
<repositories>
<repository>
<id>dragon-public</id>
<name>dragon-maven</name>
<url>http://10.100.1.7:8081/repository/maven-public/</url>
</repository>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://10.100.1.7:8081/repository/maven-releases/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>16</source><target>16</target></configuration></plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,18 @@
package com.durant;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author
* @version 1.0
* @description:
* @date 2023/11/24 14:23
*/
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
}
}

View File

@ -0,0 +1,43 @@
package com.durant.kafka.config;
import com.durant.kafka.constant.KafkaConstants;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
import static com.durant.kafka.constant.KafkaConstants.BOOSTRAP_SERVERS;
/**
* @author
* @version 1.0
* @description: kafka
* @date 2023/11/25 13:23
*/
@Configuration
@Log4j2
public class KafkaProducerConfig {
@Bean
public Producer<String, String> producerInit() {
// 创建一个KafkaProducer的配置对象
Properties properties = new Properties();
// 设置Kafka服务器的地址
properties.put("bootstrap.servers", "117.72.43.22:9092");
// 设置消息的key和value的序列化方式为StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("partitioner.class", "com.durant.kafka.config.MyPartitioner");
// 输出日志信息
log.info("你好kafka");
// 返回一个使用给定配置对象创建的KafkaProducer实例
return new KafkaProducer<>(properties);
}
}

View File

@ -0,0 +1,43 @@
package com.durant.kafka.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @authorfst
* @date2023/11/25
* @aim
*/
@Component
public class MyPartitioner implements Partitioner {
/**
* kafka 3
* @param topic
* @param key key
* @param keyBytes key
* @param value value
* @param valueBytes value
* @param cluster kafka
* @return 3, 0 1 2
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取topic的partitions信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionsNum = partitionInfos.size();
// 这里以 key 的哈希值作为分区选择依据
System.out.println("================================");
System.out.println(Math.abs(key.hashCode()) % partitionsNum);
return Math.abs(key.hashCode()) % partitionsNum;
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}

View File

@ -0,0 +1,32 @@
package com.durant.kafka.constant;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author
* @version 1.0
* @description:
* @date 2023/11/25 13:20
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class KafkaConstants {
public String topic="top";
public static final String BOOSTRAP_SERVERS="117.72.43.22:9092";
public String partition;
public static KafkaConstants getBuilder(String topic,String partition){
return KafkaConstants.builder()
.topic(topic)
.partition(partition)
.build();
}
}

View File

@ -0,0 +1,49 @@
package com.durant.kafka.service;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author
* @version 1.0
* @description: kafkaservice
* @date 2023/11/25 13:57
*/
@Service
@Log4j2
public class KafkaService {
@Autowired
private Producer<String,String> producer;
/**
* Kafka
*
* @param topic
* @param key
* @param value
* @return
*/
public RecordMetadata send(String topic, String key, String value){
log.info("kafka发送消息为:"+value);
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic,0, key, value));
RecordMetadata recordMetadata=null;
try {
recordMetadata=result.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return recordMetadata;
}
}

View File

@ -0,0 +1,54 @@
package com.durant.mqtt.config;
import com.durant.mqtt.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* @author
* @version 1.0
* @description: mqtt
* @date 2023/11/24 14:26
*/
@Log4j2
@Configuration
public class MqttClientConnect {
@Autowired
private MqttService mqttService;
// @Bean
// public MqttClient connect(MqttConfig mqttConfig){
// // 创建一个MqttClient对象并返回
// MqttClient mqttClient;
// try {
// log.info("开始连接:"+System.currentTimeMillis());
// // 连接MQTT代理服务器
// mqttClient= new MqttClient(mqttConfig.getBroker(), UUID.randomUUID().toString(),new MemoryPersistence());
// // 创建MqttConnectOptions对象
// MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// mqttConnectOptions.setUserName("worker");
// mqttConnectOptions.setPassword("worker".toCharArray());
// mqttConnectOptions.setConnectionTimeout(60);
// mqttConnectOptions.setKeepAliveInterval(60);
// // 连接MQTT代理服务器
// mqttClient.connect();
// // 设置回调函数
// mqttClient.setCallback(mqttService);
// // 订阅指定的MQTT主题
// mqttClient.subscribe(mqttConfig.getTopic(),0);
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
// log.info("连接成功:"+System.currentTimeMillis());
// return mqttClient;
// }
}

View File

@ -0,0 +1,21 @@
package com.durant.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author
* @version 1.0
* @description: mqtt
* @date 2023/11/24 14:24
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt.config")
public class MqttConfig {
private String broker;
private String topic;
}

View File

@ -0,0 +1,85 @@
package com.durant.mqtt.rabbit;
import com.alibaba.fastjson.JSONObject;
import com.durant.kafka.constant.KafkaConstants;
import com.durant.mqtt.config.MqttClientConnect;
import com.durant.mqtt.config.MqttConfig;
import com.durant.mqtt.service.MqttService;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.UUID;
/**
* @author
* @version 1.0
* @description:
* @date 2023/11/29 15:48
*/
@Component
@Log4j2
public class SubscriberRabbit {
@Autowired
private MqttClientConnect mqttClientConnect;
@Autowired
private MqttConfig mqttConfig;
/**
*rabbit
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MqttService mqttService;
@RabbitListener(queuesToDeclare={@Queue(value = "sub_top")})
public void consumerSubscribe(String mesg, Message message, Channel channel){
log.info("收到需要订阅的主题是:"+mesg);
// 创建一个MqttClient对象并返回
MqttClient mqttClient;
try {
log.info("开始连接:"+System.currentTimeMillis());
// 连接MQTT代理服务器
mqttClient= new MqttClient(mqttConfig.getBroker(), UUID.randomUUID().toString(),new MemoryPersistence());
// 创建MqttConnectOptions对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("worker");
mqttConnectOptions.setPassword("worker".toCharArray());
mqttConnectOptions.setConnectionTimeout(60);
mqttConnectOptions.setKeepAliveInterval(60);
// 连接MQTT代理服务器
mqttClient.connect();
//通过消息队列告知kafka消费者准备好要拉取哪个主题下的哪个分区的消息
KafkaConstants kafkaConstants = KafkaConstants.getBuilder("topA", "0");
rabbitTemplate.convertAndSend("kafka_top", JSONObject.toJSONString(kafkaConstants),msg->{
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString().replaceAll("-",""));
return msg;
});
// 设置回调函数
mqttClient.setCallback(mqttService);
// 订阅指定的MQTT主题
mqttClient.subscribe(mesg,0);
// 订阅指定的MQTT主题
log.info("准备订阅主题是:"+mesg);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
throw new RuntimeException(e);
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
log.info("连接成功:"+System.currentTimeMillis());
}
}

View File

@ -0,0 +1,52 @@
package com.durant.mqtt.service;
import com.durant.kafka.constant.KafkaConstants;
import com.durant.kafka.service.KafkaService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Properties;
/**
* @author
* @version 1.0
* @description:
* @date 2023/11/24 19:07
*/
@Service
public class MqttService implements MqttCallback {
@Autowired
private KafkaService kafkaService;
private static final Logger log= LoggerFactory.getLogger(MqttService.class);
@Override
public void connectionLost(Throwable throwable) {
}
/**
*
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("主题是:" + topic);
String msg = new String(mqttMessage.getPayload());
log.info("收到消息是:" + msg);
kafkaService.send("topA", String.valueOf(msg.hashCode()), msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}

View File

@ -0,0 +1,43 @@
package com.durant.mqtt.utils;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* @author
* @version 1.0
* @description:
* @date 2023/11/27 14:46
*/
public class Md5Util {
public static String md5Encrypt(String message) {
try {
// 创建MD5消息摘要对象
MessageDigest md = MessageDigest.getInstance("MD5");
// 更新消息摘要
md.update(message.getBytes());
// 获取摘要结果
byte[] digest = md.digest();
// 将字节数组转换为BigInteger类型
BigInteger bigInt = new BigInteger(1, digest);
// 将BigInteger类型转换为16进制字符串
String encryptedMessage = bigInt.toString(16);
// 补齐前导0直到长度为32
while (encryptedMessage.length() < 32) {
encryptedMessage = "0" + encryptedMessage;
}
return encryptedMessage;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,29 @@
mqtt:
config:
broker: tcp://47.115.228.23:1883
topic: test
server:
port: 8084
spring:
application:
name: mqttDemo
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://124.221.216.186:3306/mqtt?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: 27a9601cb3545824
rabbitmq:
host: 182.254.222.21
port: 5672
template:
mandatory: true
listener:
simple:
prefetch: 1 # 每次取一条消息消费 消费完成取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 支持重试
publisher-confirms: true #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)