master
fst1996 2023-11-30 13:28:44 +08:00
parent 263aaaa18b
commit a457390ac9
6 changed files with 79 additions and 11 deletions

27
pom.xml
View File

@ -14,8 +14,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
@ -73,4 +71,29 @@
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -24,17 +24,23 @@ public class KafkaProducerConfig {
public Producer<String, String> producerInit() {
// 创建一个KafkaProducer的配置对象
Properties properties = new Properties();
Properties props = new Properties();
// 设置Kafka服务器的地址
properties.put("bootstrap.servers", "10.100.1.8:9092");
props.put("bootstrap.servers", "10.100.1.8:9092");
props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
props.put("retries", 3);// 请求失败重试的次数
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间也会立即发送请求设置一段时间用来等待从而将缓冲区填的更多单位为毫秒producer发送数据会延迟1ms可以减少发送到kafka服务器的请求数据
props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
// 设置消息的key和value的序列化方式为StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置分区策略
props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
// 输出日志信息
log.info("你好kafka");
// 返回一个使用给定配置对象创建的KafkaProducer实例
return new KafkaProducer<>(properties);
return new KafkaProducer<>(props);
}

View File

@ -31,8 +31,10 @@ public class MyPartitioner implements Partitioner {
int partitionsNum = partitionInfos.size();
// 这里以 key 的哈希值作为分区选择依据
System.out.println("================================");
System.out.println(Math.abs(key.hashCode()) % partitionsNum);
return Math.abs(key.hashCode()) % partitionsNum;
int partitionsNum1 = Math.abs(key.hashCode()) % partitionsNum;
System.out.println(partitionsNum1);
//
return partitionsNum1;
}
public void close() {

View File

@ -20,6 +20,14 @@ public class KafkaProducerService {
@Autowired
private Producer<String, String> producer;
/**
*
*
* @param topic
* @param key
* @param value
* @return RecordMetadata
*/
public RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;

View File

@ -0,0 +1,19 @@
package com.god.kafka.utils;
/**
* @description: 1610
* @Author fst
* @date 2023/11/29 13:44
*/
public class DecryptUtil {
public static String hexToString(String hex) {
StringBuilder output = new StringBuilder();
for (int i = 0; i < hex.length(); i += 2) {
String str = hex.substring(i, i + 2);
output.append((char) Integer.parseInt(str, 16));
}
return output.toString();
}
}

View File

@ -2,7 +2,9 @@ package com.god.mqtt.service;
import com.god.kafka.contents.KafkaContent;
import com.god.kafka.partitioner.MyPartitioner;
import com.god.kafka.service.KafkaProducerService;
import com.god.kafka.utils.DecryptUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -49,7 +51,15 @@ public class MqttService implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg);
kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), "如果是这个错,我一头撞死");
//去除空格
String inputString = msg.replaceAll(" ", "");
log.info("去除空格后:{}", inputString);
//解密 16转10
String str = DecryptUtil.hexToString(inputString);
//去除空格
log.info("解密后:{}", str);
//发送消息到kafka
kafkaProducerService.send(topic, String.valueOf(msg.hashCode()), str);
}
@Override