代码更新

master
one 2023-12-05 09:43:28 +08:00
parent 0111e33f7d
commit 2d4c91e7c1
5 changed files with 11 additions and 7 deletions

View File

@ -24,7 +24,7 @@ public class RedisInitTest {
/**
*
*/
@Bean
// @Bean
public void redisInit()
{
redisService.deleteObject("Fence" + "VIN12345678912345");
@ -42,7 +42,7 @@ public class RedisInitTest {
/**
*
*/
@Bean
// @Bean
public void parseInit()
{
redisService.deleteObject("event" + "VIN12345678912345");

View File

@ -33,7 +33,7 @@ public class KafkaConsumerConfig {
//设置kafka服务器地址
props.put("bootstrap.servers", KAFKA_CON);
//每个消费者分配独立的组号
props.put("group.id", "g2");
props.put("group.id", "sjz");
//如果value合法则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量

View File

@ -24,7 +24,7 @@
// /**
// * @description: kafka监听
// */
// @KafkaListener(topics = { "test002" },
// @KafkaListener(topics = { "test" },
// containerFactory = "kafkaListenerContainerFactory",
// errorHandler = "myKafkaListenerErrorHandler")
// public void kafkaProducer(ConsumerRecord<Object,Object> record, Acknowledgment acknowledgment){

View File

@ -80,10 +80,14 @@ public class MessageProcessor {
* @param carMessage
*/
private void eventProducer(CarMessage carMessage){
if (!redisService.hasKey("event" + carMessage.getVin())){
redisService.setCacheList("event" + carMessage.getVin(),null);
}
// 根据对象车辆vin获取事件集合从redis中获取
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
log.info("当前车辆 vin{} 绑定事件:{}",carMessage.getVin(),eventList.toString());
// 执行事件
if(!eventList.isEmpty()){
if(!eventList.isEmpty()) {
for (String event : eventList) {
EventService eventService = SpringUtils.getBean(event);
eventService.execute(carMessage);

View File

@ -41,13 +41,13 @@ public class ParseDataService {
public void start(){
new Thread(() -> {
consumer.subscribe(Arrays.asList("test"));
consumer.subscribe(List.of("test"));
log.info("kafka数据解析服务启动");
while (true){
ConsumerRecords<String, String> records = null;
try {
//死循环拉取kafka数据
records = consumer.poll(Duration.ofMillis(1000));
records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
long startTime = System.currentTimeMillis();