feat:() 优化kafka消费者

dev.processing.optimize
晨哀 2024-10-07 20:00:24 +08:00
parent 2695602972
commit d3f2444521
2 changed files with 37 additions and 16 deletions

View File

@ -20,6 +20,26 @@ public class VehicleProcessingApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(VehicleProcessingApplication.class, args); SpringApplication.run(VehicleProcessingApplication.class, args);
System.out.println(" _ooOoo_\n" +
" o8888888o\n" +
" 88\" . \"88\n" +
" (| -_- |)\n" +
" O\\ = /O\n" +
" ____/`---'\\____\n" +
" .' \\\\| |// `.\n" +
" / \\\\||| : |||// \\\n" +
" / _||||| -:- |||||- \\\n" +
" | | \\\\\\ - /// | |\n" +
" | \\_| ''\\---/'' | |\n" +
" \\ .-\\__ `-` ___/-. /\n" +
" ___`. .' /--.--\\ `. . __\n" +
" .\"\" '< `.___\\_<|>_/___.' >'\"\".\n" +
" | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n" +
" \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n" +
" ======`-.____`-.___\\_____/___.-`____.-'======\n" +
" `=---='\n" +
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" +
" // 佛祖保佑 永不宕机 永无BUG //");
} }
} }

View File

@ -56,23 +56,24 @@ public class KafkaConsumerService implements InitializingBean {
while (true) { while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) { for (ConsumerRecord consumerRecord : consumerRecords) {
//从ConsumerRecord中获取消费数据 try {
String originalMsg = (String) consumerRecord.value(); //从ConsumerRecord中获取消费数据
log.info("从Kafka中消费的原始数据: " + originalMsg); String originalMsg = (String) consumerRecord.value();
//把消费数据转换为JSON对象 log.info("从Kafka中消费的原始数据: " + originalMsg);
JSONObject jsonObject = JSON.parseObject(originalMsg); //把消费数据转换为JSON对象
log.info("消费数据转换为JSON对象: " + jsonObject); JSONObject jsonObject = JSON.parseObject(originalMsg);
log.info("消费数据转换为JSON对象: " + jsonObject.toString()); String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin);
String value = jsonObject.toString(); Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
String vin = value.substring(0, 11); WarnRule warnRule = (WarnRule) map.get("warnRule");
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin); WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
WarnRule warnRule = (WarnRule) map.get("warnRule"); Vehicle vehicle = (Vehicle) map.get("vehicle");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); Object breakdown = map.get("breakdown");
Vehicle vehicle = (Vehicle) map.get("vehicle"); Fence fence = (Fence) map.get("fence");
Object breakdown = map.get("breakdown");
Fence fence = (Fence) map.get("fence");
// eventInterface.handle(jsonObject); // eventInterface.handle(jsonObject);
} catch (Exception e) {
log.info("这个有问题:{}",e.getMessage());
}
} }
} }
}); });