feat commit

网关节点上线,车辆上线,负载均衡功能
公共发邮件接口模块创建任务(yyh)
master_suzejing
玉安君 2024-04-12 11:41:59 +08:00
parent 66ca1afa4a
commit 6454d72b8d
18 changed files with 448 additions and 160 deletions

View File

@ -1,6 +1,5 @@
package com.zhilian.common.redis.service;
import io.lettuce.core.output.DoubleListOutput;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
@ -272,6 +271,9 @@ public class RedisService {
redisTemplate.opsForZSet().add(zkey, collect);
}
/**
* Zset
*
@ -326,7 +328,7 @@ public class RedisService {
}
/**
*
* ,delta,
* @param zkey
* @param value
* @param delta
@ -371,12 +373,13 @@ public class RedisService {
/**
*
*
* @param zkey
* @return
*/
public Map<Object, Double> getCacheZsetMin(final String zkey){
public ZSetOperations.TypedTuple getCacheZsetMin(final String zkey){
ZSetOperations.TypedTuple typedTuple = redisTemplate.opsForZSet().popMin(zkey);
return new HashMap<Object,Double>(){{put(typedTuple.getValue(),typedTuple.getScore());}};
return typedTuple;
}
@ -390,8 +393,13 @@ public class RedisService {
return redisTemplate.keys(pattern);
}
public void deleteCacheSet(String s) {
public void deleteCacheSet(String key) {
SetOperations setOperations = redisTemplate.opsForSet();
setOperations.remove(s);
setOperations.remove(key);
}
public <T> void deleteCacheSetValue(String key, T value) {
SetOperations setOperations = redisTemplate.opsForSet();
setOperations.remove(key,value);
}
}

View File

@ -12,7 +12,6 @@ import org.springframework.web.multipart.MultipartFile;
/**
*
*
* @author zhilian
*/
@FeignClient(contextId = "remoteFileService", value = ServiceNameConstants.FILE_SERVICE, fallbackFactory = RemoteFileFallbackFactory.class)

View File

@ -17,6 +17,7 @@
<module>zhilian-resolver</module>
<module>zhilian-business</module>
<module>zhilian-manager</module>
<module>zhilian-message</module>
</modules>
<artifactId>zhilian-modules</artifactId>

View File

@ -12,7 +12,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @Description
* @Author: LiYuan
* @CreateTime: 2024-03-31 10:13
* @Description: TODO
* @Description:
* @Version: 1.0
*/
@EnableCustomConfig

View File

@ -5,6 +5,7 @@ import java.util.List;
import com.zhilian.business.domain.BreakLog;
import com.zhilian.business.domain.BusinessBreak;
import com.zhilian.business.domain.request.BreakReq;
import org.apache.ibatis.annotations.Mapper;
/**
* Mapper
@ -12,6 +13,7 @@ import com.zhilian.business.domain.request.BreakReq;
* @author Yy
* @date 2024-04-07
*/
@Mapper
public interface BusinessBreakMapper
{
/**

View File

@ -1,2 +1,3 @@
Spring Boot Version: ${spring-boot.version}
Spring Application Name: ${spring.application.name}

View File

@ -8,7 +8,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @BelongsPackage: com.zhilian.manager
* @Author: LiYuan
* @CreateTime: 2024-03-31 10:17
* @Description: TODO
* @Description:
* @Version: 1.0
*/
@SpringBootApplication

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zhilian</groupId>
<artifactId>zhilian-modules</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>zhilian-message</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.zhilian</groupId>
<artifactId>zhilian-common-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -3,6 +3,10 @@ package com.zhilian.online;
import com.zhilian.common.security.annotation.EnableCustomConfig;
import com.zhilian.common.security.annotation.EnableMyFeignClients;
import com.zhilian.common.swagger.annotation.EnableCustomSwagger2;
import com.zhilian.online.controller.OnlineLoadCenterController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -21,6 +25,4 @@ public class ZhiLianOnlineApplication{
SpringApplication.run(ZhiLianOnlineApplication.class,args);
}
}

View File

@ -63,6 +63,36 @@ public class RabbitConfig {
*/
public static final String DEAD_ROUTING_KEY = "dead_routing";
/**
*
*/
public static final String DELAY_QUEUE_FOR_CAR = "delay_queue_for_car";
/**
*
*/
public static final String DELAY_EXCHANGE_FOR_CAR = "delay_exchange_for_car";
/**
*
*/
public static final String DELAY_ROUTING_FOR_CAR = "delay_routing_for_car";
/**
*
*/
public static final String DEAD_QUEUE_FOR_CAR = "dead_queue_for_car";
/**
*
*/
public static final String DEAD_EXCHANGE_FOR_CAR = "dead_exchange_for_car";
/**
*
*/
public static final String DEAD_ROUTING_FOR_CAR = "dead_routing_for_car";
/**
*
*
@ -153,11 +183,89 @@ public class RabbitConfig {
// }
// @Bean
// public MessageConverter jsonMessageConverter(){
// return new Jackson2JsonMessageConverter();
// }
/**
*
*
* @return Queue
*/
@Bean
Queue deadQueueForCar() {
log.info("死信队列ForCar创建成功");
return new Queue(DEAD_QUEUE_FOR_CAR, true, false, false);
}
/**
*
*
* @return DirectExchange
*/
@Bean
DirectExchange deadExchangeForCar() {
log.info("死信交换机ForCar创建成功");
return new DirectExchange(DEAD_EXCHANGE_FOR_CAR, true, false);
}
/**
*
*
* @return Binding
*/
@Bean
Binding deadBindingForCar() {
log.info("死信通道ForCar建立成功");
return BindingBuilder.bind(deadQueue())
.to(deadExchange())
.with(DEAD_ROUTING_FOR_CAR);
}
/**
*
*
* @return Queue
*/
@Bean
Queue delayQueueForCar() {
Map<String, Object> map = new HashMap<>();
//设置消息过期时间为30秒
map.put("x-message-ttl", 1000 * 30);
//设置消息过期后存储的死信交换机
map.put("x-dead-letter-exchange", DEAD_EXCHANGE_FOR_CAR);
//设置死信路由键
map.put("x-dead-letter-routing-key", DEAD_ROUTING_FOR_CAR);
log.info("延迟队列ForCar创建成功");
return new Queue(DELAY_QUEUE_FOR_CAR, true, false, false, map);
}
/**
*
*
* @return DirectExchange
*/
@Bean
DirectExchange delayExchangeForCar() {
log.info("延迟交换机ForCar创建成功");
return new DirectExchange(DELAY_EXCHANGE_FOR_CAR, true, false);
}
/**
*
*
* @return
*/
@Bean
Binding delayBindingForCar() {
log.info("延迟通道ForCar创建成功");
return BindingBuilder.bind(deadQueue())
.to(delayExchange())
.with(DELAY_ROUTING_FOR_CAR);
}
}

View File

@ -1,6 +1,5 @@
package com.zhilian.online.constans;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
@ -27,7 +26,11 @@ public class OnlineConstants {
/**
*
*/
public static final String NODE_LOAD_PREFIX = "node_load";
public static final String GATHER_LOAD_CONTROL = "gather_info_control";
/**
*
*/
public static final String ONLINE_VEHICLE = "online_vehicle:";

View File

@ -1,12 +1,26 @@
package com.zhilian.online.consumer;
import com.alibaba.fastjson.JSON;
import com.zhilian.common.core.constant.Constants;
import com.zhilian.common.core.domain.Result;
import com.zhilian.common.redis.service.RedisService;
import com.zhilian.online.config.RabbitConfig;
import com.zhilian.online.constans.OnlineConstants;
import com.zhilian.online.domain.Gather;
import com.zhilian.online.domain.req.GatherRegReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @BelongsProject: smart-cloud-server
* @BelongsPackage: com.zhilian.online.consumer
@ -19,21 +33,72 @@ import org.springframework.stereotype.Component;
@Slf4j
public class DeadQueueConsumer {
/**
* redis
*/
@Autowired
private RedisService redisService;
/**
* ,线
* fluxMQhttp,线
*/
@RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)
public void SecureOnline(String gatherRegReqMsg) {
GatherRegReq gatherRegReq = JSON.parseObject(gatherRegReqMsg, GatherRegReq.class);
log.info("开始检查节点{}的上线状态......",
gatherRegReq.getClientId());
public void SecureOnline(String gatherMsg) {
Gather gather = JSON.parseObject(gatherMsg, Gather.class);
log.info("开始检查节点{}的上线状态......", gatherMsg);
String ipAddress = gather.getIpAddress();
HttpURLConnection connection = null;
try {
ipAddress = "http://" + ipAddress;
URL url = new URL(ipAddress);
connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("GET");
int responseCode = connection.getResponseCode();
if (Constants.SUCCESS == responseCode){
log.info("节点上线成功");
log.info("节点上线失败");
}
if (Constants.FAIL == responseCode){
log.error("节点{}上线失败",gatherMsg);
//上线失败需要将该节点的负载均衡缓存删除
if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())){
redisService.removeCacheZsetBatch(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId());
}
}
} catch (Exception e) {
log.error("节点上线失败{}",e.getMessage());
e.printStackTrace();
}finally {
if (null != connection){
connection.disconnect();
}
}
}
// /**
// * 确定车辆上线成功
// * @param vin
// */
// @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_FOR_CAR)
// public void checkedConnectionOfVehicle(String vin){
// Set<Object> cacheSet = redisService.getCacheSet(OnlineConstants.ONLINE_VEHICLE);
// List<String> list= cacheSet.stream().map(item -> {
// return String.valueOf(item);
// }).collect(Collectors.toList());
// if (list.contains(vin)){
// log.info("{}车辆上线成功",vin);
// }else {
// log.error("{}车辆上线失败",vin);
//
//
// String clientId = (String) redisService.getCacheObject(OnlineConstants.ONLINE_VEHICLE + vin);
//
// redisService.deleteObject(OnlineConstants.ONLINE_VEHICLE+vin);
// redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_CONTROL);
// redisService.incrementScore(OnlineConstants.GATHER_LOAD_CONTROL + clientId ,gather,-1.0);
// }
// }
}

View File

@ -20,9 +20,9 @@ import java.util.function.Consumer;
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "test-topic")
public void handlerMsg(ConsumerRecord<Object,Object> record){
log.info("消费者消费消息信息为:{}", JSON.toJSONString(record));
}
// @KafkaListener(topics = "test-topic")
// public void handlerMsg(ConsumerRecord<Object,Object> record){
// log.info("消费者消费消息信息为:{}", JSON.toJSONString(record));
// }
}

View File

@ -1,5 +1,6 @@
package com.zhilian.online.controller;
import com.alibaba.fastjson.JSON;
import com.zhilian.common.core.domain.Result;
import com.zhilian.common.core.utils.ip.IpUtils;
import com.zhilian.common.core.web.controller.BaseController;
@ -9,10 +10,7 @@ import com.zhilian.online.service.OnlineLoadCenterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@ -47,9 +45,9 @@ public class OnlineLoadCenterController extends BaseController {
* @return: Result<OnlineAccount>
**/
@GetMapping("/applyForReg")
public Result<String> applyForReg(){
log.info("申请注册令牌");
return onlineLoadCenterService.applyForReg();
public Result<String> applyForReg(Gather gather) {
log.info("申请注册令牌{}", JSON.toJSONString(gather));
return onlineLoadCenterService.applyForReg(gather);
}
@ -60,21 +58,37 @@ public class OnlineLoadCenterController extends BaseController {
* @return: Result
**/
@PostMapping("/regGather")
public Result regGather(@Validated Gather gather){
public Result regGather(@Validated @RequestBody Gather gather) {
String ipAddr = IpUtils.getIpAddr(request);
gather.setIpAddress(ipAddr);
log.info("节点{}正在上线",gather);
log.info("节点{}正在上线", JSON.toJSONString(gather));
return onlineLoadCenterService.regGather(gather);
}
/**
* 线
* @return null
* 线
*
* @return gather
* @author: LiYuan
* @param: vin
* @return: Result
*/
public Result getConnectionOption(){
@GetMapping("/applyForConnectToGather")
public Result applyForConnectToGather(@RequestParam("vin") String vin) {
return Result.success(onlineLoadCenterService.applyForConnectToGather(vin));
}
return null;
/**
* @description:线
* @author: LiYuan
* @param: vin
* @return: Result
**/
@GetMapping("/applyForDisconnectToGather")
public Result applyForDisconnectToGather(@RequestParam("vin") String vin) {
onlineLoadCenterService.applyForDisconnectToGather(vin);
return Result.success();
}

View File

@ -25,14 +25,12 @@ public class Gather extends BaseEntity {
/**
* ID
*/
@TableId(type = IdType.INPUT)
private String clientId;
// /**
// * 收集节点登录令牌
// */
// @NotBlank
// private String token;
/**
*
*/
private String token;
/**
* broker

View File

@ -20,7 +20,7 @@ public interface OnlineLoadCenterService extends IService<VehicleAccount> {
* @param: vehicle
* @return: Result<OnlineAccount>
**/
Result<String> applyForReg();
Result<String> applyForReg(Gather gather);
/**
* @description: 使
@ -29,4 +29,22 @@ public interface OnlineLoadCenterService extends IService<VehicleAccount> {
* @return: Result
**/
Result regGather(Gather gather);
/**
* 线
* @author: LiYuan
* @param: vin
* @return gather
*/
Gather applyForConnectToGather(String vin);
/**
* @description:线
* @author: LiYuan
* @param: vin
* @return: Result
**/
void applyForDisconnectToGather(String vin);
}

View File

@ -9,15 +9,18 @@ import com.zhilian.online.config.RabbitConfig;
import com.zhilian.online.constans.OnlineConstants;
import com.zhilian.online.domain.Gather;
import com.zhilian.online.domain.VehicleAccount;
import com.zhilian.online.domain.req.GatherRegReq;
import com.zhilian.online.mapper.OnlineLoadCenterMapper;
import com.zhilian.online.service.OnlineLoadCenterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @version 1.0
@ -54,43 +57,82 @@ public class OnlineLoadCenterServiceImpl extends ServiceImpl<OnlineLoadCenterMap
* @return: Result<OnlineAccount>
**/
@Override
public Result<String> applyForReg() {
public Result<String> applyForReg(Gather gather) {
//生成一次性令牌
String token = IdUtils.fastSimpleUUID();
//将令牌信息缓存到Redis中
redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX , token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS);
redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX+gather.getClientId(),token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS);
//将账户信息返回客户端
//将令牌信息返回客户端
return Result.success(token);
}
/**
* @description: 线
* @description: 线
* @author: LiYuan
* @param: Gather
* @return: Result
**/
@Override
public Result regGather(Gather gather) {
// //判断登录令牌是否过期,一致
// if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) {
// return Result.error("令牌已过期");
// }
//判断登录令牌是否过期,一致
if (!redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) {
return Result.error("令牌已过期");
}
String token = redisService.getCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId());
if (!token.equals(gather.getToken())){
return Result.error("令牌错误");
}
//为该节点创建负载均衡缓存
if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) {
redisService.deleteObject(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId());
}
redisService.setCacheZsetValue(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId(), gather, 0.0);
redisService.setCacheZsetValue(OnlineConstants.GATHER_LOAD_CONTROL + gather.getClientId(), JSON.toJSONString(gather), 0.0);
// //向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线
// rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(gather));
//向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(gather));
return Result.success("节点上线");
}
/**
* 线
* @author: LiYuan
* @param: vin
* @return gather
*/
@Override
public Gather applyForConnectToGather(String vin) {
//判断车辆是否是我们的车
List<Object> cacheList = redisService.getCacheList("our_car");
List<String> list = cacheList.stream().map(item -> {
return String.valueOf(item);
}).collect(Collectors.toList());
if (!list.contains(vin)){
throw new RuntimeException("车辆未登记");
}
//获取负载最少的车辆进行链接
ZSetOperations.TypedTuple cacheZsetMin = redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_CONTROL);
Gather gather = JSON.parseObject((String) cacheZsetMin.getValue(), Gather.class);
//存放节点车辆信息
redisService.setCacheObject(OnlineConstants.ONLINE_VEHICLE+vin,gather.getClientId());
//发送延迟队列确定车辆上线
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_FOR_CAR, RabbitConfig.DELAY_ROUTING_FOR_CAR,vin);
return gather;
}
/**
* @description:线
* @author: LiYuan
* @param: vin
* @return: Result
**/
@Override
public void applyForDisconnectToGather(String vin) {
}
}

View File

@ -1,97 +1,97 @@
//package com.zhilian.resolver.model;
//import com.zhilian.common.core.utils.SpringUtils;
//import com.zhilian.common.redis.service.RedisService;
//import com.zhilian.common.resolver.domain.ResolverReportData;
//import com.zhilian.resolver.service.ResolverEventService;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.ConsumerRecords;
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import java.time.Duration;
//import java.util.*;
//import java.util.stream.Collectors;
//
//import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString;
//import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData;
//
///**
// * @ClassName ModelsKafkaMessage
// * @Description 描述
// * @Author Can.J
// * @Date 2024/4/8
// */
//@Component
//@Slf4j
//public class ModelsKafkaMessage {
// @Autowired
// private RedisService redisService;
// private static final String TOPIC_NAME = "vehicle-topic";
// private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092";
//
//
//
// /**
// * 消费者配置
// * @return
// */
// @PostConstruct
// private void consumerMessages() {
// Thread kafkaConsumerThread = new Thread(() -> {
// log.info("启动线程");
// Properties props = new Properties();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
// //创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
// try {
//
// //订阅主题
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
// //持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// records.forEach(record -> {
// System.out.println("接收到的数据:" + record.value());
// String str = hexStringToString(record.value());
// List<ResolverReportData> resolverReportDataList = parseVehicleData(str);
// for (ResolverReportData vehicleData : resolverReportDataList) {
// log.info("解析到车辆数据:{}", vehicleData);
//
// //获取vin
// String vin = vehicleData.getVin();
// //获取事件集
// Set<Object> cacheSet = redisService.getCacheSet("vehicle-event:" + vin);
// List<String> events = cacheSet.stream().map(item -> {
// return String.valueOf(item);
// }).collect(Collectors.toList());
// log.info("事件集合:{}",events);
//
// log.info("解析到车辆数据:{}", vehicleData);
// for (String stringEvent : events) {
// ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent);
// resolverEventService.execute(vehicleData);
// }
// }
// });
// }
// } catch (Exception e) {
// log.error("Error occurred in Kafka consumer thread", e);
// } finally {
// consumer.close();
// }
// });
// kafkaConsumerThread.start();
// }
//
//
//}
package com.zhilian.resolver.model;
import com.zhilian.common.core.utils.SpringUtils;
import com.zhilian.common.redis.service.RedisService;
import com.zhilian.common.resolver.domain.ResolverReportData;
import com.zhilian.resolver.service.ResolverEventService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString;
import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData;
/**
* @ClassName ModelsKafkaMessage
* @Description
* @Author Can.J
* @Date 2024/4/8
*/
@Component
@Slf4j
public class ModelsKafkaMessage {
@Autowired
private RedisService redisService;
private static final String TOPIC_NAME = "vehicle-topic";
private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092";
/**
*
* @return
*/
@PostConstruct
private void consumerMessages() {
Thread kafkaConsumerThread = new Thread(() -> {
log.info("启动线程");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("接收到的数据:" + record.value());
String str = hexStringToString(record.value());
List<ResolverReportData> resolverReportDataList = parseVehicleData(str);
for (ResolverReportData vehicleData : resolverReportDataList) {
log.info("解析到车辆数据:{}", vehicleData);
//获取vin
String vin = vehicleData.getVin();
//获取事件集
Set<Object> cacheSet = redisService.getCacheSet("vehicle-event:" + vin);
List<String> events = cacheSet.stream().map(item -> {
return String.valueOf(item);
}).collect(Collectors.toList());
log.info("事件集合:{}",events);
log.info("解析到车辆数据:{}", vehicleData);
for (String stringEvent : events) {
ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent);
resolverEventService.execute(vehicleData);
}
}
});
}
} catch (Exception e) {
log.error("Error occurred in Kafka consumer thread", e);
} finally {
consumer.close();
}
});
kafkaConsumerThread.start();
}
}