server_five_liuyunhu
parent
b655ca9c30
commit
78f59d7392
|
@ -6,6 +6,7 @@ import com.couplet.common.domain.request.VehicleListParams;
|
|||
import com.couplet.common.redis.service.RedisService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -22,7 +23,7 @@ import java.util.List;
|
|||
public class Timer {
|
||||
//redis
|
||||
@Autowired
|
||||
private RedisService redis;
|
||||
private StringRedisTemplate redis;
|
||||
//查询车辆列表
|
||||
@Autowired
|
||||
private VehicleService vehicleService;
|
||||
|
@ -35,9 +36,7 @@ public class Timer {
|
|||
|
||||
//先查询车辆列表
|
||||
List<Vehicle> list = vehicleService.list(new VehicleListParams(null, null, null, null));
|
||||
|
||||
list.forEach(vehicle -> {
|
||||
|
||||
for (Vehicle vehicle : list) {
|
||||
//只针对已经上线的车辆
|
||||
if (redis.hasKey(vehicle.getVin())) {
|
||||
|
||||
|
@ -48,15 +47,15 @@ public class Timer {
|
|||
//执行修改下线状态的方法
|
||||
Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0);
|
||||
|
||||
if (0 == 1) {
|
||||
log.error("下线状态修改失败");
|
||||
}
|
||||
// if (0 == 1) {
|
||||
// log.error("下线状态修改失败");
|
||||
// }
|
||||
|
||||
log.info("下线状态修改成功");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ public class KafkaTest {
|
|||
|
||||
public static void main(String[] args) {
|
||||
//生产者示例
|
||||
produceMessage();
|
||||
// produceMessage();
|
||||
|
||||
//消费者示例
|
||||
consumerMessages();
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.couplet.online.utils;
|
|||
|
||||
import com.couplet.common.domain.Vehicle;
|
||||
import com.couplet.remote.RemoteVehicleService;
|
||||
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
|
@ -17,6 +16,8 @@ import org.springframework.stereotype.Component;
|
|||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -78,6 +79,11 @@ public class MqttMonitor {
|
|||
private static final String TOPIC_NAME = "online";
|
||||
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
|
||||
|
||||
//线程池,用于异步处理消息到来时的业务逻辑
|
||||
private ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
|
||||
//Kafka生产者实例化为类成员变量
|
||||
private KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
//随项目启动而执行这个方法
|
||||
@PostConstruct
|
||||
|
@ -116,7 +122,7 @@ public class MqttMonitor {
|
|||
client.reconnect();
|
||||
log.info("重连中...");
|
||||
} catch (InterruptedException | MqttException e) {
|
||||
throw new RuntimeException(e);
|
||||
log.error("重连失败:" + e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -124,72 +130,7 @@ public class MqttMonitor {
|
|||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) {
|
||||
// log.info("消息已送达");
|
||||
// log.info("接收消息主题:{}",topic);
|
||||
// log.info("接收消息qos:{}", mqttMessage.getQos());
|
||||
|
||||
//接收到的原始报文
|
||||
String message = new String(mqttMessage.getPayload());
|
||||
|
||||
// log.info("接收消息原始内容:{}", message);
|
||||
|
||||
//解析后的字符串
|
||||
String parseMsg = ParseMessageUtil.parseMsg(message);
|
||||
|
||||
//拿到前17位(车辆vin码)
|
||||
String start17 = parseMsg.substring(0, 17);
|
||||
|
||||
|
||||
log.info("当前车辆的vin码为:" + start17);
|
||||
|
||||
// //判断缓存中是否有这个vin
|
||||
// if (redis.hasKey("不存在的车辆VIN" + start17)) {
|
||||
//
|
||||
// //可使用RabbitMQ发送消息
|
||||
// log.error("vin码为" + start17 + "的车辆不属于本系统!");
|
||||
//
|
||||
// } else {//如果缓存中没有存这个vin
|
||||
//
|
||||
//
|
||||
// }
|
||||
|
||||
|
||||
//调取接口,通过vin查询车辆
|
||||
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
|
||||
System.out.println("**************" + vehicles);
|
||||
|
||||
|
||||
//如果不存在这个车
|
||||
if (0 == vehicles.size()) {
|
||||
//将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示
|
||||
// redis.setCacheObject("不存在的车辆VIN" + start17, start17);
|
||||
log.error("未找到vin码为" + start17 + "的车辆信息");
|
||||
} else {
|
||||
//如果存在这个车
|
||||
Vehicle vehicle = vehicles.get(0);
|
||||
log.info("远程调用查询到的车辆数据:" + vehicle);
|
||||
|
||||
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
|
||||
redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
|
||||
|
||||
//调用上线接口,修改上线状态
|
||||
Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1);
|
||||
//上线成功
|
||||
if (0 != i) {
|
||||
log.info("上线成功!");
|
||||
try {
|
||||
produceMessage(message);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
executorService.execute(() -> processMessageArrival(topic, mqttMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -203,31 +144,104 @@ public class MqttMonitor {
|
|||
|
||||
} catch (MqttException e) {
|
||||
log.error("mqtt监听者启动失败,{}", e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
// throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//Kafka生产者
|
||||
private static void produceMessage(String message) {
|
||||
//异步处理,mqtt消息到达后的逻辑
|
||||
private void processMessageArrival(String topic, MqttMessage mqttMessage) {
|
||||
// log.info("消息已送达");
|
||||
// log.info("接收消息主题:{}",topic);
|
||||
// log.info("接收消息qos:{}", mqttMessage.getQos());
|
||||
|
||||
//接收到的原始报文
|
||||
String message = new String(mqttMessage.getPayload());
|
||||
|
||||
//log.info("接收消息原始内容:{}", message);
|
||||
|
||||
//解析后的字符串
|
||||
String parseMsg = ParseMessageUtil.parseMsg(message);
|
||||
|
||||
//拿到前17位(车辆vin码)
|
||||
String start17 = parseMsg.substring(0, 17);
|
||||
|
||||
|
||||
log.info("当前车辆的vin码为:" + start17);
|
||||
|
||||
// //判断缓存中是否有这个vin
|
||||
// if (redis.hasKey("不存在的车辆VIN" + start17)) {
|
||||
//
|
||||
// //可使用RabbitMQ发送消息
|
||||
// log.error("vin码为" + start17 + "的车辆不属于本系统!");
|
||||
//
|
||||
// } else {//如果缓存中没有存这个vin
|
||||
//
|
||||
//
|
||||
// }
|
||||
|
||||
|
||||
//调取接口,通过vin查询车辆
|
||||
List<Vehicle> vehicles = remoteVehicleService.findByVIN(start17).getData();
|
||||
System.out.println("**************" + vehicles);
|
||||
|
||||
|
||||
//如果不存在这个车
|
||||
if (0 == vehicles.size()) {
|
||||
//将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示
|
||||
// redis.setCacheObject("不存在的车辆VIN" + start17, start17);
|
||||
log.error("未找到vin码为" + start17 + "的车辆信息");
|
||||
} else {
|
||||
//如果存在这个车
|
||||
Vehicle vehicle = vehicles.get(0);
|
||||
log.info("远程调用查询到的车辆数据:" + vehicle);
|
||||
|
||||
//上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询
|
||||
redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!");
|
||||
|
||||
//调用上线接口,修改上线状态
|
||||
Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1);
|
||||
//上线成功
|
||||
if (0 != i) {
|
||||
log.info("上线成功!");
|
||||
try {
|
||||
produceMessage(message);
|
||||
} catch (Exception e) {
|
||||
log.error("发送信息异常:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Kafka生产者实例化为类成员变量
|
||||
@PostConstruct
|
||||
public void initKafkaProducer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
//创建生产者
|
||||
try {
|
||||
kafkaProducer = new KafkaProducer<>(props);
|
||||
}
|
||||
|
||||
|
||||
//发送消息至Kafka
|
||||
private void produceMessage(String message) {
|
||||
|
||||
try {
|
||||
//发送消息
|
||||
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
|
||||
kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, message));
|
||||
|
||||
System.out.println("发送消息:" + message);
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
producer.close();
|
||||
log.error("消息发送失败:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,11 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: 172469
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
Loading…
Reference in New Issue