dev.protocol.parsing
chentaisen 2024-09-29 08:43:52 +08:00
parent 1a3b267685
commit c90f3fa056
1 changed files with 8 additions and 63 deletions

View File

@ -17,6 +17,8 @@ import java.util.concurrent.CompletableFuture;
/**
* mqtt
*
* @ClassName MqttTest
* @Description
* @Author Chen
@ -25,10 +27,14 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
public class MqttTest {
private static final Integer ID = 1;
private static final Integer CODE = 1;
@Resource
private KafkaProducer<String, String> kafkaProducer;
@Resource
private SysCarMessageServiceImpl sysCarMessageService;
@PostConstruct
public void Test() {
String topic = "vehicle";
@ -55,13 +61,12 @@ public class MqttTest {
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageLists(1, 1);
List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageLists(ID, CODE);
String string = new String(mqttMessage.getPayload());
System.out.println(new String(mqttMessage.getPayload()));
log.info(new String(mqttMessage.getPayload()));
String[] test = string.split(" ");
String[] results = new String[list.size()];
List<CompletableFuture<String>> futures = new ArrayList<>();
for (SysCarMessage carMessage : list) {
futures.add(CompletableFuture.supplyAsync(() -> {
int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
@ -119,66 +124,6 @@ public class MqttTest {
}
// public Result<List<SysCarMessage>> list(SysCarMessage sysCarMessage) throws ExecutionException, InterruptedException {
// List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageList(sysCarMessage);
// if (list == null || list.isEmpty()) {
// return Result.error(); //为空返回错误信息
// }
// String[] test = TEST.split(" ");
// String[] results = new String[list.size()];
// List<CompletableFuture<String>> futures = new ArrayList<>();
//
// for (SysCarMessage carMessage : list) {
// futures.add(CompletableFuture.supplyAsync(() -> {
// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
// int end = Integer.parseInt(carMessage.getMessageEndIndex());
// StringBuilder hexBuilder = new StringBuilder();
// for (int i = start; i < end; i++) {
// hexBuilder.append(test[i]);
// }
// String hex = hexBuilder.toString();
// char[] result = new char[hex.length() / 2];
// for (int x = 0; x < hex.length(); x += 2) {
// int high = Character.digit(hex.charAt(x), 16);
// int low = Character.digit(hex.charAt(x + 1), 16);
// result[x / 2] = (char) ((high << 4) + low);
// }
// return new String(result);
// }));
// }
// for (int i = 0; i < futures.size(); i++) {
// results[i] = futures.get(i).get();
// }
// log.info("======================={}", results);
// String jsonString = """
// [{
// "key": "vin",
// "label": "VIN码",
// "type": "String",
// "value": "vin131413534474"
// },{
// "key": "timestamp",
// "label": "时间戳",
// "type": "String",
// "value": "1727525252127"
// },{
// "key": "latitude",
// "label": "纬度",
// "type": "String",
// "value": "66.898"
// },{
// "key": "longitude",
// "label": "经度",
// "type": "String",
// "value": "99.124"
// }]""";
//
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
// kafkaProducer.send(producerRecord);
// log.info("消息发送成功:{}", jsonString);
// return Result.success(list);
// }
});
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());