package com.muyu.eventdriven; import com.alibaba.fastjson.JSON; import com.muyu.eventdriven.domain.VehicleData; import com.muyu.eventdriven.server.EventInfoService; import com.muyu.eventdriven.server.impl.EventInfoServiceImpl; import com.muyu.eventdriven.tactics.EventTactics; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.checkerframework.checker.units.qual.A; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationContext; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * @ClassName EventDriveenApplication * @Description 描述 * @Author YunFei.Du * @Date 2024/6/25 10:03 */ @Log4j2 @Component public class EventDrivenRunner implements ApplicationRunner { @Autowired private EventInfoServiceImpl eventInfoService; @Autowired private RedisTemplate< String, String > redisTemplate; @Autowired private ApplicationContext applicationContext; @Autowired private EventTactics eventTactics; private final AtomicInteger start = new AtomicInteger(); private static List classNameList = new ArrayList<>((Arrays.asList( "StorageEvent", "ElectronicFenceEvent", "FaultAlarmEvent", "RealTimeDataEvent", "IndexWarningEvent"))); @Override public void run(ApplicationArguments args) throws Exception { if (start.get() != 0) { return; } synchronized (this) { if (start.get() != 0) { return; } start.set(1); // kafka分区监听器 new Thread(() -> { ArrayList< TopicPartition > topicPartitions = new ArrayList<>(); // List< String > topics = redisTemplate.opsForList ( ).range ( "ipList", 0, -1 ).stream ().distinct ().collect( Collectors.toList()); List topics = new ArrayList ( ){{ add ( "47.92.213.100" ); add ( "47.92.95.98" ); }}; for (String topic : topics) { for (int i = 0; i < 8; i++) { TopicPartition topicPartition = new TopicPartition(topic, i); topicPartitions.add(topicPartition); } } Properties props = new Properties(){{ put("bootstrap.servers", "43.142.12.243:9092"); put("auto.commit.interval.ms", "1000"); put("group.id", "test"); put("enable.auto.commit", "true"); put("session.timeout.ms", "30000"); put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); }}; KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.assign(topicPartitions); try { while (true) { ConcurrentHashMap< String, ArrayList< VehicleData > > stringListHashMap = new ConcurrentHashMap<>(); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); ConcurrentHashMap< String, ArrayList< VehicleData > > map = new ConcurrentHashMap<> ( ); stringListHashMap= eventInfoService.getVehicleData(records, map); if (!records.isEmpty()) { ArrayList< VehicleData > vehicleData1 = new ArrayList<> ( ); ConcurrentHashMap< String, ArrayList< VehicleData > > finalStringListHashMap = stringListHashMap; new Thread(() -> { for (TopicPartition partition : records.partitions()) { String value=""; String key1=""; List< ConsumerRecord > partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { key1=record.key ( ); log.info("Offset = {}, Key = {}, Value = {}", record.offset(), key1, record.value()); value= record.value ( ); System.out.println (value ); VehicleData vehicleData = JSON.parseObject ( value, VehicleData.class ); vehicleData1.add ( vehicleData ); // VehicleData vehicle = kafkaConsumerListenerExample.getVehicle(record.value()); // VehicleEvent events = eventsService.getEvents(vehicle.getVin()); // HandlerHelper.doHandler(events, vehicle, redisService); } // 处理拉取到的消息,将消息按车辆事件类型分类 finalStringListHashMap.forEach((key, value1) -> { // 从 Redis 中获取车辆事件处理类的列表 // String vehicleEventString = redisTemplate.opsForHash().get(RedisConstants.VEHICLE_EVENT, key).toString(); String vehicleEventString = "1,2,3,4,5"; for (String str : vehicleEventString.split(",")) { CompletableFuture.runAsync(() -> { applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value1); }); } }); } }).start(); } else { Thread.sleep(10); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Consumer was interrupted.", e); } finally { consumer.close(); } }).start(); } } }