feat()解析报文后接入kafka

dev
20300 2024-06-09 20:28:38 +08:00
parent ddef239905
commit 9c72d245a9
3 changed files with 75 additions and 11 deletions

View File

@ -1,25 +1,23 @@
package com.hyc.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hyc.domain.ConnectionParameter;
import com.hyc.domain.SummarizeResp;
import com.hyc.kafka.demo.config.KafkaSendResultHandler;
import com.hyc.result.Result;
import com.hyc.service.SummarizeService;
import com.hyc.util.CalculateCheckDigit;
import com.hyc.util.ConversionUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.*;
/**
* Mqtt
@ -37,6 +35,16 @@ public class SummarizeController {
@Autowired
private SummarizeService summarizeService;
private KafkaTemplate<Object, Object> kafkaTemplate;
private final TransactionTemplate transactionTemplate;
public SummarizeController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler, TransactionTemplate transactionTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.transactionTemplate = transactionTemplate;
this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
}
@PostMapping("/createMqttClient")
public String createMqttClient(@RequestBody List<ConnectionParameter> connectionParameterList){
@ -58,6 +66,7 @@ public class SummarizeController {
@Transactional
public boolean createOneMqttClient(ConnectionParameter connectionParameter){
int qos = 0;
@ -81,7 +90,8 @@ public class SummarizeController {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
cause.printStackTrace();
System.out.println("connectionLost: ");
}
@Override
@ -111,6 +121,7 @@ public class SummarizeController {
"positionStatus","easStatus","ptcStatus","epsStatus","absStatus","mcuStatus",
"heatingStatus","batteryStatus","batteryInsulationStatus","dcdcStatus","chgStatus"};
LinkedHashMap<String, Object> linkedHashMap = new LinkedHashMap<>();
HashMap<String, String> stringStringHashMap = new HashMap<>();
for (int i = 0; i < 47; i++) {
String substring = realString.substring(count, count + intArr[i]);
linkedHashMap.put(strArr[i],substring);
@ -119,8 +130,19 @@ public class SummarizeController {
log.warn("hashMap:{}",linkedHashMap);
ObjectMapper objectMapper = new ObjectMapper();
try {
String json = objectMapper.writeValueAsString(linkedHashMap);
log.error("json格式:{}",json);
String jsonData = objectMapper.writeValueAsString(linkedHashMap);
log.error("json格式:{}",jsonData);
transactionTemplate.execute(status -> {
try {
kafkaTemplate.send("topic1", UUID.randomUUID().toString(), jsonData);
status.flush();
}catch (Exception e){
log.error("kafka生产异常: {}", e.getMessage());
status.setRollbackOnly();
}
return null;
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}

View File

@ -0,0 +1,42 @@
package com.hyc.kafka.demo.controller;
import com.hyc.kafka.demo.config.KafkaSendResultHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* kafka
*
* @author YouChe·He
* @ClassName: KafkaController
* @Description: kafka
* @CreateTime: 2024/6/6 14:42
*/
@RestController
public class KafkaController {
private KafkaTemplate<Object, Object> kafkaTemplate;
public KafkaController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
}
@GetMapping("/send")
@Transactional
public void sendMessage(String message) {
System.out.println("呼呼呼");
kafkaTemplate.send("topic1", UUID.randomUUID().toString(), message);
}
}

View File

@ -113,7 +113,7 @@ spring:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 1
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误