测压一波
parent
07fae80e36
commit
00f08abd18
|
@ -2,6 +2,7 @@ package com.parseSystem;
|
|||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
|
@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
* @date 2023/11/25 15:34
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class ParseSystemApplication {
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,11 +23,10 @@ import java.time.Duration;
|
|||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
*
|
||||
* 监听车辆事件变更消费队列
|
||||
*
|
||||
* @author 冯凯
|
||||
* @version 1.0
|
||||
*
|
||||
* @date 2023/11/27 22:19
|
||||
*/
|
||||
@Component
|
||||
|
@ -52,7 +51,7 @@ public class ListenEventChangeRabbitMq {
|
|||
*/
|
||||
@RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")})
|
||||
public void consumerSubscribe(String mesg, Message message, Channel channel) {
|
||||
log.info("收到准备订阅主题:{}" , mesg);
|
||||
log.info("收到准备订阅主题:{}", mesg);
|
||||
|
||||
// 将接收到的消息解析为kafkaConstants对象
|
||||
kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class);
|
||||
|
@ -67,31 +66,30 @@ public class ListenEventChangeRabbitMq {
|
|||
// 将消费者订阅的主题设置为"kafka_top"队列的第一个分区
|
||||
consumer.assign(Collections.singleton(topicPartition));
|
||||
// 开启线程持续拉取数据
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = null;
|
||||
try {
|
||||
// 每个一秒钟拉取一次数据
|
||||
records = consumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
System.out.println(record.value());
|
||||
long startTime = System.currentTimeMillis();
|
||||
log.info("开始消费时间{}:",startTime);
|
||||
// 解析数据
|
||||
String data = ParseUtil.sixteenToStr(record.value());
|
||||
// 构建数据对象
|
||||
VehicleData vehicleData = VehicleData.getBuild(data);
|
||||
log.info(vehicleData);
|
||||
dataQueueManager.enqueueData(vehicleData);
|
||||
|
||||
log.info("耗费时间{}",System.currentTimeMillis()-startTime);
|
||||
// 调用执行事件
|
||||
eventHandlerService.executeEvent(vehicleData);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("records: {}", records);
|
||||
log.error(e);
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = null;
|
||||
try {
|
||||
// 每个一秒钟拉取一次数据
|
||||
records = consumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
System.out.println(record.value());
|
||||
long startTime = System.currentTimeMillis();
|
||||
log.info("开始消费时间{}:", startTime);
|
||||
// 解析数据
|
||||
String data = ParseUtil.sixteenToStr(record.value());
|
||||
// 构建数据对象
|
||||
VehicleData vehicleData = VehicleData.getBuild(data);
|
||||
log.info(vehicleData);
|
||||
dataQueueManager.enqueueData(vehicleData);
|
||||
// 调用执行事件
|
||||
eventHandlerService.executeEvent(vehicleData);
|
||||
log.info("耗费时间{}", System.currentTimeMillis() - startTime);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("records: {}", records);
|
||||
log.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 订阅特定分区
|
||||
|
|
|
@ -4,14 +4,12 @@ import com.parseSystem.storage.service.StorageDateService;
|
|||
import com.parseSystem.vehicle.VehicleData;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* @author 冯凯
|
||||
|
@ -23,9 +21,9 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
@Log4j2
|
||||
public class DataQueueManager {
|
||||
|
||||
private int count;
|
||||
@Autowired
|
||||
private StorageDateService storageDateService;
|
||||
private static final int BATCH_SIZE_THRESHOLD = 100; // 批量处理阈值
|
||||
|
||||
|
||||
@Autowired
|
||||
|
@ -49,25 +47,22 @@ public class DataQueueManager {
|
|||
@Scheduled(fixedRate = 5000)
|
||||
public void processQueues() {
|
||||
// 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作
|
||||
if (dataQueue1.size() >= BATCH_SIZE_THRESHOLD) {
|
||||
List<VehicleData> batchData = new ArrayList<>();
|
||||
dataQueue1.drainTo(batchData, BATCH_SIZE_THRESHOLD);
|
||||
batchPersistToDatabase(batchData);
|
||||
}
|
||||
if (!dataQueue1.isEmpty()) {
|
||||
|
||||
List<VehicleData> batchData = new ArrayList<>();
|
||||
count+=batchData.size();
|
||||
dataQueue1.drainTo(batchData);
|
||||
storageDateService.saveBatch(batchData);
|
||||
}
|
||||
// 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作
|
||||
else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) {
|
||||
log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作");
|
||||
List<VehicleData> batchData = new ArrayList<>();
|
||||
dataQueue2.drainTo(batchData, BATCH_SIZE_THRESHOLD);
|
||||
batchPersistToDatabase(batchData);
|
||||
count+=batchData.size();
|
||||
dataQueue2.drainTo(batchData);
|
||||
storageDateService.saveBatch(batchData);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void batchPersistToDatabase(List<VehicleData> batchData) {
|
||||
// 批量持久化到数据库的逻辑
|
||||
for (VehicleData vehicleData : batchData) {
|
||||
storageDateService.save(vehicleData); // 持久化到数据库
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ import java.util.Date;
|
|||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@TableName("vehicle_data")
|
||||
@TableName("vehicle_data_copy1")
|
||||
public class VehicleData {
|
||||
|
||||
/**
|
||||
|
|
|
@ -9,7 +9,7 @@ spring:
|
|||
name: parseSystem
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
||||
url: jdbc:mysql://117.72.43.22:4000/test1?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
|
||||
username: root
|
||||
password: 123456
|
||||
rabbitmq:
|
||||
|
|
Loading…
Reference in New Issue