server_five_liuyunhu
parent
a5bf87099d
commit
64caa2296c
|
@ -51,4 +51,7 @@ public interface RemoteVehicleService {
|
|||
@GetMapping("/findByVIN/{vin}")
|
||||
public Result<List<Vehicle>> findByVIN(@PathVariable("vin") String vin);
|
||||
|
||||
@GetMapping("onOrOutLineByVIN")
|
||||
public Integer onOrOutLineByVIN(@RequestParam("params") String params);
|
||||
|
||||
}
|
||||
|
|
|
@ -50,6 +50,12 @@ public class RemoteVehicleFallbackFactory implements FallbackFactory<RemoteVehic
|
|||
public Result<List<Vehicle>> findByVIN(String vin) {
|
||||
return Result.error("车辆服务调用失败:" + cause.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer onOrOutLineByVIN(String params) {
|
||||
log.error("车辆服务调用失败:" + cause.getMessage());
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,5 +32,4 @@ public class ServiceNameConstants {
|
|||
public static final String VEHICLE_SERVICE = "couplet-vehicle";
|
||||
|
||||
public static final String BUSINESS_SERVICE = "couplet-business";
|
||||
public static final String VEHICLE_SERVICE = "vehicle-service";
|
||||
}
|
||||
|
|
|
@ -174,8 +174,21 @@ public class VehicleController extends BaseController {
|
|||
|
||||
|
||||
|
||||
/*
|
||||
* @Author: LiuYunHu
|
||||
* @Date: 2024/4/4 11:28
|
||||
* @Description: 通过vin修改车辆上下线的状态
|
||||
* @Param:
|
||||
* @Return:
|
||||
**/
|
||||
@GetMapping("onOrOutLineByVIN")
|
||||
public Integer onOrOutLineByVIN(@RequestParam("params") String params) {
|
||||
String[] split = params.split(",");
|
||||
|
||||
|
||||
return vehicleService.onOrOutLineByVIN(split[0], Integer.parseInt(split[1]));
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
|||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.domain.VehicleMiddle;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -24,4 +25,6 @@ public interface VehicleMapper extends BaseMapper<Vehicle> {
|
|||
Integer addVehicle(VehicleMiddle vehicleMiddle);
|
||||
|
||||
List<Vehicle> vehicleAll();
|
||||
|
||||
Integer onOrOutLineByVIN(@Param("vin") String vin, @Param("status") int status);
|
||||
}
|
||||
|
|
|
@ -37,4 +37,6 @@ public interface VehicleService extends IService<Vehicle> {
|
|||
|
||||
List<Vehicle> vehicleAll();
|
||||
|
||||
Integer onOrOutLineByVIN(String s, int i);
|
||||
|
||||
}
|
||||
|
|
|
@ -282,6 +282,14 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
|||
return vehicleMapper.vehicleAll();
|
||||
}
|
||||
|
||||
|
||||
//通过vin修改车辆上下线的状态
|
||||
@Override
|
||||
public Integer onOrOutLineByVIN(String vin, int status) {
|
||||
|
||||
return vehicleMapper.onOrOutLineByVIN(vin, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Vehicle> findByVIN(String vin) {
|
||||
|
||||
|
|
|
@ -45,6 +45,15 @@
|
|||
(#{userId}, #{vehicleId}, 0)
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
|
||||
<update id="onOrOutLineByVIN">
|
||||
UPDATE `couplet-cloud`.`couplet_vehicle`
|
||||
SET `vehicle_state` = #{status}
|
||||
WHERE `vin` = #{vin};
|
||||
</update>
|
||||
|
||||
|
||||
<delete id="deleteVehicle">
|
||||
update couplet_middle
|
||||
set del_flag = '2'
|
||||
|
|
|
@ -106,6 +106,15 @@
|
|||
<groupId>com.couplet</groupId>
|
||||
<artifactId>couplet-common-business</artifactId>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.springframework.kafka</groupId>-->
|
||||
<!-- <artifactId>spring-kafka</artifactId>-->
|
||||
<!-- </dependency>-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
package com.couplet.mq.config;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka生产者配置类
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@EnableKafka
|
||||
public class KafkaProducerConfig {
|
||||
@Value("${kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value("${kafka.producer.retries}")
|
||||
private Integer retries;
|
||||
|
||||
@Value("${kafka.producer.batch-size}")
|
||||
private Integer batchSize;
|
||||
|
||||
@Value("${kafka.producer.buffer-memory}")
|
||||
private Integer bufferMemory;
|
||||
|
||||
@Value("${kafka.producer.linger}")
|
||||
private Integer linger;
|
||||
|
||||
private Map<String, Object> producerConfigs() {
|
||||
HashMap<String, Object> props = new HashMap<>(16);
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, retries);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put("security.protocol", "SASL_PLAINTEXT");
|
||||
props.put("sasl.mechanism", "SCRAM-SHA-512");
|
||||
return props;
|
||||
}
|
||||
|
||||
private ProducerFactory<String, String> producerFactory() {
|
||||
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
|
||||
return producerFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> KafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
package com.couplet.mq.controller;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/kafka")
|
||||
public class KafkaController {
|
||||
|
||||
}
|
|
@ -14,7 +14,7 @@ import java.util.Properties;
|
|||
* @date 2024/4/5 21:38
|
||||
* @description
|
||||
*/
|
||||
public class Aaa {
|
||||
public class KafkaTest {
|
||||
private static final String TOPIC_NAME = "online";
|
||||
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
package com.couplet.mq.service;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/4/4
|
||||
* @Description: kafka监听者1
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class KafkaConsumer {
|
||||
|
||||
}
|
|
@ -1,2 +1 @@
|
|||
com.couplet.mq.config.RabbitMQConfig
|
||||
com.couplet.mq.config.KafkaProducerConfig
|
||||
|
|
|
@ -26,27 +26,6 @@ spring:
|
|||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
#kafka配置信息
|
||||
kafka:
|
||||
bootstrap-servers: 39.103.133.136:9092
|
||||
producer:
|
||||
batch-size: 16384 #一次最多发送数据量 16K
|
||||
retries: 3 #发送失败后的重复发送次数
|
||||
buffer-memory: 33554432 #32M批处理缓冲区
|
||||
linger: 5 #延迟发送时间ms,如果未达到batch-size,但是时间达到linger将发送消息
|
||||
consumer:
|
||||
auto-offset-reset: latest #新建消费组时从什么位置开始消费 latest:最近位置 earliest:最早位置
|
||||
max-poll-records: 80 #批量消费一次最大拉取的数据量
|
||||
enable-auto-commit: false #是否开启自动提交
|
||||
auto-commit-interval: 1000 #自动提交的间隔时间,自动提交开启时生效
|
||||
session-timeout: 20000 #连接超时时间
|
||||
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
|
||||
max-partition-fetch-bytes: 1048576 #设置拉取数据的大小,1M
|
||||
group-id: test-group #消费组
|
||||
listener:
|
||||
batch-listener: true #是否开启批量消费,true表示批量消费
|
||||
concurrencys: 5 #设置消费的线程数
|
||||
poll-timeout: 1500 #只限自动提交
|
||||
|
||||
|
||||
logging:
|
||||
|
|
|
@ -97,6 +97,18 @@
|
|||
<artifactId>couplet-common-business</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Kafka依赖-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.springframework.kafka</groupId>-->
|
||||
<!-- <artifactId>spring-kafka</artifactId>-->
|
||||
<!-- </dependency>-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.0</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,17 +1,22 @@
|
|||
package com.couplet.online.utils;
|
||||
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.common.redis.service.RedisService;
|
||||
import com.couplet.remote.RemoteVehicleService;
|
||||
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -66,7 +71,12 @@ public class MqttMonitor {
|
|||
|
||||
//redis
|
||||
@Autowired
|
||||
private RedisService redis;
|
||||
private StringRedisTemplate redis;
|
||||
|
||||
|
||||
//Kafka生产者配置
|
||||
private static final String TOPIC_NAME = "online";
|
||||
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
|
||||
|
||||
|
||||
//随项目启动而执行这个方法
|
||||
|
@ -146,6 +156,7 @@ public class MqttMonitor {
|
|||
|
||||
//调取接口,通过vin查询车辆
|
||||
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
|
||||
System.out.println("**************" + vehicles);
|
||||
|
||||
|
||||
//如果不存在这个车
|
||||
|
@ -159,7 +170,7 @@ public class MqttMonitor {
|
|||
log.info("远程调用查询到的车辆数据:" + vehicle);
|
||||
|
||||
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
|
||||
redis.setCacheObject(start17, start17, 6L, TimeUnit.SECONDS);
|
||||
redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
|
||||
|
@ -169,7 +180,14 @@ public class MqttMonitor {
|
|||
//上线成功
|
||||
if (0 != i) {
|
||||
log.info("上线成功!");
|
||||
try {
|
||||
produceMessage(message);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -189,4 +207,27 @@ public class MqttMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
//Kafka生产者
|
||||
private static void produceMessage(String message) {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
//创建生产者
|
||||
try {
|
||||
|
||||
//发送消息
|
||||
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
|
||||
|
||||
System.out.println("发送消息:" + message);
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,11 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
Loading…
Reference in New Issue