kafka连接改动

master
fst1996 2023-12-06 18:55:39 +08:00
parent 745db9d03f
commit fefd255ae0
3 changed files with 61 additions and 14 deletions

View File

@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @description:
* kafka
* @Author fst
* @date 2023/12/5 14:29
*/
@ -23,11 +23,20 @@ public class TopicListener {
@Autowired
private ParseDataService parseDataService;
/**
* kafka
* @param msg
* @param message
* @param channel
*/
@RabbitListener(queuesToDeclare = @Queue("TOPIC_INFORM"))
public void topicData(String msg, Message message, Channel channel){
try {
log.info("监听到需要连接的主题:{},开始建立kafka连接",msg);
parseDataService.start(msg);
//添加主题
parseDataService.addTopic(msg);
//触发连接
parseDataService.start();
log.info("连接完成");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {

View File

@ -11,10 +11,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.*;
/**
@ -33,11 +30,27 @@ public class ParseDataService {
@Autowired
private MessageProcessor messageProcessor;
private static final Set<String> topics = new HashSet<>();
/**
* kafka线
*/
private Thread consumerThread;
/**
*
*/
private int count=0;
public void start(String topic){
/**
* kafka
*/
public void start(){
if (consumerThread != null && consumerThread.isAlive()){
consumerThread.interrupt(); //中断线程
}
new Thread(() -> {
consumer.subscribe(List.of(topic));
consumer.subscribe(topics);
log.info("kafka数据解析服务启动");
while (true){
ConsumerRecords<String, String> records = null;
@ -71,4 +84,21 @@ public class ParseDataService {
}).start();
}
/**
* topic
* @param topic
*/
public void addTopic(String topic) {
topics.add(topic);
}
/**
* topic
* @param topic
*/
public void removeTopic(String topic) {
topics.remove(topic);
}
}

View File

@ -1,5 +1,6 @@
package com.god.data.service.impl;
import com.god.common.core.utils.uuid.UUID;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.BreakConstant;
import com.god.data.common.domain.CarMessage;
@ -49,13 +50,20 @@ public class FaultAlarmEvent implements EventService {
ArrayList<String> strings = new ArrayList<>();
faultCodeStrategy.addFaultCodeIfNecessary(carMessage,strings);
//把对应的故障码集合存入rabbitmq
if (!strings.isEmpty()){
strings.forEach(obj -> {
BreakConstant breakConstant = BreakConstant.valueOf(obj);
String value = breakConstant.value();
});
rabbitTemplate.convertAndSend("god.car.fault.alarm",strings);
HashMap<String, List<String>> stringListHashMap = new HashMap<>();
if (!strings.isEmpty()){
// strings.forEach(obj -> {
// BreakConstant breakConstant = BreakConstant.valueOf(obj);
// String value = breakConstant.value();
// });
// rabbitTemplate.convertAndSend("god.car.fault.alarm",strings,message -> {
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
// return message;
// });
}
stringListHashMap.put(carMessage.getVin(),strings);
rabbitTemplate.convertAndSend("",stringListHashMap);
}