Merge remote-tracking branch 'origin/dev.analysis' into dev
commit
7c67737481
|
@ -0,0 +1,47 @@
|
||||||
|
package com.muyu.enterprise.domain;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @Author:李庆帅
|
||||||
|
* @Package:com.muyu.enterprise.domain
|
||||||
|
* @Project:cloud-server
|
||||||
|
* @name:MqttProperties
|
||||||
|
* @Date:2024/10/10 20:01
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class MqttProperties
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 节点
|
||||||
|
*/
|
||||||
|
private String broker;
|
||||||
|
/**
|
||||||
|
* 主题
|
||||||
|
*/
|
||||||
|
private String topic;
|
||||||
|
/**
|
||||||
|
* 用户名
|
||||||
|
*/
|
||||||
|
private String userName;
|
||||||
|
/**
|
||||||
|
* 密码
|
||||||
|
*/
|
||||||
|
private String password;
|
||||||
|
/**
|
||||||
|
* 客户端ID
|
||||||
|
*/
|
||||||
|
private String clientId;
|
||||||
|
/**
|
||||||
|
* 上报级别
|
||||||
|
*/
|
||||||
|
private int qos=0;
|
||||||
|
|
||||||
|
}
|
|
@ -136,6 +136,17 @@
|
||||||
<artifactId>cloud-modules-enterprise-cache</artifactId>
|
<artifactId>cloud-modules-enterprise-cache</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- rabbit消息队列模块 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-common-rabbit</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- <!– gateway系统模块 –>-->
|
||||||
|
<!-- <dependency>-->
|
||||||
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
|
<!-- <artifactId>cloud-modules-vehicle-gateway</artifactId>-->
|
||||||
|
<!-- </dependency>-->
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.muyu.analysis.parsing.mqtt;
|
package com.muyu.analysis.parsing.mqtt;
|
||||||
|
|
||||||
|
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
import com.muyu.common.core.constant.KafkaConstants;
|
||||||
import com.muyu.common.core.constant.RedisConstants;
|
import com.muyu.common.core.constant.RedisConstants;
|
||||||
import com.muyu.common.core.domain.Result;
|
import com.muyu.common.core.domain.Result;
|
||||||
|
@ -7,6 +8,7 @@ import com.muyu.common.mqtt.MQTTConnect;
|
||||||
import com.muyu.enterprise.cache.CarMessageValueCacheService;
|
import com.muyu.enterprise.cache.CarMessageValueCacheService;
|
||||||
import com.muyu.enterprise.cache.CarVehicleCacheService;
|
import com.muyu.enterprise.cache.CarVehicleCacheService;
|
||||||
import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
|
import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
|
||||||
|
import com.muyu.enterprise.domain.MqttProperties;
|
||||||
import com.muyu.enterprise.domain.car.Vehicle;
|
import com.muyu.enterprise.domain.car.Vehicle;
|
||||||
import com.muyu.enterprise.domain.car.VehicleType;
|
import com.muyu.enterprise.domain.car.VehicleType;
|
||||||
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
|
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
|
||||||
|
@ -20,6 +22,7 @@ import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -64,49 +67,79 @@ public class ParsingMQTT {
|
||||||
@Resource
|
@Resource
|
||||||
private CarMessageValueCacheService allMessageValueCacheService;
|
private CarMessageValueCacheService allMessageValueCacheService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MqttClientService mqttClientService;
|
||||||
/**
|
/**
|
||||||
* 协议解析
|
* 协议解析
|
||||||
*/
|
*/
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void mqttClient() {
|
public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
|
||||||
// String topic = "vehicle";
|
// String topic = "vehicle";
|
||||||
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
|
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
|
||||||
//// String clientId = "JavaSample";
|
//// String clientId = "JavaSample";
|
||||||
|
|
||||||
try {
|
String payload = new String(mqttMessage.getPayload());
|
||||||
// 第三个参数为空,默认持久化策略
|
log.info("====:{}", payload);
|
||||||
MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
|
|
||||||
MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
||||||
connOpts.setCleanSession(true);
|
|
||||||
log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
|
|
||||||
sampleClient.connect(connOpts);
|
|
||||||
sampleClient.subscribe(MQTTConnect.TOPIC, 0);
|
|
||||||
sampleClient.setCallback(new MqttCallback() {
|
|
||||||
// 连接丢失
|
|
||||||
@Override
|
|
||||||
public void connectionLost(Throwable throwable) {
|
|
||||||
|
|
||||||
}
|
// try {
|
||||||
|
// // 第三个参数为空,默认持久化策略
|
||||||
|
// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
|
||||||
|
// MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
// connOpts.setCleanSession(true);
|
||||||
|
// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
|
||||||
|
// sampleClient.connect(connOpts);
|
||||||
|
// //MQTTConnect.TOPIC
|
||||||
|
// sampleClient.subscribe(MQTTConnect.TOPIC, 0);
|
||||||
|
// sampleClient.setCallback(new MqttCallback() {
|
||||||
|
// // 连接丢失
|
||||||
|
// @Override
|
||||||
|
// public void connectionLost(Throwable throwable) {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // 连接成功
|
||||||
|
// @Override
|
||||||
|
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
||||||
|
// System.out.println(new String(mqttMessage.getPayload()));
|
||||||
|
//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload()));
|
||||||
|
////
|
||||||
|
//// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
||||||
|
//// entries.toString() );
|
||||||
|
//// kafkaProducer.send(producerRecord);
|
||||||
|
//// log.info("解析之后的数据:"+entries);
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// // 接收信息
|
||||||
|
// @Override
|
||||||
|
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
// } catch (MqttException me) {
|
||||||
|
// log.info("reason " + me.getReasonCode());
|
||||||
|
// log.info("msg " + me.getMessage());
|
||||||
|
// log.info("loc " + me.getLocalizedMessage());
|
||||||
|
// log.info("cause " + me.getCause());
|
||||||
|
// log.info("excep " + me);
|
||||||
|
// System.out.println("reason " + me.getReasonCode());
|
||||||
|
// System.out.println("msg " + me.getMessage());
|
||||||
|
// System.out.println("loc " + me.getLocalizedMessage());
|
||||||
|
// System.out.println("cause " + me.getCause());
|
||||||
|
// System.out.println("excep " + me);
|
||||||
|
// me.printStackTrace();
|
||||||
|
// }
|
||||||
|
|
||||||
// 连接成功
|
|
||||||
@Override
|
|
||||||
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
||||||
System.out.println(new String(mqttMessage.getPayload()));
|
|
||||||
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
|
|
||||||
|
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
|
||||||
entries.toString() );
|
|
||||||
kafkaProducer.send(producerRecord);
|
|
||||||
log.info("解析之后的数据:"+entries);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 协议解析
|
* 协议解析
|
||||||
* @param messageStr
|
* @param messageStr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public JSONObject protocolParsing(String messageStr) {
|
// public JSONObject protocolParsing() {
|
||||||
//根据空格切割数据
|
//根据空格切割数据
|
||||||
String[] hexArray = messageStr.split(" ");
|
String[] hexArray = messageStr.split(" ");
|
||||||
StringBuilder result = new StringBuilder();
|
StringBuilder result = new StringBuilder();
|
||||||
|
@ -142,28 +175,10 @@ public class ParsingMQTT {
|
||||||
jsonObject.put(messageValue.getMessageLabel(), value);
|
jsonObject.put(messageValue.getMessageLabel(), value);
|
||||||
}
|
}
|
||||||
return jsonObject;
|
return jsonObject;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// 接收信息
|
|
||||||
@Override
|
|
||||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (MqttException me) {
|
|
||||||
log.info("reason " + me.getReasonCode());
|
|
||||||
log.info("msg " + me.getMessage());
|
|
||||||
log.info("loc " + me.getLocalizedMessage());
|
|
||||||
log.info("cause " + me.getCause());
|
|
||||||
log.info("excep " + me);
|
|
||||||
System.out.println("reason " + me.getReasonCode());
|
|
||||||
System.out.println("msg " + me.getMessage());
|
|
||||||
System.out.println("loc " + me.getLocalizedMessage());
|
|
||||||
System.out.println("cause " + me.getCause());
|
|
||||||
System.out.println("excep " + me);
|
|
||||||
me.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
package com.muyu.analysis.parsing.mqtt.service;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONObject;
|
||||||
|
import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
|
||||||
|
import com.muyu.common.core.constant.KafkaConstants;
|
||||||
|
import com.muyu.enterprise.domain.MqttProperties;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author:李庆帅
|
||||||
|
* @Package:com.muyu.analysis.parsing.mqtt.service
|
||||||
|
* @Project:cloud-server
|
||||||
|
* @name:MqttClientService
|
||||||
|
* @Date:2024/10/10 20:24
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class MqttClientService {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ParsingMQTT messageProcessor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
private MqttClient sampleClient;
|
||||||
|
|
||||||
|
|
||||||
|
public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
|
||||||
|
executorService.submit(() -> {
|
||||||
|
try {
|
||||||
|
connectAndSubscribe(mqttProperties);
|
||||||
|
} catch (MqttException | IOException e) {
|
||||||
|
log.error("MQTT连接或订阅失败", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
|
||||||
|
if (sampleClient != null && sampleClient.isConnected()) {
|
||||||
|
log.info("MQTT客户端已经连接,跳过重新连接。");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
|
||||||
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
connOpts.setUserName(mqttProperties.getUserName());
|
||||||
|
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
|
||||||
|
connOpts.setCleanSession(true);
|
||||||
|
|
||||||
|
sampleClient.connect(connOpts);
|
||||||
|
sampleClient.subscribe(mqttProperties.getTopic(), 0);
|
||||||
|
|
||||||
|
sampleClient.setCallback(new MqttCallback() {
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable throwable) {
|
||||||
|
log.error("连接丢失:{}", throwable.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String s, MqttMessage mqttMessage) {
|
||||||
|
executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
|
||||||
|
System.out.println(new String(mqttMessage.getPayload()));
|
||||||
|
JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
|
||||||
|
// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
|
||||||
|
|
||||||
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
||||||
|
entries.toString() );
|
||||||
|
kafkaProducer.send(producerRecord);
|
||||||
|
log.info("解析之后的数据:"+entries);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||||
|
log.info("消息发送完成:{}", iMqttDeliveryToken);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
executorService.shutdown();
|
||||||
|
if (sampleClient != null && sampleClient.isConnected()) {
|
||||||
|
try {
|
||||||
|
sampleClient.disconnect();
|
||||||
|
} catch (MqttException e) {
|
||||||
|
log.error("MQTT客户端断开连接失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.muyu.analysis.parsing.consumer;
|
||||||
|
|
||||||
|
|
||||||
|
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
|
||||||
|
import com.muyu.enterprise.domain.MqttProperties;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author:李庆帅
|
||||||
|
* @Package:com.muyu.analysis.parsing.consumer
|
||||||
|
* @Project:cloud-server
|
||||||
|
* @name:RabbitListenerComponent
|
||||||
|
* @Date:2024/10/10 20:24
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class RabbitListenerComponent {
|
||||||
|
@Autowired
|
||||||
|
private MqttClientService mqttClientService;
|
||||||
|
|
||||||
|
private static final String FORM_QUEUE = "GO_LINE";
|
||||||
|
|
||||||
|
@RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true"))
|
||||||
|
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
|
||||||
|
try {
|
||||||
|
mqttClientService.connectAndSubscribeAsync(mqttProperties);
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
log.error("处理RabbitMQ消息时发生错误", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
//package com.muyu.analysis.parsing.manager;
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//import com.alibaba.fastjson2.JSONObject;
|
||||||
|
//import lombok.extern.slf4j.Slf4j;
|
||||||
|
//import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
//import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
//import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
//import org.springframework.stereotype.Service;
|
||||||
|
//
|
||||||
|
//import javax.annotation.Resource;
|
||||||
|
//import java.util.ArrayList;
|
||||||
|
//import java.util.List;
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//
|
||||||
|
///**
|
||||||
|
// *
|
||||||
|
// *
|
||||||
|
// * @Author:李庆帅
|
||||||
|
// * @Package:com.muyu.analysis.parsing.manager
|
||||||
|
// * @Project:cloud-server
|
||||||
|
// * @name:MessageProcessor
|
||||||
|
// * @Date:2024/10/10 20:24
|
||||||
|
// */
|
||||||
|
//
|
||||||
|
//@Slf4j
|
||||||
|
//@Service
|
||||||
|
//public class MessageProcessor {
|
||||||
|
// private static final int ID = 1;
|
||||||
|
// @Resource
|
||||||
|
// private SysCarMessageServiceImpl sysCarMessageService;
|
||||||
|
// @Resource
|
||||||
|
// private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// public void processMessage(MqttMessage mqttMessage) {
|
||||||
|
// String payload = new String(mqttMessage.getPayload());
|
||||||
|
// log.info("====:{}", payload);
|
||||||
|
//
|
||||||
|
// List<SysCarMessage> carMessages = sysCarMessageService.selectSysCarMessageLists(ID);
|
||||||
|
// List<KafKaData> kafKaDataList = new ArrayList<>();
|
||||||
|
//
|
||||||
|
// String[] test = payload.split(" ");
|
||||||
|
// for (SysCarMessage carMessage : carMessages) {
|
||||||
|
// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
|
||||||
|
// int end = Integer.parseInt(carMessage.getMessageEndIndex());
|
||||||
|
//
|
||||||
|
// StringBuilder hexBuilder = new StringBuilder();
|
||||||
|
// for (int i = start; i < end; i++) {
|
||||||
|
// hexBuilder.append(test[i]);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// String hex = hexBuilder.toString();
|
||||||
|
// char[] result = new char[hex.length() / 2];
|
||||||
|
// for (int x = 0; x < hex.length(); x += 2) {
|
||||||
|
// int high = Character.digit(hex.charAt(x), 16);
|
||||||
|
// int low = Character.digit(hex.charAt(x + 1), 16);
|
||||||
|
// result[x / 2] = (char) ((high << 4) + low);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// String value = new String(result);
|
||||||
|
// kafKaDataList.add(KafKaData.builder()
|
||||||
|
// .key(carMessage.getMessageTypeCode())
|
||||||
|
// .label(carMessage.getMessageTypeCode())
|
||||||
|
// .value(value)
|
||||||
|
// .type(carMessage.getMessageType())
|
||||||
|
// .build());
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// kafKaDataList.add(KafKaData.builder()
|
||||||
|
// .key("firmCode")
|
||||||
|
// .label("企业编码")
|
||||||
|
// .value("firm01")
|
||||||
|
// .type("String")
|
||||||
|
// .build());
|
||||||
|
// String jsonString = JSONObject.toJSONString(kafKaDataList);
|
||||||
|
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
|
||||||
|
// kafkaProducer.send(producerRecord);
|
||||||
|
// log.info("kafka投产:{}", jsonString);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//}
|
7
pom.xml
7
pom.xml
|
@ -425,6 +425,13 @@
|
||||||
<version>${muyu.version}</version>
|
<version>${muyu.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- gateway系统模块 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-modules-vehicle-gateway</artifactId>
|
||||||
|
<version>${muyu.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
Loading…
Reference in New Issue