diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaConsumerConfig.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaConsumerConfig.java deleted file mode 100644 index 921077f..0000000 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.muyu.template.config; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - *Kafka消费者配置 - * @author liuxinyue - * @Package:com.muyu.mqtt.configure - * @Project:cloud-server - * @name:KafkaConsumerConfig - * @Date:2024/9/28 23:42 - */ -@Configuration -public class KafkaConsumerConfig { - @Bean - public KafkaConsumer kafkaConsumer() { - Map configs = new HashMap<>(); - //kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "47.101.53.251:9092"); - //开启consumer的偏移量(offset)自动提交到Kafka - configs.put("enable.auto.commit", true); - //consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒 - configs.put("auto.commit.interval", 5000); - //在Kafka中没有初始化偏移量或者当前偏移量不存在情况 - //earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量 - //latest, 在偏移量无效的情况下, 自动重置为最新的偏移量 - //none, 在偏移量无效的情况下, 抛出异常. - configs.put("auto.offset.reset", "latest"); - //请求阻塞的最大时间(毫秒) - configs.put("fetch.max.wait", 500); - //请求应答的最小字节数 - configs.put("fetch.min.size", 1); - //心跳间隔时间(毫秒) - configs.put("heartbeat-interval", 3000); - //一次调用poll返回的最大记录条数 - configs.put("max.poll.records", 500); - //指定消费组 - configs.put("group.id", "kafka_grop"); - //指定key使用的反序列化类 - Deserializer keyDeserializer = new StringDeserializer(); - //指定value使用的反序列化类 - Deserializer valueDeserializer = new StringDeserializer(); - //创建Kafka消费者 - KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer); - return kafkaConsumer; - } -} diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaProviderConfig.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaProviderConfig.java deleted file mode 100644 index 00f5617..0000000 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/KafkaProviderConfig.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.muyu.template.config; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - * Kafka生产者配置 - * @author liuxinyue - * @Package:com.muyu.mqtt.configure - * @Project:cloud-server - * @name:KafkaProviderConfig - * @Date:2024/9/28 23:50 - */ -@Configuration -public class KafkaProviderConfig { - - @Bean - public KafkaProducer kafkaProducer() { - Map configs = new HashMap<>(); - //#kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "47.116.173.119:9092"); - //客户端发送服务端失败的重试次数 - configs.put("retries", 2); - //多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求. - //此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位) - configs.put("batch.size", 16384); - //生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位) - configs.put("buffer-memory", 33554432); - //生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化 - //acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1. - //acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失. - //acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失. - configs.put("acks", "-1"); - //指定key使用的序列化类 - Serializer keySerializer = new StringSerializer(); - //指定value使用的序列化类 - Serializer valueSerializer = new StringSerializer(); - //创建Kafka生产者 - KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer); - return kafkaProducer; - } - -} diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java index 7593fd5..b72a7fa 100644 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java @@ -3,12 +3,18 @@ import cn.hutool.json.JSONObject; import com.alibaba.fastjson2.JSON; import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.SysCar; +import com.muyu.common.kafka.config.KafkaProducerConfig; import com.muyu.common.redis.service.RedisService; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; + import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.List; @@ -23,129 +29,135 @@ import java.util.List; @Log4j2 @Component public class MqttConfigure { -// @Autowired -// private RedisService redisService; -// -// @Resource -// private SysCarService sysCarService; -// -// @Resource -// private SysCarServiceImpl service; -// -// @Resource -// private MessageTemplateTypeService messageTemplateTypeService; -// -// @Autowired -// private RedisTemplate redisTemplate; -// -// @PostConstruct -// public void MQTTMonitoring(){ -// -// String topic = "vehicle"; -// int qos = 2; -// String broker = "tcp://47.101.53.251:1883"; -// String clientId = "测试mqtt"; -// try { -// MqttClient sampleClient = new MqttClient(broker, clientId); -// MqttConnectOptions connOpts = new MqttConnectOptions(); -// //是否清空session -// connOpts.setCleanSession(true); -// log.info("Connecting to broker: " + broker); -// //连接 -// sampleClient.connect(connOpts); -// sampleClient.subscribe(topic,qos); -// sampleClient.setCallback(new MqttCallback() { -// //连接丢失(报错) -// @Override -// public void connectionLost(Throwable throwable) { -// log.error("error:"+throwable.getMessage()); -// } -// //消息已经接收到 -// @Override -// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { -// // 将MQTT消息转换为字符串 -// String messageContent = new String(mqttMessage.getPayload()); -// // 解析JSON字符串 -// JSONObject jsonObject = new JSONObject(messageContent); -// // 从JSON对象中获取"msg"字段的值 -// String msgValue = jsonObject.getStr("msg"); -// messageParsing(msgValue); -// log.info("接收到的值为:"+msgValue); -// } -// //交付完成 -// @Override -// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { -// -// } -// }); -// } catch(MqttException me) { -// System.out.println("reason "+me.getReasonCode()); -// System.out.println("msg "+me.getMessage()); -// System.out.println("loc "+me.getLocalizedMessage()); -// System.out.println("cause "+me.getCause()); -// System.out.println("excep "+me); -// me.printStackTrace(); -// } -// } -// -// public JSONObject messageParsing(String templateMessage) { -// //给一个JSON对象 -// JSONObject jsonObject = new JSONObject(); -// //先截取出VIN码 然后根据VIN码查询这个车属于什么类型 -// if (templateMessage.length() < 18) { -// throw new RuntimeException("The vehicle message is incorrect"); -// } -// //将报文进行切割 -// String[] hexArray = templateMessage.split(" "); -// StringBuilder result = new StringBuilder(); -// for (String hex : hexArray) { -// int decimal = Integer.parseInt(hex, 16); -// result.append((char) decimal); -// } -// //取出VIN码 -// String carVin = result.substring(0, 18 - 1); -// log.info("carVin码为:" + carVin); -// //根据VIN码获取车辆信息 -// SysCar carByVin = service.findCarByVin(carVin); -// log.info("车辆信息为:" + carByVin); -// //对应车辆所对应的报文模版 -// Integer templateId = carByVin.getTemplateId(); -// List templateTypeList; -// //key -// String redisKey = "messageTemplateType" + templateId; -// //key存在 -// if (redisTemplate.hasKey(redisKey)) { -// -// List list = redisTemplate.opsForList().range(redisKey, 0, -1); -// -// templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class)) -// .toList(); -// } else { -// List templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId); -// templateTypeList = templateTypeList1; -// templateTypeList.forEach( -// templateType -> -// redisTemplate.opsForList().rightPush( -// redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType) -// ) -// ); -// } -// //将模版里面有的配置进行循环 -// for (MessageTemplateType messageTemplateType : templateTypeList) { -// //开始位置 -// Integer startIndex = messageTemplateType.getStartIndex() - 1; -// //结束位置 -// Integer endIndex = messageTemplateType.getEndIndex(); -// //将每个解析后的字段都存入到JSON对象中 -// jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex)); -// } -// -// System.out.println("哈哈哈红红火火恍恍惚惚"); -// log.info("解析后的报文是:" + jsonObject); -// -// return jsonObject; -// } -// + @Autowired + private RedisService redisService; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private KafkaProducer kafkaProducer; + + @Autowired + private SysCarService service; + + @Autowired + private MessageTemplateTypeService messageTemplateTypeService; + + @PostConstruct + public void MQTTMonitoring(){ + + String topic = "vehicle"; + int qos = 2; + String broker = "tcp://47.101.53.251:1883"; + String clientId = "lxy"; + try { + MqttClient sampleClient = new MqttClient(broker, clientId); + MqttConnectOptions connOpts = new MqttConnectOptions(); + //是否清空session + connOpts.setCleanSession(false); + log.info("Connecting to broker: " + broker); + //连接 + sampleClient.connect(connOpts); + sampleClient.subscribe(topic,0); + sampleClient.setCallback(new MqttCallback() { + //连接丢失(报错) + @Override + public void connectionLost(Throwable throwable) { + log.error("error:"+throwable.getMessage()); + } + //消息已经接收到 + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + // 将MQTT消息转换为字符串 + String messageContent = new String(mqttMessage.getPayload()); + // 解析JSON字符串 + JSONObject jsonObject = new JSONObject(messageContent); + // 从JSON对象中获取"msg"字段的值 + String msgValue = jsonObject.getStr("msg"); +// messageParsing(msgValue); + log.info("接收到的值为:"+msgValue); + } + //交付完成 + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + } catch(MqttException me) { + System.out.println("reason "+me.getReasonCode()); + System.out.println("msg "+me.getMessage()); + System.out.println("loc "+me.getLocalizedMessage()); + System.out.println("cause "+me.getCause()); + System.out.println("excep "+me); + me.printStackTrace(); + } + } + + public JSONObject messageParsing(String templateMessage) { + //给一个JSON对象 + JSONObject jsonObject = new JSONObject(); + //先截取出VIN码 然后根据VIN码查询这个车属于什么类型 + if (templateMessage.length() < 18) { + throw new RuntimeException("The vehicle message is incorrect"); + } + //将报文进行切割 + String[] hexArray = templateMessage.split(" "); + StringBuilder result = new StringBuilder(); + for (String hex : hexArray) { + int decimal = Integer.parseInt(hex, 16); + result.append((char) decimal); + } + //取出VIN码 + String carVin = result.substring(0, 18 - 1); + log.info("carVin码为:" + carVin); + //根据VIN码获取车辆信息 + SysCar carByVin = service.findCarByVin(carVin); + log.info("车辆信息为:" + carByVin); + //对应车辆所对应的报文模版 + Integer templateId = carByVin.getTemplateId(); + List templateTypeList; + //key + String redisKey = "messageTemplateType" + templateId; + //key存在 + if (redisTemplate.hasKey(redisKey)) { + + List list = redisTemplate.opsForList().range(redisKey, 0, -1); + + templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class)) + .toList(); + } else { + List templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId); + templateTypeList = templateTypeList1; + templateTypeList.forEach( + templateType -> + redisTemplate.opsForList().rightPush( + redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType) + ) + ); + } + //将模版里面有的配置进行循环 + for (MessageTemplateType messageTemplateType : templateTypeList) { + //开始位置 + Integer startIndex = messageTemplateType.getStartIndex() - 1; + //结束位置 + Integer endIndex = messageTemplateType.getEndIndex(); + //将每个解析后的字段都存入到JSON对象中 + jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex)); + } + + log.info("解析后的报文是:" + jsonObject); + sendKafka(jsonObject); + log.info("发送kafka成功"); + return jsonObject; + } + + + public void sendKafka(JSONObject jsonObject){ + ProducerRecord stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); + kafkaProducer.send(stringStringProducerRecord); + log.info("kafka发送成功"); + } } diff --git a/cloud-modules/saas/pom.xml b/cloud-modules/saas/pom.xml index da04f8a..6810929 100644 --- a/cloud-modules/saas/pom.xml +++ b/cloud-modules/saas/pom.xml @@ -22,6 +22,12 @@ + + + org.springframework.amqp + spring-rabbit + + org.springframework.integration @@ -29,6 +35,12 @@ 6.2.5 + + com.muyu + cloud-common-kafka + 3.6.3 + + org.springframework.kafka diff --git a/cloud-modules/saas/saas-server/pom.xml b/cloud-modules/saas/saas-server/pom.xml index 5d7bfd0..bb9544a 100644 --- a/cloud-modules/saas/saas-server/pom.xml +++ b/cloud-modules/saas/saas-server/pom.xml @@ -24,6 +24,12 @@ + + + org.springframework.boot + spring-boot-starter-amqp + + com.muyu saas-cache diff --git a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java index 440b010..1717862 100644 --- a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java +++ b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java @@ -1,17 +1,25 @@ package com.muyu.server.controller; +import com.alibaba.fastjson2.JSONObject; import com.muyu.common.core.domain.Result; import com.muyu.common.domain.SysCar; import com.muyu.common.domain.req.SysCarReq; import com.muyu.common.domain.resp.SysCarFaultLogVo; +import com.muyu.common.kafka.config.KafkaProducerConfig; +import com.muyu.common.redis.service.RedisService; import com.muyu.server.service.SysCarService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import javax.security.auth.callback.Callback; import java.util.List; /** @@ -30,6 +38,16 @@ public class SysCarController { @Autowired private SysCarService sysCarService; + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private RedisService redisService; + + @Autowired + private KafkaProducer kafkaProducer; + + /** * 车辆列表 * @param sysCarReq 参数 @@ -91,10 +109,13 @@ public class SysCarController { * @return */ @PostMapping("/findCarByVin") -// @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息") + @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息") public Result findCarByVin(@RequestParam("carVin") String carVin){ + List carList = redisService.getCacheList("carList"); + log.info("从缓存查到的数据是:"+carList); return Result.success(sysCarService.findCarByVin(carVin)); } + }