Compare commits

..

No commits in common. "db1368379ec2d5a9a0fd1f4e1eedde64f43d4da4" and "0960e8f9fbf2654bf9640947bc5a12935a01204e" have entirely different histories.

6 changed files with 34 additions and 77 deletions

View File

@ -3,7 +3,6 @@ package com.couplet.analyze.common.event;
import com.couplet.common.domain.request.RealTimeDataRequest; import com.couplet.common.domain.request.RealTimeDataRequest;
import com.couplet.common.redis.service.RedisService; import com.couplet.common.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Set; import java.util.Set;
@ -13,7 +12,6 @@ import java.util.Set;
* @Description: * @Description:
*/ */
@Component
public class AnalyzeEventCache { public class AnalyzeEventCache {
@Autowired @Autowired

View File

@ -87,11 +87,6 @@
<artifactId>spring-boot-starter-amqp</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-common-event</artifactId>
</dependency>
<!-- RabbitMQ依赖--> <!-- RabbitMQ依赖-->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@ -3,8 +3,6 @@ package com.couplet.business.server.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.couplet.analyze.common.contents.AnalyzeEventContents;
import com.couplet.analyze.common.event.AnalyzeEventCache;
import com.couplet.business.server.mapper.VehicleMapper; import com.couplet.business.server.mapper.VehicleMapper;
import com.couplet.business.server.service.FenAndLogoService; import com.couplet.business.server.service.FenAndLogoService;
import com.couplet.business.server.service.VehicleAndLogoService; import com.couplet.business.server.service.VehicleAndLogoService;
@ -26,7 +24,6 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,9 +59,6 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
@Autowired @Autowired
private FenAndLogoService fenAndLogoService; private FenAndLogoService fenAndLogoService;
@Autowired
private AnalyzeEventCache eventCache;
/* /*
* @Author: LiuYunHu * @Author: LiuYunHu
* @Date: 2024/3/26 22:11 * @Date: 2024/3/26 22:11
@ -260,7 +254,6 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
Result.error(result); Result.error(result);
} }
//获取新增的车辆id值 //获取新增的车辆id值
//执行添加电子围栏 //执行添加电子围栏
int i = vehicleAndLogoService.vehicleBindLogo(vehicle.getVehicleId(), insertParams.getLogoIds()); int i = vehicleAndLogoService.vehicleBindLogo(vehicle.getVehicleId(), insertParams.getLogoIds());
@ -269,9 +262,6 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
//刷新set缓存 //刷新set缓存
reCache(); reCache();
//加入事件缓存
reEvent();
result = "新增成功!"; result = "新增成功!";
@ -405,10 +395,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
* @Param: [] * @Param: []
* @Return: void * @Return: void
**/ **/
// @Scheduled(cron = "0/3 * * * * *") @Scheduled(cron = "0/3 * * * * *")
//初始化
@PostConstruct
public void reCache() { public void reCache() {
//刷新缓存执行开始 //刷新缓存执行开始
@ -444,23 +431,4 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
}); });
} }
/*
* @Author: LiuYunHu
* @Date: 2024/4/8 19:39
* @Description:
* @Param: []
* @Return: void
**/
@PostConstruct
public void reEvent() {
List<Vehicle> list = list(new VehicleListParams());
list.forEach(vehicle -> {
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.STORED_EVENT);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.BREAKDOWN);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.ELECTRONIC_FENCE);
eventCache.addEvent(vehicle.getVin(), AnalyzeEventContents.REAL_TIME_DATA);
});
}
} }

View File

@ -16,11 +16,9 @@ spring:
discovery: discovery:
# 服务注册地址 # 服务注册地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: 172469
config: config:
# 配置中心地址 # 配置中心地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式 # 配置文件格式
file-extension: yml file-extension: yml
# 共享配置 # 共享配置

View File

@ -27,10 +27,10 @@ public class KafkaTest {
public static void main(String[] args) { public static void main(String[] args) {
//生产者示例 //生产者示例
produceMessage(); // produceMessage();
//消费者示例 //消费者示例
consumerMessages(); // consumerMessages();
} }
@ -61,33 +61,33 @@ public class KafkaTest {
} }
//消费者 //消费者
private static void consumerMessages() { // private static void consumerMessages() {
Properties props = new Properties(); // Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
//创建消费者 // //创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
try { // try {
//
//订阅主题 // //订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME)); // consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
//持续消费消息 // //持续消费消息
while (true) { // while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> { // records.forEach(record -> {
System.out.println("消费者接受到的消息值:" + record.value()); // System.out.println("消费者接受到的消息值:" + record.value());
}); // });
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
consumer.close(); // consumer.close();
} // }
} // }
} }

View File

@ -15,11 +15,9 @@ spring:
discovery: discovery:
# 服务注册地址 # 服务注册地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: 172469
config: config:
# 配置中心地址 # 配置中心地址
server-addr: 121.89.211.230:8848 server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式 # 配置文件格式
file-extension: yml file-extension: yml
# 共享配置 # 共享配置
@ -38,7 +36,7 @@ mqtt:
# broker: mqtt://115.159.47.13:1883 # broker: mqtt://115.159.47.13:1883
username: username:
password: password:
clientId: liuyunhu clientId: fluxMq
qos: 0 qos: 0
topic: liuyunhu topic: test