Merge remote-tracking branch 'refs/remotes/origin/dev.template' into dev

dev.vehiclegateway
Number7 2024-10-02 16:39:56 +08:00
commit 9846c72b3b
6 changed files with 175 additions and 227 deletions

View File

@ -1,54 +0,0 @@
package com.muyu.template.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
*Kafka
* @author liuxinyue
* @Packagecom.muyu.mqtt.configure
* @Projectcloud-server
* @nameKafkaConsumerConfig
* @Date2024/9/28 23:42
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public KafkaConsumer kafkaConsumer() {
Map<String, Object> configs = new HashMap<>();
//kafka服务端的IP和端口,格式:(ip:port)
configs.put("bootstrap.servers", "47.101.53.251:9092");
//开启consumer的偏移量(offset)自动提交到Kafka
configs.put("enable.auto.commit", true);
//consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒
configs.put("auto.commit.interval", 5000);
//在Kafka中没有初始化偏移量或者当前偏移量不存在情况
//earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量
//latest, 在偏移量无效的情况下, 自动重置为最新的偏移量
//none, 在偏移量无效的情况下, 抛出异常.
configs.put("auto.offset.reset", "latest");
//请求阻塞的最大时间(毫秒)
configs.put("fetch.max.wait", 500);
//请求应答的最小字节数
configs.put("fetch.min.size", 1);
//心跳间隔时间(毫秒)
configs.put("heartbeat-interval", 3000);
//一次调用poll返回的最大记录条数
configs.put("max.poll.records", 500);
//指定消费组
configs.put("group.id", "kafka_grop");
//指定key使用的反序列化类
Deserializer keyDeserializer = new StringDeserializer();
//指定value使用的反序列化类
Deserializer valueDeserializer = new StringDeserializer();
//创建Kafka消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer);
return kafkaConsumer;
}
}

View File

@ -1,49 +0,0 @@
package com.muyu.template.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka
* @author liuxinyue
* @Packagecom.muyu.mqtt.configure
* @Projectcloud-server
* @nameKafkaProviderConfig
* @Date2024/9/28 23:50
*/
@Configuration
public class KafkaProviderConfig {
@Bean
public KafkaProducer kafkaProducer() {
Map<String, Object> configs = new HashMap<>();
//#kafka服务端的IP和端口,格式:(ip:port)
configs.put("bootstrap.servers", "47.116.173.119:9092");
//客户端发送服务端失败的重试次数
configs.put("retries", 2);
//多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求.
//此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位)
configs.put("batch.size", 16384);
//生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位)
configs.put("buffer-memory", 33554432);
//生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
//acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
//acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失.
//acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失.
configs.put("acks", "-1");
//指定key使用的序列化类
Serializer keySerializer = new StringSerializer();
//指定value使用的序列化类
Serializer valueSerializer = new StringSerializer();
//创建Kafka生产者
KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer);
return kafkaProducer;
}
}

View File

@ -3,12 +3,18 @@ import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.kafka.config.KafkaProducerConfig;
import com.muyu.common.redis.service.RedisService; import com.muyu.common.redis.service.RedisService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
@ -23,129 +29,135 @@ import java.util.List;
@Log4j2 @Log4j2
@Component @Component
public class MqttConfigure { public class MqttConfigure {
// @Autowired
// private RedisService redisService;
//
// @Resource
// private SysCarService sysCarService;
//
// @Resource
// private SysCarServiceImpl service;
//
// @Resource
// private MessageTemplateTypeService messageTemplateTypeService;
//
// @Autowired
// private RedisTemplate redisTemplate;
//
// @PostConstruct
// public void MQTTMonitoring(){
//
// String topic = "vehicle";
// int qos = 2;
// String broker = "tcp://47.101.53.251:1883";
// String clientId = "测试mqtt";
// try {
// MqttClient sampleClient = new MqttClient(broker, clientId);
// MqttConnectOptions connOpts = new MqttConnectOptions();
// //是否清空session
// connOpts.setCleanSession(true);
// log.info("Connecting to broker: " + broker);
// //连接
// sampleClient.connect(connOpts);
// sampleClient.subscribe(topic,qos);
// sampleClient.setCallback(new MqttCallback() {
// //连接丢失(报错)
// @Override
// public void connectionLost(Throwable throwable) {
// log.error("error:"+throwable.getMessage());
// }
// //消息已经接收到
// @Override
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// // 将MQTT消息转换为字符串
// String messageContent = new String(mqttMessage.getPayload());
// // 解析JSON字符串
// JSONObject jsonObject = new JSONObject(messageContent);
// // 从JSON对象中获取"msg"字段的值
// String msgValue = jsonObject.getStr("msg");
// messageParsing(msgValue);
// log.info("接收到的值为:"+msgValue);
// }
// //交付完成
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//
// }
// });
// } catch(MqttException me) {
// System.out.println("reason "+me.getReasonCode());
// System.out.println("msg "+me.getMessage());
// System.out.println("loc "+me.getLocalizedMessage());
// System.out.println("cause "+me.getCause());
// System.out.println("excep "+me);
// me.printStackTrace();
// }
// }
//
// public JSONObject messageParsing(String templateMessage) {
// //给一个JSON对象
// JSONObject jsonObject = new JSONObject();
// //先截取出VIN码 然后根据VIN码查询这个车属于什么类型
// if (templateMessage.length() < 18) {
// throw new RuntimeException("The vehicle message is incorrect");
// }
// //将报文进行切割
// String[] hexArray = templateMessage.split(" ");
// StringBuilder result = new StringBuilder();
// for (String hex : hexArray) {
// int decimal = Integer.parseInt(hex, 16);
// result.append((char) decimal);
// }
// //取出VIN码
// String carVin = result.substring(0, 18 - 1);
// log.info("carVin码为:" + carVin);
// //根据VIN码获取车辆信息
// SysCar carByVin = service.findCarByVin(carVin);
// log.info("车辆信息为:" + carByVin);
// //对应车辆所对应的报文模版
// Integer templateId = carByVin.getTemplateId();
// List<MessageTemplateType> templateTypeList;
// //key
// String redisKey = "messageTemplateType" + templateId;
// //key存在
// if (redisTemplate.hasKey(redisKey)) {
//
// List list = redisTemplate.opsForList().range(redisKey, 0, -1);
//
// templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
// .toList();
// } else {
// List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
// templateTypeList = templateTypeList1;
// templateTypeList.forEach(
// templateType ->
// redisTemplate.opsForList().rightPush(
// redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType)
// )
// );
// }
// //将模版里面有的配置进行循环
// for (MessageTemplateType messageTemplateType : templateTypeList) {
// //开始位置
// Integer startIndex = messageTemplateType.getStartIndex() - 1;
// //结束位置
// Integer endIndex = messageTemplateType.getEndIndex();
// //将每个解析后的字段都存入到JSON对象中
// jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
// }
//
// System.out.println("哈哈哈红红火火恍恍惚惚");
// log.info("解析后的报文是:" + jsonObject);
//
// return jsonObject;
// }
//
@Autowired
private RedisService redisService;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private SysCarService service;
@Autowired
private MessageTemplateTypeService messageTemplateTypeService;
@PostConstruct
public void MQTTMonitoring(){
String topic = "vehicle";
int qos = 2;
String broker = "tcp://47.101.53.251:1883";
String clientId = "lxy";
try {
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
//是否清空session
connOpts.setCleanSession(false);
log.info("Connecting to broker: " + broker);
//连接
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.setCallback(new MqttCallback() {
//连接丢失(报错)
@Override
public void connectionLost(Throwable throwable) {
log.error("error:"+throwable.getMessage());
}
//消息已经接收到
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// 将MQTT消息转换为字符串
String messageContent = new String(mqttMessage.getPayload());
// 解析JSON字符串
JSONObject jsonObject = new JSONObject(messageContent);
// 从JSON对象中获取"msg"字段的值
String msgValue = jsonObject.getStr("msg");
// messageParsing(msgValue);
log.info("接收到的值为:"+msgValue);
}
//交付完成
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
public JSONObject messageParsing(String templateMessage) {
//给一个JSON对象
JSONObject jsonObject = new JSONObject();
//先截取出VIN码 然后根据VIN码查询这个车属于什么类型
if (templateMessage.length() < 18) {
throw new RuntimeException("The vehicle message is incorrect");
}
//将报文进行切割
String[] hexArray = templateMessage.split(" ");
StringBuilder result = new StringBuilder();
for (String hex : hexArray) {
int decimal = Integer.parseInt(hex, 16);
result.append((char) decimal);
}
//取出VIN码
String carVin = result.substring(0, 18 - 1);
log.info("carVin码为:" + carVin);
//根据VIN码获取车辆信息
SysCar carByVin = service.findCarByVin(carVin);
log.info("车辆信息为:" + carByVin);
//对应车辆所对应的报文模版
Integer templateId = carByVin.getTemplateId();
List<MessageTemplateType> templateTypeList;
//key
String redisKey = "messageTemplateType" + templateId;
//key存在
if (redisTemplate.hasKey(redisKey)) {
List list = redisTemplate.opsForList().range(redisKey, 0, -1);
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
.toList();
} else {
List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
templateTypeList = templateTypeList1;
templateTypeList.forEach(
templateType ->
redisTemplate.opsForList().rightPush(
redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType)
)
);
}
//将模版里面有的配置进行循环
for (MessageTemplateType messageTemplateType : templateTypeList) {
//开始位置
Integer startIndex = messageTemplateType.getStartIndex() - 1;
//结束位置
Integer endIndex = messageTemplateType.getEndIndex();
//将每个解析后的字段都存入到JSON对象中
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
}
log.info("解析后的报文是:" + jsonObject);
sendKafka(jsonObject);
log.info("发送kafka成功");
return jsonObject;
}
public void sendKafka(JSONObject jsonObject){
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());
kafkaProducer.send(stringStringProducerRecord);
log.info("kafka发送成功");
}
} }

View File

@ -22,6 +22,12 @@
</description> </description>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<!--mqtt依赖--> <!--mqtt依赖-->
<dependency> <dependency>
<groupId>org.springframework.integration</groupId> <groupId>org.springframework.integration</groupId>
@ -29,6 +35,12 @@
<version>6.2.5</version> <version>6.2.5</version>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
<version>3.6.3</version>
</dependency>
<!--kafka--> <!--kafka-->
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>

View File

@ -24,6 +24,12 @@
<dependencies> <dependencies>
<!-- Spring Boot Starter for AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>saas-cache</artifactId> <artifactId>saas-cache</artifactId>

View File

@ -1,17 +1,25 @@
package com.muyu.server.controller; package com.muyu.server.controller;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.req.SysCarReq; import com.muyu.common.domain.req.SysCarReq;
import com.muyu.common.domain.resp.SysCarFaultLogVo; import com.muyu.common.domain.resp.SysCarFaultLogVo;
import com.muyu.common.kafka.config.KafkaProducerConfig;
import com.muyu.common.redis.service.RedisService;
import com.muyu.server.service.SysCarService; import com.muyu.server.service.SysCarService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.security.auth.callback.Callback;
import java.util.List; import java.util.List;
/** /**
@ -30,6 +38,16 @@ public class SysCarController {
@Autowired @Autowired
private SysCarService sysCarService; private SysCarService sysCarService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisService redisService;
@Autowired
private KafkaProducer kafkaProducer;
/** /**
* *
* @param sysCarReq * @param sysCarReq
@ -91,10 +109,13 @@ public class SysCarController {
* @return * @return
*/ */
@PostMapping("/findCarByVin") @PostMapping("/findCarByVin")
// @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息") @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){ public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){
List<Object> carList = redisService.getCacheList("carList");
log.info("从缓存查到的数据是:"+carList);
return Result.success(sysCarService.findCarByVin(carVin)); return Result.success(sysCarService.findCarByVin(carVin));
} }
} }