diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java index c953032..13d3710 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/RemoteVehicleService.java @@ -51,4 +51,7 @@ public interface RemoteVehicleService { @GetMapping("/findByVIN/{vin}") public Result> findByVIN(@PathVariable("vin") String vin); + @GetMapping("onOrOutLineByVIN") + public Integer onOrOutLineByVIN(@RequestParam("params") String params); + } diff --git a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java index 58fa254..8eba12c 100644 --- a/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java +++ b/couplet-common/couplet-common-business/src/main/java/com/couplet/remote/factory/RemoteVehicleFallbackFactory.java @@ -50,6 +50,12 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory> findByVIN(String vin) { return Result.error("车辆服务调用失败:" + cause.getMessage()); } + + @Override + public Integer onOrOutLineByVIN(String params) { + log.error("车辆服务调用失败:" + cause.getMessage()); + return null; + } }; } } diff --git a/couplet-common/couplet-common-core/src/main/java/com/couplet/common/core/constant/ServiceNameConstants.java b/couplet-common/couplet-common-core/src/main/java/com/couplet/common/core/constant/ServiceNameConstants.java index 2ff27ab..3df67ab 100644 --- a/couplet-common/couplet-common-core/src/main/java/com/couplet/common/core/constant/ServiceNameConstants.java +++ b/couplet-common/couplet-common-core/src/main/java/com/couplet/common/core/constant/ServiceNameConstants.java @@ -32,5 +32,4 @@ public class ServiceNameConstants { public static final String VEHICLE_SERVICE = "couplet-vehicle"; public static final String BUSINESS_SERVICE = "couplet-business"; - public static final String VEHICLE_SERVICE = "vehicle-service"; } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/VehicleController.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/VehicleController.java index d7198d5..20c8663 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/VehicleController.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/VehicleController.java @@ -174,8 +174,21 @@ public class VehicleController extends BaseController { + /* + * @Author: LiuYunHu + * @Date: 2024/4/4 11:28 + * @Description: 通过vin修改车辆上下线的状态 + * @Param: + * @Return: + **/ + @GetMapping("onOrOutLineByVIN") + public Integer onOrOutLineByVIN(@RequestParam("params") String params) { + String[] split = params.split(","); + return vehicleService.onOrOutLineByVIN(split[0], Integer.parseInt(split[1])); + } + } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java index 2f3a903..6a7374d 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.couplet.common.domain.Vehicle; import com.couplet.common.domain.VehicleMiddle; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Component; import java.util.List; @@ -24,4 +25,6 @@ public interface VehicleMapper extends BaseMapper { Integer addVehicle(VehicleMiddle vehicleMiddle); List vehicleAll(); + + Integer onOrOutLineByVIN(@Param("vin") String vin, @Param("status") int status); } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java index 954fe8b..0712f58 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java @@ -37,4 +37,6 @@ public interface VehicleService extends IService { List vehicleAll(); + Integer onOrOutLineByVIN(String s, int i); + } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java index 1ee5a95..7968448 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java @@ -282,6 +282,14 @@ public class VehicleServiceImpl extends ServiceImpl impl return vehicleMapper.vehicleAll(); } + + //通过vin修改车辆上下线的状态 + @Override + public Integer onOrOutLineByVIN(String vin, int status) { + + return vehicleMapper.onOrOutLineByVIN(vin, status); + } + @Override public List findByVIN(String vin) { diff --git a/couplet-modules/couplet-business/src/main/resources/mapper/business/VehicleMapper.xml b/couplet-modules/couplet-business/src/main/resources/mapper/business/VehicleMapper.xml index 4eb95a7..0e5bdb8 100644 --- a/couplet-modules/couplet-business/src/main/resources/mapper/business/VehicleMapper.xml +++ b/couplet-modules/couplet-business/src/main/resources/mapper/business/VehicleMapper.xml @@ -45,6 +45,15 @@ (#{userId}, #{vehicleId}, 0) + + + + UPDATE `couplet-cloud`.`couplet_vehicle` + SET `vehicle_state` = #{status} + WHERE `vin` = #{vin}; + + + update couplet_middle set del_flag = '2' diff --git a/couplet-modules/couplet-modules-mq/pom.xml b/couplet-modules/couplet-modules-mq/pom.xml index 9f61159..ea591c0 100644 --- a/couplet-modules/couplet-modules-mq/pom.xml +++ b/couplet-modules/couplet-modules-mq/pom.xml @@ -106,6 +106,15 @@ com.couplet couplet-common-business + + + + + + org.apache.kafka + kafka-clients + 2.8.0 + diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/KafkaProducerConfig.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/KafkaProducerConfig.java deleted file mode 100644 index c39db13..0000000 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/KafkaProducerConfig.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.couplet.mq.config; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka生产者配置类 - */ - -@Configuration -@EnableKafka -public class KafkaProducerConfig { - @Value("${kafka.bootstrap-servers}") - private String bootstrapServers; - - @Value("${kafka.producer.retries}") - private Integer retries; - - @Value("${kafka.producer.batch-size}") - private Integer batchSize; - - @Value("${kafka.producer.buffer-memory}") - private Integer bufferMemory; - - @Value("${kafka.producer.linger}") - private Integer linger; - - private Map producerConfigs() { - HashMap props = new HashMap<>(16); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.RETRIES_CONFIG, retries); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); - props.put(ProducerConfig.LINGER_MS_CONFIG, linger); - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "SCRAM-SHA-512"); - return props; - } - - private ProducerFactory producerFactory() { - DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); - - return producerFactory; - } - - @Bean - public KafkaTemplate KafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } -} diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaController.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaController.java deleted file mode 100644 index 1014535..0000000 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaController.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.couplet.mq.controller; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka - */ - -@Slf4j -@RestController -@RequestMapping("/kafka") -public class KafkaController { - -} diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/Aaa.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java similarity index 98% rename from couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/Aaa.java rename to couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index fd523f6..1a877e1 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/Aaa.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -14,7 +14,7 @@ import java.util.Properties; * @date 2024/4/5 21:38 * @description */ -public class Aaa { +public class KafkaTest { private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/KafkaConsumer.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/KafkaConsumer.java deleted file mode 100644 index d917f7d..0000000 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/KafkaConsumer.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.couplet.mq.service; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka监听者1 - */ - -@Component -public class KafkaConsumer { - -} diff --git a/couplet-modules/couplet-modules-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/couplet-modules/couplet-modules-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 5d1d8ad..84477af 100644 --- a/couplet-modules/couplet-modules-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/couplet-modules/couplet-modules-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1 @@ com.couplet.mq.config.RabbitMQConfig -com.couplet.mq.config.KafkaProducerConfig diff --git a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml index c3c29ed..00ac887 100644 --- a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml @@ -26,27 +26,6 @@ spring: main: allow-bean-definition-overriding: true - #kafka配置信息 - kafka: - bootstrap-servers: 39.103.133.136:9092 - producer: - batch-size: 16384 #一次最多发送数据量 16K - retries: 3 #发送失败后的重复发送次数 - buffer-memory: 33554432 #32M批处理缓冲区 - linger: 5 #延迟发送时间ms,如果未达到batch-size,但是时间达到linger将发送消息 - consumer: - auto-offset-reset: latest #新建消费组时从什么位置开始消费 latest:最近位置 earliest:最早位置 - max-poll-records: 80 #批量消费一次最大拉取的数据量 - enable-auto-commit: false #是否开启自动提交 - auto-commit-interval: 1000 #自动提交的间隔时间,自动提交开启时生效 - session-timeout: 20000 #连接超时时间 - max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。 - max-partition-fetch-bytes: 1048576 #设置拉取数据的大小,1M - group-id: test-group #消费组 - listener: - batch-listener: true #是否开启批量消费,true表示批量消费 - concurrencys: 5 #设置消费的线程数 - poll-timeout: 1500 #只限自动提交 logging: diff --git a/couplet-modules/couplet-modules-onLine/pom.xml b/couplet-modules/couplet-modules-onLine/pom.xml index 5497b91..44947a4 100644 --- a/couplet-modules/couplet-modules-onLine/pom.xml +++ b/couplet-modules/couplet-modules-onLine/pom.xml @@ -97,6 +97,18 @@ couplet-common-business + + + + + + + + org.apache.kafka + kafka-clients + 2.8.0 + + diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index e81d545..56f860b 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -1,17 +1,22 @@ package com.couplet.online.utils; import com.couplet.common.domain.Vehicle; -import com.couplet.common.redis.service.RedisService; import com.couplet.remote.RemoteVehicleService; +import com.fasterxml.jackson.databind.ser.std.StringSerializer; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; /** @@ -66,7 +71,12 @@ public class MqttMonitor { //redis @Autowired - private RedisService redis; + private StringRedisTemplate redis; + + + //Kafka生产者配置 + private static final String TOPIC_NAME = "online"; + private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //随项目启动而执行这个方法 @@ -146,6 +156,7 @@ public class MqttMonitor { //调取接口,通过vin查询车辆 List vehicles = remoteVehicleService.findByVIN(start17).getData(); + System.out.println("**************" + vehicles); //如果不存在这个车 @@ -159,7 +170,7 @@ public class MqttMonitor { log.info("远程调用查询到的车辆数据:" + vehicle); //上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询 - redis.setCacheObject(start17, start17, 6L, TimeUnit.SECONDS); + redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS); log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!"); @@ -169,7 +180,14 @@ public class MqttMonitor { //上线成功 if (0 != i) { log.info("上线成功!"); + try { + produceMessage(message); + } catch (Exception e) { + e.printStackTrace(); + } } + + } } @@ -189,4 +207,27 @@ public class MqttMonitor { } } + + //Kafka生产者 + private static void produceMessage(String message) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaProducer producer = new KafkaProducer<>(props); + //创建生产者 + try { + + //发送消息 + producer.send(new ProducerRecord<>(TOPIC_NAME, message)); + + System.out.println("发送消息:" + message); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + producer.close(); + } + } } diff --git a/couplet-modules/couplet-system/src/main/resources/bootstrap.yml b/couplet-modules/couplet-system/src/main/resources/bootstrap.yml index d9315bd..91453d0 100644 --- a/couplet-modules/couplet-system/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-system/src/main/resources/bootstrap.yml @@ -15,9 +15,11 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 + namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置