From fe2ae95a94765934eb3b675b65ffc968292eeb9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=86=99=E6=9C=9D?= <13694051+wangxizhao123@user.noreply.gitee.com> Date: Mon, 17 Jun 2024 22:09:15 +0800 Subject: [PATCH] =?UTF-8?q?fast()=E8=8E=B7=E5=8F=96=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../business/common/domain/VehicleInfo.java | 211 ++++++++++++++++++ .../muyu-business/muyu-business-kafka/pom.xml | 37 +++ .../business/kafka/common/KafkaMonitor.java | 13 ++ .../muyu-business-server/pom.xml | 17 ++ .../com/muyu/goods/client/KafkaConsumer.java | 21 ++ .../com/muyu/goods/client/MqConsumer.java | 24 ++ .../muyu/goods/controller/CarController.java | 32 +++ .../com/muyu/goods/incident/mq/Producer.java | 16 ++ .../java/com/muyu/goods/mapper/CarMapper.java | 6 + .../java/com/muyu/goods/mapper/MapMapper.java | 2 + .../com/muyu/goods/service/ICarService.java | 6 + .../goods/service/impl/CarServiceImpl.java | 5 + .../muyu/goods/service/impl/MapService.java | 33 +++ .../main/java/com/muyu/goods/timer/Timer.java | 43 ++++ .../src/main/resources/bootstrap.yml | 8 + .../main/resources/mapper/rule/CarMapper.xml | 3 + muyu-modules/muyu-business/pom.xml | 1 + .../controller/EnterpriseController.java | 7 + .../main/java/com/muyu/goods/mq/Producer.java | 8 + .../goods/service/IEnterpriseService.java | 1 + .../service/impl/EnterpriseServiceImpl.java | 7 + .../muyu-incident-client/pom.xml | 35 +++ .../client/VehicleEventClientApplication.java | 14 ++ .../incident/client/config/KafkaConfig.java | 32 +++ .../incident/client/config/MsgComponent.java | 124 ++++++++++ .../src/main/resources/application.yml | 11 + .../muyu-incident-common/pom.xml | 51 +++++ .../incident/common/config/MsgConfig.java | 21 ++ .../common/constants/VehicleConstant.java | 14 ++ ...ot.autoconfigure.AutoConfiguration.imports | 0 .../muyu-incident-server/pom.xml | 28 +++ .../server/VehicleEventSeverApplication.java | 11 + .../server/controller/IncidentController.java | 26 +++ .../incident/server/model/IncidentModel.java | 22 ++ .../src/main/resources/application.yml | 7 + muyu-modules/muyu-incident/pom.xml | 27 +++ .../muyu-moudels-many-datasource/pom.xml | 7 +- .../cloud/CloudManyDataSourceApplication.java | 2 + .../main/java/com/muyu/cloud/mq/Consumer.java | 14 +- pom.xml | 1 + 40 files changed, 946 insertions(+), 2 deletions(-) create mode 100644 muyu-modules/muyu-business/muyu-business-common/src/main/java/com/business/common/domain/VehicleInfo.java create mode 100644 muyu-modules/muyu-business/muyu-business-kafka/pom.xml create mode 100644 muyu-modules/muyu-business/muyu-business-kafka/src/main/java/muyu/business/kafka/common/KafkaMonitor.java create mode 100644 muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/KafkaConsumer.java create mode 100644 muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/MqConsumer.java create mode 100644 muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/incident/mq/Producer.java create mode 100644 muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/timer/Timer.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-client/pom.xml create mode 100644 muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/VehicleEventClientApplication.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/KafkaConfig.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/MsgComponent.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-client/src/main/resources/application.yml create mode 100644 muyu-modules/muyu-incident/muyu-incident-common/pom.xml create mode 100644 muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/config/MsgConfig.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/constants/VehicleConstant.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 muyu-modules/muyu-incident/muyu-incident-server/pom.xml create mode 100644 muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/VehicleEventSeverApplication.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/controller/IncidentController.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/model/IncidentModel.java create mode 100644 muyu-modules/muyu-incident/muyu-incident-server/src/main/resources/application.yml create mode 100644 muyu-modules/muyu-incident/pom.xml diff --git a/muyu-modules/muyu-business/muyu-business-common/src/main/java/com/business/common/domain/VehicleInfo.java b/muyu-modules/muyu-business/muyu-business-common/src/main/java/com/business/common/domain/VehicleInfo.java new file mode 100644 index 0000000..b84c85a --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-common/src/main/java/com/business/common/domain/VehicleInfo.java @@ -0,0 +1,211 @@ +package com.business.common.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * 车辆信息 + * @ClassName VehicleInfo + * @Author GuanTieLin + * @Date 2024/5/30 18:40 + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class VehicleInfo { + /** + * vin + */ + private String vin; + /** + * 时间 + */ + private String time; + /** + * 经度 + */ + private String longitude; + /** + * 纬度 + */ + private String latitude; + /** + * 车速 + */ + private String speed; + /** + * 总里程 + */ + private String totalMileage; + /** + * 总电压 + */ + private String totalVoltage; + /** + * 总电流 + */ + private String totalCurrent; + /** + * 绝缘电阻 + */ + private String insulationResistance; + /** + * 档位 + */ + private String gears; + /** + * 加速踏板行程值 + */ + private String acceleratorPedal; + /** + * 制动踏板行程值 + */ + private String brakePedal; + /** + * 燃料消耗率 + */ + private String fuelConsumptionRate; + /** + * 电机控制器温度 + */ + private String MotorControllerTemperature; + /** + * 电机转速 + */ + private String motorSpeed; + /** + * 电机转矩 + */ + private String motorTorque; + /** + * 电机温度 + */ + private String motorTemperature; + /** + * 电机电压 + */ + private String motorVoltage; + /** + * 电机电流 + */ + private String motorCurrent; + /** + * 动力电池剩余电量SOC + */ + private String remainingPower; + /** + * 当前状态允许的最大反馈功率 + */ + private String maximumFeedbackPower; + /** + * 当前状态允许最大放电功率 + */ + private String maximumDischargePower; + /** + * BMS自检计数器 + */ + private String selfTestCounter; + /** + * 动力电池充放电电流 + */ + private String batteryChargeOrDischargeCurrent; + /** + * 动力电池负载端总电压V3 + */ + private String totalVoltageV3; + /** + * 单次最大电压 + */ + private String maximumVoltage; + /** + * 单体电池最低电压 + */ + private String minimumBatteryVoltage; + /** + * 单体电池最高温度 + */ + private String maximumBatteryTemperature; + /** + * 单体电池最低温度 + */ + private String minimumBatteryTemperature; + /** + * 动力电池可用容量 + */ + private String usableBatteryCapacity; + /** + * 车辆状态 + */ + private String vehicleStatus; + /** + * 充电状态 + */ + private String chargeStatus; + /** + * 运行状态 + */ + private String operationalStatus; + /** + * SOC + */ + private String soc; + /** + * 可充电储能装置工作状态 + */ + private String energyStorageDeviceWorkingStatus; + /** + * 驱动电机状态 + */ + private String driveMotorStatus; + /** + * 定位是否有效 + */ + private String positioningIsOrNotEffective; + /** + * EAS + */ + private String eas; + /** + * PTC + */ + private String ptc; + /** + * EPS + */ + private String eps; + /** + * ABS + */ + private String abs; + /** + * MCU + */ + private String mcu; + /** + * 动力电池加热状态 + */ + private String batteryHeatingStatus; + /** + * 动力电池当前状态 + */ + private String batteryCurrentStatus; + /** + * 动力电池保温状态 + */ + private String batteryInsulationStatus; + /** + * DCDC + */ + private String dcdc; + /** + * CHG + */ + private String chg; + /** + * 校验位 + */ + private String checkDigit; +} diff --git a/muyu-modules/muyu-business/muyu-business-kafka/pom.xml b/muyu-modules/muyu-business/muyu-business-kafka/pom.xml new file mode 100644 index 0000000..5d0aa0f --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-kafka/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.muyu + muyu + 3.6.3 + ../../../pom.xml + + + muyu-business-kafka + + + 17 + 17 + UTF-8 + + + + + com.muyu + muyu-business-common + 3.6.3 + + + + org.springframework.kafka + spring-kafka + + + + + + + diff --git a/muyu-modules/muyu-business/muyu-business-kafka/src/main/java/muyu/business/kafka/common/KafkaMonitor.java b/muyu-modules/muyu-business/muyu-business-kafka/src/main/java/muyu/business/kafka/common/KafkaMonitor.java new file mode 100644 index 0000000..0a1934a --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-kafka/src/main/java/muyu/business/kafka/common/KafkaMonitor.java @@ -0,0 +1,13 @@ +package muyu.business.kafka.common; + +import org.springframework.stereotype.Component; +import org.springframework.kafka.annotation.KafkaListener; + +@Component +public class KafkaMonitor { + + @KafkaListener(topics = "${kafka.topic}") + public void receive(String message){ + System.out.println("监听到:" + message); + } +} diff --git a/muyu-modules/muyu-business/muyu-business-server/pom.xml b/muyu-modules/muyu-business/muyu-business-server/pom.xml index dcda4fe..509d4b5 100644 --- a/muyu-modules/muyu-business/muyu-business-server/pom.xml +++ b/muyu-modules/muyu-business/muyu-business-server/pom.xml @@ -95,6 +95,23 @@ com.muyu muyu-common-system + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.kafka + spring-kafka + + + + + org.springframework.boot + spring-boot-starter + diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/KafkaConsumer.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/KafkaConsumer.java new file mode 100644 index 0000000..9eb4a58 --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/KafkaConsumer.java @@ -0,0 +1,21 @@ +//package com.muyu.goods.client; +// +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Component; +// +//@Component +//public class KafkaConsumer { +// +// @KafkaListener(topics = "my-topic") +// public void listenToMessage(String message) { +// System.out.println("监听到kafka: " + message); +// // 在这里处理消息逻辑 +// } +// +// // 如果需要监听多个主题,可以添加更多的@KafkaListener注解方法 +// @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}") +// public void listenToMultipleTopics(String message) { +// // ... +// } +// +//} diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/MqConsumer.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/MqConsumer.java new file mode 100644 index 0000000..1095f20 --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/client/MqConsumer.java @@ -0,0 +1,24 @@ +package com.muyu.goods.client; + +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +public class MqConsumer { + + +// @RabbitListener(queuesToDeclare = {@Queue("vehicle.event")}) + //监听交换机 Queue(队列名) @Exchange(value = "交换机名称", type = ExchangeTypes.FANOUT) + @RabbitListener(bindings = {@QueueBinding(value = @Queue("vehicle.event"), + exchange = @Exchange(value = "vehicle.event", type = ExchangeTypes.FANOUT))}) + public void handleMessage(String message) { + System.out.println("Received message: " + message); + // 在这里处理接收到的消息 + } +} diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/controller/CarController.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/controller/CarController.java index 582436e..8a8d74b 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/controller/CarController.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/controller/CarController.java @@ -6,6 +6,7 @@ import javax.servlet.http.HttpServletResponse; import com.business.common.domain.Car; import com.muyu.goods.service.ICarService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; @@ -34,6 +35,8 @@ public class CarController extends BaseController { @Autowired private ICarService carService; + @Autowired + private RedisTemplate redisTemplate; /** * 查询车辆信息列表 @@ -47,6 +50,15 @@ public class CarController extends BaseController return getDataTable(list); } + /** + * + * @return + */ + @PostMapping("/lists") + public Result> lists() { + return success(carService.lists()); + } + /** * 导出车辆信息列表 */ @@ -107,4 +119,24 @@ public class CarController extends BaseController { return toAjax(carService.deleteCarByCarIds(carIds)); } + + /** + * 启动实时数据事件 + * @return + */ + @PostMapping("activate") + public Result activate() { + redisTemplate.opsForList(); + return null; + } + + /** + * 结束实时数据事件 + * @return + */ + @PostMapping("finish") + public Result finish() { + String s = redisTemplate.opsForValue().get("01"); + return Result.success(s); + } } diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/incident/mq/Producer.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/incident/mq/Producer.java new file mode 100644 index 0000000..46aa5f5 --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/incident/mq/Producer.java @@ -0,0 +1,16 @@ +package com.muyu.goods.incident.mq; + +import com.muyu.common.core.domain.Result; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class Producer { + @Autowired + private RabbitTemplate rabbitTemplate; + + public void electronic(Integer id) { + rabbitTemplate.convertAndSend("vehicle.event","",id); + } +} diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/CarMapper.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/CarMapper.java index b947181..1d4ba9d 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/CarMapper.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/CarMapper.java @@ -20,6 +20,11 @@ public interface CarMapper */ public Car selectCarByCarId(Long carId); + /** + * 车辆列表 + * @return + */ + List lists(); /** * 查询车辆信息列表 * @@ -63,4 +68,5 @@ public interface CarMapper Car query(Long carId); Car carByCar(); + } diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/MapMapper.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/MapMapper.java index 7dea062..416e874 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/MapMapper.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/mapper/MapMapper.java @@ -1,5 +1,6 @@ package com.muyu.goods.mapper; +import com.business.common.domain.Car; import com.business.common.domain.Fence; import com.business.common.domain.FenceGroups; import org.apache.ibatis.annotations.Param; @@ -20,4 +21,5 @@ public interface MapMapper { Fence queryFence(@Param("fenceId") Long fenceId); int updateFence(@Param("fenceId") Long fenceId, @Param("status") String status); + } diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/ICarService.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/ICarService.java index b233465..44c2d35 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/ICarService.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/ICarService.java @@ -20,6 +20,11 @@ public interface ICarService */ public Car selectCarByCarId(Long carId); + /** + * 车辆列表 + * @return + */ + List lists(); /** * 查询车辆信息列表 * @@ -61,4 +66,5 @@ public interface ICarService public int deleteCarByCarId(Long carId); Car query(Long carId); + } diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/CarServiceImpl.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/CarServiceImpl.java index cb7651f..d2f6f53 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/CarServiceImpl.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/CarServiceImpl.java @@ -40,6 +40,11 @@ public class CarServiceImpl implements ICarService return carMapper.selectCarByCarId(carId); } + @Override + public List lists() { + return carMapper.lists(); + } + /** * 查询车辆信息列表 * diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/MapService.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/MapService.java index 765116c..294a023 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/MapService.java +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/service/impl/MapService.java @@ -22,6 +22,10 @@ public class MapService implements IMapService { @Autowired private MapMapper mapMapper; + /** + * 围栏组 + * @return + */ @Override public List selectFenceGroups() { List fenceGroupsList = mapMapper.selectFenceGroups(); @@ -35,18 +39,32 @@ public class MapService implements IMapService { return fenceGroupsList; } + /** + * 查看电子围栏 + * @return + */ @Override public List selectFence() { List fenceList = mapMapper.selectFence(); return fenceList; } + /** + * 围栏组详情 + * @param carId + * @return + */ @Override public FenceGroups selectFenceGroupsById(Long carId) { List groups = selectFenceGroups().stream().filter(car -> car.getCarId() == carId).collect(Collectors.toList()); return groups.get(0); } + /** + * 传入电子围栏 + * @param pences + * @return + */ @Override public String getSel(Pences pences) { System.out.println(pences); @@ -68,6 +86,11 @@ public class MapService implements IMapService { return "传入电子围栏失败"; } + /** + * 删除电子围栏 + * @param fenceId + * @return + */ @Override public String deleteFence(Long fenceId) { int i = mapMapper.deleteFence(fenceId); @@ -77,11 +100,21 @@ public class MapService implements IMapService { return "无效"; } + /** + * 电子围栏详情 + * @param fenceId + * @return + */ @Override public Fence queryFence(Long fenceId) { return mapMapper.queryFence(fenceId); } + /** + * 修改电子围栏状态 + * @param fenceId + * @return + */ @Override public String updateFence(Long fenceId) { Fence fence = queryFence(fenceId); diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/timer/Timer.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/timer/Timer.java new file mode 100644 index 0000000..74814d5 --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/goods/timer/Timer.java @@ -0,0 +1,43 @@ +package com.muyu.goods.timer; + +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.common.protocol.types.Field; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@Component +@EnableScheduling +@Log4j2 +public class Timer { + @Autowired + private RedisTemplate redisTemplate; + + @Scheduled(fixedDelay = 1000) + public void generateRandomNumber() { +// Random random = new Random(); +// int i = random.nextInt(100); +// System.out.println(i); + String rightPopAndLeftPush = null; + try { + rightPopAndLeftPush = redisTemplate.opsForList().leftPop("event_handler:realtime_data:JAV0VJUJYOTOK9KSY" + , 3, TimeUnit.SECONDS); + if (rightPopAndLeftPush != null) { + log.info("处理消息: {}", rightPopAndLeftPush); + } + } catch (Exception e) { + // 处理超时异常,进行持久化操作,消息没处理可进行后续处理 +// if (rightPopAndLeftPush != null) { +// redisTemplate.opsForList().leftPush("backupList", rightPopAndLeftPush); +// } + log.error(e.getMessage()); + } + + } +} diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/resources/bootstrap.yml b/muyu-modules/muyu-business/muyu-business-server/src/main/resources/bootstrap.yml index 5806aa5..ef538bf 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/resources/bootstrap.yml +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/resources/bootstrap.yml @@ -23,6 +23,14 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + kafka: + bootstrap-servers: 129.211.23.219:9200 # kafka集群地址 + consumer: + group-id: my-group #消费者组id + auto-offset-reset: earliest #默认偏移量位置 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #键序列化器 + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值序列化器 + enable-auto-commit: false # 是否自动提交offset logging: level: com.muyu.rule.mapper: DEBUG diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/resources/mapper/rule/CarMapper.xml b/muyu-modules/muyu-business/muyu-business-server/src/main/resources/mapper/rule/CarMapper.xml index c56e94a..3ef5dab 100644 --- a/muyu-modules/muyu-business/muyu-business-server/src/main/resources/mapper/rule/CarMapper.xml +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/resources/mapper/rule/CarMapper.xml @@ -46,6 +46,9 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" ORDER BY car_id DESC LIMIT 1; + insert into car diff --git a/muyu-modules/muyu-business/pom.xml b/muyu-modules/muyu-business/pom.xml index 974736e..6d0cee4 100644 --- a/muyu-modules/muyu-business/pom.xml +++ b/muyu-modules/muyu-business/pom.xml @@ -16,6 +16,7 @@ muyu-business-common muyu-business-server muyu-business-client + muyu-business-kafka diff --git a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/controller/EnterpriseController.java b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/controller/EnterpriseController.java index d79c8f1..13d3ced 100644 --- a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/controller/EnterpriseController.java +++ b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/controller/EnterpriseController.java @@ -8,6 +8,7 @@ import com.muyu.common.system.domain.SysUser; import com.muyu.goods.domain.Custom; import com.muyu.goods.domain.Enterprise; import com.muyu.goods.domain.Sources; +import com.muyu.goods.mq.Producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import com.muyu.common.log.annotation.Log; @@ -32,6 +33,12 @@ public class EnterpriseController extends BaseController @Autowired private IEnterpriseService enterpriseService; + + @PostMapping("rest") + public void send() { + enterpriseService.send(); + } + /** * 查询企业列表 */ diff --git a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/mq/Producer.java b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/mq/Producer.java index 6f29383..a20d151 100644 --- a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/mq/Producer.java +++ b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/mq/Producer.java @@ -14,5 +14,13 @@ public class Producer { rabbitTemplate.convertAndSend("datasource",ip); } + public void exchangeSource(String ip) { + rabbitTemplate.convertAndSend("datasource","",ip); + } + + public void vehicleEvent(String name) { + rabbitTemplate.convertAndSend("vehicle.event","",name); + } + } diff --git a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/IEnterpriseService.java b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/IEnterpriseService.java index 7a554ae..e3aa9c6 100644 --- a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/IEnterpriseService.java +++ b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/IEnterpriseService.java @@ -97,4 +97,5 @@ public interface IEnterpriseService List listCustom(); + void send(); } diff --git a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/impl/EnterpriseServiceImpl.java b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/impl/EnterpriseServiceImpl.java index 197af60..8d7663c 100644 --- a/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/impl/EnterpriseServiceImpl.java +++ b/muyu-modules/muyu-goods-enterprise/muyu-goods-enterprise-server/src/main/java/com/muyu/goods/service/impl/EnterpriseServiceImpl.java @@ -252,4 +252,11 @@ public class EnterpriseServiceImpl implements IEnterpriseService return enterpriseMapper.listCustom(); } + @Override + public void send() { + List lists = lists(); + String jsonString = JSONObject.toJSONString(lists); + producer.vehicleEvent(jsonString); + } + } diff --git a/muyu-modules/muyu-incident/muyu-incident-client/pom.xml b/muyu-modules/muyu-incident/muyu-incident-client/pom.xml new file mode 100644 index 0000000..5cbe07b --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-client/pom.xml @@ -0,0 +1,35 @@ + + + 4.0.0 + + com.muyu + muyu-incident + 3.6.3 + + + muyu-incident-client + + + 17 + 17 + UTF-8 + + + + + com.muyu + muyu-incident-common + 3.6.3 + + + + com.muyu + muyu-incident-server + 3.6.3 + + + + + diff --git a/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/VehicleEventClientApplication.java b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/VehicleEventClientApplication.java new file mode 100644 index 0000000..2b357fd --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/VehicleEventClientApplication.java @@ -0,0 +1,14 @@ +package com.muyu.incident.client; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * 事件系统启动类 + */ +@SpringBootApplication +public class VehicleEventClientApplication { + public static void main(String[] args) { + SpringApplication.run(VehicleEventClientApplication.class,args); + } +} diff --git a/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/KafkaConfig.java b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/KafkaConfig.java new file mode 100644 index 0000000..4f2554b --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/KafkaConfig.java @@ -0,0 +1,32 @@ +package com.muyu.incident.client.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * kafka配置 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Configuration +@ConfigurationProperties(prefix = "kafka") +public class KafkaConfig { + private static final String queueName = null; + /** + * 主题 + */ + private String topic; + /** + * 分区 + */ + private Integer partition; + public String queueName() { + return topic + "." + partition; + } +} diff --git a/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/MsgComponent.java b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/MsgComponent.java new file mode 100644 index 0000000..857b6c9 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-client/src/main/java/com/muyu/incident/client/config/MsgComponent.java @@ -0,0 +1,124 @@ +package com.muyu.incident.client.config; + +import com.alibaba.fastjson2.JSONObject; +import com.incident.server.model.IncidentModel; +import com.muyu.incident.common.config.MsgConfig; +import com.muyu.incident.common.constants.VehicleConstant; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.Arrays; + + +/** + * 消息处理配置 + */ +@Component +@Log4j2 +public class MsgComponent { + /** + * 队列声明 + * @param kafkaConfig 名称依据 + * @return 队列对象 + */ + @Bean + public Queue initCehicleEventClientQueue(KafkaConfig kafkaConfig) { + return new Queue(kafkaConfig.queueName(),true,true,true); + } + + /** + * 绑定交换机 +// * @param vehicleEventExchange 交换机 + * @param initCehicleEventClientQueue 队列 + * @return 绑定结果 + */ + @Bean + public Binding binding1(Queue initCehicleEventClientQueue) { + MsgConfig msgConfig = new MsgConfig(); + FanoutExchange fanoutExchange = msgConfig.VehicleEvent(); + System.out.println(fanoutExchange); + System.out.println(initCehicleEventClientQueue); + return BindingBuilder.bind(initCehicleEventClientQueue).to(fanoutExchange); + } + + /** + * 自定义rabbitmq + * @param connectionFactory + * @param kafkaConfig + * @return + */ + @Bean + public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,KafkaConfig kafkaConfig) { + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); + //针对哪些队列(参数为可变参数) + simpleMessageListenerContainer.setQueueNames(kafkaConfig.queueName()); + //同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。 + simpleMessageListenerContainer.setConcurrentConsumers(3); + //最大的消费者线程数 + simpleMessageListenerContainer.setMaxConcurrentConsumers(5); + //设置消息确认方式 NONE=不确认,MANUAL=手动确认,AUTO=自动确认; + //自动确认 + simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); +// simpleMessageListenerContainer.setMessageListener(message -> log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message.toString())); + //手动确认(单条确认) + simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); + simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { + log.info("springboot.rabbitmq-queue接收到的消息:[{}]", + JSONObject.parseObject(new String(message.getBody()), IncidentModel.class) + ); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + }); + //消费端限流 + simpleMessageListenerContainer.setPrefetchCount(1); + return simpleMessageListenerContainer; + } + + @Bean + public ConnectionFactory connectionFactory(RabbitProperties rabbitProperties) { + CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); + cachingConnectionFactory.setHost(rabbitProperties.getHost()); + cachingConnectionFactory.setPort(rabbitProperties.getPort()); + cachingConnectionFactory.setUsername(rabbitProperties.getUsername()); + cachingConnectionFactory.setPassword(rabbitProperties.getPassword()); + //开启发送确认 + cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + //开启消息返回 + cachingConnectionFactory.setPublisherReturns(true); + cachingConnectionFactory.createConnection(); + return cachingConnectionFactory; + } + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMandatory(true); + //消息返回 + rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { + log.info("消息返回实现,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", message, i, s, s1, s2); + }); + //发送端确认 + rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { + log.info("发送端确认,correlationData:{},ack:{},cause:{}", correlationData, b, s); + }); + return rabbitTemplate; + } + + +} diff --git a/muyu-modules/muyu-incident/muyu-incident-client/src/main/resources/application.yml b/muyu-modules/muyu-incident/muyu-incident-client/src/main/resources/application.yml new file mode 100644 index 0000000..65b0540 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-client/src/main/resources/application.yml @@ -0,0 +1,11 @@ +#server: +# port: 8088 +kafka: + topic: vehicle.gateway.001 + partition: 0 + +spring: + rabbitmq: + host: 129.211.23.219 + port: 5672 + diff --git a/muyu-modules/muyu-incident/muyu-incident-common/pom.xml b/muyu-modules/muyu-incident/muyu-incident-common/pom.xml new file mode 100644 index 0000000..4fc2ec9 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-common/pom.xml @@ -0,0 +1,51 @@ + + + 4.0.0 + + com.muyu + muyu + 3.6.3 + ../../../pom.xml + + + muyu-incident-common + + + 17 + 17 + UTF-8 + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.projectlombok + lombok + + + + com.alibaba.fastjson2 + fastjson2 + + + + com.alibaba.fastjson2 + fastjson2 + + + + + + diff --git a/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/config/MsgConfig.java b/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/config/MsgConfig.java new file mode 100644 index 0000000..ae569e0 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/config/MsgConfig.java @@ -0,0 +1,21 @@ +package com.muyu.incident.common.config; + +import com.muyu.incident.common.constants.VehicleConstant; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * 工具类 + */ +@Component +public class MsgConfig { + /** + * 创建交换机 + * @return + */ + @Bean + public FanoutExchange VehicleEvent() { + return new FanoutExchange(VehicleConstant.VEHICLE_EVENT_EXCHANGE); + } +} diff --git a/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/constants/VehicleConstant.java b/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/constants/VehicleConstant.java new file mode 100644 index 0000000..1e17065 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-common/src/main/java/com/muyu/incident/common/constants/VehicleConstant.java @@ -0,0 +1,14 @@ +package com.muyu.incident.common.constants; + +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.context.annotation.Bean; + +/** + * 事件系统常量 + */ +public class VehicleConstant { + /** + * 车辆事件系统交换机 + */ + public static String VEHICLE_EVENT_EXCHANGE = "vehicle.event"; +} diff --git a/muyu-modules/muyu-incident/muyu-incident-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/muyu-modules/muyu-incident/muyu-incident-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..e69de29 diff --git a/muyu-modules/muyu-incident/muyu-incident-server/pom.xml b/muyu-modules/muyu-incident/muyu-incident-server/pom.xml new file mode 100644 index 0000000..e5de0dc --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-server/pom.xml @@ -0,0 +1,28 @@ + + + 4.0.0 + + com.muyu + muyu-incident + 3.6.3 + + + muyu-incident-server + + + 17 + 17 + UTF-8 + + + + + com.muyu + muyu-incident-common + 3.6.3 + + + + diff --git a/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/VehicleEventSeverApplication.java b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/VehicleEventSeverApplication.java new file mode 100644 index 0000000..0cb30f1 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/VehicleEventSeverApplication.java @@ -0,0 +1,11 @@ +package com.incident.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class VehicleEventSeverApplication { + public static void main(String[] args) { + SpringApplication.run(VehicleEventSeverApplication.class,args); + } +} diff --git a/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/controller/IncidentController.java b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/controller/IncidentController.java new file mode 100644 index 0000000..fd60769 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/controller/IncidentController.java @@ -0,0 +1,26 @@ +package com.incident.server.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.incident.server.model.IncidentModel; +import com.muyu.incident.common.constants.VehicleConstant; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/test") +public class IncidentController { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @PostMapping + public String sendMsg(@RequestBody IncidentModel incidentModel) { + rabbitTemplate.convertSendAndReceive(VehicleConstant.VEHICLE_EVENT_EXCHANGE,"", + JSONObject.toJSONString(incidentModel)); + return null; + } +} diff --git a/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/model/IncidentModel.java b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/model/IncidentModel.java new file mode 100644 index 0000000..18e77f8 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-server/src/main/java/com/incident/server/model/IncidentModel.java @@ -0,0 +1,22 @@ +package com.incident.server.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class IncidentModel { + /** + * 唯一键 + */ + private String vin; + /** + * 消息类型 + */ + private String type; + +} diff --git a/muyu-modules/muyu-incident/muyu-incident-server/src/main/resources/application.yml b/muyu-modules/muyu-incident/muyu-incident-server/src/main/resources/application.yml new file mode 100644 index 0000000..fb59ef5 --- /dev/null +++ b/muyu-modules/muyu-incident/muyu-incident-server/src/main/resources/application.yml @@ -0,0 +1,7 @@ +server: + port: 8087 +spring: + rabbitmq: + host: 129.211.23.219 + port: 5672 + diff --git a/muyu-modules/muyu-incident/pom.xml b/muyu-modules/muyu-incident/pom.xml new file mode 100644 index 0000000..2b7181a --- /dev/null +++ b/muyu-modules/muyu-incident/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.muyu + muyu + 3.6.3 + ../../pom.xml + + + muyu-incident + pom + + muyu-incident-common + muyu-incident-server + muyu-incident-client + + + + 17 + 17 + UTF-8 + + + diff --git a/muyu-modules/muyu-moudels-many-datasource/pom.xml b/muyu-modules/muyu-moudels-many-datasource/pom.xml index 1b24eee..81e326b 100644 --- a/muyu-modules/muyu-moudels-many-datasource/pom.xml +++ b/muyu-modules/muyu-moudels-many-datasource/pom.xml @@ -89,12 +89,17 @@ muyu-common-swagger - + org.springframework.boot spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-integration + diff --git a/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/CloudManyDataSourceApplication.java b/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/CloudManyDataSourceApplication.java index f8057b6..28c461b 100644 --- a/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/CloudManyDataSourceApplication.java +++ b/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/CloudManyDataSourceApplication.java @@ -4,6 +4,7 @@ import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSour import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import com.muyu.common.swagger.annotation.EnableCustomSwagger2; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @@ -14,6 +15,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @EnableCustomConfig @EnableCustomSwagger2 @EnableMyFeignClients +@EnableRabbit @SpringBootApplication(exclude = {DynamicDataSourceAutoConfiguration.class, DataSourceAutoConfiguration.class}) public class CloudManyDataSourceApplication { public static void main(String[] args) { diff --git a/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/mq/Consumer.java b/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/mq/Consumer.java index 2ebb781..4f5170f 100644 --- a/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/mq/Consumer.java +++ b/muyu-modules/muyu-moudels-many-datasource/src/main/java/com/muyu/cloud/mq/Consumer.java @@ -8,7 +8,11 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -22,14 +26,22 @@ import static java.lang.Thread.sleep; @Component @Log4j2 public class Consumer { + @Autowired + private AmqpAdmin amqpAdmin; @Autowired private DruidConfig druidConfig; @Autowired private CloudController controller; - @RabbitListener(queuesToDeclare = {@Queue("datasource")}) + + +// @RabbitListener(queuesToDeclare = {@Queue("datasource")}) + @RabbitListener(bindings = {@QueueBinding(value = @Queue("datasource"), + exchange = @Exchange(value = "datasource", type = ExchangeTypes.FANOUT))}) public void dataSource(String ip){ + System.out.println(ip); controller.selectIp(ip); + amqpAdmin.deleteQueue("datasource"); } } diff --git a/pom.xml b/pom.xml index 13e7880..e933c45 100644 --- a/pom.xml +++ b/pom.xml @@ -218,6 +218,7 @@ muyu-modules/muyu-goods-enterprise muyu-modules/muyu-business muyu-modules/muyu-moudels-many-datasource + muyu-modules/muyu-incident pom