小改动

master
fst1996 2023-12-05 16:32:42 +08:00
parent 2d4c91e7c1
commit 79cb05529e
5 changed files with 120 additions and 17 deletions

View File

@ -0,0 +1,68 @@
package com.god.data.config;
import com.god.common.redis.service.RedisService;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
/**
* map
* @Author fst
* @date 2023/12/5 13:38
*/
@Component
@Log4j2
public class EventMapData {
@Autowired
private RedisService redisService;
public static Map<String, HashSet<String>> eventMap = new HashMap<>();
/**
*
*/
@Bean
public void eventDataInit(){
log.info("开始初始化事件静态map");
eventMap= redisService.getCacheMap("EVENT");
log.info("静态map初始化完成,当前数据:{}",eventMap);
}
/**
*
* @param msg
* @param message
* @param channel
*/
@RabbitListener(queuesToDeclare = @Queue("EVENT_UPDATE"))
public void eventUpdate(String msg, Message message, Channel channel){
try {
log.info("开始更新车辆事件静态map数据");
//msg其实就是车辆vin,获取缓存中对应车辆的事件集合
HashSet<String> event = redisService.getCacheMapValue("EVENT", msg);
//更新静态map
if(event!=null){
eventMap.put(msg,event);
}
//手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("更新车辆事件静态map数据完成");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,39 @@
package com.god.data.listeners;
import com.god.data.service.ParseDataService;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @description:
* @Author fst
* @date 2023/12/5 14:29
*/
@Component
@Log4j2
public class TopicListener {
@Autowired
private ParseDataService parseDataService;
@RabbitListener(queuesToDeclare = @Queue("TOPIC_INFORM"))
public void topicData(String msg, Message message, Channel channel){
try {
log.info("监听到需要连接的主题:{},开始建立kafka连接",msg);
parseDataService.start(msg);
log.info("连接完成");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -15,10 +15,13 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.god.data.config.EventMapData.eventMap;
/**
*
* @author fst
@ -80,15 +83,13 @@ public class MessageProcessor {
* @param carMessage
*/
private void eventProducer(CarMessage carMessage){
if (!redisService.hasKey("event" + carMessage.getVin())){
redisService.setCacheList("event" + carMessage.getVin(),null);
}
//从本地静态数据中获取事件集合
HashSet<String> set = eventMap.get(carMessage.getVin());
// 根据对象车辆vin获取事件集合从redis中获取
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
log.info("当前车辆 vin{} 绑定事件:{}",carMessage.getVin(),eventList.toString());
log.info("当前车辆 vin{} 绑定事件:{}",carMessage.getVin(),set.toString());
// 执行事件
if(!eventList.isEmpty()) {
for (String event : eventList) {
if(!set.isEmpty()) {
for (String event : set) {
EventService eventService = SpringUtils.getBean(event);
eventService.execute(carMessage);
}

View File

@ -35,13 +35,9 @@ public class ParseDataService {
private int count=0;
@PostConstruct
public void start(){
public void start(String topic){
new Thread(() -> {
consumer.subscribe(List.of("test"));
consumer.subscribe(List.of(topic));
log.info("kafka数据解析服务启动");
while (true){
ConsumerRecords<String, String> records = null;

View File

@ -12,9 +12,8 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
@ -45,7 +44,7 @@ public class ElectronicFenceAnalysis implements EventService {
log.info("电子围栏解析事件开始执行,接受的报文数据是:{}",carMessage);
//获取redis车辆绑定的围栏
List<ElePoint> cacheList = redisService.getCacheList("Fence" + carMessage.getVin());
Set<ElePoint> cacheList = redisService.getCacheSet("Fence" + carMessage.getVin());
//循环取出该车辆绑定的围栏信息
for (ElePoint elePoint : cacheList) {