diff --git a/zhilian-common/zhilian-common-redis/src/main/java/com/zhilian/common/redis/service/RedisService.java b/zhilian-common/zhilian-common-redis/src/main/java/com/zhilian/common/redis/service/RedisService.java index 5d0996f..b00f85d 100644 --- a/zhilian-common/zhilian-common-redis/src/main/java/com/zhilian/common/redis/service/RedisService.java +++ b/zhilian-common/zhilian-common-redis/src/main/java/com/zhilian/common/redis/service/RedisService.java @@ -1,6 +1,5 @@ package com.zhilian.common.redis.service; -import io.lettuce.core.output.DoubleListOutput; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.*; @@ -272,6 +271,9 @@ public class RedisService { redisTemplate.opsForZSet().add(zkey, collect); } + + + /** * 获取Zset集合大小 * @@ -326,7 +328,7 @@ public class RedisService { } /** - * 增加分数 + * 修改对应键值分数,delta为正数则增加,为负数则减少 * @param zkey * @param value * @param delta @@ -371,12 +373,13 @@ public class RedisService { /** * 获取最小值 + * * @param zkey * @return */ - public Map getCacheZsetMin(final String zkey){ + public ZSetOperations.TypedTuple getCacheZsetMin(final String zkey){ ZSetOperations.TypedTuple typedTuple = redisTemplate.opsForZSet().popMin(zkey); - return new HashMap(){{put(typedTuple.getValue(),typedTuple.getScore());}}; + return typedTuple; } @@ -390,8 +393,13 @@ public class RedisService { return redisTemplate.keys(pattern); } - public void deleteCacheSet(String s) { + public void deleteCacheSet(String key) { SetOperations setOperations = redisTemplate.opsForSet(); - setOperations.remove(s); + setOperations.remove(key); + } + + public void deleteCacheSetValue(String key, T value) { + SetOperations setOperations = redisTemplate.opsForSet(); + setOperations.remove(key,value); } } diff --git a/zhilian-common/zhilian-common-system/src/main/java/com/zhilian/common/system/remote/RemoteFileService.java b/zhilian-common/zhilian-common-system/src/main/java/com/zhilian/common/system/remote/RemoteFileService.java index 889aa28..aeebfbb 100644 --- a/zhilian-common/zhilian-common-system/src/main/java/com/zhilian/common/system/remote/RemoteFileService.java +++ b/zhilian-common/zhilian-common-system/src/main/java/com/zhilian/common/system/remote/RemoteFileService.java @@ -12,7 +12,6 @@ import org.springframework.web.multipart.MultipartFile; /** * 文件服务 - * * @author zhilian */ @FeignClient(contextId = "remoteFileService", value = ServiceNameConstants.FILE_SERVICE, fallbackFactory = RemoteFileFallbackFactory.class) diff --git a/zhilian-modules/pom.xml b/zhilian-modules/pom.xml index da36d68..b79f7d4 100644 --- a/zhilian-modules/pom.xml +++ b/zhilian-modules/pom.xml @@ -17,6 +17,7 @@ zhilian-resolver zhilian-business zhilian-manager + zhilian-message zhilian-modules diff --git a/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/ZhilianBusinessApplication.java b/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/ZhilianBusinessApplication.java index cf49a36..92a1cc0 100644 --- a/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/ZhilianBusinessApplication.java +++ b/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/ZhilianBusinessApplication.java @@ -12,7 +12,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @Description 启动类 * @Author: LiYuan * @CreateTime: 2024-03-31 10:13 - * @Description: TODO + * @Description: 业务模块启动类 * @Version: 1.0 */ @EnableCustomConfig diff --git a/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/mapper/BusinessBreakMapper.java b/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/mapper/BusinessBreakMapper.java index 65409ba..fa46ccf 100644 --- a/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/mapper/BusinessBreakMapper.java +++ b/zhilian-modules/zhilian-business/src/main/java/com/zhilian/business/mapper/BusinessBreakMapper.java @@ -5,6 +5,7 @@ import java.util.List; import com.zhilian.business.domain.BreakLog; import com.zhilian.business.domain.BusinessBreak; import com.zhilian.business.domain.request.BreakReq; +import org.apache.ibatis.annotations.Mapper; /** * 故障Mapper接口 @@ -12,6 +13,7 @@ import com.zhilian.business.domain.request.BreakReq; * @author Yy * @date 2024-04-07 */ +@Mapper public interface BusinessBreakMapper { /** diff --git a/zhilian-modules/zhilian-business/src/main/resources/banner.txt b/zhilian-modules/zhilian-business/src/main/resources/banner.txt index 0dd5eee..8b647ec 100644 --- a/zhilian-modules/zhilian-business/src/main/resources/banner.txt +++ b/zhilian-modules/zhilian-business/src/main/resources/banner.txt @@ -1,2 +1,3 @@ Spring Boot Version: ${spring-boot.version} Spring Application Name: ${spring.application.name} + diff --git a/zhilian-modules/zhilian-manager/src/main/java/com/zhilian/manager/ZhilianMagagerApplication.java b/zhilian-modules/zhilian-manager/src/main/java/com/zhilian/manager/ZhilianMagagerApplication.java index e0aa58e..3167c50 100644 --- a/zhilian-modules/zhilian-manager/src/main/java/com/zhilian/manager/ZhilianMagagerApplication.java +++ b/zhilian-modules/zhilian-manager/src/main/java/com/zhilian/manager/ZhilianMagagerApplication.java @@ -8,7 +8,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @BelongsPackage: com.zhilian.manager * @Author: LiYuan * @CreateTime: 2024-03-31 10:17 - * @Description: TODO + * @Description: 管理模块启动类 * @Version: 1.0 */ @SpringBootApplication diff --git a/zhilian-modules/zhilian-message/pom.xml b/zhilian-modules/zhilian-message/pom.xml new file mode 100644 index 0000000..2ef2ee8 --- /dev/null +++ b/zhilian-modules/zhilian-message/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.zhilian + zhilian-modules + 3.6.3 + + + zhilian-message + + + 17 + 17 + UTF-8 + + + + + com.zhilian + zhilian-common-core + + + + \ No newline at end of file diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java index 8bd0e3a..d01eef2 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java @@ -3,6 +3,10 @@ package com.zhilian.online; import com.zhilian.common.security.annotation.EnableCustomConfig; import com.zhilian.common.security.annotation.EnableMyFeignClients; import com.zhilian.common.swagger.annotation.EnableCustomSwagger2; +import com.zhilian.online.controller.OnlineLoadCenterController; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -21,6 +25,4 @@ public class ZhiLianOnlineApplication{ SpringApplication.run(ZhiLianOnlineApplication.class,args); } - - } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java index 1d26a43..ae87a1a 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java @@ -63,6 +63,36 @@ public class RabbitConfig { */ public static final String DEAD_ROUTING_KEY = "dead_routing"; + /** + * 延迟队列名称 + */ + public static final String DELAY_QUEUE_FOR_CAR = "delay_queue_for_car"; + + /** + * 延迟交换机名称 + */ + public static final String DELAY_EXCHANGE_FOR_CAR = "delay_exchange_for_car"; + + /** + * 延迟路由键 + */ + public static final String DELAY_ROUTING_FOR_CAR = "delay_routing_for_car"; + + /** + * 死信队列名称 + */ + public static final String DEAD_QUEUE_FOR_CAR = "dead_queue_for_car"; + + /** + * 死信交换机名称 + */ + public static final String DEAD_EXCHANGE_FOR_CAR = "dead_exchange_for_car"; + + /** + * 死信路由键 + */ + public static final String DEAD_ROUTING_FOR_CAR = "dead_routing_for_car"; + /** * 注入死信队列实例 * @@ -153,11 +183,89 @@ public class RabbitConfig { // } - // @Bean // public MessageConverter jsonMessageConverter(){ // return new Jackson2JsonMessageConverter(); // } + /** + * 注入死信队列实例 + * + * @return Queue + */ + @Bean + Queue deadQueueForCar() { + log.info("死信队列ForCar创建成功"); + return new Queue(DEAD_QUEUE_FOR_CAR, true, false, false); + } + + /** + * 注入死信交换机实例 + * + * @return DirectExchange + */ + @Bean + DirectExchange deadExchangeForCar() { + log.info("死信交换机ForCar创建成功"); + return new DirectExchange(DEAD_EXCHANGE_FOR_CAR, true, false); + } + + /** + * 将死信队列与死信交换机绑定 + * + * @return Binding + */ + @Bean + Binding deadBindingForCar() { + log.info("死信通道ForCar建立成功"); + return BindingBuilder.bind(deadQueue()) + .to(deadExchange()) + .with(DEAD_ROUTING_FOR_CAR); + } + + + /** + * 注入延迟队列实例 + * + * @return Queue + */ + @Bean + Queue delayQueueForCar() { + Map map = new HashMap<>(); + //设置消息过期时间为30秒 + map.put("x-message-ttl", 1000 * 30); + //设置消息过期后存储的死信交换机 + map.put("x-dead-letter-exchange", DEAD_EXCHANGE_FOR_CAR); + //设置死信路由键 + map.put("x-dead-letter-routing-key", DEAD_ROUTING_FOR_CAR); + log.info("延迟队列ForCar创建成功"); + return new Queue(DELAY_QUEUE_FOR_CAR, true, false, false, map); + } + + /** + * 注入延迟交换机实例 + * + * @return DirectExchange + */ + @Bean + DirectExchange delayExchangeForCar() { + log.info("延迟交换机ForCar创建成功"); + return new DirectExchange(DELAY_EXCHANGE_FOR_CAR, true, false); + } + + /** + * 将延迟队列与延迟交换机绑定 + * + * @return + */ + @Bean + Binding delayBindingForCar() { + log.info("延迟通道ForCar创建成功"); + return BindingBuilder.bind(deadQueue()) + .to(delayExchange()) + .with(DELAY_ROUTING_FOR_CAR); + } + + } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java index 4cbde11..9de409f 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java @@ -1,6 +1,5 @@ package com.zhilian.online.constans; -import lombok.AllArgsConstructor; import lombok.Data; /** @@ -27,7 +26,11 @@ public class OnlineConstants { /** * 节点负载前缀 */ - public static final String NODE_LOAD_PREFIX = "node_load"; + public static final String GATHER_LOAD_CONTROL = "gather_info_control"; + /** + * 节点负载信息 + */ + public static final String ONLINE_VEHICLE = "online_vehicle:"; diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/DeadQueueConsumer.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/DeadQueueConsumer.java index f01f88b..c136ddb 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/DeadQueueConsumer.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/DeadQueueConsumer.java @@ -1,12 +1,26 @@ package com.zhilian.online.consumer; import com.alibaba.fastjson.JSON; +import com.zhilian.common.core.constant.Constants; +import com.zhilian.common.core.domain.Result; +import com.zhilian.common.redis.service.RedisService; import com.zhilian.online.config.RabbitConfig; +import com.zhilian.online.constans.OnlineConstants; +import com.zhilian.online.domain.Gather; import com.zhilian.online.domain.req.GatherRegReq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + /** * @BelongsProject: smart-cloud-server * @BelongsPackage: com.zhilian.online.consumer @@ -19,21 +33,72 @@ import org.springframework.stereotype.Component; @Slf4j public class DeadQueueConsumer { + /** + * redis服务 + */ + @Autowired + private RedisService redisService; + /** * 消费死信队列中的消息,确保车辆上线成功 + * 向节点地址的fluxMQ发送http请求,通过响应码来确定节点是否成功上线 */ @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME) - public void SecureOnline(String gatherRegReqMsg) { - GatherRegReq gatherRegReq = JSON.parseObject(gatherRegReqMsg, GatherRegReq.class); - log.info("开始检查节点{}的上线状态......", - gatherRegReq.getClientId()); - - - log.info("节点上线成功"); - - log.info("节点上线失败"); + public void SecureOnline(String gatherMsg) { + Gather gather = JSON.parseObject(gatherMsg, Gather.class); + log.info("开始检查节点{}的上线状态......", gatherMsg); + String ipAddress = gather.getIpAddress(); + HttpURLConnection connection = null; + try { + ipAddress = "http://" + ipAddress; + URL url = new URL(ipAddress); + connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("GET"); + int responseCode = connection.getResponseCode(); + if (Constants.SUCCESS == responseCode){ + log.info("节点上线成功"); + } + if (Constants.FAIL == responseCode){ + log.error("节点{}上线失败",gatherMsg); + //上线失败需要将该节点的负载均衡缓存删除 + if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())){ + redisService.removeCacheZsetBatch(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId()); + } + } + } catch (Exception e) { + log.error("节点上线失败{}",e.getMessage()); + e.printStackTrace(); + }finally { + if (null != connection){ + connection.disconnect(); + } + } } +// /** +// * 确定车辆上线成功 +// * @param vin +// */ +// @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_FOR_CAR) +// public void checkedConnectionOfVehicle(String vin){ +// Set cacheSet = redisService.getCacheSet(OnlineConstants.ONLINE_VEHICLE); +// List list= cacheSet.stream().map(item -> { +// return String.valueOf(item); +// }).collect(Collectors.toList()); +// if (list.contains(vin)){ +// log.info("{}车辆上线成功",vin); +// }else { +// log.error("{}车辆上线失败",vin); +// +// +// String clientId = (String) redisService.getCacheObject(OnlineConstants.ONLINE_VEHICLE + vin); +// +// redisService.deleteObject(OnlineConstants.ONLINE_VEHICLE+vin); +// redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_CONTROL); +// redisService.incrementScore(OnlineConstants.GATHER_LOAD_CONTROL + clientId ,gather,-1.0); +// } +// } + } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java index d244ad7..e1acaf9 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java @@ -20,9 +20,9 @@ import java.util.function.Consumer; @Slf4j public class KafkaConsumer { - @KafkaListener(topics = "test-topic") - public void handlerMsg(ConsumerRecord record){ - log.info("消费者消费消息信息为:{}", JSON.toJSONString(record)); - } +// @KafkaListener(topics = "test-topic") +// public void handlerMsg(ConsumerRecord record){ +// log.info("消费者消费消息信息为:{}", JSON.toJSONString(record)); +// } } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java index f0757fe..bf95c96 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java @@ -1,5 +1,6 @@ package com.zhilian.online.controller; +import com.alibaba.fastjson.JSON; import com.zhilian.common.core.domain.Result; import com.zhilian.common.core.utils.ip.IpUtils; import com.zhilian.common.core.web.controller.BaseController; @@ -9,10 +10,7 @@ import com.zhilian.online.service.OnlineLoadCenterService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; @@ -41,15 +39,15 @@ public class OnlineLoadCenterController extends BaseController { /** - * @description: 节点申请注解令牌接口,仅限内网访问 + * @description: 节点申请注解令牌接口, 仅限内网访问 * @author: LiYuan * @param: vehicle * @return: Result **/ @GetMapping("/applyForReg") - public Result applyForReg(){ - log.info("申请注册令牌"); - return onlineLoadCenterService.applyForReg(); + public Result applyForReg(Gather gather) { + log.info("申请注册令牌{}", JSON.toJSONString(gather)); + return onlineLoadCenterService.applyForReg(gather); } @@ -60,21 +58,37 @@ public class OnlineLoadCenterController extends BaseController { * @return: Result **/ @PostMapping("/regGather") - public Result regGather(@Validated Gather gather){ + public Result regGather(@Validated @RequestBody Gather gather) { String ipAddr = IpUtils.getIpAddr(request); gather.setIpAddress(ipAddr); - log.info("节点{}正在上线",gather); + log.info("节点{}正在上线", JSON.toJSONString(gather)); return onlineLoadCenterService.regGather(gather); } /** - * 车辆上线至收集节点 - * @return null + * 车辆申请上线收集节点 + * + * @return gather + * @author: LiYuan + * @param: vin + * @return: Result */ - public Result getConnectionOption(){ + @GetMapping("/applyForConnectToGather") + public Result applyForConnectToGather(@RequestParam("vin") String vin) { + return Result.success(onlineLoadCenterService.applyForConnectToGather(vin)); + } - return null; + /** + * @description:车辆下线接口 + * @author: LiYuan + * @param: vin + * @return: Result + **/ + @GetMapping("/applyForDisconnectToGather") + public Result applyForDisconnectToGather(@RequestParam("vin") String vin) { + onlineLoadCenterService.applyForDisconnectToGather(vin); + return Result.success(); } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/domain/Gather.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/domain/Gather.java index 251cbdc..3a2e127 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/domain/Gather.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/domain/Gather.java @@ -25,14 +25,12 @@ public class Gather extends BaseEntity { /** * 收集节点ID */ - @TableId(type = IdType.INPUT) private String clientId; -// /** -// * 收集节点登录令牌 -// */ -// @NotBlank -// private String token; + /** + * 收集节点登录令牌 + */ + private String token; /** * broker diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/OnlineLoadCenterService.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/OnlineLoadCenterService.java index b1a3e35..c664fde 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/OnlineLoadCenterService.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/OnlineLoadCenterService.java @@ -20,7 +20,7 @@ public interface OnlineLoadCenterService extends IService { * @param: vehicle * @return: Result **/ - Result applyForReg(); + Result applyForReg(Gather gather); /** * @description: 车辆使用申请的一次性账户连接收集节点 @@ -29,4 +29,22 @@ public interface OnlineLoadCenterService extends IService { * @return: Result **/ Result regGather(Gather gather); + + + /** + * 车辆申请上线收集节点 + * @author: LiYuan + * @param: vin + * @return gather + */ + Gather applyForConnectToGather(String vin); + + + /** + * @description:车辆下线接口 + * @author: LiYuan + * @param: vin + * @return: Result + **/ + void applyForDisconnectToGather(String vin); } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java index c2ef6a2..6761d16 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java @@ -9,15 +9,18 @@ import com.zhilian.online.config.RabbitConfig; import com.zhilian.online.constans.OnlineConstants; import com.zhilian.online.domain.Gather; import com.zhilian.online.domain.VehicleAccount; -import com.zhilian.online.domain.req.GatherRegReq; import com.zhilian.online.mapper.OnlineLoadCenterMapper; import com.zhilian.online.service.OnlineLoadCenterService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Service; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @version 1.0 @@ -54,43 +57,82 @@ public class OnlineLoadCenterServiceImpl extends ServiceImpl **/ @Override - public Result applyForReg() { + public Result applyForReg(Gather gather) { //生成一次性令牌 String token = IdUtils.fastSimpleUUID(); //将令牌信息缓存到Redis中 - redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX , token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS); + redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX+gather.getClientId(),token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS); - //将账户信息返回客户端 + //将令牌信息返回客户端 return Result.success(token); } /** - * @description: 节点上线成功 + * @description: 节点上线 * @author: LiYuan * @param: Gather * @return: Result **/ @Override public Result regGather(Gather gather) { -// //判断登录令牌是否过期,一致 -// if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) { -// return Result.error("令牌已过期"); -// } + //判断登录令牌是否过期,一致 + if (!redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) { + return Result.error("令牌已过期"); + } + String token = redisService.getCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId()); + if (!token.equals(gather.getToken())){ + return Result.error("令牌错误"); + } //为该节点创建负载均衡缓存 - if (redisService.hasKey(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId())) { - redisService.deleteObject(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId()); - } - redisService.setCacheZsetValue(OnlineConstants.ONLINE_TOKEN_PREFIX + gather.getClientId(), gather, 0.0); + redisService.setCacheZsetValue(OnlineConstants.GATHER_LOAD_CONTROL + gather.getClientId(), JSON.toJSONString(gather), 0.0); -// //向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线 -// rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(gather)); + //向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线 + rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(gather)); return Result.success("节点上线"); } + /** + * 车辆申请上线收集节点 + * @author: LiYuan + * @param: vin + * @return gather + */ + @Override + public Gather applyForConnectToGather(String vin) { + //判断车辆是否是我们的车 + List cacheList = redisService.getCacheList("our_car"); + List list = cacheList.stream().map(item -> { + return String.valueOf(item); + }).collect(Collectors.toList()); + if (!list.contains(vin)){ + throw new RuntimeException("车辆未登记"); + } + //获取负载最少的车辆进行链接 + ZSetOperations.TypedTuple cacheZsetMin = redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_CONTROL); + Gather gather = JSON.parseObject((String) cacheZsetMin.getValue(), Gather.class); + //存放节点车辆信息 + redisService.setCacheObject(OnlineConstants.ONLINE_VEHICLE+vin,gather.getClientId()); + //发送延迟队列确定车辆上线 + rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_FOR_CAR, RabbitConfig.DELAY_ROUTING_FOR_CAR,vin); + + return gather; + } + + /** + * @description:车辆下线接口 + * @author: LiYuan + * @param: vin + * @return: Result + **/ + @Override + public void applyForDisconnectToGather(String vin) { + + } + } diff --git a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/model/ModelsKafkaMessage.java b/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/model/ModelsKafkaMessage.java index 909ac62..5821be9 100644 --- a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/model/ModelsKafkaMessage.java +++ b/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/model/ModelsKafkaMessage.java @@ -1,97 +1,97 @@ -//package com.zhilian.resolver.model; -//import com.zhilian.common.core.utils.SpringUtils; -//import com.zhilian.common.redis.service.RedisService; -//import com.zhilian.common.resolver.domain.ResolverReportData; -//import com.zhilian.resolver.service.ResolverEventService; -//import lombok.extern.slf4j.Slf4j; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.apache.kafka.clients.consumer.KafkaConsumer; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.scheduling.annotation.Scheduled; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.PostConstruct; -//import java.time.Duration; -//import java.util.*; -//import java.util.stream.Collectors; -// -//import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString; -//import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData; -// -///** -// * @ClassName ModelsKafkaMessage -// * @Description 描述 -// * @Author Can.J -// * @Date 2024/4/8 -// */ -//@Component -//@Slf4j -//public class ModelsKafkaMessage { -// @Autowired -// private RedisService redisService; -// private static final String TOPIC_NAME = "vehicle-topic"; -// private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092"; -// -// -// -// /** -// * 消费者配置 -// * @return -// */ -// @PostConstruct -// private void consumerMessages() { -// Thread kafkaConsumerThread = new Thread(() -> { -// log.info("启动线程"); -// Properties props = new Properties(); -// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); -// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); -// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// -// //创建消费者 -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// -// try { -// -// //订阅主题 -// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); -// -// //持续消费消息 -// while (true) { -// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); -// records.forEach(record -> { -// System.out.println("接收到的数据:" + record.value()); -// String str = hexStringToString(record.value()); -// List resolverReportDataList = parseVehicleData(str); -// for (ResolverReportData vehicleData : resolverReportDataList) { -// log.info("解析到车辆数据:{}", vehicleData); -// -// //获取vin -// String vin = vehicleData.getVin(); -// //获取事件集 -// Set cacheSet = redisService.getCacheSet("vehicle-event:" + vin); -// List events = cacheSet.stream().map(item -> { -// return String.valueOf(item); -// }).collect(Collectors.toList()); -// log.info("事件集合:{}",events); -// -// log.info("解析到车辆数据:{}", vehicleData); -// for (String stringEvent : events) { -// ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent); -// resolverEventService.execute(vehicleData); -// } -// } -// }); -// } -// } catch (Exception e) { -// log.error("Error occurred in Kafka consumer thread", e); -// } finally { -// consumer.close(); -// } -// }); -// kafkaConsumerThread.start(); -// } -// -// -//} +package com.zhilian.resolver.model; +import com.zhilian.common.core.utils.SpringUtils; +import com.zhilian.common.redis.service.RedisService; +import com.zhilian.common.resolver.domain.ResolverReportData; +import com.zhilian.resolver.service.ResolverEventService; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + +import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString; +import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData; + +/** + * @ClassName ModelsKafkaMessage + * @Description 描述 + * @Author Can.J + * @Date 2024/4/8 + */ +@Component +@Slf4j +public class ModelsKafkaMessage { + @Autowired + private RedisService redisService; + private static final String TOPIC_NAME = "vehicle-topic"; + private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092"; + + + + /** + * 消费者配置 + * @return + */ + @PostConstruct + private void consumerMessages() { + Thread kafkaConsumerThread = new Thread(() -> { + log.info("启动线程"); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + //创建消费者 + KafkaConsumer consumer = new KafkaConsumer<>(props); + + try { + + //订阅主题 + consumer.subscribe(Collections.singletonList(TOPIC_NAME)); + + //持续消费消息 + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + records.forEach(record -> { + System.out.println("接收到的数据:" + record.value()); + String str = hexStringToString(record.value()); + List resolverReportDataList = parseVehicleData(str); + for (ResolverReportData vehicleData : resolverReportDataList) { + log.info("解析到车辆数据:{}", vehicleData); + + //获取vin + String vin = vehicleData.getVin(); + //获取事件集 + Set cacheSet = redisService.getCacheSet("vehicle-event:" + vin); + List events = cacheSet.stream().map(item -> { + return String.valueOf(item); + }).collect(Collectors.toList()); + log.info("事件集合:{}",events); + + log.info("解析到车辆数据:{}", vehicleData); + for (String stringEvent : events) { + ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent); + resolverEventService.execute(vehicleData); + } + } + }); + } + } catch (Exception e) { + log.error("Error occurred in Kafka consumer thread", e); + } finally { + consumer.close(); + } + }); + kafkaConsumerThread.start(); + } + + +}