feat: 事件系统
parent
ed31af73b5
commit
9e489bd4ec
|
@ -84,10 +84,11 @@ public class Fence extends BaseEntity{
|
||||||
|
|
||||||
private Integer alarmStatus;
|
private Integer alarmStatus;
|
||||||
|
|
||||||
@TableField(exist = false)
|
|
||||||
/**
|
/**
|
||||||
* 标识
|
* 标识
|
||||||
*/
|
*/
|
||||||
|
@TableField(exist = false)
|
||||||
private Integer logoId;
|
private Integer logoId;
|
||||||
@TableField(exist = false)
|
@TableField(exist = false)
|
||||||
private String logoName;
|
private String logoName;
|
||||||
|
|
|
@ -18,7 +18,7 @@ import lombok.ToString;
|
||||||
public class RealTimeDataRequest {
|
public class RealTimeDataRequest {
|
||||||
|
|
||||||
|
|
||||||
private Integer userId;
|
private Long userId;
|
||||||
|
|
||||||
private String vin;
|
private String vin;
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
package com.couplet.analyze.msg.consumer;
|
package com.couplet.analyze.msg.consumer;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||||
|
import com.couplet.analyze.msg.mapper.IncidentMapper;
|
||||||
|
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
|
||||||
|
import com.couplet.common.domain.request.RealTimeDataRequest;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -10,6 +16,13 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Author: LiJiaYao
|
* @Author: LiJiaYao
|
||||||
* @Date: 2024/4/4
|
* @Date: 2024/4/4
|
||||||
|
@ -17,21 +30,69 @@ import org.springframework.stereotype.Component;
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@Component
|
@Component
|
||||||
@RabbitListener(queues = "")
|
@RabbitListener(queues = "vinQueue")
|
||||||
public class MsgConsumer {
|
public class MsgConsumer {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private StringRedisTemplate redisTemplate;
|
private StringRedisTemplate redisTemplate;
|
||||||
|
@Autowired
|
||||||
|
private IncidentMapper incidentMapper;
|
||||||
|
private static final Map<String, Set<Long>> setMap = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
public void realTimeDataConsumer(Channel channel, Message message){
|
public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException {
|
||||||
|
|
||||||
// log.info();
|
log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest);
|
||||||
|
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||||
|
if (!redisTemplate.hasKey("消息不丢失:" + messageId)) {
|
||||||
|
redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId);
|
||||||
|
redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES);
|
||||||
|
try {
|
||||||
|
if (0 < add) {
|
||||||
|
|
||||||
|
JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest));
|
||||||
|
String vin = jsonObject.getString("vin");
|
||||||
|
Long userId = jsonObject.getLong("userId");
|
||||||
|
RealTimeDataRequest request = new RealTimeDataRequest();
|
||||||
|
request.setVin(vin);
|
||||||
|
request.setUserId(userId);
|
||||||
|
RealTimeJudge.addRealTime(request);
|
||||||
|
//判断车辆是否有实时数据,如果没有则删除数据
|
||||||
|
if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){
|
||||||
|
log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin());
|
||||||
|
}
|
||||||
|
CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin());
|
||||||
|
if (incident == null){
|
||||||
|
log.error("没有数据......");
|
||||||
|
}
|
||||||
|
redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident));
|
||||||
|
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
} else {
|
||||||
|
log.error("消息不能重复消费:[{}]", realTimeDataRequest);
|
||||||
|
channel.basicReject(deliveryTag, false);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
|
||||||
|
log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest);
|
||||||
|
String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId);
|
||||||
|
|
||||||
|
Long o = Long.valueOf(s);
|
||||||
|
if (deliveryTag == o + 2) {
|
||||||
|
log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest);
|
||||||
|
channel.basicNack(deliveryTag, false, false);
|
||||||
|
} else {
|
||||||
|
log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest);
|
||||||
|
channel.basicNack(deliveryTag, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,4 +15,11 @@ public interface IncidentMapper {
|
||||||
* @param coupletMsgData
|
* @param coupletMsgData
|
||||||
*/
|
*/
|
||||||
public void reportMapper(CoupletMsgData coupletMsgData);
|
public void reportMapper(CoupletMsgData coupletMsgData);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询是否存在该vin
|
||||||
|
*/
|
||||||
|
// CoupletMsgData queryByIncident(RealTimeDataRequest realTimeDataRequest);
|
||||||
|
CoupletMsgData queryByIncident(String vin);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package com.couplet.analyze.msg.service;
|
||||||
|
|
||||||
import com.couplet.analyze.msg.contents.MsgContent;
|
import com.couplet.analyze.msg.contents.MsgContent;
|
||||||
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||||
|
import com.couplet.common.domain.request.RealTimeDataRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Author: LiJiaYao
|
* @Author: LiJiaYao
|
||||||
|
@ -25,4 +26,5 @@ public interface IncidentService {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class BreakdownServiceImpl extends KeyExpirationEventMessageListener impl
|
||||||
redisTemplate.opsForValue().set(String.valueOf(coupletMsgData),JSON.toJSONString(coupletMsgData),10, TimeUnit.MINUTES);
|
redisTemplate.opsForValue().set(String.valueOf(coupletMsgData),JSON.toJSONString(coupletMsgData),10, TimeUnit.MINUTES);
|
||||||
|
|
||||||
long timeMillis = System.currentTimeMillis();
|
long timeMillis = System.currentTimeMillis();
|
||||||
log.debug("失效+key is:"+ "coupletMsgData");
|
log.debug("失效+key is:"+ expireKey);
|
||||||
log.info("故障事件结束时间:"+timeMillis);
|
log.info("故障事件结束时间:"+timeMillis);
|
||||||
log.info("故障事件检测结束.....");
|
log.info("故障事件检测结束.....");
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,20 @@
|
||||||
package com.couplet.analyze.msg.service.impl;
|
package com.couplet.analyze.msg.service.impl;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||||
import com.couplet.analyze.msg.mapper.IncidentMapper;
|
import com.couplet.analyze.msg.mapper.IncidentMapper;
|
||||||
import com.couplet.analyze.msg.service.IncidentService;
|
import com.couplet.analyze.msg.service.IncidentService;
|
||||||
|
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Author: LiJiaYao
|
* @Author: LiJiaYao
|
||||||
* @Date: 2024/4/2
|
* @Date: 2024/4/2
|
||||||
|
@ -22,6 +30,9 @@ public class RealTimeDataServiceImpl implements IncidentService {
|
||||||
@Autowired
|
@Autowired
|
||||||
private IncidentMapper incidentMapper;
|
private IncidentMapper incidentMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StringRedisTemplate redisTemplate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 实时数据事件
|
* 实时数据事件
|
||||||
*
|
*
|
||||||
|
@ -32,8 +43,15 @@ public class RealTimeDataServiceImpl implements IncidentService {
|
||||||
|
|
||||||
log.info("实时数据事件开始.....");
|
log.info("实时数据事件开始.....");
|
||||||
|
|
||||||
|
if (redisTemplate.hasKey("coupletMsgData")){
|
||||||
|
redisTemplate.delete("coupletMsgData");
|
||||||
|
}
|
||||||
|
// Set<Long> userId = setMap.get(coupletMsgData.getVin());
|
||||||
|
// if (null == userId){
|
||||||
|
// userId = new HashSet<>();
|
||||||
|
// setMap.put(coupletMsgData.getVin(),userId);
|
||||||
|
// }
|
||||||
|
// userId.add(coupletMsgData.getUserId());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
package com.couplet.analyze.msg.service.impl.realTimeData;
|
||||||
|
|
||||||
|
|
||||||
|
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||||
|
import com.couplet.common.domain.request.RealTimeDataRequest;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: LiJiaYao
|
||||||
|
* @Date: 2024/4/4
|
||||||
|
* @Description: 判断实时数据
|
||||||
|
*/
|
||||||
|
public class RealTimeJudge {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 常量
|
||||||
|
*/
|
||||||
|
private static final Map<String, Set<Long>> setMap = new HashMap<>();
|
||||||
|
|
||||||
|
public static boolean isJudge(String vin){
|
||||||
|
return setMap.containsKey(vin);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param realTimeDataRequest
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static boolean addRealTime(RealTimeDataRequest realTimeDataRequest){
|
||||||
|
Set<Long> userIds = setMap.get(realTimeDataRequest.getVin());
|
||||||
|
if (userIds == null){
|
||||||
|
userIds = new HashSet<>();
|
||||||
|
setMap.put(realTimeDataRequest.getVin(),userIds);
|
||||||
|
}
|
||||||
|
userIds.add(realTimeDataRequest.getUserId());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -4,7 +4,6 @@
|
||||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
<mapper namespace="com.couplet.analyze.msg.mapper.IncidentMapper">
|
<mapper namespace="com.couplet.analyze.msg.mapper.IncidentMapper">
|
||||||
|
|
||||||
|
|
||||||
<insert id="reportMapper">
|
<insert id="reportMapper">
|
||||||
INSERT INTO `couplet-cloud`.`couplet_msg_data`
|
INSERT INTO `couplet-cloud`.`couplet_msg_data`
|
||||||
(`vin`, `create_time`, `longitude`, `latitude`,
|
(`vin`, `create_time`, `longitude`, `latitude`,
|
||||||
|
@ -71,5 +70,10 @@
|
||||||
#{brakePedal}
|
#{brakePedal}
|
||||||
);
|
);
|
||||||
</insert>
|
</insert>
|
||||||
|
<select id="queryByIncident" resultType="com.couplet.analyze.msg.domain.CoupletMsgData"
|
||||||
|
parameterType="com.couplet.common.domain.request.RealTimeDataRequest">
|
||||||
|
SELECT * FROM couplet_msg_data WHERE vin =#{vin}
|
||||||
|
</select>
|
||||||
|
|
||||||
|
|
||||||
</mapper>
|
</mapper>
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
|
||||||
public void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest) {
|
public void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest) {
|
||||||
|
|
||||||
//先添加围栏
|
//先添加围栏
|
||||||
int a= fenceMapper.insertFence(fenceRequest);
|
fenceMapper.insertFence(fenceRequest);
|
||||||
String[] logoIds = fenceRequest.getLogoIds();
|
String[] logoIds = fenceRequest.getLogoIds();
|
||||||
String[] parts = new String[0];
|
String[] parts = new String[0];
|
||||||
for (String logoId : logoIds) {
|
for (String logoId : logoIds) {
|
||||||
|
|
Loading…
Reference in New Issue