事件
master_suzejing
JangCan 2024-04-09 21:14:09 +08:00
parent 7abd6f6523
commit 9478ae16c5
4 changed files with 555 additions and 539 deletions

View File

@ -28,9 +28,4 @@ public class RealTimeTrajectoryController {
return result; return result;
} }
@GetMapping("/findVin/{vin}")
public String findVin(@PathVariable String vin){
String result = realTimeTrajectoryService.findVin(vin);
return result;
}
} }

View File

@ -1,19 +1,21 @@
package com.zhilian.resolver.model; package com.zhilian.resolver.model;
import com.zhilian.common.core.utils.SpringUtils; import com.zhilian.common.core.utils.SpringUtils;
import com.zhilian.common.redis.service.RedisService;
import com.zhilian.common.resolver.domain.ResolverReportData; import com.zhilian.common.resolver.domain.ResolverReportData;
import com.zhilian.resolver.service.ResolverEventService; import com.zhilian.resolver.service.ResolverEventService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.*;
import java.util.Collections; import java.util.stream.Collectors;
import java.util.List;
import java.util.Properties;
import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString; import static com.zhilian.resolver.utils.ConvertUtils.hexStringToString;
import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData; import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData;
@ -26,23 +28,21 @@ import static com.zhilian.resolver.utils.ConvertUtils.parseVehicleData;
@Component @Component
@Slf4j @Slf4j
public class ModelsKafkaMessage { public class ModelsKafkaMessage {
private static final String TOPIC_NAME = "test-topic"; @Autowired
private RedisService redisService;
private static final String TOPIC_NAME = "vehicle-topic";
private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092"; private static final String BOOTSTRAP_SERVERS = "10.10.25.5:9092";
static ArrayList<String> stringEvents = new ArrayList<>() {
{
add("malfunction"); //故障事件
add("event-realTimeTrajectory"); //实时轨迹数事件
add("event-storage"); //存储服务事件
add("geofence"); //电子围栏事件
}
};
/** /**
* *
* @return * @return
*/ */
@Scheduled(fixedDelay = 50) @PostConstruct
private static void consumerMessages() { private void consumerMessages() {
Thread kafkaConsumerThread = new Thread(() -> {
log.info("启动线程");
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
@ -66,22 +66,31 @@ public class ModelsKafkaMessage {
List<ResolverReportData> resolverReportDataList = parseVehicleData(str); List<ResolverReportData> resolverReportDataList = parseVehicleData(str);
for (ResolverReportData vehicleData : resolverReportDataList) { for (ResolverReportData vehicleData : resolverReportDataList) {
log.info("解析到车辆数据:{}", vehicleData); log.info("解析到车辆数据:{}", vehicleData);
//str-->vehicleDataList-->vehicleData实例
//获取vin
String vin = vehicleData.getVin();
//获取事件集
Set<Object> cacheSet = redisService.getCacheSet("vehicle-event:" + vin);
List<String> events = cacheSet.stream().map(item -> {
return String.valueOf(item);
}).collect(Collectors.toList());
log.info("事件集合:{}",events);
log.info("解析到车辆数据:{}", vehicleData); log.info("解析到车辆数据:{}", vehicleData);
for (String stringEvent : stringEvents) { for (String stringEvent : events) {
ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent); ResolverEventService resolverEventService =SpringUtils.getBean(stringEvent);
resolverEventService.execute(vehicleData); resolverEventService.execute(vehicleData);
} }
} }
}); });
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Error occurred in Kafka consumer thread", e);
} finally { } finally {
consumer.close(); consumer.close();
} }
});
kafkaConsumerThread.start();
} }

View File

@ -1,22 +1,33 @@
package com.zhilian.resolver.service.impl.eventRealTime; package com.zhilian.resolver.service.impl.eventRealTime;
import com.zhilian.common.redis.service.RedisService;
import com.zhilian.common.resolver.domain.ResolverReportData; import com.zhilian.common.resolver.domain.ResolverReportData;
import com.zhilian.resolver.service.ResolverEventService; import com.zhilian.resolver.service.ResolverEventService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* @ClassName RealTimeTrajectoryEventImplService * @ClassName RealTimeTrajectoryEventImplService
* @Description * @Description
* @Author Can.J * @Author Can.J
* @Date 2024/4/4 9:49 * @Date 2024/4/4 9:49
*/ */
@Slf4j @Slf4j
@Service("event-realTimeTrajectory") @Service("event-realTimeTrajectory")
public class RealTimeTrajectoryEventImplService implements ResolverEventService { public class RealTimeTrajectoryEventImplService implements ResolverEventService {
@Autowired
private RedisService redisService;
@Override @Override
public void execute(ResolverReportData resolverReportData) { public void execute(ResolverReportData resolverReportData) {
log.info("开始实时数据"); log.info("开始实时数据");
//实时数据 事件名+vin为key 判断有键删除 保持最新数据 前台redis组件获取数据
if(redisService.hasKey("event-real-time:"+resolverReportData.getVin())){
redisService.deleteObject("event-real-time:"+resolverReportData.getVin());
}
redisService.setCacheSet("event-real-time:"+resolverReportData.getVin(),resolverReportData);
log.info("结束实时数据"); log.info("结束实时数据");
} }
@ -29,4 +40,5 @@ public class RealTimeTrajectoryEventImplService implements ResolverEventService
public String getEventName() { public String getEventName() {
return "event-realTimeTrajectory"; return "event-realTimeTrajectory";
} }
} }