diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index 79c42b6..eead750 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -22,6 +22,7 @@ cloud-common-rabbit cloud-common-saas cloud-common-wechat + cloud-common-kafka cloud-common diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml index e0e09eb..b7bfbc8 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml @@ -33,6 +33,22 @@ com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config + + + org.apache.kafka + kafka-clients + + + + org.springframework.cloud + spring-cloud-starter-openfeign + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + com.alibaba.cloud @@ -83,6 +99,12 @@ pagehelper 6.0.0 + + com.muyu + cloud-common-kafka + 3.6.3 + compile + cloud-electronic diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java index 8b350cd..67609f3 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceClazzController.java @@ -37,7 +37,9 @@ public class CarFenceClazzController { @Operation(summary = "查询数据",description = "查询数据") public Result> selectConnect(){ List connects = carFenceClazzService.list(); + log.info("查询数据成功"); return Result.success( + connects, "操作成功" ); } diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java index bb4599f..eb25bc3 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/controller/CarFenceController.java @@ -43,6 +43,7 @@ public class CarFenceController { @Validated @RequestBody CarFenceReq req ){ Page connects = carFenceService.selectCarFence(req); + log.info("查询数据:"+ connects); return Result.success( connects, "操作成功" ); @@ -57,11 +58,13 @@ public class CarFenceController { @Validated @RequestBody CarFence carFence ){ Boolean connects = carFenceService.addCarFence(carFence); + log.info("shd"); return connects?Result.success( null, "操作成功" ):Result.success( null, "操作失败" ); + } /** @@ -73,6 +76,8 @@ public class CarFenceController { @Validated @RequestBody CarGroupReq req ){ Boolean connects = carFenceService.addCarGroup(req); + log.info("添加数据:"+ connects); + return connects?Result.success( null, "操作成功" ):Result.success( diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java new file mode 100644 index 0000000..24e4999 --- /dev/null +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/feign/KafkaClient.java @@ -0,0 +1,18 @@ +//package com.muyu.server.feign; +// +//import org.springframework.cloud.openfeign.FeignClient; +//import org.springframework.web.bind.annotation.GetMapping; +// +//@FeignClient(name = "cloud-modules-carData",value = "KafkaProducerController") +//public abstract class KafkaClient { +// /** +// * 处理"/produceTest"的GET请求,用于执行特定的测试任务 +// * 此方法的具体实现将在子类中定义 +// * @return 返回类型为String,表示该方法的执行结果 +// * @return +// */ +// @GetMapping("/produceTest") +// public abstract String produceTest(); +// +// +//} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java index 1e08ebc..3cf4ba0 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mapper/CarInformationMapper.java @@ -9,6 +9,8 @@ import com.muyu.domain.req.CarInformationListReq; import com.muyu.domain.req.CarInformationUpdReq; import com.muyu.domain.resp.CarInformationResp; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; import java.util.List; @@ -18,4 +20,12 @@ import java.util.List; @Mapper public interface CarInformationMapper extends MPJBaseMapper { + + /** + * 根据车辆VIN码获取车辆类型(报文模板分类用的值) + * @param carInformationVIN + * @return + */ + @Select("SELECT `car_information`.car_Information_Type FROM `car_information` WHERE `car_information`.car_information_VIN = #{carInformationVIN}") + Long selectcarMessageCartype(@Param("carInformationVIN") String carInformationVIN); } diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java new file mode 100644 index 0000000..007c82e --- /dev/null +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/mqtt/Demo.java @@ -0,0 +1,105 @@ +package com.muyu.server.mqtt; + +import com.alibaba.fastjson.JSONObject; + +import com.muyu.common.kafka.constants.KafkaConstants; +import com.muyu.domain.CarMessage; +import com.muyu.server.service.CarMessageService; +import jakarta.annotation.PostConstruct; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@Component +public class Demo { + @Resource + private CarMessageService service; + @Resource + private KafkaProducer kafkaProducer; + @PostConstruct + public void test() { + + String topic = "vehicle"; + String content = "Message from MqttPublishSample"; + int qos = 2; + String broker = "tcp://106.15.136.7:1883"; + String clientId = "JavaSample"; + + try { + // 第三个参数为空,默认持久化策略 + MqttClient sampleClient = new MqttClient(broker, clientId); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + System.out.println("Connecting to broker: "+broker); + sampleClient.connect(connOpts); + sampleClient.subscribe(topic,0); + sampleClient.setCallback(new MqttCallback() { + // 连接丢失 + @Override + public void connectionLost(Throwable throwable) { + + } + // 连接成功 + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + + List list= service.selectCarMessageList(1,2); + String str = new String( mqttMessage.getPayload() ); + System.out.println(str); + String[] test = str.split(" "); + String[] results = new String[list.size()]; + List> futures = new ArrayList<>(); + for (CarMessage carmsg : list) { + futures.add(CompletableFuture.supplyAsync(() -> { + int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1; + int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex())); + StringBuilder hexBuilder = new StringBuilder(); + for (int j = startIndex; j < endIndex; j++) { + hexBuilder.append(test[j]); + } + // 创建16进制的对象 + String hex = hexBuilder.toString(); + // 转橙字符数组 + char[] result = new char[hex.length() / 2]; + for (int x = 0; x < hex.length(); x += 2) { + // 先转十进制 + int high = Character.digit(hex.charAt(x), 16); + // 转二进制 + int low = Character.digit(hex.charAt(x + 1), 16); + // 转字符 + result[x / 2] = (char) ((high << 4) + low); + } + return new String(result); + })); + } + for (int i = 0; i < futures.size(); i++) { + results[i] = futures.get(i).get(); + } + String jsonString = JSONObject.toJSONString( results ); + ProducerRecord producerRecord = new ProducerRecord<>( KafkaConstants.KafkaTopic, jsonString); + kafkaProducer.send(producerRecord); + } + // 接收信息 + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + } catch(MqttException me) { + System.out.println("reason "+me.getReasonCode()); + System.out.println("msg "+me.getMessage()); + System.out.println("loc "+me.getLocalizedMessage()); + System.out.println("cause "+me.getCause()); + System.out.println("excep "+me); + me.printStackTrace(); + } + } + +} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java index 14e60f3..b44b376 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/CarMessageService.java @@ -1,7 +1,9 @@ package com.muyu.server.service; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.service.IService; import com.muyu.domain.CarMessage; +import com.muyu.domain.resp.CarInformationResp; import com.muyu.domain.resp.CarMessageResp; import java.util.List; @@ -43,4 +45,19 @@ public interface CarMessageService extends IService { */ List selectJoinList(Long id); + + //报文切割 + /** + * 分割字符串 + * 解析字符 + * 获取报文最终信息 + */ + JSONObject inciseCarMessage(String testString); + + + List selectCarMessageList(int i, int i1); + + + + } diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java index 3ea977e..74341ef 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/server/service/impl/CarMessageServiceImpl.java @@ -1,5 +1,8 @@ package com.muyu.server.service.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.github.yulichang.wrapper.MPJLambdaWrapper; import com.muyu.common.core.utils.StringUtils; @@ -7,15 +10,21 @@ import com.muyu.domain.CarMessage; import com.muyu.domain.CarMessageType; import com.muyu.domain.CarType; import com.muyu.domain.resp.CarMessageResp; +import com.muyu.server.mapper.CarInformationMapper; import com.muyu.server.mapper.CarMessageMapper; import com.muyu.server.service.CarMessageService; +import lombok.extern.log4j.Log4j2; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Collection; import java.util.List; +import java.util.Objects; /** * 报文模板展示列表业务实现层 */ +@Log4j2 @Service public class CarMessageServiceImpl extends ServiceImpl @@ -23,6 +32,11 @@ public class CarMessageServiceImpl @Resource private CarMessageMapper carMessageMapper; + @Resource + private CarInformationMapper carInformationMapper; + + + /** * 根据所属车类别 解析 车辆报文模板 * @param @@ -91,5 +105,89 @@ public class CarMessageServiceImpl .eq(StringUtils.isNotNull(id),CarMessage::getCarMessageCartype, id)); } + @Override + public JSONObject inciseCarMessage(String testString) { + return null; + } + + @Override + public List selectCarMessageList(int id, int modelCode) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CarMessage::getCarMessageId, id); + queryWrapper.eq(CarMessage::getCarMessageType, modelCode); + return this.list(queryWrapper); + } + + + + //报文处理 +// @Resource +// private RedisTemplate redisTemplate; +// +// /** +// * 报文解析 +// * @param testString +// * @return +// */ +// @Override +// public JSONObject inciseCarMessage(String testString) { +// //根据空格拆分切割数据字符串 +// String[] split = testString.split(" "); +// StringBuilder stringBuilder = new StringBuilder(); +// for (String conversion : split) { +// //将16进制字符串转换为对应的10进制 +// int inciseindex = Integer.parseInt(conversion, 16); +// // 将10进制转换为对应的字符 +// stringBuilder.append((char) inciseindex); +// } +// //切取车辆VIN +// String substring = stringBuilder.substring(1, 18); +// log.info("车辆的VIN码:" + substring); +// //根据给定的vehicleVin(车辆VIN号)获取对应的模板车辆分类carMessageCartype +// String selectcared = carInformationMapper.selectcarMessageCartype(substring); +// //创建接受数据的数组 +// List messagesList ; +// +// try{ +// String redisKey = "carMessageList" + selectcared; +// +// if (redisTemplate.hasKey(redisKey)){ +// List list = redisTemplate.opsForList().range(redisKey , 0, -1); +// messagesList = list.stream() +// .map(objects -> JSON.parseObject(objects.toString(), CarMessage.class)) +// .toList(); +// log.info("Redis缓存查询成功"); +// }else { +// messagesList = carInformationMapper.selectcarMessageCartype(selectcared); +// +// messagesList.forEach( +// listReq -> redisTemplate.opsForList().rightPushAll(redisKey, JSON.toString(listReq) ) +// ); +// log.info("数据库查询成功"); +// } +// }catch(Exception e){ +// throw new RuntimeException("获取报文模板失败"); +// } +// //判断报文模板 列表 不为空 +// if(messagesList.isEmpty()){ +// throw new RuntimeException("报文模版为空"); +// } +// //存储报文模板解析后的数据 +// JSONObject jsonObject = new JSONObject(); +// for (CarMessage carMessage : messagesList) { +// //起始位下标 +// Integer startIndex = carMessage.getCarMessageStartIndex(); +// //结束位下标 +// Integer endIndex = carMessage.getCarMessageEndIndex(); +// //根据报文模板获取保温截取位置 +// String value = stringBuilder.substring(startIndex, endIndex); +// //存入数据 +// jsonObject.put(carMessage.getMessageTypeName(), value); +// +// } +// return jsonObject; +// +// } + }