diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java index add2731..f75c32a 100644 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java @@ -17,6 +17,8 @@ import java.util.concurrent.CompletableFuture; /** + * mqtt + * * @ClassName MqttTest * @Description 描述 * @Author Chen @@ -25,10 +27,14 @@ import java.util.concurrent.CompletableFuture; @Slf4j @Component public class MqttTest { + private static final Integer ID = 1; + private static final Integer CODE = 1; + @Resource private KafkaProducer kafkaProducer; @Resource private SysCarMessageServiceImpl sysCarMessageService; + @PostConstruct public void Test() { String topic = "vehicle"; @@ -55,13 +61,12 @@ public class MqttTest { // 连接成功 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - List list = sysCarMessageService.selectSysCarMessageLists(1, 1); + List list = sysCarMessageService.selectSysCarMessageLists(ID, CODE); String string = new String(mqttMessage.getPayload()); - System.out.println(new String(mqttMessage.getPayload())); + log.info(new String(mqttMessage.getPayload())); String[] test = string.split(" "); String[] results = new String[list.size()]; List> futures = new ArrayList<>(); - for (SysCarMessage carMessage : list) { futures.add(CompletableFuture.supplyAsync(() -> { int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; @@ -119,66 +124,6 @@ public class MqttTest { } -// public Result> list(SysCarMessage sysCarMessage) throws ExecutionException, InterruptedException { -// List list = sysCarMessageService.selectSysCarMessageList(sysCarMessage); -// if (list == null || list.isEmpty()) { -// return Result.error(); //为空返回错误信息 -// } -// String[] test = TEST.split(" "); -// String[] results = new String[list.size()]; -// List> futures = new ArrayList<>(); -// -// for (SysCarMessage carMessage : list) { -// futures.add(CompletableFuture.supplyAsync(() -> { -// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; -// int end = Integer.parseInt(carMessage.getMessageEndIndex()); -// StringBuilder hexBuilder = new StringBuilder(); -// for (int i = start; i < end; i++) { -// hexBuilder.append(test[i]); -// } -// String hex = hexBuilder.toString(); -// char[] result = new char[hex.length() / 2]; -// for (int x = 0; x < hex.length(); x += 2) { -// int high = Character.digit(hex.charAt(x), 16); -// int low = Character.digit(hex.charAt(x + 1), 16); -// result[x / 2] = (char) ((high << 4) + low); -// } -// return new String(result); -// })); -// } -// for (int i = 0; i < futures.size(); i++) { -// results[i] = futures.get(i).get(); -// } -// log.info("======================={}", results); -// String jsonString = """ -// [{ -// "key": "vin", -// "label": "VIN码", -// "type": "String", -// "value": "vin131413534474" -// },{ -// "key": "timestamp", -// "label": "时间戳", -// "type": "String", -// "value": "1727525252127" -// },{ -// "key": "latitude", -// "label": "纬度", -// "type": "String", -// "value": "66.898" -// },{ -// "key": "longitude", -// "label": "经度", -// "type": "String", -// "value": "99.124" -// }]"""; -// -// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); -// kafkaProducer.send(producerRecord); -// log.info("消息发送成功:{}", jsonString); -// return Result.success(list); -// } - }); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode());