初始化

master
fst1996 2023-11-28 09:44:21 +08:00
commit 3894f18c54
17 changed files with 727 additions and 0 deletions

38
.gitignore vendored 100644
View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

8
.idea/.gitignore vendored 100644
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

14
.idea/misc.xml 100644
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>

17
Dockerfile 100644
View File

@ -0,0 +1,17 @@
# 基础镜像
FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6
#暴露端口位置
EXPOSE 9999
# 挂载目录
VOLUME /home/logs/god-data-server
# 复制jar文件到docker内部
COPY /target/mqtt-redis-demo-1.0-SNAPSHOT.jar /home/app.jar
#工作目录 exec -it 进来默认就是这个目
WORKDIR /home
# 启动java程序
ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"]

76
pom.xml 100644
View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.god</groupId>
<artifactId>mqtt-kafkademo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
</parent>
<repositories>
<repository>
<id>menghang-public</id>
<name>梦航-public</name>
<url>http://10.100.1.6:8081/repository/maven-public/</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>menghang-releases</id>
<name>梦航-releases</name>
<url>http://10.100.1.6:8081/repository/maven-releases/</url>
</repository>
</distributionManagement>
<!-- kafka-->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,11 @@
package com.god;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqttRedisApplication {
public static void main(String[] args) {
SpringApplication.run(MqttRedisApplication.class);
}
}

View File

@ -0,0 +1,81 @@
package com.god.kafka.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* kafka
*/
@Data
@Builder
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "spring.kafka.producer")
public class KafkaProviderConfig {
private String bootstrapServers;
private String transactionIdPrefix;
private String acks;
private String retries;
private String batchSize;
private String bufferMemory;
private String keySerializer;
private String valueSerializer;
/**
* Map Kafka
* @return
*/
@Bean
public KafkaProducer<String, String> producerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
//acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
//开启事务必须设为all
props.put(ProducerConfig.ACKS_CONFIG, acks);
//发生错误后消息重发的次数开启事务必须大于0
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
//批次的大小可以通过batch.size 参数设置.默认是16KB
//较小的批次大小有可能降低吞吐量批次大小为0则完全禁用批处理
//比如说kafka里的消息5秒钟Batch才凑满了16KB才能发送出去。那这些消息的延迟就是5秒钟
//实测batchSize这个参数没有用
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
//即使数据没达到16KB,也将这个批次发送出去
props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
//生产者内存缓冲区的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//反序列化,和生产者的序列化方式对应
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,13 @@
package com.god.kafka.contents;
/**
* @author DongZl
* @description: kafka
* @Date 2023/8/25 18:47
*/
public class KafkaContent {
public static final String TOPIC = "top";
public static final String KAFKA_CON = "117.72.43.22:9092";
}

View File

@ -0,0 +1,43 @@
package com.god.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @authorfst
* @date2023/11/25
* @aim
*/
@Component
public class MyPartitioner implements Partitioner {
/**
* kafka 3
* @param topic
* @param key key
* @param keyBytes key
* @param value value
* @param valueBytes value
* @param cluster kafka
* @return 3, 0 1 2
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取topic的partitions信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionsNum = partitionInfos.size();
// 这里以 key 的哈希值作为分区选择依据
System.out.println("================================");
System.out.println(Math.abs(key.hashCode()) % partitionsNum);
return Math.abs(key.hashCode()) % partitionsNum;
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}

View File

@ -0,0 +1,69 @@
package com.god.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.god.kafka.contents.KafkaContent.KAFKA_CON;
import static com.god.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:50
*/
public class ProducerTest {
private static Producer<String, String> producer;
public static void KfkProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
// props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
private static void close() {
producer.close();
}
private static RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
public static void main (String[] args) {
KfkProducer();
new Thread(() -> {
int i = 0;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
send(TOPIC, UUID.randomUUID().toString(), format);
}while (i++ < 1000);
close();
}).start();
}
}

View File

@ -0,0 +1,33 @@
package com.god.kafka.service;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @description:
* @Author fst
* @date 2023/11/26 15:06
*/
@Service
public class KafkaProducerService {
@Autowired
private Producer<String, String> producer;
public RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
}

View File

@ -0,0 +1,44 @@
package com.god.mqtt.config;
import com.god.mqtt.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* @author DongZl
* @description: Mqtt
* @Date 2023-11-24 02:06
*/
@Log4j2
@Configuration
public class MqttConfig {
@Bean
public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
try {
log.info("mqtt服务器初始化开始");
long startTime = System.currentTimeMillis();
MqttClient client = new MqttClient(mqttProper.getBroker(),
UUID.randomUUID().toString(),
new MemoryPersistence());
MemoryPersistence memoryPersistence = new MemoryPersistence();
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
log.info("mqtt服务器初始化结束 耗时:[{}MS]", System.currentTimeMillis() - startTime);
client.connect(options);
client.setCallback(mqttService);
client.subscribe(mqttProper.getTopic(), 0);
return client;
}catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,32 @@
package com.god.mqtt.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author DongZl
* @description: mqtt
* @Date 2023-11-24 02:04
*/
@Data
@Builder
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "mqtt.config")
public class MqttProper {
/**
*
*/
private String broker;
/**
*
*/
private String topic;
}

View File

@ -0,0 +1,60 @@
package com.god.mqtt.service;
import com.god.kafka.contents.KafkaContent;
import com.god.kafka.service.KafkaProducerService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author DongZl
* @description: Mqtt
* @Date 2023-11-24 02:10
*/
@Log4j2
@Service
public class MqttService implements MqttCallback {
@Autowired
private KafkaProducerService kafkaProducerService;
/**
* This method is called when the connection to the server is lost.
*
* @param cause the reason behind the loss of connection.
*/
@Override
public void connectionLost (Throwable cause) {
}
/**
* mqttkafka
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg);
kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}

View File

@ -0,0 +1,57 @@
mqtt:
config:
broker: tcp://47.115.225.229:1883
topic: test
# Tomcat
server:
port: 9999
# Spring
spring:
application:
# 应用名称
name: mqtt-kafka
profiles:
# 环境配置
active: dev
# kafka
kafka:
producer:
# Kafka生产者服务器
bootstrap-servers: 10.100.1.8:9092
transaction-id-prefix: kafkaTx-
retries: 3
# acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应
# acks=1 : 只要集群的master收到消息生产者就会收到一个来自服务器的响应。
# acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应
# 开启事务时 必须设置为all
acks: all
# 当有多个消息需要被发送到同一分区时,生产者会把他们放在同一批次里。
batch-size: 16384
# 生产者内存缓冲区的大小
buffer-memory: 1024000
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就出发reBalance,默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略报错
missing-topics-fatal: false