Merge branch 'server_five_liuyunhu' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five
# Conflicts: # couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml # couplet-modules/couplet-business/src/main/java/com/couplet/business/server/CoupletBusinessApplication.java # couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/VehicleController.java # couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java # couplet-modules/couplet-business/src/main/resources/bootstrap.yml # couplet-modules/couplet-business/src/main/resources/mapper/business/VehicleMapper.xml # couplet-modules/couplet-modules-mq/pom.xml # couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/CoupletMqApplatcaion.java # couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqConsumer.javaserver_five_liuyunhu
commit
5c2021d6f2
|
@ -51,4 +51,7 @@ public interface RemoteVehicleService {
|
|||
@GetMapping("/findByVIN/{vin}")
|
||||
public Result<List<Vehicle>> findByVIN(@PathVariable("vin") String vin);
|
||||
|
||||
@GetMapping("onOrOutLineByVIN")
|
||||
public Integer onOrOutLineByVIN(@RequestParam("params") String params);
|
||||
|
||||
}
|
||||
|
|
|
@ -50,6 +50,12 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory<RemoteVehic
|
|||
public Result<List<Vehicle>> findByVIN(String vin) {
|
||||
return Result.error("车辆服务调用失败:" + cause.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer onOrOutLineByVIN(String params) {
|
||||
log.error("车辆服务调用失败:"+cause.getMessage());
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
|
@ -24,7 +23,6 @@ spring:
|
|||
# 共享配置
|
||||
shared-configs:
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
rabbitmq:
|
||||
|
@ -51,8 +49,8 @@ mybatis-plus:
|
|||
configuration:
|
||||
map-underscore-to-camel-case: true
|
||||
|
||||
## RabbitMQ配置
|
||||
#mq:
|
||||
# queueName: queueName
|
||||
# exchangeName: exchangeName
|
||||
# routingKey: routingKey
|
||||
# RabbitMQ配置
|
||||
mq:
|
||||
queueName: queue
|
||||
exchangeName: exchange
|
||||
routingKey: routingKey
|
||||
|
|
|
@ -5,6 +5,7 @@ import com.couplet.common.security.annotation.EnableMyFeignClients;
|
|||
import com.couplet.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
|
@ -13,8 +14,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
*/
|
||||
@EnableCustomConfig
|
||||
@EnableCustomSwagger2
|
||||
@EnableMyFeignClients(basePackages = ("com.couplet.**"))
|
||||
@SpringBootApplication(scanBasePackages = {"com.couplet.**"})
|
||||
@EnableMyFeignClients(basePackages = ("com.couplet"))
|
||||
@SpringBootApplication(scanBasePackages = {"com.couplet"})
|
||||
@EnableScheduling
|
||||
public class CoupletBusinessApplication {
|
||||
public static void main (String[] args) {
|
||||
SpringApplication.run(CoupletBusinessApplication.class, args);
|
||||
|
|
|
@ -22,10 +22,12 @@ public interface VehicleMapper extends BaseMapper<Vehicle> {
|
|||
|
||||
Integer deleteVehicle(Long middleId);
|
||||
|
||||
|
||||
Integer addVehicle(VehicleMiddle vehicleMiddle);
|
||||
|
||||
List<Vehicle> vehicleAll();
|
||||
|
||||
Integer onOrOutLineByVIN(@Param("vin") String vin, @Param("status") Integer status);
|
||||
|
||||
Integer addVehicle(@Param("userId") Long userId, @Param("vehicleIds") List<Long> vehicleIds);
|
||||
|
||||
}
|
||||
|
|
|
@ -37,4 +37,6 @@ public interface VehicleService extends IService<Vehicle> {
|
|||
|
||||
List<Vehicle> vehicleAll();
|
||||
|
||||
Integer onOrOutLineByVIN(String vin,Integer status);
|
||||
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
**/
|
||||
@Override
|
||||
public String deleteById(Long vehicleId) {
|
||||
String result = "";
|
||||
String result;
|
||||
|
||||
UpdateWrapper<Vehicle> updateWrapper = new UpdateWrapper<>();
|
||||
|
||||
|
@ -126,7 +126,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
**/
|
||||
@Override
|
||||
public String editById(VehicleEditParams editParams) {
|
||||
String result = "";
|
||||
String result;
|
||||
|
||||
if ((editParams.getLogoIds() == null || editParams.getLogoIds().isEmpty())) {
|
||||
result = "未选择电子围栏";
|
||||
|
@ -181,7 +181,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
**/
|
||||
@Override
|
||||
public String insert(VehicleInsertParams insertParams) {
|
||||
String result = "";
|
||||
String result;
|
||||
|
||||
if ((insertParams.getLogoIds() == null || insertParams.getLogoIds().isEmpty())) {
|
||||
result = "未选择电子围栏";
|
||||
|
@ -250,14 +250,13 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
**/
|
||||
@Override
|
||||
public List<Long> getBindLogoById(Long vehicleId) {
|
||||
List<Long> logoIds = vehicleAndLogoService.getBindLogoById(vehicleId);
|
||||
|
||||
return logoIds;
|
||||
return vehicleAndLogoService.getBindLogoById(vehicleId);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param userId:
|
||||
* @return List<Vehicle>
|
||||
* @return List<Vehicle>
|
||||
* @author 付凡芮
|
||||
* @description 根据登入人id查询管理车辆
|
||||
*
|
||||
|
@ -283,6 +282,14 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
return vehicleMapper.vehicleAll();
|
||||
}
|
||||
|
||||
|
||||
//通过vin修改车辆上下线的状态
|
||||
@Override
|
||||
public Integer onOrOutLineByVIN(String vin, Integer status) {
|
||||
|
||||
return vehicleMapper.onOrOutLineByVIN(vin, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Vehicle> findByVIN(String vin) {
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
package com.couplet.business.time;
|
||||
|
||||
import com.couplet.business.server.service.VehicleService;
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.domain.request.VehicleListParams;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: 车辆定时器
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Timer {
|
||||
//redis
|
||||
@Autowired
|
||||
private RedisService redis;
|
||||
//查询车辆列表
|
||||
@Autowired
|
||||
private VehicleService vehicleService;
|
||||
|
||||
|
||||
//判断车辆是否下线
|
||||
@Scheduled(cron = "* * * * * *")
|
||||
public void outLine() {
|
||||
log.info("定时器启动");
|
||||
|
||||
//先查询车辆列表
|
||||
List<Vehicle> list = vehicleService.list(new VehicleListParams(null, null, null, null));
|
||||
|
||||
list.forEach(vehicle -> {
|
||||
|
||||
//只针对已经上线的车辆
|
||||
if (redis.hasKey(vehicle.getVin())) {
|
||||
|
||||
//如果vin的缓存 时间还剩一秒,则判断为已经下线
|
||||
if (redis.getExpire(vehicle.getVin()) <= 3) {
|
||||
log.info(vehicle.getVin() + "的车辆已经下线");
|
||||
|
||||
//执行修改下线状态的方法
|
||||
Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0);
|
||||
|
||||
if (0 == 1) {
|
||||
log.error("下线状态修改失败");
|
||||
}
|
||||
|
||||
log.info("下线状态修改成功");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ spring:
|
|||
application:
|
||||
# 应用名称
|
||||
name: couplet-business
|
||||
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: dev
|
||||
|
@ -15,7 +16,6 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
|
@ -24,7 +24,6 @@ spring:
|
|||
# 共享配置
|
||||
shared-configs:
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
logging:
|
||||
|
|
|
@ -6,35 +6,32 @@
|
|||
|
||||
|
||||
<sql id="selectMiddleUserOrVehicle">
|
||||
SELECT
|
||||
m.middle_id,
|
||||
m.user_id,
|
||||
m.vehicle_id,
|
||||
m.del_flag,
|
||||
v.vehicle_type,
|
||||
v.motor_manufacturer,
|
||||
v.battery_manufacturer,
|
||||
v.motor_number,
|
||||
v.battery_number,
|
||||
v.vin,
|
||||
v.vehicle_state,
|
||||
t.vehicle_type_name
|
||||
FROM
|
||||
`couplet_middle` m
|
||||
LEFT JOIN couplet_vehicle v ON m.vehicle_id = v.vehicle_id
|
||||
LEFT JOIN couplet_vehicle_type t ON v.vehicle_id = t.vehicle_type_id
|
||||
SELECT m.middle_id,
|
||||
m.user_id,
|
||||
m.vehicle_id,
|
||||
m.del_flag,
|
||||
v.vehicle_type,
|
||||
v.motor_manufacturer,
|
||||
v.battery_manufacturer,
|
||||
v.motor_number,
|
||||
v.battery_number,
|
||||
v.vin,
|
||||
v.vehicle_state,
|
||||
t.vehicle_type_name
|
||||
FROM `couplet_middle` m
|
||||
LEFT JOIN couplet_vehicle v ON m.vehicle_id = v.vehicle_id
|
||||
LEFT JOIN couplet_vehicle_type t ON v.vehicle_id = t.vehicle_type_id
|
||||
WHERE m.del_flag = 0
|
||||
</sql>
|
||||
<sql id="selectVehicle">
|
||||
select
|
||||
v.vehicle_id,
|
||||
v.motor_manufacturer,
|
||||
v.battery_manufacturer,
|
||||
v.motor_number,
|
||||
v.battery_number,
|
||||
v.vin,
|
||||
v.vehicle_state,
|
||||
t.vehicle_type_name
|
||||
select v.vehicle_id,
|
||||
v.motor_manufacturer,
|
||||
v.battery_manufacturer,
|
||||
v.motor_number,
|
||||
v.battery_number,
|
||||
v.vin,
|
||||
v.vehicle_state,
|
||||
t.vehicle_type_name
|
||||
from couplet_vehicle v
|
||||
left join couplet_vehicle_type t on v.vehicle_type = t.vehicle_type_id
|
||||
where v.isdelete = 0
|
||||
|
@ -45,6 +42,15 @@
|
|||
(#{userId}, #{singleVehicleId}, 0)
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
|
||||
<update id="onOrOutLineByVIN">
|
||||
UPDATE `couplet-cloud`.`couplet_vehicle`
|
||||
SET `vehicle_state` = #{status}
|
||||
WHERE `vin` = #{vin};
|
||||
</update>
|
||||
|
||||
|
||||
<!-- <insert id="addVehicle" useGeneratedKeys="true" keyProperty="vehicleId">-->
|
||||
<!-- INSERT INTO `couplet-cloud`.`couplet_middle` (`user_id`, `vehicle_id`, `del_flag`) VALUES-->
|
||||
<!-- <foreach collection="vehicleId" item="vehicleId" separator=",">-->
|
||||
|
@ -59,9 +65,9 @@
|
|||
|
||||
<select id="UserUnderTheVehicleList" resultType="com.couplet.common.domain.Vehicle">
|
||||
<include refid="selectMiddleUserOrVehicle"/>
|
||||
<if test="userId!=null">
|
||||
AND m.user_id = #{userId}
|
||||
</if>
|
||||
<if test="userId!=null">
|
||||
AND m.user_id = #{userId}
|
||||
</if>
|
||||
</select>
|
||||
<select id="vehicleAll" resultType="com.couplet.common.domain.Vehicle">
|
||||
<include refid="selectVehicle"/>
|
||||
|
|
|
@ -19,6 +19,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
public class CoupletMqApplatcaion {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(CoupletMqApplatcaion.class, args);
|
||||
System.out.println("获取报文、RabbitMQ模块启动成功");
|
||||
System.out.println("MQ模块启动成功");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
package com.couplet.mq.config;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka生产者配置类
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@EnableKafka
|
||||
public class KafkaProducerConfig {
|
||||
@Value("${kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value("${kafka.producer.retries}")
|
||||
private Integer retries;
|
||||
|
||||
@Value("${kafka.producer.batch-size}")
|
||||
private Integer batchSize;
|
||||
|
||||
@Value("${kafka.producer.buffer-memory}")
|
||||
private Integer bufferMemory;
|
||||
|
||||
@Value("${kafka.producer.linger}")
|
||||
private Integer linger;
|
||||
|
||||
private Map<String, Object> producerConfigs() {
|
||||
HashMap<String, Object> props = new HashMap<>(16);
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, retries);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put("security.protocol", "SASL_PLAINTEXT");
|
||||
props.put("sasl.mechanism", "SCRAM-SHA-512");
|
||||
return props;
|
||||
}
|
||||
|
||||
private ProducerFactory<String, String> producerFactory() {
|
||||
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
|
||||
return producerFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> KafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package com.couplet.mq.controller;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/kafka")
|
||||
public class KafkaController {
|
||||
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
//package com.couplet.mq.service;
|
||||
//
|
||||
//import com.couplet.mq.domain.User;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @ProjectName: five-groups-couplet
|
||||
// * @Author: LiuYunHu
|
||||
// * @CreateTime: 2024/3/28
|
||||
// * @Description: MQ消费者类
|
||||
// */
|
||||
//
|
||||
//@Component
|
||||
//@Slf4j
|
||||
//@SuppressWarnings("all")
|
||||
//@RabbitListener(queues = "${mq.queueName}")
|
||||
//public class Consumer {
|
||||
// @Autowired
|
||||
// private StringRedisTemplate redis;
|
||||
//
|
||||
// /* 线程池执行
|
||||
//
|
||||
// //创建一个定长线程池
|
||||
// private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
//
|
||||
// @Async
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) {
|
||||
// executor.execute(() -> {
|
||||
// try {
|
||||
// handleMessage(param, channel, message);
|
||||
// } catch (IOException e) {
|
||||
// log.error("处理消息失败:{}", e);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// //处理信息的方法
|
||||
// private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
//**/
|
||||
//
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// //确认消费
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// //拒绝消费
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
//
|
||||
//
|
||||
// //拒绝消费
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// //重新入队
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
|
@ -0,0 +1,17 @@
|
|||
package com.couplet.mq.service;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka监听者1
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class KafkaConsumer {
|
||||
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
package com.couplet.mq.service;
|
||||
|
||||
import com.couplet.mq.domain.User;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/3/28
|
||||
* @Description: MQ消费者类
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@SuppressWarnings("all")
|
||||
@RabbitListener(queues = "${mq.queueName}")
|
||||
public class MqConsumer {
|
||||
@Autowired
|
||||
private StringRedisTemplate redis;
|
||||
|
||||
/* 线程池执行
|
||||
|
||||
//创建一个定长线程池
|
||||
private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
|
||||
@Async
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
handleMessage(param, channel, message);
|
||||
} catch (IOException e) {
|
||||
log.error("处理消息失败:{}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//处理信息的方法
|
||||
private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
**/
|
||||
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
//确认消费
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
//拒绝消费
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
|
||||
|
||||
//拒绝消费
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
//重新入队
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1 +1,2 @@
|
|||
com.couplet.mq.config.RabbitMQConfig
|
||||
com.couplet.mq.config.KafkaProducerConfig
|
||||
|
|
|
@ -27,6 +27,30 @@ spring:
|
|||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
#kafka配置信息
|
||||
kafka:
|
||||
bootstrap-servers: 39.103.133.136:9092
|
||||
producer:
|
||||
batch-size: 16384 #一次最多发送数据量 16K
|
||||
retries: 3 #发送失败后的重复发送次数
|
||||
buffer-memory: 33554432 #32M批处理缓冲区
|
||||
linger: 5 #延迟发送时间ms,如果未达到batch-size,但是时间达到linger将发送消息
|
||||
consumer:
|
||||
auto-offset-reset: latest #新建消费组时从什么位置开始消费 latest:最近位置 earliest:最早位置
|
||||
max-poll-records: 80 #批量消费一次最大拉取的数据量
|
||||
enable-auto-commit: false #是否开启自动提交
|
||||
auto-commit-interval: 1000 #自动提交的间隔时间,自动提交开启时生效
|
||||
session-timeout: 20000 #连接超时时间
|
||||
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
|
||||
max-partition-fetch-bytes: 1048576 #设置拉取数据的大小,1M
|
||||
group-id: test-group #消费组
|
||||
listener:
|
||||
batch-listener: true #是否开启批量消费,true表示批量消费
|
||||
concurrencys: 5 #设置消费的线程数
|
||||
poll-timeout: 1500 #只限自动提交
|
||||
|
||||
|
||||
logging:
|
||||
level:
|
||||
com.couplet.system.mapper: DEBUG
|
||||
|
@ -47,3 +71,5 @@ mq:
|
|||
queueName: queue
|
||||
exchangeName: exchange
|
||||
routingKey: routingKey
|
||||
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import com.couplet.common.security.annotation.EnableMyFeignClients;
|
|||
import com.couplet.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* @ProjectName: Default (Template) Project
|
||||
|
@ -17,6 +18,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
@EnableCustomSwagger2
|
||||
@EnableMyFeignClients
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
//@EnableFeignClients
|
||||
public class OnlineApplication {
|
||||
public static void main(String[] args) {
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package com.couplet.online.utils;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import com.couplet.remote.RemoteVehicleService;
|
||||
|
@ -13,6 +12,7 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
|
@ -132,33 +132,46 @@ public class MqttMonitor {
|
|||
|
||||
log.info("当前车辆的vin码为:" + start17);
|
||||
|
||||
//判断缓存中是否有这个vin
|
||||
if (redis.hasKey("不存在的车辆VIN:" + start17)) {
|
||||
// //判断缓存中是否有这个vin
|
||||
// if (redis.hasKey("不存在的车辆VIN" + start17)) {
|
||||
//
|
||||
// //可使用RabbitMQ发送消息
|
||||
// log.error("vin码为" + start17 + "的车辆不属于本系统!");
|
||||
//
|
||||
// } else {//如果缓存中没有存这个vin
|
||||
//
|
||||
//
|
||||
// }
|
||||
|
||||
//可使用RabbitMQ发送消息
|
||||
log.error("vin码为" + start17 + "的车辆不属于本系统!");
|
||||
}
|
||||
|
||||
//调取接口,通过vin查询车辆
|
||||
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
|
||||
|
||||
|
||||
//如果不存在这个车
|
||||
if (vehicles.isEmpty()) {
|
||||
if (0 == vehicles.size()) {
|
||||
//将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示
|
||||
redis.setCacheObject("不存在的车辆VIN:" + start17, start17);
|
||||
// redis.setCacheObject("不存在的车辆VIN" + start17, start17);
|
||||
log.error("未找到vin码为" + start17 + "的车辆信息");
|
||||
} else {
|
||||
//如果存在这个车
|
||||
Vehicle vehicle = vehicles.get(0);
|
||||
System.out.println("***********" + vehicle + "***********");
|
||||
//存入redis
|
||||
redis.setCacheObject("存在的车辆VIN:" + start17, JSON.toJSONString(vehicle));
|
||||
log.info("远程调用查询到的车辆数据:" + vehicle);
|
||||
|
||||
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
|
||||
redis.setCacheObject(start17, start17, 6L, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
|
||||
|
||||
//调用上线接口,修改上线状态
|
||||
Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1);
|
||||
//上线成功
|
||||
if (0 != i) {
|
||||
log.info("上线成功!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
76
pom.xml
76
pom.xml
|
@ -211,47 +211,47 @@
|
|||
<artifactId>couplet-modules-system</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-trouble</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-electronic-fence-server</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-electronic-fence-common</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-electronic-fence-remote</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-trouble</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-server</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-common</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-electronic-fence-remote</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- 企业服务 模块 公共依赖 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-enterprisemanagement-common</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!-- <!– 企业服务 模块 公共依赖 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-enterprisemanagement-common</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- 企业服务 模块 远程调用依赖 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-enterprisemanagement-remote</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!-- <!– 企业服务 模块 远程调用依赖 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-enterprisemanagement-remote</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- 车辆管理模块 -->
|
||||
<dependency>
|
||||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-modules-vehicle</artifactId>
|
||||
<version>${couplet.version}</version>
|
||||
</dependency>
|
||||
<!-- <!– 车辆管理模块 –>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>com.couplet</groupId>-->
|
||||
<!-- <artifactId>couplet-modules-vehicle</artifactId>-->
|
||||
<!-- <version>${couplet.version}</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- RabbitMq模块 -->
|
||||
<dependency>
|
||||
|
|
Loading…
Reference in New Issue