feat 故障报警

master
rouchen 2024-06-20 22:42:11 +08:00
parent 8493f8b817
commit d0da430ffc
12 changed files with 3347 additions and 8948 deletions

File diff suppressed because it is too large Load Diff

View File

@ -223,6 +223,12 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--caffeine-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
</dependencies>
<dependencyManagement>

View File

@ -6,7 +6,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* CarEvent
* VehicleEvent
*
* @author Yangle
* Date 2024/6/17 19:55
@ -15,10 +15,12 @@ import lombok.experimental.SuperBuilder;
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class CarEvent {
public class VehicleEvent {
private Integer id;
private String vin;
private String event;
private Boolean states=false;
}

View File

@ -1,13 +1,12 @@
package com.muyu.event.controller;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent;
import com.muyu.event.common.VehicleEvent;
import com.muyu.event.service.EventService;
import com.muyu.mqtt.dao.MessageData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@ -25,14 +24,30 @@ public class EventController {
@Autowired
private EventService eventService;
@PostMapping("/selectVehicleEvent")
public Result<List<VehicleEvent>> selectVehicleEvent() {
return Result.success(eventService.selectVehicleEvent());
}
@PostMapping("/bindingEvent")
public Result<List<CarEvent>> bindingEvent(@RequestBody CarEvent carEvent) {
return Result.success(eventService.getCarEventList(carEvent));
public Result<List<VehicleEvent>> bindingEvent(@RequestBody VehicleEvent vehicleEvent) {
return Result.success(eventService.getCarEventList(vehicleEvent));
}
@PostMapping("/selectEvent")
public Result<MessageData> selectEvent(@RequestBody CarEvent carEvent) {
Result<MessageData> messageDataResult = eventService.selectEvent(carEvent);
public Result selectEvent(@RequestBody VehicleEvent vehicleEvent) {
Result<MessageData> messageDataResult = eventService.selectEvent(vehicleEvent);
return messageDataResult;
}
@PostMapping("/eventAddRedis")
public Result eventAddRedis(@RequestBody VehicleEvent vehicleEvent) {
eventService.eventAddRedis(vehicleEvent);
return Result.success();
}
@PostMapping("/getMaximumCoordinate")
public Result getMaximumCoordinate(@RequestBody VehicleEvent vehicleEvent) {
return eventService.getMaximumCoordinate(vehicleEvent.getVin());
}
}

View File

@ -1,6 +1,6 @@
package com.muyu.event.mapper;
import com.muyu.event.common.CarEvent;
import com.muyu.event.common.VehicleEvent;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@ -13,7 +13,10 @@ import java.util.List;
*/
@Mapper
public interface EventMapper {
List<CarEvent> getCarEventList(CarEvent carEvent);
List<VehicleEvent> getCarEventList(VehicleEvent vehicleEvent);
List<VehicleEvent> selectVehicleEvent();
}

View File

@ -1,7 +1,7 @@
package com.muyu.event.service;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent;
import com.muyu.event.common.VehicleEvent;
import com.muyu.mqtt.dao.MessageData;
import java.util.List;
@ -13,7 +13,17 @@ import java.util.List;
* Date 2024/6/17 19:57
*/
public interface EventService {
List<CarEvent> getCarEventList(CarEvent carEvent);
List<VehicleEvent> getCarEventList(VehicleEvent vehicleEvent);
Result<MessageData> selectEvent(VehicleEvent vehicleEvent);
void eventAddRedis(VehicleEvent vehicleEvent);
Result getMaximumCoordinate(String vin);
List<VehicleEvent> selectVehicleEvent();
Result<MessageData> selectEvent(CarEvent carEvent);
}

View File

@ -1,20 +1,36 @@
package com.muyu.event.service.impl;
import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSON;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent;
import com.muyu.event.common.VehicleEvent;
import com.muyu.event.mapper.EventMapper;
import com.muyu.event.service.EventService;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* EventServiceImpl
@ -30,38 +46,132 @@ public class EventServiceImpl implements EventService {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public List<CarEvent> getCarEventList(CarEvent carEvent) {
return eventMapper.getCarEventList(carEvent);
public List<VehicleEvent> getCarEventList(VehicleEvent vehicleEvent) {
return eventMapper.getCarEventList(vehicleEvent);
}
@Override
public Result<MessageData> selectEvent(CarEvent carEvent) {
if (carEvent.getEvent() != null) {
public Result<MessageData> selectEvent(VehicleEvent vehicleEvent) {
if (vehicleEvent.getEvent() != null) {
if (carEvent.getEvent().contains("1")) {
System.out.println("1");
}
if (carEvent.getEvent().contains("2")) {
String maximumCoordinate = getMaximumCoordinate(carEvent.getVin());
MessageData messageData = JSON.parseObject(maximumCoordinate, MessageData.class);
return Result.success(messageData);
}
if (carEvent.getEvent().contains("3")) {
if (vehicleEvent.getEvent() != null) {
if (vehicleEvent.getEvent().contains("1")) {
KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer();
// 订阅主题
consumer.subscribe(Collections.singletonList("test1"));
// 创建新线程来处理消息
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
String message = record.value();
MessageData messageData = JSON.parseObject(message, MessageData.class);
// 将毫秒级时间戳转换为LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault());
// 格式化输出日期时间
String formattedTime = dateTime.format(DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH时mm分ss秒"));
if (messageData.getVehicleStatus().equals("0")){
if (localCache.getIfPresent(messageData.getVin())==null){
if (redisTemplate.hasKey(messageData.getVin())){
redisTemplate.opsForValue().set(messageData.getVin(),"vin:"+messageData.getVin()+",故障名称:"+messageData.getVehicleStatus()+",时间:"+formattedTime,10, TimeUnit.MINUTES);
log.info("该车没有存到缓存中");
}
localCache.put(messageData.getVin(),"vin:"+messageData.getVin()+",故障名称:GZ001"+",时间:"+formattedTime);
log.info("该车没有存到缓存中");
}
}else {
redisTemplate.delete(messageData.getVin());
localCache.invalidate(messageData.getVin());
log.info("该车故障已修复");
// rabbitTemplate.send(messageData.getVin(),);
}
}
}
}).start();
}
if (vehicleEvent.getEvent().contains("2")) {
if(vehicleEvent.getStates()){
if (redisTemplate.hasKey(vehicleEvent.getVin())){
KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer();
// 订阅主题
consumer.subscribe(Collections.singletonList("test1"));
// 创建新线程来处理消息
new Thread(() -> {
while (true) {
// 发送拉取请求
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 设置拉取超时时间为100毫秒
// 处理消息
for (ConsumerRecord<String, String> record : records) {
redisTemplate.opsForValue().set(vehicleEvent.getVin(),record.value());
System.out.println("Received message: " + record.value());
// 在这里添加消息处理逻辑
}
}
}).start();
}
}
return Result.success();
}
if (vehicleEvent.getEvent().contains("3")) {
}
}
}
return Result.error("车辆没有绑定事件");
}
/**
*
*
*/
Cache<String, String> localCache = Caffeine.newBuilder()
.initialCapacity(5)
.maximumSize(10)
//过期时间3秒钟
.expireAfterWrite(3, TimeUnit.HOURS)
.build();
/**
* kafka
* @return
*/
public String getMaximumCoordinate (String vin) {
String index = redisTemplate.opsForList().index(vin, -1);
return index;
private static KafkaConsumer<String, String> getStringStringKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka服务器地址
props.put("group.id", "Partitions"); // 消费者组ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值反序列化器
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
@Override
public void eventAddRedis(VehicleEvent vehicleEvent) {
redisTemplate.opsForValue().set(vehicleEvent.getVin(),"");
}
@Override
public Result getMaximumCoordinate(String vin) {
//获取本地缓存
String ifPresent = localCache.getIfPresent(vin);
return Result.error(ifPresent);
}
@Override
public List<VehicleEvent> selectVehicleEvent() {
return eventMapper.selectVehicleEvent();
}
// //创建分区
// NewTopic newTopic = new NewTopic(topic, 8, (short) 1);
// kafkaAdmin.createOrModifyTopics(newTopic);

View File

@ -1,8 +1,11 @@
package com.muyu.kafka;
import com.alibaba.fastjson.JSON;
import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
@ -15,16 +18,24 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.rmi.ServerException;
import java.util.*;
@Log4j2
@Service
public class KafkaPCUtils {
@Resource
private IotDbServer iotDbServer;
@Autowired
private RedisTemplate<String,String> redisTemplate;
public void sendCallbackOneMessage(String topic, MessageData vehicle) {
String vin = vehicle.getVin();
@ -86,13 +97,13 @@ public class KafkaPCUtils {
}
//监听消费
@KafkaListener(topics = {"test1"},groupId = "Topics")
public void onNormalMessage1(ConsumerRecord<String, Object> record) {
public void onNormalMessage1(ConsumerRecord<String, Object> record) throws ServerException, IoTDBConnectionException, StatementExecutionException {
String value = (String) record.value();
JSON.parseObject(value, MessageData.class);
MessageData messageData = JSON.parseObject(value, MessageData.class);
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
iotDbServer.add(messageData);
}
}

View File

@ -47,7 +47,6 @@ public class SimpleKafkaConsumer {
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1);
redisTemplate.opsForList().rightPush(messageData1.getVin(), JSON.toJSONString(messageData1));
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑
@ -64,7 +63,6 @@ public class SimpleKafkaConsumer {
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1);
redisTemplate.opsForList().rightPush(messageData1.getVin(), JSON.toJSONString(messageData1));
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑

View File

@ -22,6 +22,7 @@ public class MsgHandler {
log.info("接收到消息:{}",msg);
String[] split = msg.split(",");
for (String s : split) {
MqttProperties mqttProperties = MqttProperties.configBuild(
s,
@ -35,21 +36,5 @@ public class MsgHandler {
}
@RabbitListener(queues = "ip")
public void msg1(String msg){
log.info("接收到消息:{}",msg);
String[] split = msg.split(",");
for (String s : split) {
MqttProperties mqttProperties = MqttProperties.configBuild(
s,
"test2"
);
log.error("接收到消息初始化信息:{}",mqttProperties);
MqttClient mqttClient = mqttFactory.creatClient(mqttProperties);
log.error("client创建成功:{}",mqttClient.getClientId());
}
}
}

View File

@ -3,7 +3,11 @@
<mapper namespace="com.muyu.event.mapper.EventMapper">
<select id="getCarEventList" resultType="com.muyu.event.common.CarEvent">
select * from car_event where vin = #{vin}
<select id="getCarEventList" resultType="com.muyu.event.common.VehicleEvent">
select * from vehicle_event where vin = #{vin}
</select>
<select id="selectVehicleEvent" resultType="com.muyu.event.common.VehicleEvent">
select * from vehicle_event
</select>
</mapper>

View File

@ -1,13 +1,45 @@
package com.muyu;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class IotDbApplicationTests {
@Test
void contextLoads() {
void contextLoads() throws InterruptedException {
Cache<String, String> built = Caffeine.newBuilder()
.initialCapacity(5)
.maximumSize(10)
//过期时间3秒钟
.expireAfterWrite(3, TimeUnit.SECONDS)
.build();
// //写入缓存数据
// built.put("userName","张三");
//
// //读取缓存数据
// String string = built.get("userName", (key) -> {
// return "key已过期";
// });
//
// System.out.println("第一次查询结果:"+string);
// Thread.sleep(4000);
//
// //读取量
// String name = built.get("userName", (key) -> {
// return "key已过期";
// });
// System.out.println("第二次查询结果:"+name);
}
}