diff --git a/src/main/java/com/hyc/controller/SummarizeController.java b/src/main/java/com/hyc/controller/SummarizeController.java index 1bfef50..48b50b7 100644 --- a/src/main/java/com/hyc/controller/SummarizeController.java +++ b/src/main/java/com/hyc/controller/SummarizeController.java @@ -1,25 +1,23 @@ package com.hyc.controller; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.hyc.domain.ConnectionParameter; import com.hyc.domain.SummarizeResp; +import com.hyc.kafka.demo.config.KafkaSendResultHandler; import com.hyc.result.Result; import com.hyc.service.SummarizeService; -import com.hyc.util.CalculateCheckDigit; import com.hyc.util.ConversionUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; /** * 创建Mqtt客户端 @@ -37,6 +35,16 @@ public class SummarizeController { @Autowired private SummarizeService summarizeService; + + private KafkaTemplate kafkaTemplate; + private final TransactionTemplate transactionTemplate; + + public SummarizeController(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler, TransactionTemplate transactionTemplate) { + this.kafkaTemplate = kafkaTemplate; + this.transactionTemplate = transactionTemplate; + this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); + } + @PostMapping("/createMqttClient") public String createMqttClient(@RequestBody List connectionParameterList){ @@ -58,6 +66,7 @@ public class SummarizeController { + @Transactional public boolean createOneMqttClient(ConnectionParameter connectionParameter){ int qos = 0; @@ -81,7 +90,8 @@ public class SummarizeController { @Override public void connectionLost(Throwable cause) { - System.out.println("connectionLost: " + cause.getMessage()); + cause.printStackTrace(); + System.out.println("connectionLost: "); } @Override @@ -111,6 +121,7 @@ public class SummarizeController { "positionStatus","easStatus","ptcStatus","epsStatus","absStatus","mcuStatus", "heatingStatus","batteryStatus","batteryInsulationStatus","dcdcStatus","chgStatus"}; LinkedHashMap linkedHashMap = new LinkedHashMap<>(); + HashMap stringStringHashMap = new HashMap<>(); for (int i = 0; i < 47; i++) { String substring = realString.substring(count, count + intArr[i]); linkedHashMap.put(strArr[i],substring); @@ -119,8 +130,19 @@ public class SummarizeController { log.warn("hashMap:{}",linkedHashMap); ObjectMapper objectMapper = new ObjectMapper(); try { - String json = objectMapper.writeValueAsString(linkedHashMap); - log.error("json格式:{}",json); + String jsonData = objectMapper.writeValueAsString(linkedHashMap); + log.error("json格式:{}",jsonData); + transactionTemplate.execute(status -> { + try { + kafkaTemplate.send("topic1", UUID.randomUUID().toString(), jsonData); + status.flush(); + }catch (Exception e){ + log.error("kafka生产异常: {}", e.getMessage()); + status.setRollbackOnly(); + } + return null; + }); + } catch (JsonProcessingException e) { e.printStackTrace(); } diff --git a/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java b/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java new file mode 100644 index 0000000..c5c5273 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java @@ -0,0 +1,42 @@ +package com.hyc.kafka.demo.controller; + +import com.hyc.kafka.demo.config.KafkaSendResultHandler; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +/** + * 测试kafka控制层 + * + * @author YouChe·He + * @ClassName: KafkaController + * @Description: 测试kafka控制层 + * @CreateTime: 2024/6/6 14:42 + */ +@RestController +public class KafkaController { + + + private KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { + this.kafkaTemplate = kafkaTemplate; + this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); + } + + @GetMapping("/send") + @Transactional + public void sendMessage(String message) { + System.out.println("呼呼呼"); + + kafkaTemplate.send("topic1", UUID.randomUUID().toString(), message); + } + +} + + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e43241f..0accf78 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -113,7 +113,7 @@ spring: ms: 10000 listener: # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 - concurrency: 1 + concurrency: 4 # 自动提交关闭,需要设置手动消息确认 ack-mode: manual_immediate # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误