Merge branch 'server_five_dongxiaodong' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five

# Conflicts:
#	couplet-auth/src/main/resources/bootstrap.yml
#	couplet-gateway/src/main/resources/bootstrap.yml
#	couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/contents/StateConstant.java
#	couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/BreakdownServiceImpl.java
#	couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java
#	couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java
#	couplet-modules/couplet-system/src/main/resources/bootstrap.yml
server_five_liuyunhu
lijiayao 2024-04-06 10:02:18 +08:00
commit 8de32c8223
42 changed files with 944 additions and 346 deletions

View File

@ -17,11 +17,9 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
# 配置文件格式
file-extension: yml
# 共享配置

View File

@ -3,12 +3,15 @@ package com.couplet.common.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.couplet.common.core.annotation.Excel;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.springframework.format.annotation.DateTimeFormat;
import javax.validation.constraints.NotEmpty;
import java.util.Date;
/**
* @author DongXiaoDong
@ -38,34 +41,28 @@ public class CoupletTroubleCode {
private String troubleCode;
/**
*
* vin
*/
@Excel(name = "故障位")
private String troublePosition;
/**
*
*/
@Excel(name = "故障值")
private String troubleValue;
@Excel(name = "vin")
private String troubleVin;
/**
*
*/
@Excel(name = "故障标签")
private String troubleTag;
private Integer troubleTag;
/**
* Id
*
*/
@Excel(name = "故障类型Id")
@NotEmpty(message = "故障类型Id不能为空")
private Integer typeId;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date troubleStartTime;
/**
* Id
*
*/
@Excel(name = "故障等级Id")
@NotEmpty(message = "故障等级Id不能为空")
private Integer gradeId;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date troubleEndTime;
}

View File

@ -19,34 +19,34 @@ import java.util.Date;
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CoupletTroubleLog {
public class CoupletTroubleFault {
/**
* Id
*/
private Integer troubleLogId;
private Integer troubleFaultId;
/**
*
*/
private String troubleLogCode;
private String troubleFaultCode;
/**
* VIN
*
*/
private String troubleLogVin;
private String troubleFaultType;
/**
*
*
*/
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date troubleLogStartTime;
private String troubleFaultTag;
/**
*
*
*/
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date troubleLogEndTime;
private String troubleFaultPosition;
/**
*
*/
private String troubleFaultValue;
}

View File

@ -1,15 +0,0 @@
package com.couplet.common.domain;
import lombok.Data;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/3/26 21:50
* @description
*/
@Data
public class CoupletTroubleType {
private Integer typeId;
private String typeName;
}

View File

@ -84,10 +84,11 @@ public class Fence extends BaseEntity{
private Integer alarmStatus;
@TableField(exist = false)
/**
*
*/
@TableField(exist = false)
private Integer logoId;
@TableField(exist = false)
private String logoName;

View File

@ -1,10 +0,0 @@
package com.couplet.log.annotation;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/3/28 15:39
* @description
*/
public @interface Record {
}

View File

@ -1,10 +0,0 @@
package com.couplet.log.aop;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/3/28 23:12
* @description
*/
public class AopRecord {
}

View File

@ -26,9 +26,11 @@ public class ServiceNameConstants {
* @param null:
* @return null
* @author
* @description couplet-business
* @description
* @date
*/
public static final String VEHICLE_SERVICE = "couplet-vehicle";
public static final String BUSINESS_SERVICE = "couplet-business";
public static final String VEHICLE_SERVICE = "vehicle-service";
}

View File

@ -0,0 +1,33 @@
package com.couplet.common.redis.configure;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* @Author: LiJiaYao
* @Date: 2024/4/4
* @Description: redis
*/
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
@Bean
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
return new KeyExpirationEventMessageListener(listenerContainer);
}
}

View File

@ -23,5 +23,10 @@
<groupId>com.couplet</groupId>
<artifactId>couplet-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-common-business</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,22 @@
package com.couplet.common.system.remote;
import com.couplet.common.core.constant.ServiceNameConstants;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:00
* @description
*/
@FeignClient(contextId = "remoteCodeService",value = ServiceNameConstants.BUSINESS_SERVICE, fallbackFactory = RemoteCodeFallbackFactory.class)
public interface RemoteCodeService {
@PostMapping("trouble/insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode);
}

View File

@ -0,0 +1,30 @@
package com.couplet.common.system.remote.factory;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 16:03
* @description
*/
@Slf4j
@Component
public class RemoteCodeFallbackFactory implements FallbackFactory<RemoteCodeService> {
@Override
public RemoteCodeService create(Throwable cause) {
log.error("调用日志服务异常:{}", cause.getMessage());
return new RemoteCodeService()
{
@Override
public Result<Integer> insertCode(CoupletTroubleCode coupletTroubleCode) {
return null;
}
};
}
}

View File

@ -3,3 +3,4 @@ com.couplet.common.system.remote.factory.RemoteLogFallbackFactory
com.couplet.common.system.remote.factory.RemoteFileFallbackFactory
com.couplet.common.system.remote.factory.RemoteDeptFallbackFactory
com.couplet.common.system.remote.factory.RemoteEmployeeFallbackFactory
com.couplet.common.system.remote.factory.RemoteCodeFallbackFactory

View File

@ -15,11 +15,9 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
# 配置文件格式
file-extension: yml
# 共享配置

View File

@ -86,6 +86,14 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-modules-mq</artifactId>
</dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-common-business</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -11,7 +11,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* @date 2024/4/2 8:39
* @description
*/
@SpringBootApplication
@SpringBootApplication(scanBasePackages = "com.couplet")
@EnableScheduling
@EnableFeignClients(basePackages = "com.couplet.**")
public class CoupletMsgApplication {

View File

@ -0,0 +1,100 @@
package com.couplet.analyze.msg.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.mapper.IncidentMapper;
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
import com.couplet.common.domain.request.RealTimeDataRequest;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @Author: LiJiaYao
* @Date: 2024/4/4
* @Description:
*/
@Log4j2
@Component
//@RabbitListener(queues = "vinQueue")
public class MsgConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IncidentMapper incidentMapper;
private static final Map<String, Set<Long>> setMap = new HashMap<>();
@RabbitHandler
public void realTimeDataConsumer(RealTimeDataRequest realTimeDataRequest, Channel channel, Message message) throws IOException {
log.info("消息进入队列,传入的数据是:[{}]", realTimeDataRequest);
String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (!redisTemplate.hasKey("消息不丢失:" + messageId)) {
redisTemplate.opsForValue().set("消息不丢失:" + messageId, "" + deliveryTag, 1, TimeUnit.MINUTES);
}
Long add = redisTemplate.opsForSet().add("消息不重复:" + messageId, messageId);
redisTemplate.expire("消息不重复:" + messageId, 5, TimeUnit.MINUTES);
try {
if (0 < add) {
JSONObject jsonObject = JSONObject.parseObject(String.valueOf(realTimeDataRequest));
String vin = jsonObject.getString("vin");
Long userId = jsonObject.getLong("userId");
RealTimeDataRequest request = new RealTimeDataRequest();
request.setVin(vin);
request.setUserId(userId);
RealTimeJudge.addRealTime(request);
//判断车辆是否有实时数据,如果没有则删除数据
if (RealTimeJudge.isJudge(realTimeDataRequest.getVin())){
log.info("开始实时数据传输:[{}]",realTimeDataRequest.getVin());
}
CoupletMsgData incident = incidentMapper.queryByIncident(realTimeDataRequest.getVin());
if (incident == null){
log.error("没有数据......");
}
redisTemplate.opsForList().rightPush("coupletMsgData", JSON.toJSONString(incident));
channel.basicAck(deliveryTag, false);
} else {
log.error("消息不能重复消费:[{}]", realTimeDataRequest);
channel.basicReject(deliveryTag, false);
}
} catch (IOException e) {
log.error("消息未进入队列,传入的信息是:【{}】", realTimeDataRequest);
String s = redisTemplate.opsForValue().get("消息不丢失:" + messageId);
Long o = Long.valueOf(s);
if (deliveryTag == o + 2) {
log.error("消息已丢失,无法传入的信息是:【{}】", realTimeDataRequest);
channel.basicNack(deliveryTag, false, false);
} else {
log.error("消息已丢失,已再次传入的信息是:【{}】", realTimeDataRequest);
channel.basicNack(deliveryTag, true, false);
}
}
}
}

View File

@ -0,0 +1,89 @@
package com.couplet.analyze.msg.contents;
import io.swagger.models.auth.In;
import org.springframework.stereotype.Component;
/**
* @Author: LiJiaYao
* @Date: 2024/4/4
* @Description:
*/
@Component
public class StateConstant {
/**
*
*/
public static final Integer VEHICLE_STATUS = 1;
/**
*
*/
public static final Integer CHARGING_STATUS = 1;
/**
*
*/
public static final Integer OPERATING_STATUS = 1;
/**
* soc
*/
public static final Integer SOC_STATUS = 1;
/**
*
*/
public static final Integer CHARGING_ENERGY_STORAGE_STATUS = 1;
/**
*
*/
public static final Integer DRIVE_MOTOR_STATUS = 1;
/**
*
*/
public static final Integer POSITION_STATUS = 1;
/**
* EAS()
*/
public static final Integer EAS_STATUS = 1;
/**
* PTC()
*/
public static final Integer PTC_STATUS = 1;
/**
* ABS()
*/
public static final Integer ABS_STATUS = 1;
/**
* MCU(/)
*/
public static final Integer MCU_STATUS = 1;
/**
*
*/
public static final Integer HEATING_STATUS = 1;
/**
*
*/
public static final Integer BATTERY_STATUS = 1;
/**
*
*/
public static final Integer BATTERY_INSULATION_STATUS = 1;
/**
* DCDC()
*/
public static final Integer DCDC_STATUS = 1;
/**
* CHG()
*/
public static final Integer CHG_STATUS = 1;
}

View File

@ -15,4 +15,11 @@ public interface IncidentMapper {
* @param coupletMsgData
*/
public void reportMapper(CoupletMsgData coupletMsgData);
/**
* vin
*/
// CoupletMsgData queryByIncident(RealTimeDataRequest realTimeDataRequest);
CoupletMsgData queryByIncident(String vin);
}

View File

@ -3,26 +3,20 @@ package com.couplet.analyze.msg.model;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.utils.SpringUtils;
import com.couplet.common.core.utils.uuid.IdUtils;
import com.couplet.common.system.remote.RemoteCodeService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.*;
import static com.couplet.analyze.msg.contents.MsgContent.BROKER_URL;
import static com.couplet.analyze.msg.contents.MsgContent.CLIENT_ID;
import static io.lettuce.core.pubsub.PubSubOutput.Type.message;
/**
@ -35,13 +29,8 @@ import static io.lettuce.core.pubsub.PubSubOutput.Type.message;
@Component
public class ModelMessage {
// @Autowired
// private CoupletMsgService coupletMsgService;
// @Autowired
// public ModelMessage(CoupletMsgService coupletMsgService){
// this.coupletMsgService = coupletMsgService;
// }
@Autowired
private RabbitTemplate rabbitTemplate;
static ArrayList<String> strings = new ArrayList<>() {
{
add("breakdown");
@ -51,16 +40,19 @@ public class ModelMessage {
}
};
@Value("${mq.queueName}")
public String queueName;
@Autowired
private RemoteCodeService remoteCodeService;
//交换机
@Value("${mq.exchangeName}")
public String exchangeName;
//路由键
@Value("${mq.routingKey}")
public String routingKey;
// @Value("${mq.queueName}")
// public String queueName;
//
// //交换机
// @Value("${mq.exchangeName}")
// public String exchangeName;
//
// //路由键
// @Value("${mq.routingKey}")
// public String routingKey;
@Scheduled(cron = "0/5 * * * * ?")
public void startMsg() {
@ -87,7 +79,33 @@ public class ModelMessage {
for (CoupletMsgData msgData : coupletMsgDataList) {
log.info("解析到车辆数据:{}", msgData);
//发送日志到MQ
// // 使用CompletableFuture.runAsync()方法创建一个异步任务,该任务会在一个新的线程中执行
// CompletableFuture.runAsync(() ->{
// // 创建一个CoupletTroubleLog对象用于记录故障日志
// CoupletTroubleCode troubleCode = new CoupletTroubleCode();
// //判断状态是否为异常
// if (msgData.getVehicleStatus() !=1 || msgData.getEasStatus() !=1 || msgData.getHeatingStatus() !=1){
// String code = generateGTA();
// troubleCode.setTroubleCode(code);
// String vin = msgData.getVin();
// troubleCode.setTroubleVin(vin);
// String position = getPosition();
// troubleCode.setTroublePosition(position);
// troubleCode.setTroubleValue("00");
// String tag = getTag();
// troubleCode.setTroubleTag(tag);
// String type = getType();
// troubleCode.setTroubleType(type);
// troubleCode.setTroubleStartTime(new Date());
// // 如果车辆状态为正常状态为1则添加结束时间为当前时间
// if (msgData.getVehicleStatus() == 1){
// troubleCode.setTroubleEndTime(new Date());
// }
// }
// // 调用远程服务插入故障日志信息
// remoteCodeService.insertCode(troubleCode);
// });
// log.info("记录异常成功");
for (String string : strings) {
IncidentService incidentService = SpringUtils.getBean(string);
incidentService.incident(msgData);
@ -108,12 +126,84 @@ public class ModelMessage {
});
mqttClient.subscribe("test",0);
Thread.sleep(1000*6*10);
Thread.sleep(1000*60*10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// /**
// *
// * 拼接GTA字符串
// * @return
// */
// public static String generateGTA() {
// // 生成以GTA开头的字符串
// String codefix = "GTA";
// // 删除4位数随机数字
// String s = generateRandomNumber(4);
// //拼接
// return codefix + s;
// }
//
// /**
// * 随机生成1到10位的数字
// * @param length
// * @return
// */
// public static String generateRandomNumber(int length) {
// Random random = new Random();
// StringBuilder builder = new StringBuilder();
// for (int i = 0; i < length; i++) {
// builder.append(random.nextInt(10));
// }
// return builder.toString();
// }
//
// /**
// * 生成 190 到 209 之间的随机数
// * @param
// * @return
// */
// public static String getPosition() {
// // 生成随机数对象
// Random rand = new Random();
//
// // 生成 190 到 209 之间的随机数
// int randomNumber = rand.nextInt(20) + 190;
//
// // 将随机数转换为字符串形式
// return String.valueOf(randomNumber);
// }
//
// public static String getTag() {
// // 创建一个字符串数组存储三个状态
// String[] statuses = {"车辆状态", "EAS(汽车防盗系统)状态", "DCDC(电力交换系统)"};
//
// // 生成随机数对象
// Random rand = new Random();
//
// // 生成一个范围在 0 到 2 之间的随机整数
// int randomIndex = rand.nextInt(3);
//
// return statuses[randomIndex];
// }
//
// public static String getType() {
// // 创建一个字符串数组存储三个状态
// String[] statuses = {"电池故障", "车体故障", "车尾故障","抽轮故障"};
//
// // 生成随机数对象
// Random rand = new Random();
//
// // 生成一个范围在 0 到 2 之间的随机整数
// int randomIndex = rand.nextInt(4);
//
// return statuses[randomIndex];
// }
/**
* 16ASCII
* @param s 16

View File

@ -2,6 +2,7 @@ package com.couplet.analyze.msg.service;
import com.couplet.analyze.msg.contents.MsgContent;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.common.domain.request.RealTimeDataRequest;
/**
* @Author: LiJiaYao
@ -25,4 +26,5 @@ public interface IncidentService {
}

View File

@ -1,19 +1,43 @@
package com.couplet.analyze.msg.service.impl;
import com.alibaba.fastjson.JSON;
import com.couplet.analyze.msg.contents.StateConstant;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.log.annotation.Log;
import lombok.extern.log4j.Log4j2;
import org.aspectj.bridge.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* @Author: LiJiaYao
* @Date: 2024/4/2
* @Description:
*/
@Service("breakdown")
@Log4j2
public class BreakdownServiceImpl implements IncidentService {
public class BreakdownServiceImpl extends KeyExpirationEventMessageListener implements IncidentService {
/**
* redis
*/
@Autowired
private StringRedisTemplate redisTemplate;
private static Logger log = LoggerFactory.getLogger(BreakdownServiceImpl.class);
public BreakdownServiceImpl(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
*
*
@ -23,9 +47,35 @@ public class BreakdownServiceImpl implements IncidentService {
public void incident(CoupletMsgData coupletMsgData) {
log.info("故障事件开始.....");
log.info("故障事件结束.....");
long l = System.currentTimeMillis();
log.info("开始时间是:"+l);
if (StateConstant.VEHICLE_STATUS != coupletMsgData.getVehicleStatus()
|| StateConstant.CHARGING_STATUS!=coupletMsgData.getChgStatus()
|| StateConstant.OPERATING_STATUS!=coupletMsgData.getOperatingStatus()
|| StateConstant.SOC_STATUS!=coupletMsgData.getSocStatus()
|| StateConstant.CHARGING_ENERGY_STORAGE_STATUS != coupletMsgData.getChargingEnergyStorageStatus()
|| StateConstant.DRIVE_MOTOR_STATUS != coupletMsgData.getDriveMotorStatus()
|| StateConstant.POSITION_STATUS != coupletMsgData.getPositionStatus()
|| StateConstant.EAS_STATUS != coupletMsgData.getEasStatus()
|| StateConstant.PTC_STATUS != coupletMsgData.getPtcStatus()
|| StateConstant.ABS_STATUS != coupletMsgData.getAbsStatus()
|| StateConstant.MCU_STATUS != coupletMsgData.getMcuStatus()
|| StateConstant.HEATING_STATUS != coupletMsgData.getHeatingStatus()
|| StateConstant.BATTERY_STATUS != coupletMsgData.getBatteryStatus()
|| StateConstant.BATTERY_INSULATION_STATUS != coupletMsgData.getBatteryInsulationStatus()
|| StateConstant.DCDC_STATUS != coupletMsgData.getDcdcStatus()
|| StateConstant.CHG_STATUS != coupletMsgData.getChgStatus()){
//获取过期的key
String expireKey = coupletMsgData.toString();
redisTemplate.opsForValue().set(String.valueOf(coupletMsgData),JSON.toJSONString(coupletMsgData),10, TimeUnit.MINUTES);
long timeMillis = System.currentTimeMillis();
log.debug("失效+key is:"+ expireKey);
log.info("故障事件结束时间:"+timeMillis);
log.info("故障事件检测结束.....");
}
log.info("故障事件检测结束.....");
}
/**

View File

@ -1,10 +1,19 @@
package com.couplet.analyze.msg.service.impl;
import com.alibaba.fastjson.JSON;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.mapper.IncidentMapper;
import com.couplet.analyze.msg.service.IncidentService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* @Author: LiJiaYao
* @Date: 2024/4/2
@ -14,6 +23,15 @@ import org.springframework.stereotype.Service;
@Log4j2
public class RealTimeDataServiceImpl implements IncidentService {
/**
*
*/
@Autowired
private IncidentMapper incidentMapper;
@Autowired
private StringRedisTemplate redisTemplate;
/**
*
*
@ -23,6 +41,19 @@ public class RealTimeDataServiceImpl implements IncidentService {
public void incident(CoupletMsgData coupletMsgData) {
log.info("实时数据事件开始.....");
if (redisTemplate.hasKey("coupletMsgData")){
redisTemplate.delete("coupletMsgData");
}
// Set<Long> userId = setMap.get(coupletMsgData.getVin());
// if (null == userId){
// userId = new HashSet<>();
// setMap.put(coupletMsgData.getVin(),userId);
// }
// userId.add(coupletMsgData.getUserId());
log.info("实时数据事件结束.....");
}

View File

@ -0,0 +1,45 @@
package com.couplet.analyze.msg.service.impl.realTimeData;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.common.domain.request.RealTimeDataRequest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* @Author: LiJiaYao
* @Date: 2024/4/4
* @Description:
*/
public class RealTimeJudge {
/**
*
*/
private static final Map<String, Set<Long>> setMap = new HashMap<>();
public static boolean isJudge(String vin){
return setMap.containsKey(vin);
}
/**
*
* @param realTimeDataRequest
* @return
*/
public static boolean addRealTime(RealTimeDataRequest realTimeDataRequest){
Set<Long> userIds = setMap.get(realTimeDataRequest.getVin());
if (userIds == null){
userIds = new HashSet<>();
setMap.put(realTimeDataRequest.getVin(),userIds);
}
userIds.add(realTimeDataRequest.getUserId());
return true;
}
}

View File

@ -15,6 +15,7 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
@ -23,6 +24,7 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
main:
allow-bean-definition-overriding: true
rabbitmq:
@ -49,8 +51,8 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: true
# RabbitMQ配置
mq:
queueName: queue
exchangeName: exchange
routingKey: routingKey
## RabbitMQ配置
#mq:
# queueName: queueName
# exchangeName: exchangeName
# routingKey: routingKey

View File

@ -4,7 +4,6 @@
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.couplet.analyze.msg.mapper.IncidentMapper">
<insert id="reportMapper">
INSERT INTO `couplet-cloud`.`couplet_msg_data`
(`vin`, `create_time`, `longitude`, `latitude`,
@ -71,5 +70,10 @@
#{brakePedal}
);
</insert>
<select id="queryByIncident" resultType="com.couplet.analyze.msg.domain.CoupletMsgData"
parameterType="com.couplet.common.domain.request.RealTimeDataRequest">
SELECT * FROM couplet_msg_data WHERE vin =#{vin}
</select>
</mapper>

View File

@ -1,5 +1,6 @@
package com.couplet.msg;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -11,18 +12,17 @@ import java.util.regex.Pattern;
*/
public class Main {
public static void main(String[] args) {
String msgString = "VIN123456789DIJE41711764104506116.664380039.531990072.00031.3760000022000022000852000000D00809.600940000589066790930000203002030000044282.55000014000080700007440003000400095000058000054000011111111111111111";
// 创建一个字符串数组存储三个状态
String[] statuses = {"电池故障", "车体故障", "车尾故障","抽轮故障"};
//使用正则表达式匹配需要的部分
String pattern = "(.{17})(.{10})(.{4})(.{2})(.{2})";
Pattern compile = Pattern.compile(pattern);
Matcher matcher = compile.matcher(msgString);
// 生成随机数对象
Random rand = new Random();
if (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
System.out.println("Group "+ i + ":" + matcher.group(i));
}
}
// 生成一个范围在 0 到 2 之间的随机整数
int randomIndex = rand.nextInt(4);
// 随机选择一个字符串并输出
String randomStatus = statuses[randomIndex];
System.out.println("随机输出的字符串:" + randomStatus);
}
}

View File

@ -0,0 +1,36 @@
package com.couplet.msg;
import java.util.Random;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/4 11:34
* @description
*/
public class Randoms {
public static void main(String[] args) {
String gtaNumber = generateGTA();
System.out.println("随机生成的GTA开头的七位数字" + gtaNumber);
}
public static String generateGTA() {
// 生成四位以"GTA"开头的字符串
String prefix = "GTA";
// 生成三位随机数字
String randomNumber = generateRandomNumber(4);
// 拼接字符串
return prefix + randomNumber;
}
public static String generateRandomNumber(int length) {
// 生成随机数
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
// 生成0到9之间的随机数字并转换为字符串
sb.append(random.nextInt(10));
}
return sb.toString();
}
}

View File

@ -87,6 +87,17 @@
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-analyze-msg</artifactId>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -13,8 +13,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
*/
@EnableCustomConfig
@EnableCustomSwagger2
@EnableMyFeignClients(basePackages = ("com.couplet"))
@SpringBootApplication(scanBasePackages = {"com.couplet"})
@EnableMyFeignClients(basePackages = ("com.couplet.**"))
@SpringBootApplication(scanBasePackages = {"com.couplet.**"})
public class CoupletBusinessApplication {
public static void main (String[] args) {
SpringApplication.run(CoupletBusinessApplication.class, args);

View File

@ -6,10 +6,10 @@ import com.couplet.common.core.domain.Result;
import com.couplet.common.core.web.controller.BaseController;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
import com.couplet.common.log.annotation.Log;
import com.couplet.common.log.enums.BusinessType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -24,6 +24,7 @@ import java.util.List;
*/
@RestController
@RequestMapping("trouble")
@Slf4j
public class SysTroubleController extends BaseController {
@Autowired
private SysTroubleService troubleService;
@ -37,14 +38,6 @@ public class SysTroubleController extends BaseController {
return Result.success(result);
}
/**
*
*/
@GetMapping("/troubleTypeList")
public List<CoupletTroubleType> listType() {
return troubleService.selectTroubleListByType();
}
/**
*
*/
@ -80,4 +73,29 @@ public class SysTroubleController extends BaseController {
troubleService.removeById(troubleId);
return success();
}
/**
*
* @param coupletTroubleLog
* @return
*/
@PostMapping("insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode){
long start = System.currentTimeMillis();
int i = troubleService.insertMsgResq(coupletTroubleCode);
long end = System.currentTimeMillis();
log.info("记录异常信息成功,耗时:{}",(end-start));
return Result.success(i);
}
/**
*
*/
@PostMapping("cleanTroubleCode")
public Result<?> cleanTroubleCode(){
troubleService.cleanTroubleCode();
return Result.success();
}
}

View File

@ -3,7 +3,6 @@ package com.couplet.business.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
import org.apache.ibatis.annotations.Mapper;
@ -19,11 +18,9 @@ import java.util.List;
public interface SysTroubleMapper extends BaseMapper<CoupletTroubleCode> {
List<CoupletTroubleCode> selectTroubleList(TroubleResp troubleReq);
List<CoupletTroubleType> selectTroubleListByType();
List<CoupletTroubleGrade> selectTroubleListByGrade();
// int addTrouble(TroubleAddReq troubleAddReq);
int insertMsgResq(CoupletTroubleCode coupletTroubleCode);
// int updateTrouble(TroubleUpdReq troubleUpdReq);
void cleanTroubleCode();
}

View File

@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.couplet.common.core.domain.PageResult;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
@ -19,11 +18,9 @@ import java.util.List;
public interface SysTroubleService extends IService<CoupletTroubleCode> {
PageResult<CoupletTroubleCode> selectTroubleList(TroubleResp troubleReq);
List<CoupletTroubleType> selectTroubleListByType();
List<CoupletTroubleGrade> selectTroubleListByGrade();
// int addTrouble (TroubleAddReq troubleAddReq);
int insertMsgResq(CoupletTroubleCode coupletTroubleCode);
// int updateTrouble(TroubleUpdReq troubleUpdReq);
void cleanTroubleCode();
}

View File

@ -6,7 +6,6 @@ import com.couplet.business.server.service.SysTroubleService;
import com.couplet.common.core.domain.PageResult;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@ -40,35 +39,26 @@ public class SysTroubleServiceImpl extends ServiceImpl<SysTroubleMapper, Couplet
return PageResult.toPageResult(info.getTotal(),troubleList);
}
@Override
public List<CoupletTroubleType> selectTroubleListByType() {
return sysTroubleMapper.selectTroubleListByType();
}
@Override
public List<CoupletTroubleGrade> selectTroubleListByGrade() {
return sysTroubleMapper.selectTroubleListByGrade();
}
/**
*
// * @param troubleAddReq
*
* @param coupletTroubleLog
* @return
*/
// @Override
// public int addTrouble(TroubleAddReq troubleAddReq) {
// return sysTroubleMapper.addTrouble(troubleAddReq);
// }
//
// /**
// * 修改故障码数据
// * @param troubleUpdReq
// * @return
// */
// @Override
// public int updateTrouble(TroubleUpdReq troubleUpdReq) {
// return sysTroubleMapper.updateTrouble(troubleUpdReq);
// }
@Override
public int insertMsgResq(CoupletTroubleCode coupletTroubleCode) {
return sysTroubleMapper.insertMsgResq(coupletTroubleCode);
}
/**
*
*/
@Override
public void cleanTroubleCode() {
sysTroubleMapper.cleanTroubleCode();
}
}

View File

@ -7,7 +7,6 @@ spring:
application:
# 应用名称
name: couplet-business
profiles:
# 环境配置
active: dev
@ -16,6 +15,7 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
@ -24,6 +24,7 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
main:
allow-bean-definition-overriding: true
logging:

View File

@ -7,18 +7,22 @@
<resultMap type="com.couplet.common.domain.CoupletTroubleCode" id="SysTroubleResult">
<id property="troubleId" column="trouble_id"/>
<result property="troubleCode" column="trouble_code"/>
<result property="troubleVin" column="trouble_vin"/>
<result property="troubleValue" column="trouble_value"/>
<result property="troublePosition" column="trouble_position"/>
<result property="troubleTag" column="trouble_tag"/>
<result property="typeId" column="type_id"/>
<result property="gradeId" column="grade_id"/>
<result property="troubleType" column="trouble_type"/>
<result property="troubleStartTime" column="trouble_start_time"/>
<result property="troubleEndTime" column="trouble_end_time"/>
</resultMap>
<sql id="selectTroubleVo">
select t.*,g.grade_name,y.type_name from couplet_trouble_code t
LEFT JOIN couplet_trouble_grade g on t.grade_id = g.grade_id
LEFT JOIN couplet_trouble_type y on t.type_id= y.type_id
select * from couplet_trouble_code
</sql>
<update id="cleanTroubleCode">
truncate table couplet_trouble_code
</update>
<select id="selectTroubleList" parameterType="com.couplet.business.server.mapper.SysTroubleMapper" resultMap="SysTroubleResult">
<include refid="selectTroubleVo"/>
@ -31,11 +35,17 @@
</if>
</where>
</select>
<select id="selectTroubleListByType" resultType="com.couplet.common.domain.CoupletTroubleType">
select * from couplet_trouble_type
</select>
<select id="selectTroubleListByGrade" resultType="com.couplet.common.domain.CoupletTroubleGrade">
select * from couplet_trouble_grade
</select>
<insert id="insertMsgResq">
insert into couplet_trouble_code(
trouble_code,
trouble_vin,
trouble_position,trouble_value,trouble_tag,trouble_type,trouble_start_time,trouble_end_time)
values(#{troubleCode},#{troubleVin},#{troublePosition},#{troubleValue},#{troubleTag},#{troubleType},#{troubleStartTime},#{troubleEndTime})
</insert>
</mapper>

View File

@ -25,16 +25,16 @@ import org.springframework.context.annotation.Primary;
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
// 通过注入的方式获取队列名、交换机名和路由键
//队列名
@Value("${mq.queueName}")
public String queueName;
// @Value("${mq.queueName}")
public static final String queueName = "queueName";
//交换机
@Value("${mq.exchangeName}")
public String exchangeName;
// @Value("${mq.exchangeName}")
public static final String exchangeName = "exchangeName";
//路由键
@Value("${mq.routingKey}")
public String routingKey;
// @Value("${mq.routingKey}")
public static final String routingKey = "routingKey";
private RabbitTemplate rabbitTemplate;

View File

@ -0,0 +1,54 @@
package com.couplet.mq.controller;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/5 21:38
* @description
*/
public class Aaa {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
public static void main(String[] args) {
aaa();
}
private static void aaa() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
//订阅主题
consumer.subscribe(Collections.singletonList("online"));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("消费者接受到的消息值:" + record.value());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

View File

@ -1,164 +1,164 @@
package com.couplet.mq.service;
import com.couplet.mq.domain.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @ProjectName: five-groups-couplet
* @Author: LiuYunHu
* @CreateTime: 2024/3/28
* @Description: MQ
*/
@Component
@Slf4j
@SuppressWarnings("all")
@RabbitListener(queues = "${mq.queueName}")
public class Consumer {
@Autowired
private StringRedisTemplate redis;
/* 线
//创建一个定长线程池
private final Executor executor = Executors.newFixedThreadPool(5);
@Async
@RabbitHandler
public void process(User param, Channel channel, Message message) {
executor.execute(() -> {
try {
handleMessage(param, channel, message);
} catch (IOException e) {
log.error("处理消息失败:{}", e);
}
});
}
//处理信息的方法
private void handleMessage(User param, Channel channel, Message message) throws IOException {
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
if (!redis.hasKey("value:" + messageId)) {
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
}
// 1 添加成功新数据 0已有重复值,不允许再添加
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
//过期时间
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
try {
if (add == 1) {
//第一次 消费
System.out.println("*****************************");
System.out.println("消费者收到消息:" + param);
System.out.println("*****************************");
log.info("消费结束");
channel.basicAck(deliveryTag, false);
} else {
//重复消费
log.error("重复消费");
channel.basicReject(deliveryTag, false);
//删除缓存
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
}
} catch (Exception e) {
log.error("消息没有成功消费!");
String s = redis.opsForValue().get("value:" + messageId);
long oldTag = Long.parseLong(s);
if (deliveryTag == (oldTag + 2)) {
log.error("确实消费不了,不入队了!");
channel.basicNack(deliveryTag, false, false);
} else {
log.info("消息消费失败,重新入队");
channel.basicNack(deliveryTag, false, true);
}
}
}
**/
@RabbitHandler
public void process(User param, Channel channel, Message message) throws IOException {
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
if (!redis.hasKey("value:" + messageId)) {
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
}
// 1 添加成功新数据 0已有重复值,不允许再添加
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
//过期时间
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
try {
if (add == 1) {
//第一次 消费
System.out.println("*****************************");
System.out.println("消费者收到消息:" + param);
System.out.println("*****************************");
log.info("消费结束");
//确认消费
channel.basicAck(deliveryTag, false);
} else {
//重复消费
log.error("重复消费");
//拒绝消费
channel.basicReject(deliveryTag, false);
//删除缓存
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
}
} catch (Exception e) {
log.error("消息没有成功消费!");
String s = redis.opsForValue().get("value:" + messageId);
long oldTag = Long.parseLong(s);
if (deliveryTag == (oldTag + 2)) {
log.error("确实消费不了,不入队了!");
//拒绝消费
channel.basicNack(deliveryTag, false, false);
} else {
log.info("消息消费失败,重新入队");
//重新入队
channel.basicNack(deliveryTag, false, true);
}
}
}
}
//package com.couplet.mq.service;
//
//import com.couplet.mq.domain.User;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.StringRedisTemplate;
//import org.springframework.stereotype.Component;
//
//import java.io.IOException;
//import java.util.concurrent.TimeUnit;
//
///**
// * @ProjectName: five-groups-couplet
// * @Author: LiuYunHu
// * @CreateTime: 2024/3/28
// * @Description: MQ消费者类
// */
//
//@Component
//@Slf4j
//@SuppressWarnings("all")
//@RabbitListener(queues = "${mq.queueName}")
//public class Consumer {
// @Autowired
// private StringRedisTemplate redis;
//
// /* 线程池执行
//
// //创建一个定长线程池
// private final Executor executor = Executors.newFixedThreadPool(5);
//
// @Async
// @RabbitHandler
// public void process(User param, Channel channel, Message message) {
// executor.execute(() -> {
// try {
// handleMessage(param, channel, message);
// } catch (IOException e) {
// log.error("处理消息失败:{}", e);
// }
// });
// }
//
// //处理信息的方法
// private void handleMessage(User param, Channel channel, Message message) throws IOException {
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
//
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// String messageId = message.getMessageProperties().getMessageId();
//
// if (!redis.hasKey("value:" + messageId)) {
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
// }
//
// // 1 添加成功新数据 0已有重复值,不允许再添加
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
// //过期时间
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
//
//
// try {
// if (add == 1) {
// //第一次 消费
// System.out.println("*****************************");
// System.out.println("消费者收到消息:" + param);
// System.out.println("*****************************");
// log.info("消费结束");
//
// channel.basicAck(deliveryTag, false);
//
// } else {
// //重复消费
// log.error("重复消费");
// channel.basicReject(deliveryTag, false);
//
// //删除缓存
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
// }
//
//
// } catch (Exception e) {
// log.error("消息没有成功消费!");
//
// String s = redis.opsForValue().get("value:" + messageId);
//
// long oldTag = Long.parseLong(s);
//
// if (deliveryTag == (oldTag + 2)) {
// log.error("确实消费不了,不入队了!");
// channel.basicNack(deliveryTag, false, false);
// } else {
// log.info("消息消费失败,重新入队");
// channel.basicNack(deliveryTag, false, true);
// }
// }
//
// }
//
//**/
//
// @RabbitHandler
// public void process(User param, Channel channel, Message message) throws IOException {
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
//
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// String messageId = message.getMessageProperties().getMessageId();
//
// if (!redis.hasKey("value:" + messageId)) {
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
// }
//
// // 1 添加成功新数据 0已有重复值,不允许再添加
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
// //过期时间
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
//
//
// try {
// if (add == 1) {
// //第一次 消费
// System.out.println("*****************************");
// System.out.println("消费者收到消息:" + param);
// System.out.println("*****************************");
// log.info("消费结束");
//
// //确认消费
// channel.basicAck(deliveryTag, false);
//
// } else {
// //重复消费
// log.error("重复消费");
// //拒绝消费
// channel.basicReject(deliveryTag, false);
//
// //删除缓存
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
// }
//
//
// } catch (Exception e) {
// log.error("消息没有成功消费!");
//
// String s = redis.opsForValue().get("value:" + messageId);
//
// long oldTag = Long.parseLong(s);
//
// if (deliveryTag == (oldTag + 2)) {
// log.error("确实消费不了,不入队了!");
//
//
// //拒绝消费
// channel.basicNack(deliveryTag, false, false);
// } else {
// log.info("消息消费失败,重新入队");
// //重新入队
// channel.basicNack(deliveryTag, false, true);
// }
// }
// }
//}

View File

@ -4,7 +4,6 @@ server:
# Spring
spring:
application:
# 应用名称
name: couplet-system
@ -16,11 +15,9 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 968741d4-299d-483c-8d30-ede2aff8cfd4
# 配置文件格式
file-extension: yml
# 共享配置

View File

@ -275,6 +275,13 @@
<version>${couplet.version}</version>
</dependency>
<!--车辆解析模块-->
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-analyze-msg</artifactId>
<version>${couplet.version}</version>
</dependency>
</dependencies>
</dependencyManagement>