diff --git a/couplet-auth/src/main/resources/bootstrap.yml b/couplet-auth/src/main/resources/bootstrap.yml index 427f682..68c3c25 100644 --- a/couplet-auth/src/main/resources/bootstrap.yml +++ b/couplet-auth/src/main/resources/bootstrap.yml @@ -17,9 +17,11 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 + namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java index 8f1a275..ec9a53a 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java @@ -4,7 +4,9 @@ import com.couplet.common.core.constant.ServiceNameConstants; import com.couplet.common.core.domain.Result; import com.couplet.common.domain.Vehicle; import com.couplet.common.domain.VehicleMiddle; +import com.couplet.common.domain.request.VehicleListParams; import com.couplet.remote.factory.RemoteVehicleFallbackFactory; +import lombok.extern.java.Log; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.*; @@ -62,4 +64,7 @@ public interface RemoteVehicleService { @PostMapping("/vehicleAndLogo/queryByLogoIds/{vehicleId}") public Result> queryByLogoIds(@PathVariable("vehicleId") Long vehicleId); + @PostMapping("/list") + public Result list(@RequestBody VehicleListParams listParams); + } diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java index 34ed01e..1cda3b4 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java @@ -3,6 +3,7 @@ package com.couplet.remote.factory; import com.couplet.common.core.domain.Result; import com.couplet.common.domain.Vehicle; import com.couplet.common.domain.VehicleMiddle; +import com.couplet.common.domain.request.VehicleListParams; import com.couplet.remote.RemoteVehicleService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,11 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory> queryByLogoIds(Long vehicleId) { return Result.error("车辆服务调用失败:"+cause.getMessage()); } + + @Override + public Result list(VehicleListParams listParams) { + return Result.error("车辆服务调用失败:"+cause.getMessage()); + } }; } } diff --git a/couplet-common/couplet-common-event/.gitignore b/couplet-common/couplet-common-event/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/couplet-common/couplet-common-event/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/couplet-common/couplet-common-event/pom.xml b/couplet-common/couplet-common-event/pom.xml new file mode 100644 index 0000000..f0150b5 --- /dev/null +++ b/couplet-common/couplet-common-event/pom.xml @@ -0,0 +1,28 @@ + + + 4.0.0 + + com.couplet + couplet-common + 3.6.3 + + + couplet-common-event + + 事件系统 + + 17 + 17 + UTF-8 + + + + + com.couplet + couplet-common-redis + + + + diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java new file mode 100644 index 0000000..f920863 --- /dev/null +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/contents/AnalyzeEventContents.java @@ -0,0 +1,27 @@ +package com.couplet.analyze.common.contents; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/7 + * @Description: 事件内容 + */ +public class AnalyzeEventContents { + + /** + * 故障 + */ + String BREAKDOWN = "breakdown"; + /** + * 电子围栏 + */ + String ELECTRONIC_FENCE = "electronic-fence"; + /** + * 实时数据 + */ + String REAL_TIME_DATA = "real-time-data"; + + /** + * 存储 + */ + String STORED_EVENT = "stored-event"; +} diff --git a/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java new file mode 100644 index 0000000..32d043a --- /dev/null +++ b/couplet-common/couplet-common-event/src/main/java/com/couplet/analyze/common/event/AnalyzeEventCache.java @@ -0,0 +1,51 @@ +package com.couplet.analyze.common.event; + +import com.couplet.common.redis.service.RedisService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Set; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/7 + * @Description: + */ + +public class AnalyzeEventCache { + + @Autowired + private RedisService redisService; + + public String encode(String vin){ + return "event:arr"+vin; + } + /** + * 添加事件 + */ + public void addEvent(String vin, String eventValue) { + redisService.setCacheSet(encode(vin), eventValue); + } + + /** + * 修改事件 + */ + public void updateEvent(String vin,String eventValue){ + redisService.setCacheSet(encode(vin), eventValue); + } + /** + * 删除事件 + */ + public void removeEvent(String vin,String eventName){ + redisService.deleteSet(encode(vin), eventName); + } + /** + * 获取事件集合 + * + * @return + */ + public Set getEventList(String vin){ + + return redisService.getCacheSet(encode(vin)); + } + +} diff --git a/couplet-common/couplet-common-event/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/couplet-common/couplet-common-event/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..cb7030c --- /dev/null +++ b/couplet-common/couplet-common-event/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.couplet.analyze.common.event.AnalyzeEventCache diff --git a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java index 5e5dcd7..003af12 100644 --- a/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java +++ b/couplet-common/couplet-common-redis/src/main/java/com/couplet/common/redis/service/RedisService.java @@ -1,6 +1,7 @@ package com.couplet.common.redis.service; import com.couplet.common.domain.CoupletVehicleData; +import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.*; import org.springframework.stereotype.Component; @@ -179,8 +180,31 @@ public class RedisService { setOperation.add(it.next()); } return setOperation; + } /** + * 缓存Set + * + * @param key 缓存键值 + * @param dataSet 缓存的数据 + * + * @return 缓存数据的对象 + */ + public BoundSetOperations setCacheSet (final String key, final T dataSet) { + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + setOperation.add(dataSet); + return setOperation; } + /** + * 删除set + * @param key + * @param setValue + */ + public void deleteSet(String key, String setValue) { + + //缓存的键值 + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + setOperation.remove(setValue); //缓存的数据 + } /** * 获得缓存的set * @@ -192,6 +216,7 @@ public class RedisService { return redisTemplate.opsForSet().members(key); } + /** * 缓存Map * @@ -273,4 +298,6 @@ public class RedisService { public Collection keys (final String pattern) { return redisTemplate.keys(pattern); } + + } diff --git a/couplet-common/pom.xml b/couplet-common/pom.xml index e1454b0..abf1afa 100644 --- a/couplet-common/pom.xml +++ b/couplet-common/pom.xml @@ -19,6 +19,7 @@ couplet-common-datasource couplet-common-system couplet-common-business + couplet-common-event diff --git a/couplet-gateway/src/main/resources/bootstrap.yml b/couplet-gateway/src/main/resources/bootstrap.yml index 091e68a..6dc39fe 100644 --- a/couplet-gateway/src/main/resources/bootstrap.yml +++ b/couplet-gateway/src/main/resources/bootstrap.yml @@ -15,9 +15,11 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 + namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index 55191f0..006bb88 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -11,6 +11,7 @@ couplet-analyze-msg + 解析系统 17 17 @@ -86,13 +87,14 @@ org.eclipse.paho.client.mqttv3 1.2.5 + com.couplet - couplet-modules-mq + couplet-common-event - com.couplet - couplet-common-business + org.springframework.kafka + spring-kafka diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java index e4e33f1..53716a3 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/FenceConsumer.java @@ -1,84 +1,84 @@ -package com.couplet.analyze.msg.consumer; - -import com.couplet.common.core.text.Convert; -import com.couplet.common.domain.request.FenceUpdateRequest; -import com.couplet.common.redis.service.RedisService; -import com.rabbitmq.client.Channel; -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.BoundSetOperations; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/4 - * @Description: - */ -@Log4j2 -@Component -@RabbitListener(queues = "fenceQueue") -public class FenceConsumer { - @Autowired - private RedisService redisService; - - @RabbitHandler - public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException { - - log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest); - - String messageId = message.getMessageProperties().getMessageId(); - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) { - redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag); - } -// if (redisService.hasKey("fence")){ -// redisService.deleteObject("fence"); +//package com.couplet.analyze.msg.consumer; +// +//import com.couplet.common.core.text.Convert; +//import com.couplet.common.domain.request.FenceUpdateRequest; +//import com.couplet.common.redis.service.RedisService; +//import com.rabbitmq.client.Channel; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.BoundSetOperations; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.HashMap; +//import java.util.HashSet; +//import java.util.Set; +//import java.util.concurrent.TimeUnit; +// +///** +// * @Author: LiJiaYao +// * @Date: 2024/4/4 +// * @Description: +// */ +//@Log4j2 +//@Component +//@RabbitListener(queues = "fenceQueue") +//public class FenceConsumer { +// @Autowired +// private RedisService redisService; +// +// @RabbitHandler +// public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException { +// +// log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest); +// +// String messageId = message.getMessageProperties().getMessageId(); +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) { +// redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag); // } - - HashSet objects = new HashSet<>(); - objects.add(messageId); - - BoundSetOperations set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects); - redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES); - try { - if (set != null) { - HashMap hashMap = new HashMap<>(); - HashSet hashSet = new HashSet<>(); - hashSet.add(fenceUpdateRequest); - hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest); -// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES); -// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap)); - - String key = Convert.toStr(fenceUpdateRequest.getFenceId()); - redisService.setCacheObject(key,fenceUpdateRequest); - redisService.expire(key, 10, TimeUnit.MINUTES); - //判断车辆是否有实时数据,如果没有则删除数据 - channel.basicAck(deliveryTag, false); - } else { - log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest); - channel.basicReject(deliveryTag, false); - } - } catch (IOException e) { - log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest); - String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId); - - Long o = Long.valueOf(s); - if (deliveryTag == o + 2) { - log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest); - channel.basicNack(deliveryTag, false, false); - } else { - log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest); - channel.basicNack(deliveryTag, true, false); - } - } - } -} +//// if (redisService.hasKey("fence")){ +//// redisService.deleteObject("fence"); +//// } +// +// HashSet objects = new HashSet<>(); +// objects.add(messageId); +// +// BoundSetOperations set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects); +// redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES); +// try { +// if (set != null) { +// HashMap hashMap = new HashMap<>(); +// HashSet hashSet = new HashSet<>(); +// hashSet.add(fenceUpdateRequest); +// hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest); +//// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES); +//// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap)); +// +// String key = Convert.toStr(fenceUpdateRequest.getFenceId()); +// redisService.setCacheObject(key,fenceUpdateRequest); +// redisService.expire(key, 10, TimeUnit.MINUTES); +// //判断车辆是否有实时数据,如果没有则删除数据 +// channel.basicAck(deliveryTag, false); +// } else { +// log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest); +// channel.basicReject(deliveryTag, false); +// } +// } catch (IOException e) { +// log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest); +// String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId); +// +// Long o = Long.valueOf(s); +// if (deliveryTag == o + 2) { +// log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest); +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest); +// channel.basicNack(deliveryTag, true, false); +// } +// } +// } +//} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java index feaf03f..f184f7f 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/MsgConsumer.java @@ -1,91 +1,91 @@ -package com.couplet.analyze.msg.consumer; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.couplet.analyze.msg.domain.CoupletMsgData; -import com.couplet.analyze.msg.mapper.IncidentMapper; -import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; -import com.couplet.common.domain.request.RealTimeDataRequest; -import com.rabbitmq.client.Channel; -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/4 - * @Description: - */ -@Log4j2 -@Component -@RabbitListener(queues = "finByVinQueueName") -public class MsgConsumer { - @Autowired - private StringRedisTemplate redisTemplate; - @Autowired - private IncidentMapper incidentMapper; - - @RabbitHandler - public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException { - - log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest); - - String messageId = message.getMessageProperties().getMessageId(); - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - if (!redisTemplate.hasKey("消息不丢失:" + messageId)) { - redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES); - } - - Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId); - redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES); - try { - if (0 < add) { - JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest)); - Long userId = jsonObject.getLong("userId"); - String vin = jsonObject.getString("vin"); - RealTimeDataRequest request = new RealTimeDataRequest(); - request.setVin(vin); - request.setUserId(userId); - RealTimeJudge.addRealTime(request); - //判断车辆是否有实时数据,如果没有则删除数据 - if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){ - log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin()); - } - CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin()); - if (incident == null){ - log.error("没有数据......"); - } - redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident)); - - channel.basicAck(deliveryTag, false); - } else { - log.error("消息不能重复消费:[{}]", realTimeDataRequest); - channel.basicReject(deliveryTag, false); - } - } catch (IOException e) { - - log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest); - String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId); - - Long o = Long.valueOf(s); - if (deliveryTag == o + 2) { - log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest); - channel.basicNack(deliveryTag, false, false); - } else { - log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest); - channel.basicNack(deliveryTag, true, false); - } - - } - - - } - -} +//package com.couplet.analyze.msg.consumer; +// +//import com.alibaba.fastjson.JSON; +//import com.alibaba.fastjson.JSONObject; +//import com.couplet.analyze.msg.domain.CoupletMsgData; +//import com.couplet.analyze.msg.mapper.IncidentMapper; +//import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; +//import com.couplet.common.domain.request.RealTimeDataRequest; +//import com.rabbitmq.client.Channel; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.StringRedisTemplate; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.concurrent.TimeUnit; +// +///** +// * @Author: LiJiaYao +// * @Date: 2024/4/4 +// * @Description: +// */ +//@Log4j2 +//@Component +//@RabbitListener(queues = "finByVinQueueName") +//public class MsgConsumer { +// @Autowired +// private StringRedisTemplate redisTemplate; +// @Autowired +// private IncidentMapper incidentMapper; +// +// @RabbitHandler +// public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException { +// +// log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest); +// +// String messageId = message.getMessageProperties().getMessageId(); +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// if (!redisTemplate.hasKey("消息不丢失:" + messageId)) { +// redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES); +// } +// +// Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId); +// redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES); +// try { +// if (0 < add) { +// JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest)); +// Long userId = jsonObject.getLong("userId"); +// String vin = jsonObject.getString("vin"); +// RealTimeDataRequest request = new RealTimeDataRequest(); +// request.setVin(vin); +// request.setUserId(userId); +// RealTimeJudge.addRealTime(request); +// //判断车辆是否有实时数据,如果没有则删除数据 +// if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){ +// log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin()); +// } +// CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin()); +// if (incident == null){ +// log.error("没有数据......"); +// } +// redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident)); +// +// channel.basicAck(deliveryTag, false); +// } else { +// log.error("消息不能重复消费:[{}]", realTimeDataRequest); +// channel.basicReject(deliveryTag, false); +// } +// } catch (IOException e) { +// +// log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest); +// String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId); +// +// Long o = Long.valueOf(s); +// if (deliveryTag == o + 2) { +// log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest); +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest); +// channel.basicNack(deliveryTag, true, false); +// } +// +// } +// +// +// } +// +//} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/VehicleConsumer.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/VehicleConsumer.java index bf8b0e6..e92dbb4 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/VehicleConsumer.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/consumer/VehicleConsumer.java @@ -1,68 +1,68 @@ -package com.couplet.analyze.msg.consumer; - -import com.couplet.common.core.text.Convert; -import com.couplet.common.redis.service.RedisService; -import com.rabbitmq.client.Channel; -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.BoundSetOperations; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/4 - * @Description: - */ -@Log4j2 -@Component -@RabbitListener(queues = "vehicleQueue") -public class VehicleConsumer { - @Autowired - private RedisService redisService; - @RabbitHandler - public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException { - log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo); - String messageId = message.getMessageProperties().getMessageId(); - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - if (!redisService.hasKey("车辆消息不丢失:" + messageId)) { - redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag); - } - HashSet objects = new HashSet<>(); - objects.add(messageId); - BoundSetOperations set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects); - redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES); - try { - if (set != null) { -// String key = Convert.toStr(id); - - String key = "id"; - redisService.setCacheObject(key, vehicleAndLogo); - redisService.expire(key, 10, TimeUnit.MINUTES); - //判断车辆是否有实时数据,如果没有则删除数据 - channel.basicAck(deliveryTag, false); - } else { - log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo); - channel.basicReject(deliveryTag, false); - } - } catch (IOException e) { - log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo); - String s = redisService.getCacheObject("车辆消息不丢失:" + messageId); - Long o = Long.valueOf(s); - if (deliveryTag == o + 2) { - log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo); - channel.basicNack(deliveryTag, false, false); - } else { - log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo); - channel.basicNack(deliveryTag, true, false); - } - } - } -} +//package com.couplet.analyze.msg.consumer; +// +//import com.couplet.common.core.text.Convert; +//import com.couplet.common.redis.service.RedisService; +//import com.rabbitmq.client.Channel; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.BoundSetOperations; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.HashSet; +//import java.util.List; +//import java.util.concurrent.TimeUnit; +// +///** +// * @Author: LiJiaYao +// * @Date: 2024/4/4 +// * @Description: +// */ +//@Log4j2 +//@Component +//@RabbitListener(queues = "vehicleQueue") +//public class VehicleConsumer { +// @Autowired +// private RedisService redisService; +// @RabbitHandler +// public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException { +// log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo); +// String messageId = message.getMessageProperties().getMessageId(); +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// if (!redisService.hasKey("车辆消息不丢失:" + messageId)) { +// redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag); +// } +// HashSet objects = new HashSet<>(); +// objects.add(messageId); +// BoundSetOperations set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects); +// redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES); +// try { +// if (set != null) { +//// String key = Convert.toStr(id); +// +// String key = "id"; +// redisService.setCacheObject(key, vehicleAndLogo); +// redisService.expire(key, 10, TimeUnit.MINUTES); +// //判断车辆是否有实时数据,如果没有则删除数据 +// channel.basicAck(deliveryTag, false); +// } else { +// log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo); +// channel.basicReject(deliveryTag, false); +// } +// } catch (IOException e) { +// log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo); +// String s = redisService.getCacheObject("车辆消息不丢失:" + messageId); +// Long o = Long.valueOf(s); +// if (deliveryTag == o + 2) { +// log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo); +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo); +// channel.basicNack(deliveryTag, true, false); +// } +// } +// } +//} diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java index b11c941..5563610 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java @@ -74,9 +74,7 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl //获取过期的key String key = "breakdown"; log.debug("失效+key is:"+ key); - HashSet objects = new HashSet<>(); - objects.add(coupletMsgData); - redisService.setCacheSet(key, objects); + redisService.setCacheSet(key, coupletMsgData); long expireTime = 30; redisService.expire(key, expireTime, TimeUnit.MINUTES); scheduledRedis(); @@ -98,112 +96,108 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl } public void scheduledRedis() { - // Get all members of the set - Set members = redisService.getCacheSet("breakdown"); - if (members.size()!=0){ - for (String member : members){ - CoupletMsgData code = JSON.parseObject(member, CoupletMsgData.class); - String vin = code.getVin(); - Set breakdownIds = redisService.getCacheSet(vin+":"+"breakdown"); + String key = "breakdown"; + Set members = redisService.getCacheSet(key); + if (members.size()>0){ + for (CoupletMsgData member : members) { + Set breakdownIds = redisService.getCacheSet(member.getVin()+":"+key); if (breakdownIds.size()==0){ CoupletTroubleCode troubleCode = new CoupletTroubleCode(); troubleCode.setTroubleStartTime(new Date()); - troubleCode.setTroubleVin(code.getVin()); + troubleCode.setTroubleVin(member.getVin()); // 随机生成故障码 String faultCode = MsgUtils.generateGTA(); troubleCode.setTroubleCode(faultCode); // 检查车辆状态,若为0,则设置故障位置为"190" - if(code.getVehicleStatus() == 0) { + if(member.getVehicleStatus() == 0) { troubleCode.setTroublePosition("190"); } // 检查充电状态,若为0,则设置故障位置为"191" - if (code.getChargingStatus() == 0) { + if (member.getChargingStatus() == 0) { troubleCode.setTroublePosition("191"); } // 检查运行状态,若为0,则设置故障位置为"192" - if (code.getOperatingStatus() == 0) { + if (member.getOperatingStatus() == 0) { troubleCode.setTroublePosition("192"); } // 检查电池荷电状态(SOC), 若为0,则设置故障位置为"193" - if (code.getSocStatus() == 0) { + if (member.getSocStatus() == 0) { troubleCode.setTroublePosition("193"); } // 检查充电能源存储状态,若为0,则设置故障位置为"194" - if (code.getChargingEnergyStorageStatus() == 0) { + if (member.getChargingEnergyStorageStatus() == 0) { troubleCode.setTroublePosition("194"); } // 检查驱动电机状态,若为0,则设置故障位置为"195" - if (code.getDriveMotorStatus() == 0) { + if (member.getDriveMotorStatus() == 0) { troubleCode.setTroublePosition("195"); } // 检查定位状态,若为0,则设置故障位置为"196" - if (code.getPositionStatus() == 0) { + if (member.getPositionStatus() == 0) { troubleCode.setTroublePosition("196"); } // 检查电子驻车系统(EAS)状态,若为0,则设置故障位置为"197" - if (code.getEasStatus() == 0) { + if (member.getEasStatus() == 0) { troubleCode.setTroublePosition("197"); } // 检查PTC(正温度系数热敏电阻)状态,若为0,则设置故障位置为"198" - if (code.getPtcStatus() == 0) { + if (member.getPtcStatus() == 0) { troubleCode.setTroublePosition("198"); } // 检查电动助力转向系统(EPS)状态,若为0,则设置故障位置为"199" - if (code.getEpsStatus() == 0) { + if (member.getEpsStatus() == 0) { troubleCode.setTroublePosition("199"); } // 检查防抱死制动系统(ABS)状态,若为0,则设置故障位置为"200" - if (code.getAbsStatus() == 0) { + if (member.getAbsStatus() == 0) { troubleCode.setTroublePosition("200"); } // 检查主控制器(MCU)状态,若为0,则设置故障位置为"201" - if (code.getMcuStatus() == 0) { + if (member.getMcuStatus() == 0) { troubleCode.setTroublePosition("201"); } // 检查加热状态,若为0,则设置故障位置为"202" - if (code.getHeatingStatus() == 0) { + if (member.getHeatingStatus() == 0) { troubleCode.setTroublePosition("202"); } // 检查电池状态,若为0,则设置故障位置为"203" - if (code.getBatteryStatus() == 0) { + if (member.getBatteryStatus() == 0) { troubleCode.setTroublePosition("203"); } // 检查电池绝缘状态,若为0,则设置故障位置为"204" - if (code.getBatteryInsulationStatus() == 0) { + if (member.getBatteryInsulationStatus() == 0) { troubleCode.setTroublePosition("204"); } // 检查直流-直流转换器(DC/DC)状态,若为0,则设置故障位置为"205" - if (code.getDcdcStatus() == 0) { + if (member.getDcdcStatus() == 0) { troubleCode.setTroublePosition("205"); } // 检查充电机(CHG)状态,若为0,则设置故障位置为"206" - if (code.getChgStatus() == 0) { + if (member.getChgStatus() == 0) { troubleCode.setTroublePosition("206"); } remoteTroubleService.newFaultData(troubleCode); - HashSet objects = new HashSet<>(); - objects.add(code.getVin()+":"+code); - redisService.setCacheSet(vin+":"+"breakdown", objects); + redisService.setCacheSet(member.getVin()+":"+key, member.getVin()+":"+member); long expireTime = 30; - redisService.expire(vin+":"+"breakdown", expireTime, TimeUnit.MINUTES); + redisService.expire(member.getVin()+":"+key, expireTime, TimeUnit.MINUTES); } } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java index 4690279..fd228be 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/ElectronicFenceServiceImpl.java @@ -64,10 +64,12 @@ public class ElectronicFenceServiceImpl implements IncidentService { String[] strings = s.split(","); if (strings.length == 2){ + // 经度 Double trim = Double.valueOf(strings[0].trim()); + // 纬度 Double trim1 = Double.valueOf(strings[1].trim()); - boolean a = trim<= Double.valueOf(coupletMsgData.getLongitude()); - boolean b = trim1 < Double.valueOf(coupletMsgData.getLatitude()); + boolean a = trim <= Double.valueOf(coupletMsgData.getLongitude()); + boolean b = trim1 <= Double.valueOf(coupletMsgData.getLatitude()); if (a && b){ log.info("电子围栏报警啦!!!!您的车驶出范围啦!!!"); }else { @@ -78,10 +80,7 @@ public class ElectronicFenceServiceImpl implements IncidentService { }else { throw new RuntimeException("电子围栏经纬度格式错误"+strings); } - - } - } // log.info("更改的电子围栏内容是:"+fence); log.info("电子围栏事件结束......."); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java index 67d7dcf..ddfdd89 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/breakdown/BreakdownEvent.java @@ -1,5 +1,6 @@ package com.couplet.analyze.msg.service.impl.breakdown; +import com.couplet.common.redis.service.RedisService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @@ -12,7 +13,7 @@ import org.springframework.stereotype.Component; @Component public class BreakdownEvent { @Autowired - private StringRedisTemplate redisTemplate; + private RedisService redisService; diff --git a/couplet-modules/couplet-analyze/pom.xml b/couplet-modules/couplet-analyze/pom.xml index 51c0ee2..a616071 100644 --- a/couplet-modules/couplet-analyze/pom.xml +++ b/couplet-modules/couplet-analyze/pom.xml @@ -21,73 +21,4 @@ UTF-8 - - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-discovery - - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-config - - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-sentinel - - - - - org.springframework.boot - spring-boot-starter-actuator - - - - - io.springfox - springfox-swagger-ui - ${swagger.fox.version} - - - - - com.mysql - mysql-connector-j - - - - - com.couplet - couplet-common-datasource - - - - - com.couplet - couplet-common-datascope - - - - - com.couplet - couplet-common-log - - - - - com.couplet - couplet-common-swagger - - - - org.springframework.kafka - spring-kafka - - - diff --git a/couplet-modules/couplet-business/pom.xml b/couplet-modules/couplet-business/pom.xml index 6f04112..2a6efe6 100644 --- a/couplet-modules/couplet-business/pom.xml +++ b/couplet-modules/couplet-business/pom.xml @@ -87,11 +87,6 @@ spring-boot-starter-amqp - - com.couplet - couplet-analyze-msg - - org.springframework.boot diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/FenceController.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/FenceController.java index 5f38304..bf29810 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/FenceController.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/FenceController.java @@ -51,11 +51,11 @@ public class FenceController extends BaseController { @PostMapping("/fenceAdd") @RequiresPermissions("couplet:fence:fenceAdd") @Log(title = "电子围栏新增",businessType = BusinessType.INSERT) - public Result fenceInsert(HttpServletRequest request, @RequestBody FenceRequest fenceRequest){ + public Result fenceInsert(@RequestBody FenceRequest fenceRequest){ // if (!fenceService.checkFenceKeyUnique(fenceRequest.getFenceName())) { // return error("新增参数'" + fenceRequest.getFenceName() + "'失败,参数键名已存在"); // } - fenceService.fenceInsert(request,fenceRequest); + fenceService.fenceInsert(fenceRequest); return Result.success("新增成功"); } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/FenceService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/FenceService.java index 8a05254..b43dfa1 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/FenceService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/FenceService.java @@ -6,7 +6,6 @@ import com.couplet.common.domain.request.FenceConfig; import com.couplet.common.domain.request.FenceRequest; import com.couplet.common.domain.request.FenceUpdateRequest; -import javax.servlet.http.HttpServletRequest; import java.util.List; /** @@ -27,7 +26,7 @@ public interface FenceService extends IService { * * @param fenceRequest */ - void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest); + void fenceInsert(FenceRequest fenceRequest); /** * 删除电子围栏 diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java index c7d3faf..94c1ab1 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java @@ -9,13 +9,12 @@ import com.couplet.common.domain.Fence; import com.couplet.common.domain.request.FenceConfig; import com.couplet.common.domain.request.FenceRequest; import com.couplet.common.domain.request.FenceUpdateRequest; +import com.couplet.common.redis.service.RedisService; import com.couplet.common.security.utils.SecurityUtils; -import com.couplet.mq.remote.RemoteFenceService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; -import javax.servlet.http.HttpServletRequest; import java.util.List; import java.util.concurrent.TimeUnit; @@ -41,12 +40,12 @@ public class FenceServiceImpl extends ServiceImpl implements * 注入redis模板 */ @Autowired - private StringRedisTemplate redisTemplate; + private RedisService redisService; /** * 远程调用队列服务 */ - @Autowired - private RemoteFenceService remoteFenceService; +// @Autowired +// private RemoteFenceService remoteFenceService; @Override public List pageQuery(FenceConfig fenceConfig) { @@ -55,6 +54,7 @@ public class FenceServiceImpl extends ServiceImpl implements } @Override + @Transactional(rollbackFor = Exception.class) public void changeFenceStatus(FenceUpdateRequest fenceUpdateRequest) { String username = SecurityUtils.getUsername(); @@ -63,45 +63,31 @@ public class FenceServiceImpl extends ServiceImpl implements /** * 电子围栏发送改变 */ - redisTemplate.opsForValue().set("changeFenceStatus", JSON.toJSONString(fenceUpdateRequest), 10, TimeUnit.MINUTES); +// redisService.setCacheObject("fence:info"+fenceUpdateRequest.getFenceId(),fenceUpdateRequest, 10, TimeUnit.MINUTES); - remoteFenceService.fenceQueue(fenceUpdateRequest); + redisService.setCacheObject("fence:info:"+fenceUpdateRequest.getFenceId(),fenceUpdateRequest); + redisService.expire("fence:info:"+fenceUpdateRequest.getFenceId(),10,TimeUnit.MINUTES); +// remoteFenceService.fenceQueue(fenceUpdateRequest); } /** * 业务实现:添加围栏 * - * @param request * @param fenceRequest */ + @Transactional(rollbackFor = Exception.class) @Override - public void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest) { + public void fenceInsert(FenceRequest fenceRequest) { String username = SecurityUtils.getUsername(); fenceRequest.setCrateName(username); //先添加围栏 fenceMapper.insertFence(fenceRequest); - String[] logoIds = fenceRequest.getLogoIds(); - String[] parts = new String[0]; - for (String logoId : logoIds) { - //把前台传入的字符串分割成数组 - parts = logoId.split(","); - //再添加围栏和标识中间表 - fenAndLogoService.addBach(fenceRequest.getFenceId(), parts); - } - /** - * 电子围栏发送改变 - */ - redisTemplate.opsForValue().set("fenceInsert", JSON.toJSONString(fenceRequest), 10, TimeUnit.MINUTES); + fenAndLogoService.addBach(fenceRequest.getFenceId(), fenceRequest.getLogoIds()); } @Override public void removeByFenceId(Long fenceId) { fenceMapper.removeByFenceId(fenceId); - /** - * 电子围栏发送改变 - */ - redisTemplate.opsForValue().set("removeByFenceId", JSON.toJSONString(fenceId), 10, TimeUnit.MINUTES); - } @Override diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java index 06fcf6a..5dd54a5 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java @@ -16,9 +16,10 @@ import com.couplet.common.domain.VehicleType; import com.couplet.common.domain.request.VehicleEditParams; import com.couplet.common.domain.request.VehicleInsertParams; import com.couplet.common.domain.request.VehicleListParams; -import com.couplet.mq.remote.RemoteFenceService; +import com.couplet.common.redis.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.List; @@ -36,10 +37,12 @@ public class VehicleServiceImpl extends ServiceImpl impl //车辆mapper @Autowired private VehicleMapper vehicleMapper; + @Autowired + private RedisService redis; //远程发送mq - @Autowired - private RemoteFenceService remoteFenceService; +// @Autowired +// private RemoteFenceService remoteFenceService; //车辆类型服务 @Autowired @@ -133,7 +136,7 @@ public class VehicleServiceImpl extends ServiceImpl impl String result = ""; if ((editParams.getLogoIds() == null || editParams.getLogoIds().isEmpty())) { - result = "未选择电子围栏"; + result = "未选择标识"; Result.error(result); } @@ -172,16 +175,16 @@ public class VehicleServiceImpl extends ServiceImpl impl vehicleAndLogoService.vehicleBindLogo(editParams.getVehicleId(), editParams.getLogoIds()); - //mq - List logoList = getBindLogoById(editParams.getVehicleId()); - if (0 != logoList.size()) { - String ids = ""; - for (Long l : logoList) { - ids = "," + l; - } - ids = ids.substring(1); - remoteFenceService.vehicleQueue(editParams.getVehicleId() + "-" + ids); - } +// //mq +// List logoList = getBindLogoById(editParams.getVehicleId()); +// if (0 != logoList.size()) { +// String ids = ""; +// for (Long l : logoList) { +// ids = "," + l; +// } +// ids = ids.substring(1); +// remoteFenceService.vehicleQueue(editParams.getVehicleId() + "-" + ids); +// } result = "编辑成功!"; @@ -201,7 +204,7 @@ public class VehicleServiceImpl extends ServiceImpl impl String result = ""; if ((insertParams.getLogoIds() == null || insertParams.getLogoIds().isEmpty())) { - result = "未选择电子围栏"; + result = "未选择标识"; Result.error(result); } @@ -251,16 +254,16 @@ public class VehicleServiceImpl extends ServiceImpl impl //执行添加电子围栏 int i = vehicleAndLogoService.vehicleBindLogo(vehicle.getVehicleId(), insertParams.getLogoIds()); - - List logoList = getBindLogoById(vehicle.getVehicleId()); - if (0 != logoList.size()) { - String ids = ""; - for (Long l : logoList) { - ids = "," + l; - } - ids = ids.substring(1); - remoteFenceService.vehicleQueue(vehicle.getVehicleId() + "-" + ids); - } +// +// List logoList = getBindLogoById(vehicle.getVehicleId()); +// if (0 != logoList.size()) { +// String ids = ""; +// for (Long l : logoList) { +// ids = "," + l; +// } +// ids = ids.substring(1); +// remoteFenceService.vehicleQueue(vehicle.getVehicleId() + "-" + ids); +// } result = "新增成功!"; @@ -334,4 +337,46 @@ public class VehicleServiceImpl extends ServiceImpl impl } + + @Scheduled(cron = "0/1 * * * * *") + public void aa() { + System.out.println("********************************************************"); + } + + //判断车辆是否下线 + @Scheduled(cron = "0/1 * * * * *") + public void downLine() { + log.info("定时器启动"); + //先查询车辆列表 + List list = this.list(new VehicleListParams(null, null, null, null)); + + + list.forEach(vehicle -> { + try { + //只针对已经上线的车辆 + if (redis.hasKey(vehicle.getVin())) { + + //如果vin的缓存 时间还剩一秒,则判断为已经下线 + if (redis.getExpire(vehicle.getVin()) <= 3) { + log.info(vehicle.getVin() + "的车辆已经下线"); + + //执行修改下线状态的方法 +// Integer i = this.onOrOutLineByVIN(vehicle.getVin() + "," + 0); + Integer i = this.onOrOutLineByVIN(vehicle.getVin() , 0); + + if (0 == i) { + log.error("下线状态修改失败"); + } else { + log.info("下线状态修改成功"); + } + + } + } + } catch (Exception e) { + log.error(e.getMessage()); + } + }); + + } + } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java deleted file mode 100644 index c2c3925..0000000 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.couplet.business.server.time; - -import com.couplet.business.server.service.VehicleService; -import com.couplet.common.domain.Vehicle; -import com.couplet.common.domain.request.VehicleListParams; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: 车辆定时器 - */ -@Component -@Slf4j -public class Timer { - //redis - @Autowired - private StringRedisTemplate redis; - //查询车辆列表 - @Autowired - private VehicleService vehicleService; - - - @Scheduled(cron = "0/1 * * * * *") - public void aa() { - System.out.println("********************************************************"); - } - - //判断车辆是否下线 - @Scheduled(cron = "0/1 * * * * *") - public void downLine() { - log.info("定时器启动"); - - //先查询车辆列表 - List list = vehicleService.list(new VehicleListParams(null, null, null, null)); - - - list.forEach(vehicle -> { - try { - //只针对已经上线的车辆 - if (redis.hasKey(vehicle.getVin())) { - - //如果vin的缓存 时间还剩一秒,则判断为已经下线 - if (redis.getExpire(vehicle.getVin()) <= 3) { - log.info(vehicle.getVin() + "的车辆已经下线"); - - //执行修改下线状态的方法 - Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0); - - if (0 == i) { - log.error("下线状态修改失败"); - } else { - log.info("下线状态修改成功"); - } - - } - } - } catch (Exception e) { - log.error(e.getMessage()); - } - }); - - } - -} diff --git a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml index b88e464..d900876 100644 --- a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml @@ -16,11 +16,9 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 - namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 - namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.java index 32720f4..626af8d 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.java @@ -1,164 +1,165 @@ -package com.couplet.mq.service; - -import com.couplet.mq.domain.User; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/3/28 - * @Description: MQ消费者类 - */ - -@Component -@Slf4j -@SuppressWarnings("all") -@RabbitListener(queues = "queueName") -public class MqConsumer { - @Autowired - private StringRedisTemplate redis; - - /* 线程池执行 - - //创建一个定长线程池 - private final Executor executor = Executors.newFixedThreadPool(5); - - @Async - @RabbitHandler - public void process(User param, Channel channel, Message message) { - executor.execute(() -> { - try { - handleMessage(param, channel, message); - } catch (IOException e) { - log.error("处理消息失败:{}", e); - } - }); - } - - //处理信息的方法 - private void handleMessage(User param, Channel channel, Message message) throws IOException { - log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); - - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - String messageId = message.getMessageProperties().getMessageId(); - - if (!redis.hasKey("value:" + messageId)) { - redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); - } - - // 1 添加成功新数据 0已有重复值,不允许再添加 - Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); - //过期时间 - redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); - - - try { - if (add == 1) { - //第一次 消费 - System.out.println("*****************************"); - System.out.println("消费者收到消息:" + param); - System.out.println("*****************************"); - log.info("消费结束"); - - channel.basicAck(deliveryTag, false); - - } else { - //重复消费 - log.error("重复消费"); - channel.basicReject(deliveryTag, false); - - //删除缓存 - redis.opsForSet().remove("set:" + messageId, "set:" + messageId); - } - - - } catch (Exception e) { - log.error("消息没有成功消费!"); - - String s = redis.opsForValue().get("value:" + messageId); - - long oldTag = Long.parseLong(s); - - if (deliveryTag == (oldTag + 2)) { - log.error("确实消费不了,不入队了!"); - channel.basicNack(deliveryTag, false, false); - } else { - log.info("消息消费失败,重新入队"); - channel.basicNack(deliveryTag, false, true); - } - } - - } - -**/ - - @RabbitHandler - public void process(User param, Channel channel, Message message) throws IOException { - log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); - - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - String messageId = message.getMessageProperties().getMessageId(); - - if (!redis.hasKey("value:" + messageId)) { - redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); - } - - // 1 添加成功新数据 0已有重复值,不允许再添加 - Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); - //过期时间 - redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); - - - try { - if (add == 1) { - //第一次 消费 - System.out.println("*****************************"); - System.out.println("消费者收到消息:" + param); - System.out.println("*****************************"); - log.info("消费结束"); - - //确认消费 - channel.basicAck(deliveryTag, false); - - } else { - //重复消费 - log.error("重复消费"); - //拒绝消费 - channel.basicReject(deliveryTag, false); - - //删除缓存 - redis.opsForSet().remove("set:" + messageId, "set:" + messageId); - } - - - } catch (Exception e) { - log.error("消息没有成功消费!"); - - String s = redis.opsForValue().get("value:" + messageId); - - long oldTag = Long.parseLong(s); - - if (deliveryTag == (oldTag + 2)) { - log.error("确实消费不了,不入队了!"); - - - //拒绝消费 - channel.basicNack(deliveryTag, false, false); - } else { - log.info("消息消费失败,重新入队"); - //重新入队 - channel.basicNack(deliveryTag, false, true); - } - } - } -} +//package com.couplet.mq.service; +// +//import com.couplet.common.redis.service.RedisService; +//import com.couplet.mq.domain.User; +//import com.rabbitmq.client.Channel; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.StringRedisTemplate; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.concurrent.TimeUnit; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/3/28 +// * @Description: MQ消费者类 +// */ +// +//@Component +//@Slf4j +//@SuppressWarnings("all") +//@RabbitListener(queues = "queueName") +//public class MqConsumer { +// @Autowired +// private RedisService redis; +// +// /* 线程池执行 +// +// //创建一个定长线程池 +// private final Executor executor = Executors.newFixedThreadPool(5); +// +// @Async +// @RabbitHandler +// public void process(User param, Channel channel, Message message) { +// executor.execute(() -> { +// try { +// handleMessage(param, channel, message); +// } catch (IOException e) { +// log.error("处理消息失败:{}", e); +// } +// }); +// } +// +// //处理信息的方法 +// private void handleMessage(User param, Channel channel, Message message) throws IOException { +// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); +// +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// String messageId = message.getMessageProperties().getMessageId(); +// +// if (!redis.hasKey("value:" + messageId)) { +// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); +// } +// +// // 1 添加成功新数据 0已有重复值,不允许再添加 +// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); +// //过期时间 +// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); +// +// +// try { +// if (add == 1) { +// //第一次 消费 +// System.out.println("*****************************"); +// System.out.println("消费者收到消息:" + param); +// System.out.println("*****************************"); +// log.info("消费结束"); +// +// channel.basicAck(deliveryTag, false); +// +// } else { +// //重复消费 +// log.error("重复消费"); +// channel.basicReject(deliveryTag, false); +// +// //删除缓存 +// redis.opsForSet().remove("set:" + messageId, "set:" + messageId); +// } +// +// +// } catch (Exception e) { +// log.error("消息没有成功消费!"); +// +// String s = redis.opsForValue().get("value:" + messageId); +// +// long oldTag = Long.parseLong(s); +// +// if (deliveryTag == (oldTag + 2)) { +// log.error("确实消费不了,不入队了!"); +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.info("消息消费失败,重新入队"); +// channel.basicNack(deliveryTag, false, true); +// } +// } +// +// } +// +//**/ +// +// @RabbitHandler +// public void process(User param, Channel channel, Message message) throws IOException { +// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); +// +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// String messageId = message.getMessageProperties().getMessageId(); +// +// if (!redis.hasKey("value:" + messageId)) { +// redis.setCacheObject("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); +// } +// +// // 1 添加成功新数据 0已有重复值,不允许再添加 +// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); +// //过期时间 +// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); +// +// +// try { +// if (add == 1) { +// //第一次 消费 +// System.out.println("*****************************"); +// System.out.println("消费者收到消息:" + param); +// System.out.println("*****************************"); +// log.info("消费结束"); +// +// //确认消费 +// channel.basicAck(deliveryTag, false); +// +// } else { +// //重复消费 +// log.error("重复消费"); +// //拒绝消费 +// channel.basicReject(deliveryTag, false); +// +// //删除缓存 +// redis.opsForSet().remove("set:" + messageId, "set:" + messageId); +// } +// +// +// } catch (Exception e) { +// log.error("消息没有成功消费!"); +// +// String s = redis.opsForValue().get("value:" + messageId); +// +// long oldTag = Long.parseLong(s); +// +// if (deliveryTag == (oldTag + 2)) { +// log.error("确实消费不了,不入队了!"); +// +// +// //拒绝消费 +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.info("消息消费失败,重新入队"); +// //重新入队 +// channel.basicNack(deliveryTag, false, true); +// } +// } +// } +//} diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index d5c0dc2..16394e6 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -38,7 +38,7 @@ mqtt: # broker: mqtt://115.159.47.13:1883 username: password: - clientId: Mqfghh + clientId: fluxMq qos: 0 - topic: dxd + topic: test diff --git a/couplet-modules/couplet-system/src/main/resources/bootstrap.yml b/couplet-modules/couplet-system/src/main/resources/bootstrap.yml index d9315bd..91453d0 100644 --- a/couplet-modules/couplet-system/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-system/src/main/resources/bootstrap.yml @@ -15,9 +15,11 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 + namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置 diff --git a/pom.xml b/pom.xml index 1829b65..139c8c9 100644 --- a/pom.xml +++ b/pom.xml @@ -211,63 +211,18 @@ couplet-modules-system ${couplet.version} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - com.couplet couplet-modules-mq ${couplet.version} - - com.couplet couplet-common-business ${couplet.version} - com.couplet @@ -281,6 +236,12 @@ couplet-analyze-msg ${couplet.version} + + + com.couplet + couplet-common-event + ${couplet.version} +