diff --git a/mobai-event-common/pom.xml b/mobai-event-common/pom.xml
index 2627813..5527391 100644
--- a/mobai-event-common/pom.xml
+++ b/mobai-event-common/pom.xml
@@ -38,7 +38,6 @@
org.springframework.boot
spring-boot-starter-amqp
-
@@ -46,12 +45,29 @@
lombok
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.5.1
+
+
+
+ com.dtflys.forest
+ forest-spring-boot-starter
+ 1.5.36
+
+
org.springframework.data
spring-data-redis
2.7.15
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
org.apache.poi
poi-ooxml
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/EventReq.java b/mobai-event-common/src/main/java/com/mobai/domain/EventReq.java
new file mode 100644
index 0000000..0ddf42b
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/EventReq.java
@@ -0,0 +1,35 @@
+package com.mobai.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author Saisai
+ * @className EventReq
+ * @description 描述
+ * @date 2024/6/18 14:08
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class EventReq {
+
+ /**
+ * 车辆唯一标识
+ */
+ private String vin;
+
+ /**
+ * 事件类型
+ */
+ private Integer eventType;
+
+ /**
+ * 事件状态
+ */
+ private Integer eventState;
+
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/HttpStatus.java b/mobai-event-common/src/main/java/com/mobai/domain/HttpStatus.java
new file mode 100644
index 0000000..a7f01a7
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/HttpStatus.java
@@ -0,0 +1,93 @@
+package com.mobai.domain;
+
+/**
+ * 返回状态码
+ *
+ * @author ruoyi
+ */
+public class HttpStatus {
+ /**
+ * 操作成功
+ */
+ public static final int SUCCESS = 200;
+
+ /**
+ * 对象创建成功
+ */
+ public static final int CREATED = 201;
+
+ /**
+ * 请求已经被接受
+ */
+ public static final int ACCEPTED = 202;
+
+ /**
+ * 操作已经执行成功,但是没有返回数据
+ */
+ public static final int NO_CONTENT = 204;
+
+ /**
+ * 资源已被移除
+ */
+ public static final int MOVED_PERM = 301;
+
+ /**
+ * 重定向
+ */
+ public static final int SEE_OTHER = 303;
+
+ /**
+ * 资源没有被修改
+ */
+ public static final int NOT_MODIFIED = 304;
+
+ /**
+ * 参数列表错误(缺少,格式不匹配)
+ */
+ public static final int BAD_REQUEST = 400;
+
+ /**
+ * 未授权
+ */
+ public static final int UNAUTHORIZED = 401;
+
+ /**
+ * 访问受限,授权过期
+ */
+ public static final int FORBIDDEN = 403;
+
+ /**
+ * 资源,服务未找到
+ */
+ public static final int NOT_FOUND = 404;
+
+ /**
+ * 不允许的http方法
+ */
+ public static final int BAD_METHOD = 405;
+
+ /**
+ * 资源冲突,或者资源被锁
+ */
+ public static final int CONFLICT = 409;
+
+ /**
+ * 不支持的数据,媒体类型
+ */
+ public static final int UNSUPPORTED_TYPE = 415;
+
+ /**
+ * 系统内部错误
+ */
+ public static final int ERROR = 500;
+
+ /**
+ * 接口未实现
+ */
+ public static final int NOT_IMPLEMENTED = 501;
+
+ /**
+ * 系统警告消息
+ */
+ public static final int WARN = 601;
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/MqttServerModel.java b/mobai-event-common/src/main/java/com/mobai/domain/MqttServerModel.java
new file mode 100644
index 0000000..7c55531
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/MqttServerModel.java
@@ -0,0 +1,28 @@
+package com.mobai.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author DongZl
+ * @description: Mqtt服务器模型
+ * @Date 2024-3-26 上午 09:53
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttServerModel {
+
+ /**
+ * MQTT服务节点
+ */
+ private String broker;
+
+ /**
+ * MQTT订阅主题
+ */
+ private String topic;
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/Result.java b/mobai-event-common/src/main/java/com/mobai/domain/Result.java
new file mode 100644
index 0000000..551d077
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/Result.java
@@ -0,0 +1,101 @@
+package com.mobai.domain;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * 响应信息主体
+ *
+ * @author ruoyi
+ */
+@Data
+public class Result implements Serializable {
+ /**
+ * 成功
+ */
+ public static final int SUCCESS = HttpStatus.SUCCESS;
+ /**
+ * 失败
+ */
+ public static final int FAIL = HttpStatus.ERROR;
+ private static final long serialVersionUID = 1L;
+ /**
+ * 系统警告消息
+ */
+ private static final int WARN = HttpStatus.WARN;
+
+ private int code;
+
+ private String msg;
+
+ private T data;
+
+ public static Result success () {
+ return restResult(null, SUCCESS, "操作成功");
+ }
+
+ public static Result success (T data) {
+ return restResult(data, SUCCESS, "操作成功");
+ }
+
+ public static Result success (T data, String msg) {
+ return restResult(data, SUCCESS, msg);
+ }
+
+ public static Result error () {
+ return restResult(null, FAIL, "操作失败");
+ }
+
+ public static Result error (String msg) {
+ return restResult(null, FAIL, msg);
+ }
+
+ public static Result error (T data) {
+ return restResult(data, FAIL, "操作失败");
+ }
+
+ public static Result error (T data, String msg) {
+ return restResult(data, FAIL, msg);
+ }
+
+ public static Result error (int code, String msg) {
+ return restResult(null, code, msg);
+ }
+
+ public static Result warn () {
+ return restResult(null, WARN, "操作失败");
+ }
+
+ public static Result warn (String msg) {
+ return restResult(null, WARN, msg);
+ }
+
+ public static Result warn (T data) {
+ return restResult(data, WARN, "操作失败");
+ }
+
+ public static Result warn (T data, String msg) {
+ return restResult(data, WARN, msg);
+ }
+
+ public static Result warn (int code, String msg) {
+ return restResult(null, code, msg);
+ }
+
+ private static Result restResult (T data, int code, String msg) {
+ Result apiResult = new Result<>();
+ apiResult.setCode(code);
+ apiResult.setData(data);
+ apiResult.setMsg(msg);
+ return apiResult;
+ }
+
+ public static Boolean isError (Result ret) {
+ return !isSuccess(ret);
+ }
+
+ public static Boolean isSuccess (Result ret) {
+ return Result.SUCCESS == ret.getCode();
+ }
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/VehicleEvent.java b/mobai-event-common/src/main/java/com/mobai/domain/VehicleEvent.java
new file mode 100644
index 0000000..bde4f98
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/VehicleEvent.java
@@ -0,0 +1,26 @@
+package com.mobai.domain;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author Mobai
+ * @className VehivleEvent
+ * @description 描述
+ * @date 2024/6/18 11:23
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class VehicleEvent {
+ @TableId(type= IdType.AUTO)
+ private Long id;
+ private String Vin;
+ private String events;
+
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java b/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java
index 05f3fb6..e7744c4 100644
--- a/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java
+++ b/mobai-event-common/src/main/java/com/mobai/utils/RedisService.java
@@ -3,10 +3,7 @@ package com.mobai.utils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
-import org.springframework.data.redis.core.BoundSetOperations;
-import org.springframework.data.redis.core.HashOperations;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.ValueOperations;
+import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -24,6 +21,9 @@ public class RedisService {
@Autowired
public RedisTemplate redisTemplate;
+ @Autowired
+ private StringRedisTemplate stringRedisTemplate;
+
/**
* 缓存基本的对象,Integer、String、实体类等
*
@@ -258,4 +258,8 @@ public class RedisService {
Long count = redisTemplate.opsForList().rightPushAll(key, dataList);
return count == null ? 0 : count;
}
+
+ public void setList(String vin, String jsonString) {
+ stringRedisTemplate.opsForList().leftPush(vin,jsonString);
+ }
}
diff --git a/mobai-event-service/pom.xml b/mobai-event-service/pom.xml
index 2a51adc..e399a87 100644
--- a/mobai-event-service/pom.xml
+++ b/mobai-event-service/pom.xml
@@ -70,6 +70,31 @@
lombok
true
+
+
+ org.apache.commons
+ commons-lang3
+ 3.13.0
+
+
+
+
+ com.baomidou
+ dynamic-datasource-spring-boot-starter
+ 2.5.8
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.33
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/mobai-event-service/src/main/java/com/mobai/forest/ForestGet.java b/mobai-event-service/src/main/java/com/mobai/forest/ForestGet.java
new file mode 100644
index 0000000..d27a9f1
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/forest/ForestGet.java
@@ -0,0 +1,21 @@
+package com.mobai.forest;
+
+import com.dtflys.forest.annotation.Request;
+import com.mobai.domain.MqttServerModel;
+import com.mobai.domain.Result;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author Saisai
+ * @className ForestGet
+ * @description 描述
+ * @date 2024/6/18 15:40
+ */
+
+@Component
+public interface ForestGet {
+ @Request("http://127.0.0.1:8081/fluxmq/getIps/")
+ Result> getIps();
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
index 0face5a..25e8855 100644
--- a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
@@ -136,7 +136,7 @@ public class IotDbServerImpl implements IotDbServer {
add(String.valueOf(vehicle.getChgStatus()));
}};
- iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicle.getStartTime()), measurementsList, valuesList);
+ iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList);
}
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java
new file mode 100644
index 0000000..4442701
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java
@@ -0,0 +1,99 @@
+package com.mobai.kafka.listener;
+
+import com.mobai.domain.MqttServerModel;
+import com.mobai.domain.Vehicle;
+import com.mobai.forest.ForestGet;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * @author Saisai
+ * @className VinConsumer
+ * @description 描述
+ * @date 2024/6/18 17:18
+ */
+
+@Component
+public class VinConsumer {
+
+ public static void main(String[] args) {
+
+ ArrayList topicPartitions = new ArrayList<>();
+ List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
+ for (String topic : topics) {
+ for (int i = 0; i < 8; i++) {
+ TopicPartition topicPartition = new TopicPartition(topic, i);
+ topicPartitions.add(topicPartition);
+ }
+ }
+
+ new ConsumerRebalanceListener(){
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ // 可以在这里处理分区被撤销前的逻辑
+ System.out.println("Partitions revoked: " + partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ // 可以在这里处理分区被分配后的逻辑
+ System.out.println("Partitions assigned: " + partitions);
+ }
+ };
+// 1.参数配置:不是每一非得配置
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("auto.commit.interval.ms", "1000");
+ //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "true");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.assign(topicPartitions);
+ try {
+// while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
+ if (!records.isEmpty()) { // 检查是否有消息到来
+ // 创建线程异步执行
+ new Thread(() -> {
+ for (TopicPartition partition : records.partitions()) {
+ List> partitionRecords = records.records(partition);
+ for (ConsumerRecord record : partitionRecords) {
+ // 处理每条消息
+ log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
+ Vehicle vehicle = getVehicle(record.value());
+ if (redisService.hasKey(vehicle.getVin())){
+ redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
+ }
+ }
+ }
+ }).start();
+ } else {
+ // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
+ Thread.sleep(10);
+ }
+// }
+ } catch (InterruptedException e) {
+ // 处理解除阻塞时的中断异常,如Thread.sleep被中断
+ Thread.currentThread().interrupt(); // 重新设置中断标志
+ log.error("Consumer was interrupted.", e);
+ }
+ finally {
+ consumer.close();
+ }
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java
index 3ee33e3..83d150a 100644
--- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java
+++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java
@@ -1,22 +1,37 @@
package com.mobai.kafka.listener;
import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.mobai.domain.MqttServerModel;
import com.mobai.domain.Vehicle;
+import com.mobai.forest.ForestGet;
+import com.mobai.iotDB.service.IotDbServer;
+import com.mobai.utils.RedisService;
import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+
import org.springframework.stereotype.Component;
+import java.lang.annotation.Annotation;
+import java.math.BigDecimal;
+import java.rmi.ServerException;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
+import java.util.Properties;
/**
- *
- * @description: kafka 消费者
+ * @description: kafka 消费者
* @copyright: @Copyright (c) 2022
* @company: hmblogs
* @author: heming
@@ -27,25 +42,208 @@ import java.util.stream.Collectors;
@Slf4j
public class kafkaConsumerListenerExample {
- @KafkaListener(topics = {"topic0","topic1"}, groupId = "0")
+
+ @Autowired
+ private RedisService redisService;
+
+
+ @Autowired
+ private IotDbServer iotDbServer;
+
+// @Autowired
+// private KafkaConsumer consumer;
+
+ @Autowired
+ private ForestGet forestGet;
+
+ /**
+ * 监听消费
+ *
+ * @param record
+ */
+ @KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
public void consume(ConsumerRecord record) {
- Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
- // 进行消息处理逻辑
- log.info("车辆报文信息 : " + value);
+ System.out.println(record);
+ Vehicle vehicle = getVehicle(record.value());
+ // 存入iotDB数据库
+ try {
+ iotDbServer.insertData(vehicle);
+ log.info("添加成功");
+ if (redisService.hasKey(vehicle.getVin())) {
+ redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
+ }
+ } catch (StatementExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (ServerException e) {
+ throw new RuntimeException(e);
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ }
}
- //批量消费
- @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
- public void onBatchMessage(List> records) {
- System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
-// List collect = records.stream().map(record -> {
+//// 批量消费
+// @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
+// public void onBatchMessage(List> records) {
+// System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
+// List collect = records.stream().jsonObject(record -> {
// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
// log.info("车辆报文:{}", value);
// return value;
// }).toList();
// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
- System.out.println(records);
+// System.out.println(records);
+// }
+
+ @Bean
+ public void partitionConsumer() {
+ ArrayList topicPartitions = new ArrayList<>();
+ List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
+ for (String topic : topics) {
+ for (int i = 0; i < 8; i++) {
+ TopicPartition topicPartition = new TopicPartition(topic, i);
+ topicPartitions.add(topicPartition);
+ }
+ }
+//
+// new ConsumerRebalanceListener (){
+// @Override
+// public void onPartitionsRevoked(Collection partitions) {
+// // 可以在这里处理分区被撤销前的逻辑
+// System.out.println("Partitions revoked: " + partitions);
+// }
+//
+// @Override
+// public void onPartitionsAssigned(Collection partitions) {
+// // 可以在这里处理分区被分配后的逻辑
+// System.out.println("Partitions assigned: " + partitions);
+// }
+// };
+// //1.参数配置:不是每一非得配置
+// Properties props = new Properties();
+// props.put("bootstrap.servers", "localhost:9092");
+// props.put("auto.commit.interval.ms", "1000");
+// //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
+// props.put("group.id", "test");
+// props.put("enable.auto.commit", "true");
+// props.put("session.timeout.ms", "30000");
+// props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+// props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+// KafkaConsumer consumer = new KafkaConsumer<>(props);
+// consumer.assign(topicPartitions);
+// try {
+//// while (true) {
+// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
+// if (!records.isEmpty()) { // 检查是否有消息到来
+// // 创建线程异步执行
+// new Thread(() -> {
+// for (TopicPartition partition : records.partitions()) {
+// List> partitionRecords = records.records(partition);
+// for (ConsumerRecord record : partitionRecords) {
+// // 处理每条消息
+// log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
+// Vehicle vehicle = getVehicle(record.value());
+// if (redisService.hasKey(vehicle.getVin())){
+// redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
+// }
+// }
+// }
+// }).start();
+// } else {
+// // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
+// Thread.sleep(10);
+// }
+//// }
+// } catch (InterruptedException e) {
+// // 处理解除阻塞时的中断异常,如Thread.sleep被中断
+// Thread.currentThread().interrupt(); // 重新设置中断标志
+// log.error("Consumer was interrupted.", e);
+// }
+// finally {
+// consumer.close();
+// }
+
}
+ // 自定义分区分配监听器
+ public static class MyPartitionAssignmentListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ // 可以在这里处理分区被撤销前的逻辑
+ System.out.println("Partitions revoked: " + partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ // 可以在这里处理分区被分配后的逻辑
+ System.out.println("Partitions assigned: " + partitions);
+ }
+ }
+
+ /**
+ * 解析kafka拉取数据
+ *
+ * @param jsonString
+ * @return 车辆信息
+ */
+ public Vehicle getVehicle(String jsonString) {
+ JSONObject jsonObject = JSONObject.parseObject(jsonString);
+ // 解析车辆报文对象
+ Vehicle vehicle = new Vehicle() {{
+ setStartTime(Long.valueOf(jsonObject.get("startTime").toString()));
+ setSpeed(new BigDecimal(jsonObject.get("speed").toString()));
+ setLatitude(new BigDecimal(jsonObject.get("latitude").toString()));
+ setLongitude(new BigDecimal(jsonObject.get("longitude").toString()));
+ setMileage(new BigDecimal(jsonObject.get("mileage").toString()));
+ setGear(jsonObject.get("gear").toString());
+ setVehicleStatus(new BigDecimal(jsonObject.get("vehicleStatus").toString()).intValue());
+ setChargingStatus(new BigDecimal(jsonObject.get("chargingStatus").toString()).intValue());
+ setChargingEnergyStorageStatus(new BigDecimal(jsonObject.get("chargingEnergyStorageStatus").toString()).intValue());
+ setEasStatus(new BigDecimal(jsonObject.get("easStatus").toString()).intValue());
+ setMotorControllerTemperature(new BigDecimal(jsonObject.get("motorControllerTemperature").toString()));
+ setTotalBatteryCurrent(new BigDecimal(jsonObject.get("totalBatteryCurrent").toString()));
+ setSingleBatteryMaxVoltage(new BigDecimal(jsonObject.get("singleBatteryMaxVoltage").toString()));
+ setOperatingStatus(new BigDecimal(jsonObject.get("operatingStatus").toString()).intValue());
+ setHeatingStatus(new BigDecimal(jsonObject.get("heatingStatus").toString()).intValue());
+ setDcdcStatus(new BigDecimal(jsonObject.get("dcdcStatus").toString()).intValue());
+ setDriveMotorStatus(new BigDecimal(jsonObject.get("driveMotorStatus").toString()).intValue());
+ setPositionStatus(new BigDecimal(jsonObject.get("positionStatus").toString()).intValue());
+ setPtcStatus(new BigDecimal(jsonObject.get("ptcStatus").toString()).intValue());
+ setEpsStatus(new BigDecimal(jsonObject.get("epsStatus").toString()).intValue());
+ setAbsStatus(new BigDecimal(jsonObject.get("absStatus").toString()).intValue());
+ setMcuStatus(new BigDecimal(jsonObject.get("mcuStatus").toString()).intValue());
+ setBatteryInsulationStatus(new BigDecimal(jsonObject.get("batteryInsulationStatus").toString()).intValue());
+ setBatteryStatus(new BigDecimal(jsonObject.get("batteryStatus").toString()).intValue());
+ setChgStatus(new BigDecimal(jsonObject.get("chgStatus").toString()).intValue());
+ setTotalBatteryVoltage(new BigDecimal(jsonObject.get("totalBatteryVoltage").toString()));
+ setMotorSpeed(new BigDecimal(jsonObject.get("motorSpeed").toString()));
+ setMotorCurrent(new BigDecimal(jsonObject.get("motorCurrent").toString()));
+ setMotorVoltage(new BigDecimal(jsonObject.get("motorVoltage").toString()));
+ setAccelerationPedal(new BigDecimal(jsonObject.get("accelerationPedal").toString()));
+ setBrakePedal(new BigDecimal(jsonObject.get("brakePedal").toString()));
+ setSelfCheckCounter(new BigDecimal(jsonObject.get("selfCheckCounter").toString()));
+ setMotorTemperature(new BigDecimal(jsonObject.get("motorTemperature").toString()));
+ setMaximumDischargePower(new BigDecimal(jsonObject.get("maximumDischargePower").toString()));
+ setMaximumFeedbackPower(new BigDecimal(jsonObject.get("maximumFeedbackPower").toString()));
+ setFuelConsumptionRate(new BigDecimal(jsonObject.get("fuelConsumptionRate").toString()));
+ setVin(jsonObject.get("vin").toString());
+ setVoltage(new BigDecimal(jsonObject.get("voltage").toString()));
+ setCurrent(new BigDecimal(jsonObject.get("current").toString()));
+ setResistance(new BigDecimal(jsonObject.get("resistance").toString()));
+ setMotoTorque(new BigDecimal(jsonObject.get("motoTorque").toString()));
+ setRemainingBattery(new BigDecimal(jsonObject.get("remainingBattery").toString()));
+ setSingleBatteryMinVoltage(new BigDecimal(jsonObject.get("singleBatteryMinVoltage").toString()));
+ setSingleBatteryMaxTemperature(new BigDecimal(jsonObject.get("singleBatteryMaxTemperature").toString()));
+ setSingleBatteryMinTemperature(new BigDecimal(jsonObject.get("singleBatteryMinTemperature").toString()));
+ setAvailableBatteryCapacity(new BigDecimal(jsonObject.get("availableBatteryCapacity").toString()));
+
+ }};
+
+// if (redisService.hasKey(vehicle.getVin())) {
+// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
+// }
+ // 进行消息处理逻辑
+ log.info("车辆报文信息 : " + vehicle);
+ return vehicle;
+ }
}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/controller/EventsController.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/controller/EventsController.java
new file mode 100644
index 0000000..75ff62a
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/controller/EventsController.java
@@ -0,0 +1,53 @@
+package com.mobai.vehicle.event.controller;
+
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+import com.mobai.domain.VehicleEvent;
+import com.mobai.vehicle.event.service.EventsService;
+import com.sun.jdi.request.EventRequest;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author Saisai
+ * @className EventsController
+ * @description 描述
+ * @date 2024/6/18 11:36
+ */
+@Log4j2
+@RestController
+@RequestMapping("/event")
+public class EventsController {
+ @Autowired
+ private EventsService eventsService;
+
+ /**
+ * 车辆注册事件
+ * @param vehicleEvent
+ * @return
+ */
+ @PostMapping("/insert")
+ public Result insertEvent(@RequestBody VehicleEvent vehicleEvent) {
+ if (!StringUtils.isAllBlank(vehicleEvent.getEvents(),vehicleEvent.getVin())){
+ return Result.error("车辆vin或事件为空");
+ }
+ boolean save = eventsService.save(vehicleEvent);
+ return Result.success("","添加成功");
+ }
+
+ /**
+ * 激活事件
+ * @param req
+ * @return
+ */
+ @PostMapping("/event")
+ public Result activeEvent(@RequestBody EventReq req){
+ return eventsService.activeEvent(req);
+ }
+
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/mapper/VehicleEventMapper.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/mapper/VehicleEventMapper.java
new file mode 100644
index 0000000..6afdcda
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/mapper/VehicleEventMapper.java
@@ -0,0 +1,16 @@
+package com.mobai.vehicle.event.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.mobai.domain.VehicleEvent;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @ClassName VehicleEventMapper
+ * @Description 描述
+ * @Author Saisai
+ * @Date 2024/6/18 11:40
+ */
+@Mapper
+public interface VehicleEventMapper extends BaseMapper {
+
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventActiveService.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventActiveService.java
new file mode 100644
index 0000000..f0b0249
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventActiveService.java
@@ -0,0 +1,14 @@
+package com.mobai.vehicle.event.service;
+
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+
+/**
+ * @ClassName EventActiveService
+ * @Description 描述
+ * @Author Saisai
+ * @Date 2024/6/18 14:17
+ */
+public interface EventActiveService {
+ Result realTimeData(EventReq req);
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java
new file mode 100644
index 0000000..a49150f
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java
@@ -0,0 +1,17 @@
+package com.mobai.vehicle.event.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+import com.mobai.domain.VehicleEvent;
+
+/**
+ * @ClassName EventsService
+ * @Description 描述
+ * @Author Saisai
+ * @Date 2024/6/18 11:36
+ */
+public interface EventsService extends IService {
+
+ Result activeEvent(EventReq req);
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java
new file mode 100644
index 0000000..b8a2085
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java
@@ -0,0 +1,40 @@
+package com.mobai.vehicle.event.service.impl;
+
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.event.service.EventActiveService;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Properties;
+
+/**
+ * @author Saisai
+ * @className EventActiveServiceImpl
+ * @description 描述
+ * @date 2024/6/18 14:17
+ */
+
+@Log4j2
+@Service
+public class EventActiveServiceImpl implements EventActiveService {
+
+ @Autowired
+ private RedisService redisService;
+
+ @Override
+ public Result realTimeData(EventReq req) {
+
+ if (req.getEventState()==0){
+ log.info("开始实时数据事件");
+ redisService.setCacheObject(req.getVin(),null);
+ }else {
+ log.info("关闭实时数据事件");
+ redisService.deleteObject(req.getVin());
+ }
+ return null;
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java
new file mode 100644
index 0000000..9ac495f
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java
@@ -0,0 +1,34 @@
+package com.mobai.vehicle.event.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+import com.mobai.domain.VehicleEvent;
+import com.mobai.vehicle.event.mapper.VehicleEventMapper;
+import com.mobai.vehicle.event.service.EventActiveService;
+import com.mobai.vehicle.event.service.EventsService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author Saisai
+ * @className EventsServiceImpl
+ * @description 描述
+ * @date 2024/6/18 11:39
+ */
+
+@Service
+public class EventsServiceImpl extends ServiceImpl implements EventsService {
+
+ @Autowired
+ private EventActiveService eventActiveService;
+
+ @Override
+ public Result activeEvent(EventReq req) {
+ switch (req.getEventType()){
+ case 0:
+ return eventActiveService.realTimeData(req);
+ }
+ return null;
+ }
+}
diff --git a/mobai-event-service/src/main/resources/application.yml b/mobai-event-service/src/main/resources/application.yml
index 53698b8..65ecb22 100644
--- a/mobai-event-service/src/main/resources/application.yml
+++ b/mobai-event-service/src/main/resources/application.yml
@@ -4,14 +4,29 @@ kafka:
topic: vehicle-event-topic0
partition: 0
spring:
+
redis:
- host: 127.0.0.1
+ host: 175.24.138.82
rabbitmq:
host: 175.24.138.82
stream:
username: guest
password: guest
- # kafka 配置
+ datasource:
+ dynamic:
+ primary: mybatis #设置默认的数据源或者数据源组,默认值即为master
+ strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
+ datasource:
+ # gmall:
+ # url: jdbc:mysql://hadoop104:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8
+ # username: root
+ # password: "000000"
+ # driver-class-name: com.mysql.cj.jdbc.Driver
+ mybatis:
+ url: jdbc:mysql://127.0.0.1:3306/train_working?useSSL=false&useUnicode=true&characterEncoding=UTF-8
+ username: root
+ password: 1234
+ driver-class-name: com.mysql.cj.jdbc.Driver
kafka:
producer:
# Kafka服务器
@@ -75,7 +90,7 @@ spring:
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
- max-poll-records: 3
+ max-poll-records: 10
properties:
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
max:
@@ -87,6 +102,7 @@ spring:
timeout:
ms: 10000
listener:
+# type: batch # 开启批量监听
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
@@ -95,3 +111,13 @@ spring:
missing-topics-fatal: false
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
+
+ # mybatis配置
+ mybatis:
+ # 搜索指定包别名
+ typeAliasesPackage: com.mobai
+ # 配置mapper的扫描,找到所有的mapper.xml映射文件
+ mapperLocations: classpath:mapper/**/*.xml
+ logging:
+ level:
+ com.mobai.mapper: DEBUG
diff --git a/mobai-event-service/src/main/resources/mapper/StayTimeMapper.xml b/mobai-event-service/src/main/resources/mapper/StayTimeMapper.xml
new file mode 100644
index 0000000..ce9fde0
--- /dev/null
+++ b/mobai-event-service/src/main/resources/mapper/StayTimeMapper.xml
@@ -0,0 +1,7 @@
+
+
+
+
+