server_five_liuyunhu
liuyunhu 2024-04-08 19:51:32 +08:00
parent be2e99af0c
commit 9327ecae1f
4 changed files with 70 additions and 31 deletions

View File

@ -2,6 +2,7 @@ package com.couplet.analyze.common.event;
import com.couplet.common.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Set;
@ -11,6 +12,7 @@ import java.util.Set;
* @Description:
*/
@Component
public class AnalyzeEventCache {
@Autowired

View File

@ -87,6 +87,11 @@
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-common-event</artifactId>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -3,6 +3,8 @@ package com.couplet.business.server.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.couplet.analyze.common.contents.AnalyzeEventContents;
import com.couplet.analyze.common.event.AnalyzeEventCache;
import com.couplet.business.server.mapper.VehicleMapper;
import com.couplet.business.server.service.FenAndLogoService;
import com.couplet.business.server.service.VehicleAndLogoService;
@ -24,6 +26,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -59,6 +62,9 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
@Autowired
private FenAndLogoService fenAndLogoService;
@Autowired
private AnalyzeEventCache eventCache;
/*
* @Author: LiuYunHu
* @Date: 2024/3/26 22:11
@ -254,6 +260,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
Result.error(result);
}
//获取新增的车辆id值
//执行添加电子围栏
int i = vehicleAndLogoService.vehicleBindLogo(vehicle.getVehicleId(), insertParams.getLogoIds());
@ -262,6 +269,9 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
//刷新set缓存
reCache();
//加入事件缓存
reEvent();
result = "新增成功!";
@ -395,7 +405,10 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
* @Param: []
* @Return: void
**/
@Scheduled(cron = "0/3 * * * * *")
// @Scheduled(cron = "0/3 * * * * *")
//初始化
@PostConstruct
public void reCache() {
//刷新缓存执行开始
@ -431,4 +444,23 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
});
}
/*
* @Author: LiuYunHu
* @Date: 2024/4/8 19:39
* @Description:
* @Param: []
* @Return: void
**/
@PostConstruct
public void reEvent() {
List<Vehicle> list = list(new VehicleListParams());
list.forEach(vehicle -> {
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.STORED_EVENT);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.BREAKDOWN);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.ELECTRONIC_FENCE);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.REAL_TIME_DATA);
});
}
}

View File

@ -27,10 +27,10 @@ public class KafkaTest {
public static void main(String[] args) {
//生产者示例
// produceMessage();
produceMessage();
//消费者示例
// consumerMessages();
consumerMessages();
}
@ -61,33 +61,33 @@ public class KafkaTest {
}
//消费者
// private static void consumerMessages() {
// Properties props = new Properties();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
// //创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
// try {
//
// //订阅主题
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
// //持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// records.forEach(record -> {
// System.out.println("消费者接受到的消息值:" + record.value());
// });
// }
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// consumer.close();
// }
// }
private static void consumerMessages() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println("消费者接受到的消息值:" + record.value());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}