diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml
index 79c42b6..eead750 100644
--- a/cloud-common/pom.xml
+++ b/cloud-common/pom.xml
@@ -22,6 +22,7 @@
cloud-common-rabbit
cloud-common-saas
cloud-common-wechat
+ cloud-common-kafka
cloud-common
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
index e0e09eb..b7bfbc8 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
@@ -33,6 +33,22 @@
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-config
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-openfeign
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.2
+
com.alibaba.cloud
@@ -83,6 +99,12 @@
pagehelper
6.0.0
+
+ com.muyu
+ cloud-common-kafka
+ 3.6.3
+ compile
+
cloud-electronic
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java
index 8b350cd..67609f3 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java
@@ -37,7 +37,9 @@ public class CarFenceClazzController {
@Operation(summary = "查询数据",description = "查询数据")
public Result> selectConnect(){
List connects = carFenceClazzService.list();
+ log.info("查询数据成功");
return Result.success(
+
connects, "操作成功"
);
}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java
index bb4599f..eb25bc3 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java
@@ -43,6 +43,7 @@ public class CarFenceController {
@Validated @RequestBody CarFenceReq req
){
Page connects = carFenceService.selectCarFence(req);
+ log.info("查询数据:"+ connects);
return Result.success(
connects, "操作成功"
);
@@ -57,11 +58,13 @@ public class CarFenceController {
@Validated @RequestBody CarFence carFence
){
Boolean connects = carFenceService.addCarFence(carFence);
+ log.info("shd");
return connects?Result.success(
null, "操作成功"
):Result.success(
null, "操作失败"
);
+
}
/**
@@ -73,6 +76,8 @@ public class CarFenceController {
@Validated @RequestBody CarGroupReq req
){
Boolean connects = carFenceService.addCarGroup(req);
+ log.info("添加数据:"+ connects);
+
return connects?Result.success(
null, "操作成功"
):Result.success(
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java
new file mode 100644
index 0000000..24e4999
--- /dev/null
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java
@@ -0,0 +1,18 @@
+//package com.muyu.server.feign;
+//
+//import org.springframework.cloud.openfeign.FeignClient;
+//import org.springframework.web.bind.annotation.GetMapping;
+//
+//@FeignClient(name = "cloud-modules-carData",value = "KafkaProducerController")
+//public abstract class KafkaClient {
+// /**
+// * 处理"/produceTest"的GET请求,用于执行特定的测试任务
+// * 此方法的具体实现将在子类中定义
+// * @return 返回类型为String,表示该方法的执行结果
+// * @return
+// */
+// @GetMapping("/produceTest")
+// public abstract String produceTest();
+//
+//
+//}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java
index 1e08ebc..3cf4ba0 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java
@@ -9,6 +9,8 @@ import com.muyu.domain.req.CarInformationListReq;
import com.muyu.domain.req.CarInformationUpdReq;
import com.muyu.domain.resp.CarInformationResp;
import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
import java.util.List;
@@ -18,4 +20,12 @@ import java.util.List;
@Mapper
public interface CarInformationMapper extends MPJBaseMapper {
+
+ /**
+ * 根据车辆VIN码获取车辆类型(报文模板分类用的值)
+ * @param carInformationVIN
+ * @return
+ */
+ @Select("SELECT `car_information`.car_Information_Type FROM `car_information` WHERE `car_information`.car_information_VIN = #{carInformationVIN}")
+ Long selectcarMessageCartype(@Param("carInformationVIN") String carInformationVIN);
}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java
new file mode 100644
index 0000000..007c82e
--- /dev/null
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java
@@ -0,0 +1,105 @@
+package com.muyu.server.mqtt;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.muyu.common.kafka.constants.KafkaConstants;
+import com.muyu.domain.CarMessage;
+import com.muyu.server.service.CarMessageService;
+import jakarta.annotation.PostConstruct;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@Component
+public class Demo {
+ @Resource
+ private CarMessageService service;
+ @Resource
+ private KafkaProducer kafkaProducer;
+ @PostConstruct
+ public void test() {
+
+ String topic = "vehicle";
+ String content = "Message from MqttPublishSample";
+ int qos = 2;
+ String broker = "tcp://106.15.136.7:1883";
+ String clientId = "JavaSample";
+
+ try {
+ // 第三个参数为空,默认持久化策略
+ MqttClient sampleClient = new MqttClient(broker, clientId);
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ System.out.println("Connecting to broker: "+broker);
+ sampleClient.connect(connOpts);
+ sampleClient.subscribe(topic,0);
+ sampleClient.setCallback(new MqttCallback() {
+ // 连接丢失
+ @Override
+ public void connectionLost(Throwable throwable) {
+
+ }
+ // 连接成功
+ @Override
+ public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+
+ List list= service.selectCarMessageList(1,2);
+ String str = new String( mqttMessage.getPayload() );
+ System.out.println(str);
+ String[] test = str.split(" ");
+ String[] results = new String[list.size()];
+ List> futures = new ArrayList<>();
+ for (CarMessage carmsg : list) {
+ futures.add(CompletableFuture.supplyAsync(() -> {
+ int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1;
+ int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex()));
+ StringBuilder hexBuilder = new StringBuilder();
+ for (int j = startIndex; j < endIndex; j++) {
+ hexBuilder.append(test[j]);
+ }
+ // 创建16进制的对象
+ String hex = hexBuilder.toString();
+ // 转橙字符数组
+ char[] result = new char[hex.length() / 2];
+ for (int x = 0; x < hex.length(); x += 2) {
+ // 先转十进制
+ int high = Character.digit(hex.charAt(x), 16);
+ // 转二进制
+ int low = Character.digit(hex.charAt(x + 1), 16);
+ // 转字符
+ result[x / 2] = (char) ((high << 4) + low);
+ }
+ return new String(result);
+ }));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ results[i] = futures.get(i).get();
+ }
+ String jsonString = JSONObject.toJSONString( results );
+ ProducerRecord producerRecord = new ProducerRecord<>( KafkaConstants.KafkaTopic, jsonString);
+ kafkaProducer.send(producerRecord);
+ }
+ // 接收信息
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+ }
+ });
+ } catch(MqttException me) {
+ System.out.println("reason "+me.getReasonCode());
+ System.out.println("msg "+me.getMessage());
+ System.out.println("loc "+me.getLocalizedMessage());
+ System.out.println("cause "+me.getCause());
+ System.out.println("excep "+me);
+ me.printStackTrace();
+ }
+ }
+
+}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java
index 3199524..598f6f7 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java
@@ -1,8 +1,9 @@
package com.muyu.server.service;
+import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.IService;
-import com.muyu.common.core.domain.Result;
import com.muyu.domain.CarMessage;
+import com.muyu.domain.resp.CarInformationResp;
import com.muyu.domain.resp.CarMessageResp;
import java.util.List;
@@ -44,4 +45,19 @@ public interface CarMessageService extends IService {
*/
List selectJoinList(Long id);
+
+ //报文切割
+ /**
+ * 分割字符串
+ * 解析字符
+ * 获取报文最终信息
+ */
+ JSONObject inciseCarMessage(String testString);
+
+
+ List selectCarMessageList(int i, int i1);
+
+
+
+
}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java
index 4044570..404db2d 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java
@@ -1,5 +1,8 @@
package com.muyu.server.service.impl;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import com.muyu.common.core.utils.StringUtils;
@@ -7,15 +10,21 @@ import com.muyu.domain.CarMessage;
import com.muyu.domain.CarMessageType;
import com.muyu.domain.CarType;
import com.muyu.domain.resp.CarMessageResp;
+import com.muyu.server.mapper.CarInformationMapper;
import com.muyu.server.mapper.CarMessageMapper;
import com.muyu.server.service.CarMessageService;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.util.Collection;
import java.util.List;
+import java.util.Objects;
/**
* 报文模板展示列表业务实现层
*/
+@Log4j2
@Service
public class CarMessageServiceImpl
extends ServiceImpl
@@ -23,6 +32,11 @@ public class CarMessageServiceImpl
@Resource
private CarMessageMapper carMessageMapper;
+ @Resource
+ private CarInformationMapper carInformationMapper;
+
+
+
/**
* 根据所属车类别 解析 车辆报文模板
* @param
@@ -91,5 +105,89 @@ public class CarMessageServiceImpl
.eq(StringUtils.isNotNull(id),CarMessage::getCarMessageCartype, id));
}
+ @Override
+ public JSONObject inciseCarMessage(String testString) {
+ return null;
+ }
+
+ @Override
+ public List selectCarMessageList(int id, int modelCode) {
+ LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.eq(CarMessage::getCarMessageId, id);
+ queryWrapper.eq(CarMessage::getCarMessageType, modelCode);
+ return this.list(queryWrapper);
+ }
+
+
+
+ //报文处理
+// @Resource
+// private RedisTemplate redisTemplate;
+//
+// /**
+// * 报文解析
+// * @param testString
+// * @return
+// */
+// @Override
+// public JSONObject inciseCarMessage(String testString) {
+// //根据空格拆分切割数据字符串
+// String[] split = testString.split(" ");
+// StringBuilder stringBuilder = new StringBuilder();
+// for (String conversion : split) {
+// //将16进制字符串转换为对应的10进制
+// int inciseindex = Integer.parseInt(conversion, 16);
+// // 将10进制转换为对应的字符
+// stringBuilder.append((char) inciseindex);
+// }
+// //切取车辆VIN
+// String substring = stringBuilder.substring(1, 18);
+// log.info("车辆的VIN码:" + substring);
+// //根据给定的vehicleVin(车辆VIN号)获取对应的模板车辆分类carMessageCartype
+// String selectcared = carInformationMapper.selectcarMessageCartype(substring);
+// //创建接受数据的数组
+// List messagesList ;
+//
+// try{
+// String redisKey = "carMessageList" + selectcared;
+//
+// if (redisTemplate.hasKey(redisKey)){
+// List list = redisTemplate.opsForList().range(redisKey , 0, -1);
+// messagesList = list.stream()
+// .map(objects -> JSON.parseObject(objects.toString(), CarMessage.class))
+// .toList();
+// log.info("Redis缓存查询成功");
+// }else {
+// messagesList = carInformationMapper.selectcarMessageCartype(selectcared);
+//
+// messagesList.forEach(
+// listReq -> redisTemplate.opsForList().rightPushAll(redisKey, JSON.toString(listReq) )
+// );
+// log.info("数据库查询成功");
+// }
+// }catch(Exception e){
+// throw new RuntimeException("获取报文模板失败");
+// }
+// //判断报文模板 列表 不为空
+// if(messagesList.isEmpty()){
+// throw new RuntimeException("报文模版为空");
+// }
+// //存储报文模板解析后的数据
+// JSONObject jsonObject = new JSONObject();
+// for (CarMessage carMessage : messagesList) {
+// //起始位下标
+// Integer startIndex = carMessage.getCarMessageStartIndex();
+// //结束位下标
+// Integer endIndex = carMessage.getCarMessageEndIndex();
+// //根据报文模板获取保温截取位置
+// String value = stringBuilder.substring(startIndex, endIndex);
+// //存入数据
+// jsonObject.put(carMessage.getMessageTypeName(), value);
+//
+// }
+// return jsonObject;
+//
+// }
+
}