Merge remote-tracking branch 'origin/server_five' into server_five_dongxiaodong
# Conflicts: # couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.ymlserver_five_liuyunhu
commit
f104a50c27
|
@ -17,9 +17,11 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -4,7 +4,9 @@ import com.couplet.common.core.constant.ServiceNameConstants;
|
|||
import com.couplet.common.core.domain.Result;
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.domain.VehicleMiddle;
|
||||
import com.couplet.common.domain.request.VehicleListParams;
|
||||
import com.couplet.remote.factory.RemoteVehicleFallbackFactory;
|
||||
import lombok.extern.java.Log;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
|
@ -62,4 +64,7 @@ public interface RemoteVehicleService {
|
|||
@PostMapping("/vehicleAndLogo/queryByLogoIds/{vehicleId}")
|
||||
public Result<List<Long>> queryByLogoIds(@PathVariable("vehicleId") Long vehicleId);
|
||||
|
||||
@PostMapping("/list")
|
||||
public Result list(@RequestBody VehicleListParams listParams);
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.couplet.remote.factory;
|
|||
import com.couplet.common.core.domain.Result;
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.domain.VehicleMiddle;
|
||||
import com.couplet.common.domain.request.VehicleListParams;
|
||||
import com.couplet.remote.RemoteVehicleService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -61,6 +62,11 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory<RemoteVehic
|
|||
public Result<List<Long>> queryByLogoIds(Long vehicleId) {
|
||||
return Result.error("车辆服务调用失败:"+cause.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result list(VehicleListParams listParams) {
|
||||
return Result.error("车辆服务调用失败:"+cause.getMessage());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea/modules.xml
|
||||
.idea/jarRepositories.xml
|
||||
.idea/compiler.xml
|
||||
.idea/libraries/
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### Eclipse ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>couplet-common-event</artifactId>
|
||||
|
||||
<description>事件系统</description>
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-redis</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,27 @@
|
|||
package com.couplet.analyze.common.contents;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/7
|
||||
* @Description: 事件内容
|
||||
*/
|
||||
public class AnalyzeEventContents {
|
||||
|
||||
/**
|
||||
* 故障
|
||||
*/
|
||||
String BREAKDOWN = "breakdown";
|
||||
/**
|
||||
* 电子围栏
|
||||
*/
|
||||
String ELECTRONIC_FENCE = "electronic-fence";
|
||||
/**
|
||||
* 实时数据
|
||||
*/
|
||||
String REAL_TIME_DATA = "real-time-data";
|
||||
|
||||
/**
|
||||
* 存储
|
||||
*/
|
||||
String STORED_EVENT = "stored-event";
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package com.couplet.analyze.common.event;
|
||||
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/7
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
public class AnalyzeEventCache {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
public String encode(String vin){
|
||||
return "event:arr"+vin;
|
||||
}
|
||||
/**
|
||||
* 添加事件
|
||||
*/
|
||||
public void addEvent(String vin, String eventValue) {
|
||||
redisService.setCacheSet(encode(vin), eventValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改事件
|
||||
*/
|
||||
public void updateEvent(String vin,String eventValue){
|
||||
redisService.setCacheSet(encode(vin), eventValue);
|
||||
}
|
||||
/**
|
||||
* 删除事件
|
||||
*/
|
||||
public void removeEvent(String vin,String eventName){
|
||||
redisService.deleteSet(encode(vin), eventName);
|
||||
}
|
||||
/**
|
||||
* 获取事件集合
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Set<Object> getEventList(String vin){
|
||||
|
||||
return redisService.getCacheSet(encode(vin));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
com.couplet.analyze.common.event.AnalyzeEventCache
|
|
@ -1,6 +1,7 @@
|
|||
package com.couplet.common.redis.service;
|
||||
|
||||
import com.couplet.common.domain.CoupletVehicleData;
|
||||
import org.apache.poi.ss.formula.functions.T;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
@ -179,8 +180,31 @@ public class RedisService {
|
|||
setOperation.add(it.next());
|
||||
}
|
||||
return setOperation;
|
||||
} /**
|
||||
* 缓存Set
|
||||
*
|
||||
* @param key 缓存键值
|
||||
* @param dataSet 缓存的数据
|
||||
*
|
||||
* @return 缓存数据的对象
|
||||
*/
|
||||
public <T> BoundSetOperations<String, T> setCacheSet (final String key, final T dataSet) {
|
||||
BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
|
||||
setOperation.add(dataSet);
|
||||
return setOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除set
|
||||
* @param key
|
||||
* @param setValue
|
||||
*/
|
||||
public <T> void deleteSet(String key, String setValue) {
|
||||
|
||||
//缓存的键值
|
||||
BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
|
||||
setOperation.remove(setValue); //缓存的数据
|
||||
}
|
||||
/**
|
||||
* 获得缓存的set
|
||||
*
|
||||
|
@ -192,6 +216,7 @@ public class RedisService {
|
|||
return redisTemplate.opsForSet().members(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 缓存Map
|
||||
*
|
||||
|
@ -273,4 +298,6 @@ public class RedisService {
|
|||
public Collection<String> keys (final String pattern) {
|
||||
return redisTemplate.keys(pattern);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
<module>couplet-common-datasource</module>
|
||||
<module>couplet-common-system</module>
|
||||
<module>couplet-common-business</module>
|
||||
<module>couplet-common-event</module>
|
||||
|
||||
</modules>
|
||||
|
||||
|
|
|
@ -15,9 +15,11 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
<artifactId>couplet-analyze-msg</artifactId>
|
||||
|
||||
<description>解析系统</description>
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
|
@ -86,13 +87,14 @@
|
|||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>1.2.5</version>
|
||||
</dependency>
|
||||
<!-- 事件核心配置 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-modules-mq</artifactId>
|
||||
<artifactId>couplet-common-event</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-business</artifactId>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -1,84 +1,84 @@
|
|||
package com.couplet.analyze.msg.consumer;
|
||||
|
||||
import com.couplet.common.core.text.Convert;
|
||||
import com.couplet.common.domain.request.FenceUpdateRequest;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.BoundSetOperations;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/4
|
||||
* @Description:
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RabbitListener(queues = "fenceQueue")
|
||||
public class FenceConsumer {
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@RabbitHandler
|
||||
public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException {
|
||||
|
||||
log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest);
|
||||
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) {
|
||||
redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag);
|
||||
}
|
||||
// if (redisService.hasKey("fence")){
|
||||
// redisService.deleteObject("fence");
|
||||
//package com.couplet.analyze.msg.consumer;
|
||||
//
|
||||
//import com.couplet.common.core.text.Convert;
|
||||
//import com.couplet.common.domain.request.FenceUpdateRequest;
|
||||
//import com.couplet.common.redis.service.RedisService;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.log4j.Log4j2;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.BoundSetOperations;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.HashMap;
|
||||
//import java.util.HashSet;
|
||||
//import java.util.Set;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @Author: LiJiaYao
|
||||
// * @Date: 2024/4/4
|
||||
// * @Description:
|
||||
// */
|
||||
//@Log4j2
|
||||
//@Component
|
||||
//@RabbitListener(queues = "fenceQueue")
|
||||
//public class FenceConsumer {
|
||||
// @Autowired
|
||||
// private RedisService redisService;
|
||||
//
|
||||
// @RabbitHandler
|
||||
// public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException {
|
||||
//
|
||||
// log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest);
|
||||
//
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) {
|
||||
// redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag);
|
||||
// }
|
||||
|
||||
HashSet<String> objects = new HashSet<>();
|
||||
objects.add(messageId);
|
||||
|
||||
BoundSetOperations<String, String> set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects);
|
||||
redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
try {
|
||||
if (set != null) {
|
||||
HashMap<String, Object> hashMap = new HashMap<>();
|
||||
HashSet<FenceUpdateRequest> hashSet = new HashSet<>();
|
||||
hashSet.add(fenceUpdateRequest);
|
||||
hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest);
|
||||
// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES);
|
||||
// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap));
|
||||
|
||||
String key = Convert.toStr(fenceUpdateRequest.getFenceId());
|
||||
redisService.setCacheObject(key,fenceUpdateRequest);
|
||||
redisService.expire(key, 10, TimeUnit.MINUTES);
|
||||
//判断车辆是否有实时数据,如果没有则删除数据
|
||||
channel.basicAck(deliveryTag, false);
|
||||
} else {
|
||||
log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest);
|
||||
channel.basicReject(deliveryTag, false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest);
|
||||
String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId);
|
||||
|
||||
Long o = Long.valueOf(s);
|
||||
if (deliveryTag == o + 2) {
|
||||
log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest);
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest);
|
||||
channel.basicNack(deliveryTag, true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//// if (redisService.hasKey("fence")){
|
||||
//// redisService.deleteObject("fence");
|
||||
//// }
|
||||
//
|
||||
// HashSet<String> objects = new HashSet<>();
|
||||
// objects.add(messageId);
|
||||
//
|
||||
// BoundSetOperations<String, String> set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects);
|
||||
// redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
// try {
|
||||
// if (set != null) {
|
||||
// HashMap<String, Object> hashMap = new HashMap<>();
|
||||
// HashSet<FenceUpdateRequest> hashSet = new HashSet<>();
|
||||
// hashSet.add(fenceUpdateRequest);
|
||||
// hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest);
|
||||
//// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES);
|
||||
//// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap));
|
||||
//
|
||||
// String key = Convert.toStr(fenceUpdateRequest.getFenceId());
|
||||
// redisService.setCacheObject(key,fenceUpdateRequest);
|
||||
// redisService.expire(key, 10, TimeUnit.MINUTES);
|
||||
// //判断车辆是否有实时数据,如果没有则删除数据
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
// } else {
|
||||
// log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest);
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
// }
|
||||
// } catch (IOException e) {
|
||||
// log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest);
|
||||
// String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId);
|
||||
//
|
||||
// Long o = Long.valueOf(s);
|
||||
// if (deliveryTag == o + 2) {
|
||||
// log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest);
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest);
|
||||
// channel.basicNack(deliveryTag, true, false);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
|
|
@ -1,91 +1,91 @@
|
|||
package com.couplet.analyze.msg.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||
import com.couplet.analyze.msg.mapper.IncidentMapper;
|
||||
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
|
||||
import com.couplet.common.domain.request.RealTimeDataRequest;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/4
|
||||
* @Description:
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RabbitListener(queues = "finByVinQueueName")
|
||||
public class MsgConsumer {
|
||||
@Autowired
|
||||
private StringRedisTemplate redisTemplate;
|
||||
@Autowired
|
||||
private IncidentMapper incidentMapper;
|
||||
|
||||
@RabbitHandler
|
||||
public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException {
|
||||
|
||||
log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest);
|
||||
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
if (!redisTemplate.hasKey("消息不丢失:" + messageId)) {
|
||||
redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId);
|
||||
redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
try {
|
||||
if (0 < add) {
|
||||
JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest));
|
||||
Long userId = jsonObject.getLong("userId");
|
||||
String vin = jsonObject.getString("vin");
|
||||
RealTimeDataRequest request = new RealTimeDataRequest();
|
||||
request.setVin(vin);
|
||||
request.setUserId(userId);
|
||||
RealTimeJudge.addRealTime(request);
|
||||
//判断车辆是否有实时数据,如果没有则删除数据
|
||||
if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){
|
||||
log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin());
|
||||
}
|
||||
CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin());
|
||||
if (incident == null){
|
||||
log.error("没有数据......");
|
||||
}
|
||||
redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident));
|
||||
|
||||
channel.basicAck(deliveryTag, false);
|
||||
} else {
|
||||
log.error("消息不能重复消费:[{}]", realTimeDataRequest);
|
||||
channel.basicReject(deliveryTag, false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
||||
log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest);
|
||||
String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId);
|
||||
|
||||
Long o = Long.valueOf(s);
|
||||
if (deliveryTag == o + 2) {
|
||||
log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest);
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest);
|
||||
channel.basicNack(deliveryTag, true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
//package com.couplet.analyze.msg.consumer;
|
||||
//
|
||||
//import com.alibaba.fastjson.JSON;
|
||||
//import com.alibaba.fastjson.JSONObject;
|
||||
//import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||
//import com.couplet.analyze.msg.mapper.IncidentMapper;
|
||||
//import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
|
||||
//import com.couplet.common.domain.request.RealTimeDataRequest;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.log4j.Log4j2;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @Author: LiJiaYao
|
||||
// * @Date: 2024/4/4
|
||||
// * @Description:
|
||||
// */
|
||||
//@Log4j2
|
||||
//@Component
|
||||
//@RabbitListener(queues = "finByVinQueueName")
|
||||
//public class MsgConsumer {
|
||||
// @Autowired
|
||||
// private StringRedisTemplate redisTemplate;
|
||||
// @Autowired
|
||||
// private IncidentMapper incidentMapper;
|
||||
//
|
||||
// @RabbitHandler
|
||||
// public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException {
|
||||
//
|
||||
// log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest);
|
||||
//
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// if (!redisTemplate.hasKey("消息不丢失:" + messageId)) {
|
||||
// redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId);
|
||||
// redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
// try {
|
||||
// if (0 < add) {
|
||||
// JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest));
|
||||
// Long userId = jsonObject.getLong("userId");
|
||||
// String vin = jsonObject.getString("vin");
|
||||
// RealTimeDataRequest request = new RealTimeDataRequest();
|
||||
// request.setVin(vin);
|
||||
// request.setUserId(userId);
|
||||
// RealTimeJudge.addRealTime(request);
|
||||
// //判断车辆是否有实时数据,如果没有则删除数据
|
||||
// if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){
|
||||
// log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin());
|
||||
// }
|
||||
// CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin());
|
||||
// if (incident == null){
|
||||
// log.error("没有数据......");
|
||||
// }
|
||||
// redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident));
|
||||
//
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
// } else {
|
||||
// log.error("消息不能重复消费:[{}]", realTimeDataRequest);
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
// }
|
||||
// } catch (IOException e) {
|
||||
//
|
||||
// log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest);
|
||||
// String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId);
|
||||
//
|
||||
// Long o = Long.valueOf(s);
|
||||
// if (deliveryTag == o + 2) {
|
||||
// log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest);
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest);
|
||||
// channel.basicNack(deliveryTag, true, false);
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
//
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
|
|
@ -1,68 +1,68 @@
|
|||
package com.couplet.analyze.msg.consumer;
|
||||
|
||||
import com.couplet.common.core.text.Convert;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.BoundSetOperations;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/4
|
||||
* @Description:
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RabbitListener(queues = "vehicleQueue")
|
||||
public class VehicleConsumer {
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
@RabbitHandler
|
||||
public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException {
|
||||
log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo);
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
if (!redisService.hasKey("车辆消息不丢失:" + messageId)) {
|
||||
redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag);
|
||||
}
|
||||
HashSet<String> objects = new HashSet<>();
|
||||
objects.add(messageId);
|
||||
BoundSetOperations<String, String> set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects);
|
||||
redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
try {
|
||||
if (set != null) {
|
||||
// String key = Convert.toStr(id);
|
||||
|
||||
String key = "id";
|
||||
redisService.setCacheObject(key, vehicleAndLogo);
|
||||
redisService.expire(key, 10, TimeUnit.MINUTES);
|
||||
//判断车辆是否有实时数据,如果没有则删除数据
|
||||
channel.basicAck(deliveryTag, false);
|
||||
} else {
|
||||
log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo);
|
||||
channel.basicReject(deliveryTag, false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo);
|
||||
String s = redisService.getCacheObject("车辆消息不丢失:" + messageId);
|
||||
Long o = Long.valueOf(s);
|
||||
if (deliveryTag == o + 2) {
|
||||
log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo);
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo);
|
||||
channel.basicNack(deliveryTag, true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//package com.couplet.analyze.msg.consumer;
|
||||
//
|
||||
//import com.couplet.common.core.text.Convert;
|
||||
//import com.couplet.common.redis.service.RedisService;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.log4j.Log4j2;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.BoundSetOperations;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.HashSet;
|
||||
//import java.util.List;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @Author: LiJiaYao
|
||||
// * @Date: 2024/4/4
|
||||
// * @Description:
|
||||
// */
|
||||
//@Log4j2
|
||||
//@Component
|
||||
//@RabbitListener(queues = "vehicleQueue")
|
||||
//public class VehicleConsumer {
|
||||
// @Autowired
|
||||
// private RedisService redisService;
|
||||
// @RabbitHandler
|
||||
// public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException {
|
||||
// log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo);
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// if (!redisService.hasKey("车辆消息不丢失:" + messageId)) {
|
||||
// redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag);
|
||||
// }
|
||||
// HashSet<String> objects = new HashSet<>();
|
||||
// objects.add(messageId);
|
||||
// BoundSetOperations<String, String> set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects);
|
||||
// redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||
// try {
|
||||
// if (set != null) {
|
||||
//// String key = Convert.toStr(id);
|
||||
//
|
||||
// String key = "id";
|
||||
// redisService.setCacheObject(key, vehicleAndLogo);
|
||||
// redisService.expire(key, 10, TimeUnit.MINUTES);
|
||||
// //判断车辆是否有实时数据,如果没有则删除数据
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
// } else {
|
||||
// log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo);
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
// }
|
||||
// } catch (IOException e) {
|
||||
// log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo);
|
||||
// String s = redisService.getCacheObject("车辆消息不丢失:" + messageId);
|
||||
// Long o = Long.valueOf(s);
|
||||
// if (deliveryTag == o + 2) {
|
||||
// log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo);
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo);
|
||||
// channel.basicNack(deliveryTag, true, false);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
|
|
@ -74,9 +74,7 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl
|
|||
//获取过期的key
|
||||
String key = "breakdown";
|
||||
log.debug("失效+key is:"+ key);
|
||||
HashSet<CoupletMsgData> objects = new HashSet<>();
|
||||
objects.add(coupletMsgData);
|
||||
redisService.setCacheSet(key, objects);
|
||||
redisService.setCacheSet(key, coupletMsgData);
|
||||
long expireTime = 30;
|
||||
redisService.expire(key, expireTime, TimeUnit.MINUTES);
|
||||
scheduledRedis();
|
||||
|
@ -98,112 +96,108 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl
|
|||
}
|
||||
|
||||
public void scheduledRedis() {
|
||||
|
||||
// Get all members of the set
|
||||
Set<String> members = redisService.getCacheSet("breakdown");
|
||||
if (members.size()!=0){
|
||||
for (String member : members){
|
||||
CoupletMsgData code = JSON.parseObject(member, CoupletMsgData.class);
|
||||
String vin = code.getVin();
|
||||
Set<String> breakdownIds = redisService.getCacheSet(vin+":"+"breakdown");
|
||||
String key = "breakdown";
|
||||
Set<CoupletMsgData> members = redisService.getCacheSet(key);
|
||||
if (members.size()>0){
|
||||
for (CoupletMsgData member : members) {
|
||||
Set<String> breakdownIds = redisService.getCacheSet(member.getVin()+":"+key);
|
||||
if (breakdownIds.size()==0){
|
||||
CoupletTroubleCode troubleCode = new CoupletTroubleCode();
|
||||
troubleCode.setTroubleStartTime(new Date());
|
||||
troubleCode.setTroubleVin(code.getVin());
|
||||
troubleCode.setTroubleVin(member.getVin());
|
||||
// 随机生成故障码
|
||||
String faultCode = MsgUtils.generateGTA();
|
||||
troubleCode.setTroubleCode(faultCode);
|
||||
|
||||
// 检查车辆状态,若为0,则设置故障位置为"190"
|
||||
if(code.getVehicleStatus() == 0) {
|
||||
if(member.getVehicleStatus() == 0) {
|
||||
troubleCode.setTroublePosition("190");
|
||||
}
|
||||
|
||||
// 检查充电状态,若为0,则设置故障位置为"191"
|
||||
if (code.getChargingStatus() == 0) {
|
||||
if (member.getChargingStatus() == 0) {
|
||||
troubleCode.setTroublePosition("191");
|
||||
}
|
||||
|
||||
// 检查运行状态,若为0,则设置故障位置为"192"
|
||||
if (code.getOperatingStatus() == 0) {
|
||||
if (member.getOperatingStatus() == 0) {
|
||||
troubleCode.setTroublePosition("192");
|
||||
}
|
||||
|
||||
// 检查电池荷电状态(SOC), 若为0,则设置故障位置为"193"
|
||||
if (code.getSocStatus() == 0) {
|
||||
if (member.getSocStatus() == 0) {
|
||||
troubleCode.setTroublePosition("193");
|
||||
}
|
||||
|
||||
// 检查充电能源存储状态,若为0,则设置故障位置为"194"
|
||||
if (code.getChargingEnergyStorageStatus() == 0) {
|
||||
if (member.getChargingEnergyStorageStatus() == 0) {
|
||||
troubleCode.setTroublePosition("194");
|
||||
}
|
||||
|
||||
// 检查驱动电机状态,若为0,则设置故障位置为"195"
|
||||
if (code.getDriveMotorStatus() == 0) {
|
||||
if (member.getDriveMotorStatus() == 0) {
|
||||
troubleCode.setTroublePosition("195");
|
||||
}
|
||||
|
||||
// 检查定位状态,若为0,则设置故障位置为"196"
|
||||
if (code.getPositionStatus() == 0) {
|
||||
if (member.getPositionStatus() == 0) {
|
||||
troubleCode.setTroublePosition("196");
|
||||
}
|
||||
|
||||
// 检查电子驻车系统(EAS)状态,若为0,则设置故障位置为"197"
|
||||
if (code.getEasStatus() == 0) {
|
||||
if (member.getEasStatus() == 0) {
|
||||
troubleCode.setTroublePosition("197");
|
||||
}
|
||||
|
||||
// 检查PTC(正温度系数热敏电阻)状态,若为0,则设置故障位置为"198"
|
||||
if (code.getPtcStatus() == 0) {
|
||||
if (member.getPtcStatus() == 0) {
|
||||
troubleCode.setTroublePosition("198");
|
||||
}
|
||||
|
||||
// 检查电动助力转向系统(EPS)状态,若为0,则设置故障位置为"199"
|
||||
if (code.getEpsStatus() == 0) {
|
||||
if (member.getEpsStatus() == 0) {
|
||||
troubleCode.setTroublePosition("199");
|
||||
}
|
||||
|
||||
// 检查防抱死制动系统(ABS)状态,若为0,则设置故障位置为"200"
|
||||
if (code.getAbsStatus() == 0) {
|
||||
if (member.getAbsStatus() == 0) {
|
||||
troubleCode.setTroublePosition("200");
|
||||
}
|
||||
|
||||
// 检查主控制器(MCU)状态,若为0,则设置故障位置为"201"
|
||||
if (code.getMcuStatus() == 0) {
|
||||
if (member.getMcuStatus() == 0) {
|
||||
troubleCode.setTroublePosition("201");
|
||||
}
|
||||
|
||||
// 检查加热状态,若为0,则设置故障位置为"202"
|
||||
if (code.getHeatingStatus() == 0) {
|
||||
if (member.getHeatingStatus() == 0) {
|
||||
troubleCode.setTroublePosition("202");
|
||||
}
|
||||
|
||||
// 检查电池状态,若为0,则设置故障位置为"203"
|
||||
if (code.getBatteryStatus() == 0) {
|
||||
if (member.getBatteryStatus() == 0) {
|
||||
troubleCode.setTroublePosition("203");
|
||||
}
|
||||
|
||||
// 检查电池绝缘状态,若为0,则设置故障位置为"204"
|
||||
if (code.getBatteryInsulationStatus() == 0) {
|
||||
if (member.getBatteryInsulationStatus() == 0) {
|
||||
troubleCode.setTroublePosition("204");
|
||||
}
|
||||
|
||||
// 检查直流-直流转换器(DC/DC)状态,若为0,则设置故障位置为"205"
|
||||
if (code.getDcdcStatus() == 0) {
|
||||
if (member.getDcdcStatus() == 0) {
|
||||
troubleCode.setTroublePosition("205");
|
||||
}
|
||||
|
||||
// 检查充电机(CHG)状态,若为0,则设置故障位置为"206"
|
||||
if (code.getChgStatus() == 0) {
|
||||
if (member.getChgStatus() == 0) {
|
||||
troubleCode.setTroublePosition("206");
|
||||
}
|
||||
remoteTroubleService.newFaultData(troubleCode);
|
||||
HashSet<Object> objects = new HashSet<>();
|
||||
objects.add(code.getVin()+":"+code);
|
||||
redisService.setCacheSet(vin+":"+"breakdown", objects);
|
||||
redisService.setCacheSet(member.getVin()+":"+key, member.getVin()+":"+member);
|
||||
long expireTime = 30;
|
||||
redisService.expire(vin+":"+"breakdown", expireTime, TimeUnit.MINUTES);
|
||||
redisService.expire(member.getVin()+":"+key, expireTime, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -64,10 +64,12 @@ public class ElectronicFenceServiceImpl implements IncidentService {
|
|||
|
||||
String[] strings = s.split(",");
|
||||
if (strings.length == 2){
|
||||
// 经度
|
||||
Double trim = Double.valueOf(strings[0].trim());
|
||||
// 纬度
|
||||
Double trim1 = Double.valueOf(strings[1].trim());
|
||||
boolean a = trim<= Double.valueOf(coupletMsgData.getLongitude());
|
||||
boolean b = trim1 < Double.valueOf(coupletMsgData.getLatitude());
|
||||
boolean a = trim <= Double.valueOf(coupletMsgData.getLongitude());
|
||||
boolean b = trim1 <= Double.valueOf(coupletMsgData.getLatitude());
|
||||
if (a && b){
|
||||
log.info("电子围栏报警啦!!!!您的车驶出范围啦!!!");
|
||||
}else {
|
||||
|
@ -78,10 +80,7 @@ public class ElectronicFenceServiceImpl implements IncidentService {
|
|||
}else {
|
||||
throw new RuntimeException("电子围栏经纬度格式错误"+strings);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
// log.info("更改的电子围栏内容是:"+fence);
|
||||
log.info("电子围栏事件结束.......");
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.couplet.analyze.msg.service.impl.breakdown;
|
||||
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
@ -12,7 +13,7 @@ import org.springframework.stereotype.Component;
|
|||
@Component
|
||||
public class BreakdownEvent {
|
||||
@Autowired
|
||||
private StringRedisTemplate redisTemplate;
|
||||
private RedisService redisService;
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -21,73 +21,4 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- SpringCloud Alibaba Nacos -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Nacos Config -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Sentinel -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringBoot Actuator -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Swagger UI -->
|
||||
<dependency>
|
||||
<groupId>io.springfox</groupId>
|
||||
<artifactId>springfox-swagger-ui</artifactId>
|
||||
<version>${swagger.fox.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Mysql Connector -->
|
||||
<dependency>
|
||||
<groupId>com.mysql</groupId>
|
||||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataSource -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-datasource</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataScope -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-datascope</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common Log -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-log</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common Swagger -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-swagger</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -87,11 +87,6 @@
|
|||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-analyze-msg</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- RabbitMQ依赖-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
|
|
@ -51,11 +51,11 @@ public class FenceController extends BaseController {
|
|||
@PostMapping("/fenceAdd")
|
||||
@RequiresPermissions("couplet:fence:fenceAdd")
|
||||
@Log(title = "电子围栏新增",businessType = BusinessType.INSERT)
|
||||
public Result<?> fenceInsert(HttpServletRequest request, @RequestBody FenceRequest fenceRequest){
|
||||
public Result<?> fenceInsert(@RequestBody FenceRequest fenceRequest){
|
||||
// if (!fenceService.checkFenceKeyUnique(fenceRequest.getFenceName())) {
|
||||
// return error("新增参数'" + fenceRequest.getFenceName() + "'失败,参数键名已存在");
|
||||
// }
|
||||
fenceService.fenceInsert(request,fenceRequest);
|
||||
fenceService.fenceInsert(fenceRequest);
|
||||
return Result.success("新增成功");
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import com.couplet.common.domain.request.FenceConfig;
|
|||
import com.couplet.common.domain.request.FenceRequest;
|
||||
import com.couplet.common.domain.request.FenceUpdateRequest;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -27,7 +26,7 @@ public interface FenceService extends IService<Fence> {
|
|||
*
|
||||
* @param fenceRequest
|
||||
*/
|
||||
void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest);
|
||||
void fenceInsert(FenceRequest fenceRequest);
|
||||
|
||||
/**
|
||||
* 删除电子围栏
|
||||
|
|
|
@ -9,13 +9,12 @@ import com.couplet.common.domain.Fence;
|
|||
import com.couplet.common.domain.request.FenceConfig;
|
||||
import com.couplet.common.domain.request.FenceRequest;
|
||||
import com.couplet.common.domain.request.FenceUpdateRequest;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import com.couplet.common.security.utils.SecurityUtils;
|
||||
import com.couplet.mq.remote.RemoteFenceService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -41,12 +40,12 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
|
|||
* 注入redis模板
|
||||
*/
|
||||
@Autowired
|
||||
private StringRedisTemplate redisTemplate;
|
||||
private RedisService redisService;
|
||||
/**
|
||||
* 远程调用队列服务
|
||||
*/
|
||||
@Autowired
|
||||
private RemoteFenceService remoteFenceService;
|
||||
// @Autowired
|
||||
// private RemoteFenceService remoteFenceService;
|
||||
|
||||
@Override
|
||||
public List<Fence> pageQuery(FenceConfig fenceConfig) {
|
||||
|
@ -55,6 +54,7 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
|
|||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void changeFenceStatus(FenceUpdateRequest fenceUpdateRequest) {
|
||||
|
||||
String username = SecurityUtils.getUsername();
|
||||
|
@ -63,45 +63,31 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
|
|||
/**
|
||||
* 电子围栏发送改变
|
||||
*/
|
||||
redisTemplate.opsForValue().set("changeFenceStatus", JSON.toJSONString(fenceUpdateRequest), 10, TimeUnit.MINUTES);
|
||||
// redisService.setCacheObject("fence:info"+fenceUpdateRequest.getFenceId(),fenceUpdateRequest, 10, TimeUnit.MINUTES);
|
||||
|
||||
remoteFenceService.fenceQueue(fenceUpdateRequest);
|
||||
redisService.setCacheObject("fence:info:"+fenceUpdateRequest.getFenceId(),fenceUpdateRequest);
|
||||
redisService.expire("fence:info:"+fenceUpdateRequest.getFenceId(),10,TimeUnit.MINUTES);
|
||||
// remoteFenceService.fenceQueue(fenceUpdateRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* 业务实现:添加围栏
|
||||
*
|
||||
* @param request
|
||||
* @param fenceRequest
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest) {
|
||||
public void fenceInsert(FenceRequest fenceRequest) {
|
||||
String username = SecurityUtils.getUsername();
|
||||
fenceRequest.setCrateName(username);
|
||||
//先添加围栏
|
||||
fenceMapper.insertFence(fenceRequest);
|
||||
String[] logoIds = fenceRequest.getLogoIds();
|
||||
String[] parts = new String[0];
|
||||
for (String logoId : logoIds) {
|
||||
//把前台传入的字符串分割成数组
|
||||
parts = logoId.split(",");
|
||||
//再添加围栏和标识中间表
|
||||
fenAndLogoService.addBach(fenceRequest.getFenceId(), parts);
|
||||
}
|
||||
/**
|
||||
* 电子围栏发送改变
|
||||
*/
|
||||
redisTemplate.opsForValue().set("fenceInsert", JSON.toJSONString(fenceRequest), 10, TimeUnit.MINUTES);
|
||||
fenAndLogoService.addBach(fenceRequest.getFenceId(), fenceRequest.getLogoIds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeByFenceId(Long fenceId) {
|
||||
fenceMapper.removeByFenceId(fenceId);
|
||||
/**
|
||||
* 电子围栏发送改变
|
||||
*/
|
||||
redisTemplate.opsForValue().set("removeByFenceId", JSON.toJSONString(fenceId), 10, TimeUnit.MINUTES);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,9 +16,10 @@ import com.couplet.common.domain.VehicleType;
|
|||
import com.couplet.common.domain.request.VehicleEditParams;
|
||||
import com.couplet.common.domain.request.VehicleInsertParams;
|
||||
import com.couplet.common.domain.request.VehicleListParams;
|
||||
import com.couplet.mq.remote.RemoteFenceService;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -36,10 +37,12 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
//车辆mapper
|
||||
@Autowired
|
||||
private VehicleMapper vehicleMapper;
|
||||
@Autowired
|
||||
private RedisService redis;
|
||||
|
||||
//远程发送mq
|
||||
@Autowired
|
||||
private RemoteFenceService remoteFenceService;
|
||||
// @Autowired
|
||||
// private RemoteFenceService remoteFenceService;
|
||||
|
||||
//车辆类型服务
|
||||
@Autowired
|
||||
|
@ -133,7 +136,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
String result = "";
|
||||
|
||||
if ((editParams.getLogoIds() == null || editParams.getLogoIds().isEmpty())) {
|
||||
result = "未选择电子围栏";
|
||||
result = "未选择标识";
|
||||
Result.error(result);
|
||||
}
|
||||
|
||||
|
@ -172,16 +175,16 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
vehicleAndLogoService.vehicleBindLogo(editParams.getVehicleId(), editParams.getLogoIds());
|
||||
|
||||
|
||||
//mq
|
||||
List<Long> logoList = getBindLogoById(editParams.getVehicleId());
|
||||
if (0 != logoList.size()) {
|
||||
String ids = "";
|
||||
for (Long l : logoList) {
|
||||
ids = "," + l;
|
||||
}
|
||||
ids = ids.substring(1);
|
||||
remoteFenceService.vehicleQueue(editParams.getVehicleId() + "-" + ids);
|
||||
}
|
||||
// //mq
|
||||
// List<Long> logoList = getBindLogoById(editParams.getVehicleId());
|
||||
// if (0 != logoList.size()) {
|
||||
// String ids = "";
|
||||
// for (Long l : logoList) {
|
||||
// ids = "," + l;
|
||||
// }
|
||||
// ids = ids.substring(1);
|
||||
// remoteFenceService.vehicleQueue(editParams.getVehicleId() + "-" + ids);
|
||||
// }
|
||||
|
||||
|
||||
result = "编辑成功!";
|
||||
|
@ -201,7 +204,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
String result = "";
|
||||
|
||||
if ((insertParams.getLogoIds() == null || insertParams.getLogoIds().isEmpty())) {
|
||||
result = "未选择电子围栏";
|
||||
result = "未选择标识";
|
||||
Result.error(result);
|
||||
}
|
||||
|
||||
|
@ -251,16 +254,16 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
//执行添加电子围栏
|
||||
int i = vehicleAndLogoService.vehicleBindLogo(vehicle.getVehicleId(), insertParams.getLogoIds());
|
||||
|
||||
|
||||
List<Long> logoList = getBindLogoById(vehicle.getVehicleId());
|
||||
if (0 != logoList.size()) {
|
||||
String ids = "";
|
||||
for (Long l : logoList) {
|
||||
ids = "," + l;
|
||||
}
|
||||
ids = ids.substring(1);
|
||||
remoteFenceService.vehicleQueue(vehicle.getVehicleId() + "-" + ids);
|
||||
}
|
||||
//
|
||||
// List<Long> logoList = getBindLogoById(vehicle.getVehicleId());
|
||||
// if (0 != logoList.size()) {
|
||||
// String ids = "";
|
||||
// for (Long l : logoList) {
|
||||
// ids = "," + l;
|
||||
// }
|
||||
// ids = ids.substring(1);
|
||||
// remoteFenceService.vehicleQueue(vehicle.getVehicleId() + "-" + ids);
|
||||
// }
|
||||
|
||||
|
||||
result = "新增成功!";
|
||||
|
@ -334,4 +337,46 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
}
|
||||
|
||||
|
||||
|
||||
@Scheduled(cron = "0/1 * * * * *")
|
||||
public void aa() {
|
||||
System.out.println("********************************************************");
|
||||
}
|
||||
|
||||
//判断车辆是否下线
|
||||
@Scheduled(cron = "0/1 * * * * *")
|
||||
public void downLine() {
|
||||
log.info("定时器启动");
|
||||
//先查询车辆列表
|
||||
List<Vehicle> list = this.list(new VehicleListParams(null, null, null, null));
|
||||
|
||||
|
||||
list.forEach(vehicle -> {
|
||||
try {
|
||||
//只针对已经上线的车辆
|
||||
if (redis.hasKey(vehicle.getVin())) {
|
||||
|
||||
//如果vin的缓存 时间还剩一秒,则判断为已经下线
|
||||
if (redis.getExpire(vehicle.getVin()) <= 3) {
|
||||
log.info(vehicle.getVin() + "的车辆已经下线");
|
||||
|
||||
//执行修改下线状态的方法
|
||||
// Integer i = this.onOrOutLineByVIN(vehicle.getVin() + "," + 0);
|
||||
Integer i = this.onOrOutLineByVIN(vehicle.getVin() , 0);
|
||||
|
||||
if (0 == i) {
|
||||
log.error("下线状态修改失败");
|
||||
} else {
|
||||
log.info("下线状态修改成功");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,72 +0,0 @@
|
|||
package com.couplet.business.server.time;
|
||||
|
||||
import com.couplet.business.server.service.VehicleService;
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.domain.request.VehicleListParams;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: 车辆定时器
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Timer {
|
||||
//redis
|
||||
@Autowired
|
||||
private StringRedisTemplate redis;
|
||||
//查询车辆列表
|
||||
@Autowired
|
||||
private VehicleService vehicleService;
|
||||
|
||||
|
||||
@Scheduled(cron = "0/1 * * * * *")
|
||||
public void aa() {
|
||||
System.out.println("********************************************************");
|
||||
}
|
||||
|
||||
//判断车辆是否下线
|
||||
@Scheduled(cron = "0/1 * * * * *")
|
||||
public void downLine() {
|
||||
log.info("定时器启动");
|
||||
|
||||
//先查询车辆列表
|
||||
List<Vehicle> list = vehicleService.list(new VehicleListParams(null, null, null, null));
|
||||
|
||||
|
||||
list.forEach(vehicle -> {
|
||||
try {
|
||||
//只针对已经上线的车辆
|
||||
if (redis.hasKey(vehicle.getVin())) {
|
||||
|
||||
//如果vin的缓存 时间还剩一秒,则判断为已经下线
|
||||
if (redis.getExpire(vehicle.getVin()) <= 3) {
|
||||
log.info(vehicle.getVin() + "的车辆已经下线");
|
||||
|
||||
//执行修改下线状态的方法
|
||||
Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0);
|
||||
|
||||
if (0 == i) {
|
||||
log.error("下线状态修改失败");
|
||||
} else {
|
||||
log.info("下线状态修改成功");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -16,11 +16,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -1,164 +1,165 @@
|
|||
package com.couplet.mq.service;
|
||||
|
||||
import com.couplet.mq.domain.User;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/3/28
|
||||
* @Description: MQ消费者类
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@SuppressWarnings("all")
|
||||
@RabbitListener(queues = "queueName")
|
||||
public class MqConsumer {
|
||||
@Autowired
|
||||
private StringRedisTemplate redis;
|
||||
|
||||
/* 线程池执行
|
||||
|
||||
//创建一个定长线程池
|
||||
private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
|
||||
@Async
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
handleMessage(param, channel, message);
|
||||
} catch (IOException e) {
|
||||
log.error("处理消息失败:{}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//处理信息的方法
|
||||
private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
**/
|
||||
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
//确认消费
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
//拒绝消费
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
|
||||
|
||||
//拒绝消费
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
//重新入队
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//package com.couplet.mq.service;
|
||||
//
|
||||
//import com.couplet.common.redis.service.RedisService;
|
||||
//import com.couplet.mq.domain.User;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @ProjectName: five-groups-couplet
|
||||
// * @Author: LiuYunHu
|
||||
// * @CreateTime: 2024/3/28
|
||||
// * @Description: MQ消费者类
|
||||
// */
|
||||
//
|
||||
//@Component
|
||||
//@Slf4j
|
||||
//@SuppressWarnings("all")
|
||||
//@RabbitListener(queues = "queueName")
|
||||
//public class MqConsumer {
|
||||
// @Autowired
|
||||
// private RedisService redis;
|
||||
//
|
||||
// /* 线程池执行
|
||||
//
|
||||
// //创建一个定长线程池
|
||||
// private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
//
|
||||
// @Async
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) {
|
||||
// executor.execute(() -> {
|
||||
// try {
|
||||
// handleMessage(param, channel, message);
|
||||
// } catch (IOException e) {
|
||||
// log.error("处理消息失败:{}", e);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// //处理信息的方法
|
||||
// private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
//**/
|
||||
//
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.setCacheObject("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// //确认消费
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// //拒绝消费
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
//
|
||||
//
|
||||
// //拒绝消费
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// //重新入队
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
|
|
@ -38,7 +38,7 @@ mqtt:
|
|||
# broker: mqtt://115.159.47.13:1883
|
||||
username:
|
||||
password:
|
||||
clientId: Mqfghh
|
||||
clientId: fluxMq
|
||||
qos: 0
|
||||
topic: dxd
|
||||
topic: test
|
||||
|
||||
|
|
|
@ -15,9 +15,11 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
51
pom.xml
51
pom.xml
|
@ -211,63 +211,18 @@
|
|||
<artifactId>couplet-modules-system</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-trouble</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-server</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-common</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-remote</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- <!– 企业服务 模块 公共依赖 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-enterprisemanagement-common</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- <!– 企业服务 模块 远程调用依赖 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-enterprisemanagement-remote</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- <!– 车辆管理模块 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-modules-vehicle</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- RabbitMq模块 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-modules-mq</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- 业务系统核心模块 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-business</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 车辆上线模块-->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
|
@ -281,6 +236,12 @@
|
|||
<artifactId>couplet-analyze-msg</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!--车辆解析核心模块-->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-event</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
|
Loading…
Reference in New Issue