diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteTroubleService.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteTroubleService.java new file mode 100644 index 0000000..0af7ecc --- /dev/null +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteTroubleService.java @@ -0,0 +1,25 @@ +package com.couplet.remote; + +import com.couplet.common.core.constant.ServiceNameConstants; +import com.couplet.common.core.domain.Result; +import com.couplet.common.domain.CoupletTroubleCode; +import com.couplet.remote.factory.RemoteTroubleFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +@FeignClient(contextId = "remoteTroubleService" , + value = ServiceNameConstants.BUSINESS_SERVICE, + fallbackFactory = RemoteTroubleFallbackFactory.class +) +public interface RemoteTroubleService { + + /** + * 新增故障码 + * @param code + * @return + */ + @PostMapping("/trouble/newFaultData") + public Result newFaultData(@RequestBody CoupletTroubleCode code); + +} diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java index 13d3710..c953032 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java @@ -51,7 +51,4 @@ public interface RemoteVehicleService { @GetMapping("/findByVIN/{vin}") public Result> findByVIN(@PathVariable("vin") String vin); - @GetMapping("onOrOutLineByVIN") - public Integer onOrOutLineByVIN(@RequestParam("params") String params); - } diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteTroubleFallbackFactory.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteTroubleFallbackFactory.java new file mode 100644 index 0000000..e81225c --- /dev/null +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteTroubleFallbackFactory.java @@ -0,0 +1,34 @@ +package com.couplet.remote.factory; + +import com.couplet.common.core.domain.Result; +import com.couplet.common.domain.CoupletTroubleCode; +import com.couplet.remote.RemoteTroubleService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.openfeign.FallbackFactory; +import org.springframework.stereotype.Component; + +/** + * @author fufanrui + * @version 1.0 + * @description: TODO + * @date 2024/4/2 14:46 + */ +@Component +public class RemoteTroubleFallbackFactory implements FallbackFactory { + private static final Logger log = LoggerFactory.getLogger(RemoteTroubleFallbackFactory.class); + + + @Override + public RemoteTroubleService create(Throwable cause) { + return new RemoteTroubleService() { + + + @Override + public Result newFaultData(CoupletTroubleCode code) { + return Result.error("调用失败...."+cause.getMessage()); + } + + }; + } +} diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java index 603b590..58fa254 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java @@ -50,12 +50,6 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory> findByVIN(String vin) { return Result.error("车辆服务调用失败:" + cause.getMessage()); } - - @Override - public Integer onOrOutLineByVIN(String params) { - log.error("车辆服务调用失败:"+cause.getMessage()); - return null; - } }; } } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java new file mode 100644 index 0000000..828df61 --- /dev/null +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java @@ -0,0 +1,77 @@ +package com.couplet.analyze.msg.consumer; + + +import com.alibaba.fastjson.JSON; +import com.couplet.common.domain.request.FenceUpdateRequest; +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/4 + * @Description: + */ +@Log4j2 +@Component +@RabbitListener(queues = "fenceQueue") +public class FenceConsumer { + @Autowired + private StringRedisTemplate redisTemplate; + + @RabbitHandler + public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException { + + log.info("消息进入队列,传入的数据是:[{}]", fenceUpdateRequest); + + String messageId = message.getMessageProperties().getMessageId(); + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + if (!redisTemplate.hasKey("消息不丢失:" + messageId)) { + redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES); + } + + Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId); + redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES); + try { + if (0 < add) { + + HashMap hashMap = new HashMap<>(); + HashSet hashSet = new HashSet<>(); + hashSet.add(fenceUpdateRequest); + hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest); + redisTemplate.opsForValue().set("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES); + //判断车辆是否有实时数据,如果没有则删除数据 + channel.basicAck(deliveryTag, false); + } else { + log.error("消息不能重复消费:[{}]", fenceUpdateRequest); + channel.basicReject(deliveryTag, false); + } + } catch (IOException e) { + log.error("消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest); + String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId); + + Long o = Long.valueOf(s); + if (deliveryTag == o + 2) { + log.error("消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest); + channel.basicNack(deliveryTag, false, false); + } else { + log.error("消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest); + channel.basicNack(deliveryTag, true, false); + } + + } + + + } + +} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java index 0498344..4c85370 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java @@ -7,20 +7,15 @@ import com.couplet.analyze.msg.mapper.IncidentMapper; import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; import com.couplet.common.domain.request.RealTimeDataRequest; import com.rabbitmq.client.Channel; - import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.*; -import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; -import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -30,28 +25,8 @@ import java.util.concurrent.TimeUnit; */ @Log4j2 @Component -//@RabbitListener(queues = "vinQueue") +@RabbitListener(queues = "finByVinQueueName") public class MsgConsumer { - - // 普通交换机名称 - public static final String EXCHANGE_NAME = "confirm_exchange"; - // 队列名称 - public static final String QUEUE_NAME = "vinQueue"; - - public static final String ROUTING_KEY = "key1"; - - @Bean - public DirectExchange confirmExchange() { - return new DirectExchange(EXCHANGE_NAME); - } - @Bean - public Queue confirmQueue() { - return QueueBuilder.durable(QUEUE_NAME).build(); - } - @Bean - public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { - return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY); - } @Autowired private StringRedisTemplate redisTemplate; @Autowired diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java index 28b2399..0590b4f 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java @@ -4,17 +4,18 @@ import com.alibaba.fastjson.JSON; import com.couplet.analyze.msg.contents.StateConstant; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; -import lombok.extern.log4j.Log4j2; -import org.aspectj.bridge.Message; +import com.couplet.common.domain.CoupletTroubleCode; +import com.couplet.remote.RemoteTroubleService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Service; +import java.util.Date; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -30,6 +31,9 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl */ @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private RemoteTroubleService remoteTroubleService; + private static Logger log = LoggerFactory.getLogger(BreakdownServiceImpl.class); @@ -66,17 +70,22 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl || StateConstant.DCDC_STATUS != coupletMsgData.getDcdcStatus() || StateConstant.CHG_STATUS != coupletMsgData.getChgStatus()){ //获取过期的key - String expireKey = coupletMsgData.toString(); - redisTemplate.opsForValue().set(String.valueOf(coupletMsgData),JSON.toJSONString(coupletMsgData),10, TimeUnit.MINUTES); - log.debug("失效+key is:"+ expireKey); + String key = "breakdown"; + log.debug("失效+key is:"+ key); + String value = JSON.toJSONString(coupletMsgData); + redisTemplate.opsForSet().add(key, value); + long expireTime = 30; + redisTemplate.expire(key, expireTime, TimeUnit.MINUTES); + long timeMillis = System.currentTimeMillis(); + scheduledRedis(); + log.info("故障事件结束时间:"+timeMillis); log.info("故障事件检测结束....."); + log.info("故障事件结束....."); } long timeMillis = System.currentTimeMillis(); log.info("故障事件结束时间:"+timeMillis); log.info("故障事件检测结束....."); log.info("故障事件结束....."); - - } /** @@ -86,4 +95,31 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl public String getName() { return "breakdown"; } + + public void scheduledRedis() { + + // Get all members of the set + Set members = redisTemplate.opsForSet().members("breakdown"); + if (members.size()!=0){ + for (String member : members){ + CoupletMsgData code = JSON.parseObject(member, CoupletMsgData.class); + Set breakdownIds = redisTemplate.opsForSet().members(code.getVin()); + if (breakdownIds.size()==0){ + CoupletTroubleCode troubleCode = new CoupletTroubleCode(); + troubleCode.setTroubleStartTime(new Date()); + // 插入数据库 + troubleCode.setTroubleTag(0); + troubleCode.setTroubleVin(code.getVin()); + remoteTroubleService.newFaultData(troubleCode); + redisTemplate.opsForSet().add(code.getVin(), code.getVin()+":"+code); + long expireTime = 30; + redisTemplate.expire(code.getVin(), expireTime, TimeUnit.MINUTES); + + } + + } + + } + + } } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java index bed51e1..d4a9d20 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java @@ -3,6 +3,8 @@ package com.couplet.analyze.msg.service.impl; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; /** @@ -14,6 +16,8 @@ import org.springframework.stereotype.Service; @Log4j2 public class ElectronicFenceServiceImpl implements IncidentService { + @Autowired + private StringRedisTemplate redisTemplate; /** * 电子围栏事件 @@ -24,10 +28,16 @@ public class ElectronicFenceServiceImpl implements IncidentService { public void incident(CoupletMsgData coupletMsgData) { log.info("电子围栏事件开始......."); + + if (redisTemplate.hasKey("fence")) { + log.info("电子围栏事件redis存在......."); + String s = redisTemplate.opsForValue().get("fence"); + log.info("更改的电子围栏内容是:"+s); + log.info("电子围栏事件结束......."); + } + log.info("电子围栏事件结束......."); - - } /** diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index 97e08a3..6e1e511 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -1,19 +1,14 @@ package com.couplet.analyze.msg.service.impl; -import com.alibaba.fastjson.JSON; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.mapper.IncidentMapper; import com.couplet.analyze.msg.service.IncidentService; +import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - /** * @Author: LiJiaYao * @Date: 2024/4/2 @@ -47,7 +42,6 @@ public class RealTimeDataServiceImpl implements IncidentService { } log.info("[{}]开始传输实时数据", coupletMsgData.getVin()); - log.info("实时数据事件结束....."); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java new file mode 100644 index 0000000..67d7dcf --- /dev/null +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java @@ -0,0 +1,19 @@ +package com.couplet.analyze.msg.service.impl.breakdown; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/5 + * @Description: + */ +@Component +public class BreakdownEvent { + @Autowired + private StringRedisTemplate redisTemplate; + + + +}