feat: 电子围栏报警

server_five_liuyunhu
lijiayao 2024-04-07 22:37:13 +08:00
parent 8bbaadea94
commit 16d72968d4
4 changed files with 246 additions and 242 deletions

View File

@ -92,6 +92,10 @@
<groupId>com.couplet</groupId> <groupId>com.couplet</groupId>
<artifactId>couplet-common-event</artifactId> <artifactId>couplet-common-event</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -1,84 +1,84 @@
package com.couplet.analyze.msg.consumer; //package com.couplet.analyze.msg.consumer;
//
import com.couplet.common.core.text.Convert; //import com.couplet.common.core.text.Convert;
import com.couplet.common.domain.request.FenceUpdateRequest; //import com.couplet.common.domain.request.FenceUpdateRequest;
import com.couplet.common.redis.service.RedisService; //import com.couplet.common.redis.service.RedisService;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; //import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundSetOperations; //import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.HashMap; //import java.util.HashMap;
import java.util.HashSet; //import java.util.HashSet;
import java.util.Set; //import java.util.Set;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
//
/** ///**
* @Author: LiJiaYao // * @Author: LiJiaYao
* @Date: 2024/4/4 // * @Date: 2024/4/4
* @Description: // * @Description:
*/ // */
@Log4j2 //@Log4j2
@Component //@Component
@RabbitListener(queues = "fenceQueue") //@RabbitListener(queues = "fenceQueue")
public class FenceConsumer { //public class FenceConsumer {
@Autowired // @Autowired
private RedisService redisService; // private RedisService redisService;
//
@RabbitHandler // @RabbitHandler
public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException { // public void fenceConsumer(FenceUpdateRequest fenceUpdateRequest, Channel channel, Message message) throws IOException {
//
log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest); // log.info("电子围栏消息进入队列,传入的数据是:[{}]", fenceUpdateRequest);
//
String messageId = message.getMessageProperties().getMessageId(); // String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag(); // long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) { // if (!redisService.hasKey("电子围栏消息不丢失:" + messageId)) {
redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag); // redisService.setCacheObject("电子围栏消息不丢失:" + messageId, "" + deliveryTag);
}
// if (redisService.hasKey("fence")){
// redisService.deleteObject("fence");
// } // }
//// if (redisService.hasKey("fence")){
HashSet<String> objects = new HashSet<>(); //// redisService.deleteObject("fence");
objects.add(messageId); //// }
//
BoundSetOperations<String, String> set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects); // HashSet<String> objects = new HashSet<>();
redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES); // objects.add(messageId);
try { //
if (set != null) { // BoundSetOperations<String, String> set = redisService.setCacheSet("电子围栏消息不重复:" + messageId, objects);
HashMap<String, Object> hashMap = new HashMap<>(); // redisService.expire("电子围栏消息不重复:" + messageId, 5, TimeUnit.MINUTES);
HashSet<FenceUpdateRequest> hashSet = new HashSet<>(); // try {
hashSet.add(fenceUpdateRequest); // if (set != null) {
hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest); // HashMap<String, Object> hashMap = new HashMap<>();
// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES); // HashSet<FenceUpdateRequest> hashSet = new HashSet<>();
// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap)); // hashSet.add(fenceUpdateRequest);
// hashMap.put(fenceUpdateRequest.getFenceId()+"",fenceUpdateRequest);
String key = Convert.toStr(fenceUpdateRequest.getFenceId()); //// redisTemplate.opsForH("fence", JSON.toJSONString(hashMap),10,TimeUnit.MINUTES);
redisService.setCacheObject(key,fenceUpdateRequest); //// redisTemplate.opsForHash().put("fence", fenceUpdateRequest.getFenceId()+"", JSON.toJSONString(hashMap));
redisService.expire(key, 10, TimeUnit.MINUTES); //
//判断车辆是否有实时数据,如果没有则删除数据 // String key = Convert.toStr(fenceUpdateRequest.getFenceId());
channel.basicAck(deliveryTag, false); // redisService.setCacheObject(key,fenceUpdateRequest);
} else { // redisService.expire(key, 10, TimeUnit.MINUTES);
log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest); // //判断车辆是否有实时数据,如果没有则删除数据
channel.basicReject(deliveryTag, false); // channel.basicAck(deliveryTag, false);
} // } else {
} catch (IOException e) { // log.error("电子围栏消息不能重复消费:[{}]", fenceUpdateRequest);
log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest); // channel.basicReject(deliveryTag, false);
String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId); // }
// } catch (IOException e) {
Long o = Long.valueOf(s); // log.error("电子围栏消息未进入队列,传入的信息是:【{}】", fenceUpdateRequest);
if (deliveryTag == o + 2) { // String s = redisService.getCacheObject("电子围栏消息不丢失:" + messageId);
log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest); //
channel.basicNack(deliveryTag, false, false); // Long o = Long.valueOf(s);
} else { // if (deliveryTag == o + 2) {
log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest); // log.error("电子围栏消息已丢失,无法传入的信息是:【{}】", fenceUpdateRequest);
channel.basicNack(deliveryTag, true, false); // channel.basicNack(deliveryTag, false, false);
} // } else {
} // log.error("电子围栏消息已丢失,已再次传入的信息是:【{}】", fenceUpdateRequest);
} // channel.basicNack(deliveryTag, true, false);
} // }
// }
// }
//}

View File

@ -1,91 +1,91 @@
package com.couplet.analyze.msg.consumer; //package com.couplet.analyze.msg.consumer;
//
import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; //import com.alibaba.fastjson.JSONObject;
import com.couplet.analyze.msg.domain.CoupletMsgData; //import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.mapper.IncidentMapper; //import com.couplet.analyze.msg.mapper.IncidentMapper;
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge; //import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
import com.couplet.common.domain.request.RealTimeDataRequest; //import com.couplet.common.domain.request.RealTimeDataRequest;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; //import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
//
/** ///**
* @Author: LiJiaYao // * @Author: LiJiaYao
* @Date: 2024/4/4 // * @Date: 2024/4/4
* @Description: // * @Description:
*/ // */
@Log4j2 //@Log4j2
@Component //@Component
@RabbitListener(queues = "finByVinQueueName") //@RabbitListener(queues = "finByVinQueueName")
public class MsgConsumer { //public class MsgConsumer {
@Autowired // @Autowired
private StringRedisTemplate redisTemplate; // private StringRedisTemplate redisTemplate;
@Autowired // @Autowired
private IncidentMapper incidentMapper; // private IncidentMapper incidentMapper;
//
@RabbitHandler // @RabbitHandler
public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException { // public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException {
//
log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest); // log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest);
//
String messageId = message.getMessageProperties().getMessageId(); // String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag(); // long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (!redisTemplate.hasKey("消息不丢失:" + messageId)) { // if (!redisTemplate.hasKey("消息不丢失:" + messageId)) {
redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES); // redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES);
} // }
//
Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId); // Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId);
redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES); // redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES);
try { // try {
if (0 < add) { // if (0 < add) {
JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest)); // JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest));
Long userId = jsonObject.getLong("userId"); // Long userId = jsonObject.getLong("userId");
String vin = jsonObject.getString("vin"); // String vin = jsonObject.getString("vin");
RealTimeDataRequest request = new RealTimeDataRequest(); // RealTimeDataRequest request = new RealTimeDataRequest();
request.setVin(vin); // request.setVin(vin);
request.setUserId(userId); // request.setUserId(userId);
RealTimeJudge.addRealTime(request); // RealTimeJudge.addRealTime(request);
//判断车辆是否有实时数据,如果没有则删除数据 // //判断车辆是否有实时数据,如果没有则删除数据
if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){ // if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){
log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin()); // log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin());
} // }
CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin()); // CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin());
if (incident == null){ // if (incident == null){
log.error("没有数据......"); // log.error("没有数据......");
} // }
redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident)); // redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident));
//
channel.basicAck(deliveryTag, false); // channel.basicAck(deliveryTag, false);
} else { // } else {
log.error("消息不能重复消费:[{}]", realTimeDataRequest); // log.error("消息不能重复消费:[{}]", realTimeDataRequest);
channel.basicReject(deliveryTag, false); // channel.basicReject(deliveryTag, false);
} // }
} catch (IOException e) { // } catch (IOException e) {
//
log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest); // log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest);
String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId); // String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId);
//
Long o = Long.valueOf(s); // Long o = Long.valueOf(s);
if (deliveryTag == o + 2) { // if (deliveryTag == o + 2) {
log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest); // log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest);
channel.basicNack(deliveryTag, false, false); // channel.basicNack(deliveryTag, false, false);
} else { // } else {
log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest); // log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest);
channel.basicNack(deliveryTag, true, false); // channel.basicNack(deliveryTag, true, false);
} // }
//
} // }
//
//
} // }
//
} //}

View File

@ -1,68 +1,68 @@
package com.couplet.analyze.msg.consumer; //package com.couplet.analyze.msg.consumer;
//
import com.couplet.common.core.text.Convert; //import com.couplet.common.core.text.Convert;
import com.couplet.common.redis.service.RedisService; //import com.couplet.common.redis.service.RedisService;
import com.rabbitmq.client.Channel; //import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; //import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; //import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; //import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundSetOperations; //import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.io.IOException; //import java.io.IOException;
import java.util.HashSet; //import java.util.HashSet;
import java.util.List; //import java.util.List;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
//
/** ///**
* @Author: LiJiaYao // * @Author: LiJiaYao
* @Date: 2024/4/4 // * @Date: 2024/4/4
* @Description: // * @Description:
*/ // */
@Log4j2 //@Log4j2
@Component //@Component
@RabbitListener(queues = "vehicleQueue") //@RabbitListener(queues = "vehicleQueue")
public class VehicleConsumer { //public class VehicleConsumer {
@Autowired // @Autowired
private RedisService redisService; // private RedisService redisService;
@RabbitHandler // @RabbitHandler
public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException { // public void vehicleConsumer(String vehicleAndLogo, Channel channel, Message message) throws IOException {
log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo); // log.info("车辆消息进入队列,传入的数据是:[{}]", vehicleAndLogo);
String messageId = message.getMessageProperties().getMessageId(); // String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag(); // long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (!redisService.hasKey("车辆消息不丢失:" + messageId)) { // if (!redisService.hasKey("车辆消息不丢失:" + messageId)) {
redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag); // redisService.setCacheObject("车辆消息不丢失:" + messageId, "" + deliveryTag);
} // }
HashSet<String> objects = new HashSet<>(); // HashSet<String> objects = new HashSet<>();
objects.add(messageId); // objects.add(messageId);
BoundSetOperations<String, String> set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects); // BoundSetOperations<String, String> set = redisService.setCacheSet("车辆信息消息不重复:" + messageId, objects);
redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES); // redisService.expire("车辆信息消息不重复:" + messageId, 5, TimeUnit.MINUTES);
try { // try {
if (set != null) { // if (set != null) {
// String key = Convert.toStr(id); //// String key = Convert.toStr(id);
//
String key = "id"; // String key = "id";
redisService.setCacheObject(key, vehicleAndLogo); // redisService.setCacheObject(key, vehicleAndLogo);
redisService.expire(key, 10, TimeUnit.MINUTES); // redisService.expire(key, 10, TimeUnit.MINUTES);
//判断车辆是否有实时数据,如果没有则删除数据 // //判断车辆是否有实时数据,如果没有则删除数据
channel.basicAck(deliveryTag, false); // channel.basicAck(deliveryTag, false);
} else { // } else {
log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo); // log.error("车辆消息不能重复消费:[{}]", vehicleAndLogo);
channel.basicReject(deliveryTag, false); // channel.basicReject(deliveryTag, false);
} // }
} catch (IOException e) { // } catch (IOException e) {
log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo); // log.error("车辆消息未进入队列,传入的信息是:【{}】", vehicleAndLogo);
String s = redisService.getCacheObject("车辆消息不丢失:" + messageId); // String s = redisService.getCacheObject("车辆消息不丢失:" + messageId);
Long o = Long.valueOf(s); // Long o = Long.valueOf(s);
if (deliveryTag == o + 2) { // if (deliveryTag == o + 2) {
log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo); // log.error("车辆消息已丢失,无法传入的信息是:【{}】", vehicleAndLogo);
channel.basicNack(deliveryTag, false, false); // channel.basicNack(deliveryTag, false, false);
} else { // } else {
log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo); // log.error("车辆消息已丢失,已再次传入的信息是:【{}】", vehicleAndLogo);
channel.basicNack(deliveryTag, true, false); // channel.basicNack(deliveryTag, true, false);
} // }
} // }
} // }
} //}