小改动
parent
0a10ab2dbc
commit
1c0137990f
|
@ -11,7 +11,6 @@ import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.scheduling.annotation.EnableAsync;
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
@ -20,6 +19,10 @@ import java.util.List;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 队列消息处理
|
||||||
|
* @author fst
|
||||||
|
*/
|
||||||
@Component
|
@Component
|
||||||
@EnableAsync
|
@EnableAsync
|
||||||
@Data
|
@Data
|
||||||
|
|
|
@ -1,10 +1,6 @@
|
||||||
package com.god.data.service;
|
package com.god.data.service;
|
||||||
|
|
||||||
import com.god.common.core.utils.SpringUtils;
|
|
||||||
import com.god.common.redis.service.RedisService;
|
|
||||||
import com.god.data.common.domain.CarMessage;
|
|
||||||
import com.god.data.queue.MessageProcessor;
|
import com.god.data.queue.MessageProcessor;
|
||||||
import com.god.data.utils.AnalyzeUtils;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
@ -15,7 +11,6 @@ import org.springframework.stereotype.Component;
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -34,6 +29,8 @@ public class ParseDataService {
|
||||||
@Autowired
|
@Autowired
|
||||||
private MessageProcessor messageProcessor;
|
private MessageProcessor messageProcessor;
|
||||||
|
|
||||||
|
private int count=0;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
|
@ -50,7 +47,7 @@ public class ParseDataService {
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
String value = record.value();
|
String value = record.value();
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
log.info("{}为从kafak拉取到的10进制报文",value);
|
log.info("{}为从kafka拉取到的10进制报文",value);
|
||||||
//判断往那条队列发送消息
|
//判断往那条队列发送消息
|
||||||
if (!messageProcessor.getIsQueueABlocked().get()) {
|
if (!messageProcessor.getIsQueueABlocked().get()) {
|
||||||
log.info("将消息放入队列A");
|
log.info("将消息放入队列A");
|
||||||
|
@ -63,6 +60,8 @@ public class ParseDataService {
|
||||||
log.info("队列A和队列B都被阻塞,根据自定义的策略进行处理");
|
log.info("队列A和队列B都被阻塞,根据自定义的策略进行处理");
|
||||||
}
|
}
|
||||||
log.info("消息消费时间:{}", System.currentTimeMillis() - startTime);
|
log.info("消息消费时间:{}", System.currentTimeMillis() - startTime);
|
||||||
|
count++;
|
||||||
|
log.info("消费了{}条消息",count);
|
||||||
}
|
}
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.info("records: {}", records);
|
log.info("records: {}", records);
|
||||||
|
|
Loading…
Reference in New Issue