Merge branch 'server_five_liuyunhu' of https://gitea.qinmian.online/five-groups/five-groups-couplet into server_five

server_five_liuyunhu
lijiayao 2024-04-06 16:30:45 +08:00
commit 7aa223c8e4
7 changed files with 109 additions and 89 deletions

View File

@ -0,0 +1,2 @@
com.couplet.remote.factory.RemoteVehicleFallbackFactory
com.couplet.remote.factory.RemoteVehicleFallbackFactory

View File

@ -6,6 +6,7 @@ import com.couplet.common.domain.request.VehicleListParams;
import com.couplet.common.redis.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -22,7 +23,7 @@ import java.util.List;
public class Timer {
//redis
@Autowired
private RedisService redis;
private StringRedisTemplate redis;
//查询车辆列表
@Autowired
private VehicleService vehicleService;
@ -35,9 +36,7 @@ public class Timer {
//先查询车辆列表
List<Vehicle> list = vehicleService.list(new VehicleListParams(null, null, null, null));
list.forEach(vehicle -> {
for (Vehicle vehicle : list) {
//只针对已经上线的车辆
if (redis.hasKey(vehicle.getVin())) {
@ -48,15 +47,15 @@ public class Timer {
//执行修改下线状态的方法
Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0);
if (0 == 1) {
log.error("下线状态修改失败");
}
// if (0 == 1) {
// log.error("下线状态修改失败");
// }
log.info("下线状态修改成功");
}
}
});
}
}
}

View File

@ -16,9 +16,11 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 172469
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式
file-extension: yml
# 共享配置
@ -28,4 +30,4 @@ spring:
allow-bean-definition-overriding: true
logging:
level:
com.couplet.trouble.mapper: DEBUG
com.couplet.business.server.mapper: DEBUG

View File

@ -27,7 +27,7 @@ public class KafkaTest {
public static void main(String[] args) {
//生产者示例
produceMessage();
// produceMessage();
//消费者示例
// consumerMessages();

View File

@ -2,7 +2,6 @@ package com.couplet.online.utils;
import com.couplet.common.domain.Vehicle;
import com.couplet.remote.RemoteVehicleService;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -17,6 +16,8 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -78,6 +79,11 @@ public class MqttMonitor {
private static final String TOPIC_NAME = "online";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
//线程池,用于异步处理消息到来时的业务逻辑
private ExecutorService executorService = Executors.newSingleThreadExecutor();
//Kafka生产者实例化为类成员变量
private KafkaProducer<String, String> kafkaProducer;
//随项目启动而执行这个方法
@PostConstruct
@ -116,7 +122,7 @@ public class MqttMonitor {
client.reconnect();
log.info("重连中...");
} catch (InterruptedException | MqttException e) {
throw new RuntimeException(e);
log.error("重连失败:" + e.getMessage());
}
}
@ -124,14 +130,35 @@ public class MqttMonitor {
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
// log.info("消息已送达");
// log.info("接收消息主题:{}",topic);
// log.info("接收消息qos{}", mqttMessage.getQos());
executorService.execute(() -> processMessageArrival(topic, mqttMessage));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送成功!");
}
});
client.subscribe(topic, qos);
} catch (MqttException e) {
log.error("mqtt监听者启动失败{}", e.getMessage());
// throw new RuntimeException(e);
}
}
//异步处理mqtt消息到达后的逻辑
private void processMessageArrival(String topic, MqttMessage mqttMessage) {
// log.info("消息已送达");
// log.info("接收消息主题:{}",topic);
// log.info("接收消息qos{}", mqttMessage.getQos());
//接收到的原始报文
String message = new String(mqttMessage.getPayload());
// log.info("接收消息原始内容:{}", message);
//log.info("接收消息原始内容:{}", message);
//解析后的字符串
String parseMsg = ParseMessageUtil.parseMsg(message);
@ -183,7 +210,7 @@ public class MqttMonitor {
try {
produceMessage(message);
} catch (Exception e) {
e.printStackTrace();
log.error("发送信息异常:" + e.getMessage());
}
}
@ -192,42 +219,29 @@ public class MqttMonitor {
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送成功!");
}
});
client.subscribe(topic, qos);
} catch (MqttException e) {
log.error("mqtt监听者启动失败{}", e.getMessage());
throw new RuntimeException(e);
}
}
//Kafka生产者
private static void produceMessage(String message) {
//Kafka生产者实例化为类成员变量
@PostConstruct
public void initKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//创建生产者
try {
kafkaProducer = new KafkaProducer<>(props);
}
//发送消息至Kafka
private void produceMessage(String message) {
try {
//发送消息
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, message));
System.out.println("发送消息:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
log.error("消息发送失败:" + e.getMessage());
}
}
}

View File

@ -15,9 +15,11 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 172469
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式
file-extension: yml
# 共享配置