feat():解析模块的日志记录
parent
f0f6cf006f
commit
0d48eb2e34
|
@ -1,200 +0,0 @@
|
||||||
//package com.muyu.analysis.parsing.mqtt;
|
|
||||||
//
|
|
||||||
//import com.muyu.common.mqtt.MQTTConnect;
|
|
||||||
//import com.muyu.enterprise.cache.CarMessageValueCacheService;
|
|
||||||
//import com.muyu.enterprise.cache.CarVehicleCacheService;
|
|
||||||
//import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
|
|
||||||
//import com.muyu.enterprise.domain.car.MessageValue;
|
|
||||||
//import com.muyu.enterprise.domain.car.Vehicle;
|
|
||||||
//import com.muyu.enterprise.domain.car.VehicleType;
|
|
||||||
//import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
|
|
||||||
//import com.muyu.enterprise.remote.RemoteMessageValueService;
|
|
||||||
//import com.muyu.enterprise.remote.RemoteVehicleService;
|
|
||||||
//import jakarta.annotation.PostConstruct;
|
|
||||||
//import jakarta.annotation.Resource;
|
|
||||||
//import cn.hutool.json.JSONObject;
|
|
||||||
//import lombok.extern.log4j.Log4j2;
|
|
||||||
//import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
//import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
//import org.eclipse.paho.client.mqttv3.*;
|
|
||||||
//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
||||||
//import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
//import org.springframework.stereotype.Component;
|
|
||||||
//
|
|
||||||
//import java.util.List;
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * 协议解析处理数据发送传送到队列
|
|
||||||
// * @Author:李庆帅
|
|
||||||
// * @Package:com.muyu.analysis.parsing.mqtt
|
|
||||||
// * @Project:cloud-server
|
|
||||||
// * @name:AnalysisMQTT
|
|
||||||
// * @Date:2024/10/8 20:52
|
|
||||||
// */
|
|
||||||
//@Log4j2
|
|
||||||
//@Component
|
|
||||||
//public class AnalysisMQTT {
|
|
||||||
//
|
|
||||||
// private final static String topic = "sysCar_vin_topic";
|
|
||||||
//
|
|
||||||
// @Resource
|
|
||||||
// private RedisTemplate<String, Object> redisTemplate;
|
|
||||||
//
|
|
||||||
// @Resource
|
|
||||||
// private KafkaProducer<String, String> kafkaProducer;
|
|
||||||
//
|
|
||||||
// @Resource
|
|
||||||
// private RemoteVehicleService remoteVehicleService;
|
|
||||||
//
|
|
||||||
// @Resource
|
|
||||||
// private RemoteMessageValueService remoteMessageValueService;
|
|
||||||
//
|
|
||||||
// //车辆信息
|
|
||||||
// @Resource
|
|
||||||
// private CarVehicleCacheService vehicleCacheService;
|
|
||||||
//
|
|
||||||
// //车辆类型信息
|
|
||||||
// @Resource
|
|
||||||
// private CarVehicleTypeCacheService vehicleTypeCacheService;
|
|
||||||
//
|
|
||||||
// //报文模版信息
|
|
||||||
// @Resource
|
|
||||||
// private CarMessageValueCacheService allMessageValueCacheService;
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // MQTT主题
|
|
||||||
// private static final String TOPIC = "vehicle";
|
|
||||||
// // MQTT Broker地址
|
|
||||||
// private static final String BROKER = "tcp://106.15.136.7:1883";
|
|
||||||
// // MQTT客户端ID
|
|
||||||
// private static final String CLIENT_ID = "JavaSample";
|
|
||||||
// // MQTT客户端
|
|
||||||
// private MqttClient mqttClient;
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * MQTT连接选项
|
|
||||||
// */
|
|
||||||
// public AnalysisMQTT(CarVehicleCacheService vehicleCacheService, CarVehicleTypeCacheService vehicleTypeCacheService, CarMessageValueCacheService allMessageValueCacheService) {
|
|
||||||
// this.vehicleCacheService = vehicleCacheService;
|
|
||||||
// this.vehicleTypeCacheService = vehicleTypeCacheService;
|
|
||||||
// this.allMessageValueCacheService = allMessageValueCacheService;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 初始化MQTT连接
|
|
||||||
// */
|
|
||||||
// @PostConstruct
|
|
||||||
// public void init() {
|
|
||||||
// connectToMqttBroker();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 连接MQTT Broker
|
|
||||||
// */
|
|
||||||
// private void connectToMqttBroker() {
|
|
||||||
// try {
|
|
||||||
// mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
|
|
||||||
// MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
||||||
// connOpts.setCleanSession(true);
|
|
||||||
// log.info("连接到协议: " + BROKER);
|
|
||||||
// mqttClient.connect(connOpts);
|
|
||||||
// mqttClient.subscribe(TOPIC, 0);
|
|
||||||
// mqttClient.setCallback(new MqttCallbackHandler());
|
|
||||||
// } catch (MqttException me) {
|
|
||||||
// log.error("连接MQTT Broker失败: [{}]", me.getMessage());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * MQTT回调处理器
|
|
||||||
// */
|
|
||||||
// private class MqttCallbackHandler implements MqttCallback {
|
|
||||||
//
|
|
||||||
// // 连接丢失
|
|
||||||
// @Override
|
|
||||||
// public void connectionLost(Throwable throwable) {
|
|
||||||
// log.error("连接丢失: [{}]", throwable.getMessage());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 连接成功
|
|
||||||
// @Override
|
|
||||||
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
||||||
// handleMqttMessage(mqttMessage);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // 接收信息
|
|
||||||
// @Override
|
|
||||||
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 处理MQTT消息
|
|
||||||
// *
|
|
||||||
// * @param mqttMessage
|
|
||||||
// */
|
|
||||||
// private void handleMqttMessage(MqttMessage mqttMessage) {
|
|
||||||
// // 解析MQTT消息
|
|
||||||
// String messageStr = new String(mqttMessage.getPayload());
|
|
||||||
// log.info("接收到MQTT消息: " + messageStr);
|
|
||||||
// // 解析协议
|
|
||||||
// JSONObject parseMessage = parseProtocol(messageStr);
|
|
||||||
// // 发送Kafka消息
|
|
||||||
// sendKafkaMessage(parseMessage);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 解析协议
|
|
||||||
// *
|
|
||||||
// * @param messageStr
|
|
||||||
// * @return
|
|
||||||
// */
|
|
||||||
// private JSONObject parseProtocol(String messageStr) {
|
|
||||||
// String[] hexArray = messageStr.split(" ");
|
|
||||||
// // 遍历十六进制数据转换为字符
|
|
||||||
// StringBuilder stringBuilder = new StringBuilder();
|
|
||||||
// for (String hex : hexArray) {
|
|
||||||
// int decimal = Integer.parseInt(hex, 16);
|
|
||||||
// stringBuilder.append((char) decimal);
|
|
||||||
// }
|
|
||||||
// // 取出车辆VIN码
|
|
||||||
// String vehicleVin = stringBuilder.substring(1, 18);
|
|
||||||
// log.info("车辆VIN码: {}", vehicleVin);
|
|
||||||
// // 根据车辆VIN码查询车辆信息
|
|
||||||
// Vehicle vehicle = vehicleCacheService.get(vehicleVin);
|
|
||||||
// VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
|
|
||||||
// Long templateId = vehicleType.getMessageTemplateId();
|
|
||||||
// List<MessageValueListResp> templateList = allMessageValueCacheService.get(String.valueOf(templateId));
|
|
||||||
// // 判断报文模板列表不为空
|
|
||||||
// if (templateList.isEmpty()) {
|
|
||||||
// throw new RuntimeException("报文模版为空");
|
|
||||||
// }
|
|
||||||
// // 存储报文模版解析后的数据
|
|
||||||
// JSONObject jsonObject = new JSONObject();
|
|
||||||
// for (MessageValueListResp messageValue : templateList) {
|
|
||||||
// // 起始位下标
|
|
||||||
// Integer startIndex = messageValue.getMessageStartIndex() - 1;
|
|
||||||
// // 结束位下标
|
|
||||||
// Integer endIndex = messageValue.getMessageEndIndex();
|
|
||||||
// // 根据报文模版截取数据
|
|
||||||
// String value = stringBuilder.substring(startIndex, endIndex);
|
|
||||||
// // 存入数据
|
|
||||||
// jsonObject.put(messageValue.getMessageLabel(), value);
|
|
||||||
// }
|
|
||||||
// return jsonObject;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * 发送Kafka消息
|
|
||||||
// * @param parseMessage
|
|
||||||
// */
|
|
||||||
// private void sendKafkaMessage(JSONObject parseMessage) {
|
|
||||||
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", parseMessage.toString());
|
|
||||||
// kafkaProducer.send(producerRecord);
|
|
||||||
// log.info("发送Kafka消息: " + parseMessage);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//}
|
|
Loading…
Reference in New Issue