初始化中间件

master
张海宁 2023-11-26 19:00:37 +08:00 committed by fuck_zhn
commit 43d6d29ffb
14 changed files with 440 additions and 0 deletions

31
.gitignore vendored 100644
View File

@ -0,0 +1,31 @@
# IntelliJ IDEA
.idea/*
!/.idea/misc.xml
!/.idea/modules.xml
!/.idea/workspace.xml
!/.idea/tasks.xml
!/.idea/runConfigurations.xml
!/.idea/deployment.xml
!/.idea/jsLibraryMappings.xml
# Ignore all .iml files except the ones in the .idea directory
*.iml
!.idea/*.iml
# 排除编译输出的目录
/target/
# 排除IDE生成的文件和目录
/.idea/
/.vscode/
*.iml
# 排除构建工具生成的文件
/mvnw
/mvnw.cmd
/.mvn/
# 排除日志文件
*.log
# 排除临时文件
*.tmp

0
.idea/.gitignore vendored 100644
View File

19
.idea/compiler.xml 100644
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="mqtt_to_kafka" />
</profile>
</annotationProcessing>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="mqtt_to_kafka" options="-parameters" />
</option>
</component>
</project>

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding" defaultCharsetForPropertiesFiles="UTF-8">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="PROJECT" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,8 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="JavadocDeclaration" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ADDITIONAL_TAGS" value="Description" />
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="http://10.100.1.7:8081/repository/maven-public/" />
</remote-repository>
</component>
</project>

17
.idea/misc.xml 100644
View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

6
.idea/vcs.xml 100644
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

133
pom.xml 100644
View File

@ -0,0 +1,133 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhn</groupId>
<artifactId>mqtt_to_kafka</artifactId>
<version>1.0.0</version>
<!-- 非Spring Boot的项目可以不加-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
<relativePath/>
</parent>
<!-- 所有Maven项目都应该加上下面的properties可以减少很多警告 -->
<properties>
<java.version>17</java.version>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- MQTT3依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<!-- fastjson依赖 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.42</version>
</dependency>
<!-- MySQL JDBC驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<!-- Kafka依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包Spring Boot 项目的时候需要放开下面的注释 -->
<!--<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
指定JDK编译版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打包跳过测试 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 避免font文件的二进制文件格式压缩破坏 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>woff</nonFilteredFileExtension>
<nonFilteredFileExtension>woff2</nonFilteredFileExtension>
<nonFilteredFileExtension>eot</nonFilteredFileExtension>
<nonFilteredFileExtension>ttf</nonFilteredFileExtension>
<nonFilteredFileExtension>svg</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
<!-- 避免mybatis报错无效的绑定语句 -->
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>**/*.ftl</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,16 @@
package com.zhn.decode;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author : Administrator
* @Description : mqtt
*/
@SpringBootApplication
public class MqttDecodeMain {
public static void main (String[] args) {
SpringApplication.run (MqttDecodeMain.class, args);
}
}

View File

@ -0,0 +1,71 @@
package com.zhn.decode.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value ("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value ("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public KafkaAdmin kafkaAdmin () {
Map<String, Object> configs = new HashMap<> ();
configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin (configs);
}
@Bean
public ProducerFactory<String, Object> producerFactory () {
Map<String, Object> configs = new HashMap<> ();
configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<> (configs);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate (ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<> (producerFactory);
}
@Bean
public KafkaMessageListenerContainer<String, Object> messageListenerContainer () {
ContainerProperties containerProps = new ContainerProperties ("my_topic_name");
containerProps.setAckMode (AckMode.MANUAL_IMMEDIATE);
containerProps.setMessageListener (new KafkaMessageListener ());
return new KafkaMessageListenerContainer<> (consumerFactory (), containerProps);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory () {
Map<String, Object> props = new HashMap<> ();
props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put (JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<> (props);
}
}

View File

@ -0,0 +1,29 @@
package com.zhn.decode.config;
/**
* @author : Administrator
* @description : kafka
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class KafkaMessageListener implements MessageListener<String, Object> {
/**
*
*
* @param record
*/
@Override
public void onMessage (ConsumerRecord<String, Object> record) {
// 获取消息的键
String key = record.key ();
// 获取消息的值
Object value = record.value ();
// 在这里处理接收到的消息
}
}

View File

@ -0,0 +1,58 @@
package com.zhn.decode.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* MQTTMQTT
*
* @author Administrator
*/
@Configuration
public class MqttConfig {
/**
* Kafka
*/
public static final String BOOTSTRAP_SERVERS = "123.207.204.152:9092";
/**
* Kafka
*/
public static final String KAFKA_TOPIC = "test";
/**
* MQTT
*/
private static final String BROKER = "tcp://fluxmq.muyu.icu:1883";
/**
* MQTTID
*/
private static final String CLIENT_ID = "mqttx_049cd728";
/**
* MQTT
*/
private static final String TOPIC = "test";
/**
* MQTT
*
* @return MQTT
*
* @throws MqttException MQTT
*/
@Bean
public MqttClient mqttClient () throws MqttException {
MqttClient client = new MqttClient (BROKER, CLIENT_ID, new MemoryPersistence ());
MqttConnectOptions options = new MqttConnectOptions ();
options.setCleanSession (true);
client.connect (options);
client.subscribe (TOPIC, 1);
return client;
}
}

View File

@ -0,0 +1,25 @@
server.port=8080
# 应用名称
spring.application.name=mqtt-decode
# Spring Kafka配置
spring.kafka.bootstrap-servers=123.207.204.152:9092
num.partitions=1 #默认Topic分区数
num.replica.fetchers=1 #默认副本数
# Producer配置
spring.kafka.producer.transaction-id-prefix=kafka_tx.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumer配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.group-id=${group.id}
spring.kafka.consumer.auto-offset-reset=earliest
# MQTT配置
mqtt.host=tcp://localhost:1883
mqtt.username=my_username
mqtt.password=my_password
mqtt.topic=my_mqtt_topic
# 其他配置
spring.mvc.path-matching.matching-strategy=prefix