feat():存入redis和动态监听
parent
78d86a2a69
commit
71539e9660
|
@ -38,7 +38,6 @@
|
|||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -46,12 +45,29 @@
|
|||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.baomidou</groupId>
|
||||
<artifactId>mybatis-plus-boot-starter</artifactId>
|
||||
<version>3.5.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.dtflys.forest</groupId>
|
||||
<artifactId>forest-spring-boot-starter</artifactId>
|
||||
<version>1.5.36</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-redis</artifactId>
|
||||
<version>2.7.15</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.poi</groupId>
|
||||
<artifactId>poi-ooxml</artifactId>
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className EventReq
|
||||
* @description 描述
|
||||
* @date 2024/6/18 14:08
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class EventReq {
|
||||
|
||||
/**
|
||||
* 车辆唯一标识
|
||||
*/
|
||||
private String vin;
|
||||
|
||||
/**
|
||||
* 事件类型
|
||||
*/
|
||||
private Integer eventType;
|
||||
|
||||
/**
|
||||
* 事件状态
|
||||
*/
|
||||
private Integer eventState;
|
||||
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
/**
|
||||
* 返回状态码
|
||||
*
|
||||
* @author ruoyi
|
||||
*/
|
||||
public class HttpStatus {
|
||||
/**
|
||||
* 操作成功
|
||||
*/
|
||||
public static final int SUCCESS = 200;
|
||||
|
||||
/**
|
||||
* 对象创建成功
|
||||
*/
|
||||
public static final int CREATED = 201;
|
||||
|
||||
/**
|
||||
* 请求已经被接受
|
||||
*/
|
||||
public static final int ACCEPTED = 202;
|
||||
|
||||
/**
|
||||
* 操作已经执行成功,但是没有返回数据
|
||||
*/
|
||||
public static final int NO_CONTENT = 204;
|
||||
|
||||
/**
|
||||
* 资源已被移除
|
||||
*/
|
||||
public static final int MOVED_PERM = 301;
|
||||
|
||||
/**
|
||||
* 重定向
|
||||
*/
|
||||
public static final int SEE_OTHER = 303;
|
||||
|
||||
/**
|
||||
* 资源没有被修改
|
||||
*/
|
||||
public static final int NOT_MODIFIED = 304;
|
||||
|
||||
/**
|
||||
* 参数列表错误(缺少,格式不匹配)
|
||||
*/
|
||||
public static final int BAD_REQUEST = 400;
|
||||
|
||||
/**
|
||||
* 未授权
|
||||
*/
|
||||
public static final int UNAUTHORIZED = 401;
|
||||
|
||||
/**
|
||||
* 访问受限,授权过期
|
||||
*/
|
||||
public static final int FORBIDDEN = 403;
|
||||
|
||||
/**
|
||||
* 资源,服务未找到
|
||||
*/
|
||||
public static final int NOT_FOUND = 404;
|
||||
|
||||
/**
|
||||
* 不允许的http方法
|
||||
*/
|
||||
public static final int BAD_METHOD = 405;
|
||||
|
||||
/**
|
||||
* 资源冲突,或者资源被锁
|
||||
*/
|
||||
public static final int CONFLICT = 409;
|
||||
|
||||
/**
|
||||
* 不支持的数据,媒体类型
|
||||
*/
|
||||
public static final int UNSUPPORTED_TYPE = 415;
|
||||
|
||||
/**
|
||||
* 系统内部错误
|
||||
*/
|
||||
public static final int ERROR = 500;
|
||||
|
||||
/**
|
||||
* 接口未实现
|
||||
*/
|
||||
public static final int NOT_IMPLEMENTED = 501;
|
||||
|
||||
/**
|
||||
* 系统警告消息
|
||||
*/
|
||||
public static final int WARN = 601;
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author DongZl
|
||||
* @description: Mqtt服务器模型
|
||||
* @Date 2024-3-26 上午 09:53
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MqttServerModel {
|
||||
|
||||
/**
|
||||
* MQTT服务节点
|
||||
*/
|
||||
private String broker;
|
||||
|
||||
/**
|
||||
* MQTT订阅主题
|
||||
*/
|
||||
private String topic;
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 响应信息主体
|
||||
*
|
||||
* @author ruoyi
|
||||
*/
|
||||
@Data
|
||||
public class Result<T> implements Serializable {
|
||||
/**
|
||||
* 成功
|
||||
*/
|
||||
public static final int SUCCESS = HttpStatus.SUCCESS;
|
||||
/**
|
||||
* 失败
|
||||
*/
|
||||
public static final int FAIL = HttpStatus.ERROR;
|
||||
private static final long serialVersionUID = 1L;
|
||||
/**
|
||||
* 系统警告消息
|
||||
*/
|
||||
private static final int WARN = HttpStatus.WARN;
|
||||
|
||||
private int code;
|
||||
|
||||
private String msg;
|
||||
|
||||
private T data;
|
||||
|
||||
public static <T> Result<T> success () {
|
||||
return restResult(null, SUCCESS, "操作成功");
|
||||
}
|
||||
|
||||
public static <T> Result<T> success (T data) {
|
||||
return restResult(data, SUCCESS, "操作成功");
|
||||
}
|
||||
|
||||
public static <T> Result<T> success (T data, String msg) {
|
||||
return restResult(data, SUCCESS, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> error () {
|
||||
return restResult(null, FAIL, "操作失败");
|
||||
}
|
||||
|
||||
public static <T> Result<T> error (String msg) {
|
||||
return restResult(null, FAIL, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> error (T data) {
|
||||
return restResult(data, FAIL, "操作失败");
|
||||
}
|
||||
|
||||
public static <T> Result<T> error (T data, String msg) {
|
||||
return restResult(data, FAIL, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> error (int code, String msg) {
|
||||
return restResult(null, code, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> warn () {
|
||||
return restResult(null, WARN, "操作失败");
|
||||
}
|
||||
|
||||
public static <T> Result<T> warn (String msg) {
|
||||
return restResult(null, WARN, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> warn (T data) {
|
||||
return restResult(data, WARN, "操作失败");
|
||||
}
|
||||
|
||||
public static <T> Result<T> warn (T data, String msg) {
|
||||
return restResult(data, WARN, msg);
|
||||
}
|
||||
|
||||
public static <T> Result<T> warn (int code, String msg) {
|
||||
return restResult(null, code, msg);
|
||||
}
|
||||
|
||||
private static <T> Result<T> restResult (T data, int code, String msg) {
|
||||
Result<T> apiResult = new Result<>();
|
||||
apiResult.setCode(code);
|
||||
apiResult.setData(data);
|
||||
apiResult.setMsg(msg);
|
||||
return apiResult;
|
||||
}
|
||||
|
||||
public static <T> Boolean isError (Result<T> ret) {
|
||||
return !isSuccess(ret);
|
||||
}
|
||||
|
||||
public static <T> Boolean isSuccess (Result<T> ret) {
|
||||
return Result.SUCCESS == ret.getCode();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author Mobai
|
||||
* @className VehivleEvent
|
||||
* @description 描述
|
||||
* @date 2024/6/18 11:23
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class VehicleEvent {
|
||||
@TableId(type= IdType.AUTO)
|
||||
private Long id;
|
||||
private String Vin;
|
||||
private String events;
|
||||
|
||||
}
|
|
@ -3,10 +3,7 @@ package com.mobai.utils;
|
|||
import org.apache.poi.ss.formula.functions.T;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.core.BoundSetOperations;
|
||||
import org.springframework.data.redis.core.HashOperations;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.data.redis.core.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -24,6 +21,9 @@ public class RedisService {
|
|||
@Autowired
|
||||
public RedisTemplate redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 缓存基本的对象,Integer、String、实体类等
|
||||
*
|
||||
|
@ -258,4 +258,8 @@ public class RedisService {
|
|||
Long count = redisTemplate.opsForList().rightPushAll(key, dataList);
|
||||
return count == null ? 0 : count;
|
||||
}
|
||||
|
||||
public void setList(String vin, String jsonString) {
|
||||
stringRedisTemplate.opsForList().leftPush(vin,jsonString);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,31 @@
|
|||
<artifactId>lombok</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.13.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.baomidou</groupId>
|
||||
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
|
||||
<version>2.5.8</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.33</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package com.mobai.forest;
|
||||
|
||||
import com.dtflys.forest.annotation.Request;
|
||||
import com.mobai.domain.MqttServerModel;
|
||||
import com.mobai.domain.Result;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className ForestGet
|
||||
* @description 描述
|
||||
* @date 2024/6/18 15:40
|
||||
*/
|
||||
|
||||
@Component
|
||||
public interface ForestGet {
|
||||
@Request("http://127.0.0.1:8081/fluxmq/getIps/")
|
||||
Result<List<MqttServerModel>> getIps();
|
||||
}
|
|
@ -136,7 +136,7 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
add(String.valueOf(vehicle.getChgStatus()));
|
||||
}};
|
||||
|
||||
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicle.getStartTime()), measurementsList, valuesList);
|
||||
iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
package com.mobai.kafka.listener;
|
||||
|
||||
import com.mobai.domain.MqttServerModel;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.forest.ForestGet;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className VinConsumer
|
||||
* @description 描述
|
||||
* @date 2024/6/18 17:18
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class VinConsumer {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
|
||||
for (String topic : topics) {
|
||||
for (int i = 0; i < 8; i++) {
|
||||
TopicPartition topicPartition = new TopicPartition(topic, i);
|
||||
topicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
|
||||
new ConsumerRebalanceListener(){
|
||||
@Override
|
||||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// 可以在这里处理分区被撤销前的逻辑
|
||||
System.out.println("Partitions revoked: " + partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
// 可以在这里处理分区被分配后的逻辑
|
||||
System.out.println("Partitions assigned: " + partitions);
|
||||
}
|
||||
};
|
||||
// 1.参数配置:不是每一非得配置
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("auto.commit.interval.ms", "1000");
|
||||
//因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
|
||||
props.put("group.id", "test");
|
||||
props.put("enable.auto.commit", "true");
|
||||
props.put("session.timeout.ms", "30000");
|
||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.assign(topicPartitions);
|
||||
try {
|
||||
// while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
|
||||
if (!records.isEmpty()) { // 检查是否有消息到来
|
||||
// 创建线程异步执行
|
||||
new Thread(() -> {
|
||||
for (TopicPartition partition : records.partitions()) {
|
||||
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
||||
for (ConsumerRecord<String, String> record : partitionRecords) {
|
||||
// 处理每条消息
|
||||
log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
|
||||
Vehicle vehicle = getVehicle(record.value());
|
||||
if (redisService.hasKey(vehicle.getVin())){
|
||||
redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
} else {
|
||||
// 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
|
||||
Thread.sleep(10);
|
||||
}
|
||||
// }
|
||||
} catch (InterruptedException e) {
|
||||
// 处理解除阻塞时的中断异常,如Thread.sleep被中断
|
||||
Thread.currentThread().interrupt(); // 重新设置中断标志
|
||||
log.error("Consumer was interrupted.", e);
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,21 +1,36 @@
|
|||
package com.mobai.kafka.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.mobai.domain.MqttServerModel;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.forest.ForestGet;
|
||||
import com.mobai.iotDB.service.IotDbServer;
|
||||
import com.mobai.utils.RedisService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.math.BigDecimal;
|
||||
import java.rmi.ServerException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*
|
||||
* @description: kafka 消费者
|
||||
* @copyright: @Copyright (c) 2022
|
||||
* @company: hmblogs
|
||||
|
@ -27,25 +42,208 @@ import java.util.stream.Collectors;
|
|||
@Slf4j
|
||||
public class kafkaConsumerListenerExample {
|
||||
|
||||
@KafkaListener(topics = {"topic0","topic1"}, groupId = "0")
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
|
||||
@Autowired
|
||||
private IotDbServer iotDbServer;
|
||||
|
||||
// @Autowired
|
||||
// private KafkaConsumer<String, String> consumer;
|
||||
|
||||
@Autowired
|
||||
private ForestGet forestGet;
|
||||
|
||||
/**
|
||||
* 监听消费
|
||||
*
|
||||
* @param record
|
||||
*/
|
||||
@KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
|
||||
public void consume(ConsumerRecord<String, String> record) {
|
||||
Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
|
||||
// 进行消息处理逻辑
|
||||
log.info("车辆报文信息 : " + value);
|
||||
System.out.println(record);
|
||||
Vehicle vehicle = getVehicle(record.value());
|
||||
// 存入iotDB数据库
|
||||
try {
|
||||
iotDbServer.insertData(vehicle);
|
||||
log.info("添加成功");
|
||||
if (redisService.hasKey(vehicle.getVin())) {
|
||||
redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
|
||||
}
|
||||
} catch (StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ServerException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IoTDBConnectionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
//批量消费
|
||||
@KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
|
||||
public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
|
||||
System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
|
||||
// List<Vehicle> collect = records.stream().map(record -> {
|
||||
//// 批量消费
|
||||
// @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
|
||||
// public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
|
||||
// System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
|
||||
// List<Vehicle> collect = records.stream().jsonObject(record -> {
|
||||
// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
|
||||
// log.info("车辆报文:{}", value);
|
||||
// return value;
|
||||
// }).toList();
|
||||
// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
|
||||
System.out.println(records);
|
||||
// System.out.println(records);
|
||||
// }
|
||||
|
||||
@Bean
|
||||
public void partitionConsumer() {
|
||||
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
|
||||
for (String topic : topics) {
|
||||
for (int i = 0; i < 8; i++) {
|
||||
TopicPartition topicPartition = new TopicPartition(topic, i);
|
||||
topicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
//
|
||||
// new ConsumerRebalanceListener (){
|
||||
// @Override
|
||||
// public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被撤销前的逻辑
|
||||
// System.out.println("Partitions revoked: " + partitions);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被分配后的逻辑
|
||||
// System.out.println("Partitions assigned: " + partitions);
|
||||
// }
|
||||
// };
|
||||
// //1.参数配置:不是每一非得配置
|
||||
// Properties props = new Properties();
|
||||
// props.put("bootstrap.servers", "localhost:9092");
|
||||
// props.put("auto.commit.interval.ms", "1000");
|
||||
// //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
|
||||
// props.put("group.id", "test");
|
||||
// props.put("enable.auto.commit", "true");
|
||||
// props.put("session.timeout.ms", "30000");
|
||||
// props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
// props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
// consumer.assign(topicPartitions);
|
||||
// try {
|
||||
//// while (true) {
|
||||
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
|
||||
// if (!records.isEmpty()) { // 检查是否有消息到来
|
||||
// // 创建线程异步执行
|
||||
// new Thread(() -> {
|
||||
// for (TopicPartition partition : records.partitions()) {
|
||||
// List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
||||
// for (ConsumerRecord<String, String> record : partitionRecords) {
|
||||
// // 处理每条消息
|
||||
// log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
|
||||
// Vehicle vehicle = getVehicle(record.value());
|
||||
// if (redisService.hasKey(vehicle.getVin())){
|
||||
// redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }).start();
|
||||
// } else {
|
||||
// // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
|
||||
// Thread.sleep(10);
|
||||
// }
|
||||
//// }
|
||||
// } catch (InterruptedException e) {
|
||||
// // 处理解除阻塞时的中断异常,如Thread.sleep被中断
|
||||
// Thread.currentThread().interrupt(); // 重新设置中断标志
|
||||
// log.error("Consumer was interrupted.", e);
|
||||
// }
|
||||
// finally {
|
||||
// consumer.close();
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
// 自定义分区分配监听器
|
||||
public static class MyPartitionAssignmentListener implements ConsumerRebalanceListener {
|
||||
@Override
|
||||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// 可以在这里处理分区被撤销前的逻辑
|
||||
System.out.println("Partitions revoked: " + partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
// 可以在这里处理分区被分配后的逻辑
|
||||
System.out.println("Partitions assigned: " + partitions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析kafka拉取数据
|
||||
*
|
||||
* @param jsonString
|
||||
* @return 车辆信息
|
||||
*/
|
||||
public Vehicle getVehicle(String jsonString) {
|
||||
JSONObject jsonObject = JSONObject.parseObject(jsonString);
|
||||
// 解析车辆报文对象
|
||||
Vehicle vehicle = new Vehicle() {{
|
||||
setStartTime(Long.valueOf(jsonObject.get("startTime").toString()));
|
||||
setSpeed(new BigDecimal(jsonObject.get("speed").toString()));
|
||||
setLatitude(new BigDecimal(jsonObject.get("latitude").toString()));
|
||||
setLongitude(new BigDecimal(jsonObject.get("longitude").toString()));
|
||||
setMileage(new BigDecimal(jsonObject.get("mileage").toString()));
|
||||
setGear(jsonObject.get("gear").toString());
|
||||
setVehicleStatus(new BigDecimal(jsonObject.get("vehicleStatus").toString()).intValue());
|
||||
setChargingStatus(new BigDecimal(jsonObject.get("chargingStatus").toString()).intValue());
|
||||
setChargingEnergyStorageStatus(new BigDecimal(jsonObject.get("chargingEnergyStorageStatus").toString()).intValue());
|
||||
setEasStatus(new BigDecimal(jsonObject.get("easStatus").toString()).intValue());
|
||||
setMotorControllerTemperature(new BigDecimal(jsonObject.get("motorControllerTemperature").toString()));
|
||||
setTotalBatteryCurrent(new BigDecimal(jsonObject.get("totalBatteryCurrent").toString()));
|
||||
setSingleBatteryMaxVoltage(new BigDecimal(jsonObject.get("singleBatteryMaxVoltage").toString()));
|
||||
setOperatingStatus(new BigDecimal(jsonObject.get("operatingStatus").toString()).intValue());
|
||||
setHeatingStatus(new BigDecimal(jsonObject.get("heatingStatus").toString()).intValue());
|
||||
setDcdcStatus(new BigDecimal(jsonObject.get("dcdcStatus").toString()).intValue());
|
||||
setDriveMotorStatus(new BigDecimal(jsonObject.get("driveMotorStatus").toString()).intValue());
|
||||
setPositionStatus(new BigDecimal(jsonObject.get("positionStatus").toString()).intValue());
|
||||
setPtcStatus(new BigDecimal(jsonObject.get("ptcStatus").toString()).intValue());
|
||||
setEpsStatus(new BigDecimal(jsonObject.get("epsStatus").toString()).intValue());
|
||||
setAbsStatus(new BigDecimal(jsonObject.get("absStatus").toString()).intValue());
|
||||
setMcuStatus(new BigDecimal(jsonObject.get("mcuStatus").toString()).intValue());
|
||||
setBatteryInsulationStatus(new BigDecimal(jsonObject.get("batteryInsulationStatus").toString()).intValue());
|
||||
setBatteryStatus(new BigDecimal(jsonObject.get("batteryStatus").toString()).intValue());
|
||||
setChgStatus(new BigDecimal(jsonObject.get("chgStatus").toString()).intValue());
|
||||
setTotalBatteryVoltage(new BigDecimal(jsonObject.get("totalBatteryVoltage").toString()));
|
||||
setMotorSpeed(new BigDecimal(jsonObject.get("motorSpeed").toString()));
|
||||
setMotorCurrent(new BigDecimal(jsonObject.get("motorCurrent").toString()));
|
||||
setMotorVoltage(new BigDecimal(jsonObject.get("motorVoltage").toString()));
|
||||
setAccelerationPedal(new BigDecimal(jsonObject.get("accelerationPedal").toString()));
|
||||
setBrakePedal(new BigDecimal(jsonObject.get("brakePedal").toString()));
|
||||
setSelfCheckCounter(new BigDecimal(jsonObject.get("selfCheckCounter").toString()));
|
||||
setMotorTemperature(new BigDecimal(jsonObject.get("motorTemperature").toString()));
|
||||
setMaximumDischargePower(new BigDecimal(jsonObject.get("maximumDischargePower").toString()));
|
||||
setMaximumFeedbackPower(new BigDecimal(jsonObject.get("maximumFeedbackPower").toString()));
|
||||
setFuelConsumptionRate(new BigDecimal(jsonObject.get("fuelConsumptionRate").toString()));
|
||||
setVin(jsonObject.get("vin").toString());
|
||||
setVoltage(new BigDecimal(jsonObject.get("voltage").toString()));
|
||||
setCurrent(new BigDecimal(jsonObject.get("current").toString()));
|
||||
setResistance(new BigDecimal(jsonObject.get("resistance").toString()));
|
||||
setMotoTorque(new BigDecimal(jsonObject.get("motoTorque").toString()));
|
||||
setRemainingBattery(new BigDecimal(jsonObject.get("remainingBattery").toString()));
|
||||
setSingleBatteryMinVoltage(new BigDecimal(jsonObject.get("singleBatteryMinVoltage").toString()));
|
||||
setSingleBatteryMaxTemperature(new BigDecimal(jsonObject.get("singleBatteryMaxTemperature").toString()));
|
||||
setSingleBatteryMinTemperature(new BigDecimal(jsonObject.get("singleBatteryMinTemperature").toString()));
|
||||
setAvailableBatteryCapacity(new BigDecimal(jsonObject.get("availableBatteryCapacity").toString()));
|
||||
|
||||
}};
|
||||
|
||||
// if (redisService.hasKey(vehicle.getVin())) {
|
||||
// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
|
||||
// }
|
||||
// 进行消息处理逻辑
|
||||
log.info("车辆报文信息 : " + vehicle);
|
||||
return vehicle;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package com.mobai.vehicle.event.controller;
|
||||
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.vehicle.event.service.EventsService;
|
||||
import com.sun.jdi.request.EventRequest;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className EventsController
|
||||
* @description 描述
|
||||
* @date 2024/6/18 11:36
|
||||
*/
|
||||
@Log4j2
|
||||
@RestController
|
||||
@RequestMapping("/event")
|
||||
public class EventsController {
|
||||
@Autowired
|
||||
private EventsService eventsService;
|
||||
|
||||
/**
|
||||
* 车辆注册事件
|
||||
* @param vehicleEvent
|
||||
* @return
|
||||
*/
|
||||
@PostMapping("/insert")
|
||||
public Result insertEvent(@RequestBody VehicleEvent vehicleEvent) {
|
||||
if (!StringUtils.isAllBlank(vehicleEvent.getEvents(),vehicleEvent.getVin())){
|
||||
return Result.error("车辆vin或事件为空");
|
||||
}
|
||||
boolean save = eventsService.save(vehicleEvent);
|
||||
return Result.success("","添加成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* 激活事件
|
||||
* @param req
|
||||
* @return
|
||||
*/
|
||||
@PostMapping("/event")
|
||||
public Result activeEvent(@RequestBody EventReq req){
|
||||
return eventsService.activeEvent(req);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.mobai.vehicle.event.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
/**
|
||||
* @ClassName VehicleEventMapper
|
||||
* @Description 描述
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/18 11:40
|
||||
*/
|
||||
@Mapper
|
||||
public interface VehicleEventMapper extends BaseMapper<VehicleEvent> {
|
||||
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.mobai.vehicle.event.service;
|
||||
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
|
||||
/**
|
||||
* @ClassName EventActiveService
|
||||
* @Description 描述
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/18 14:17
|
||||
*/
|
||||
public interface EventActiveService {
|
||||
Result realTimeData(EventReq req);
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package com.mobai.vehicle.event.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
|
||||
/**
|
||||
* @ClassName EventsService
|
||||
* @Description 描述
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/18 11:36
|
||||
*/
|
||||
public interface EventsService extends IService<VehicleEvent> {
|
||||
|
||||
Result activeEvent(EventReq req);
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.mobai.vehicle.event.service.impl;
|
||||
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.event.service.EventActiveService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className EventActiveServiceImpl
|
||||
* @description 描述
|
||||
* @date 2024/6/18 14:17
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
public class EventActiveServiceImpl implements EventActiveService {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@Override
|
||||
public Result realTimeData(EventReq req) {
|
||||
|
||||
if (req.getEventState()==0){
|
||||
log.info("开始实时数据事件");
|
||||
redisService.setCacheObject(req.getVin(),null);
|
||||
}else {
|
||||
log.info("关闭实时数据事件");
|
||||
redisService.deleteObject(req.getVin());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.mobai.vehicle.event.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.vehicle.event.mapper.VehicleEventMapper;
|
||||
import com.mobai.vehicle.event.service.EventActiveService;
|
||||
import com.mobai.vehicle.event.service.EventsService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className EventsServiceImpl
|
||||
* @description 描述
|
||||
* @date 2024/6/18 11:39
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class EventsServiceImpl extends ServiceImpl<VehicleEventMapper,VehicleEvent> implements EventsService {
|
||||
|
||||
@Autowired
|
||||
private EventActiveService eventActiveService;
|
||||
|
||||
@Override
|
||||
public Result activeEvent(EventReq req) {
|
||||
switch (req.getEventType()){
|
||||
case 0:
|
||||
return eventActiveService.realTimeData(req);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -4,14 +4,29 @@ kafka:
|
|||
topic: vehicle-event-topic0
|
||||
partition: 0
|
||||
spring:
|
||||
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
host: 175.24.138.82
|
||||
rabbitmq:
|
||||
host: 175.24.138.82
|
||||
stream:
|
||||
username: guest
|
||||
password: guest
|
||||
# kafka 配置
|
||||
datasource:
|
||||
dynamic:
|
||||
primary: mybatis #设置默认的数据源或者数据源组,默认值即为master
|
||||
strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
|
||||
datasource:
|
||||
# gmall:
|
||||
# url: jdbc:mysql://hadoop104:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8
|
||||
# username: root
|
||||
# password: "000000"
|
||||
# driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
mybatis:
|
||||
url: jdbc:mysql://127.0.0.1:3306/train_working?useSSL=false&useUnicode=true&characterEncoding=UTF-8
|
||||
username: root
|
||||
password: 1234
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
kafka:
|
||||
producer:
|
||||
# Kafka服务器
|
||||
|
@ -75,7 +90,7 @@ spring:
|
|||
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
|
||||
# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
|
||||
# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
|
||||
max-poll-records: 3
|
||||
max-poll-records: 10
|
||||
properties:
|
||||
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
max:
|
||||
|
@ -87,6 +102,7 @@ spring:
|
|||
timeout:
|
||||
ms: 10000
|
||||
listener:
|
||||
# type: batch # 开启批量监听
|
||||
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
|
||||
concurrency: 4
|
||||
# 自动提交关闭,需要设置手动消息确认
|
||||
|
@ -95,3 +111,13 @@ spring:
|
|||
missing-topics-fatal: false
|
||||
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
poll-timeout: 600000
|
||||
|
||||
# mybatis配置
|
||||
mybatis:
|
||||
# 搜索指定包别名
|
||||
typeAliasesPackage: com.mobai
|
||||
# 配置mapper的扫描,找到所有的mapper.xml映射文件
|
||||
mapperLocations: classpath:mapper/**/*.xml
|
||||
logging:
|
||||
level:
|
||||
com.mobai.mapper: DEBUG
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE mapper
|
||||
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.mobai.vehicle.event.mapper.VehicleEventMapper">
|
||||
|
||||
</mapper>
|
Loading…
Reference in New Issue