master
fst1996 2023-12-02 08:01:20 +08:00
parent 3a9f1a7ef1
commit e942dc4a65
12 changed files with 270 additions and 25 deletions

View File

@ -5,6 +5,7 @@ import com.god.common.security.annotation.EnableMyFeignClients;
import com.god.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
*
@ -15,6 +16,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableCustomSwagger2
@EnableMyFeignClients
@SpringBootApplication
@EnableScheduling
public class GodCarDataApplication {
public static void main (String[] args) {
SpringApplication.run(GodCarDataApplication.class);

View File

@ -1,5 +1,8 @@
package com.god.data.common.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -16,7 +19,14 @@ import java.math.BigDecimal;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_message_model")
public class CarMessage {
/**
*
*/
@TableId(value = "message_id",type = IdType.AUTO)
private Long messageId;
/**
* VIN
*/

View File

@ -3,10 +3,8 @@ package com.god.data.config;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.ElePoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@ -21,7 +19,10 @@ public class RedisInitTest {
private RedisService redisService;
/**
*
*/
@Bean
public void redisInit()
{
redisService.deleteObject("Fence" + "VIN12345678912345");
@ -36,6 +37,10 @@ public class RedisInitTest {
}
/**
*
*/
@Bean
public void parseInit()
{
redisService.deleteObject("event" + "VIN12345678912345");

View File

@ -33,7 +33,7 @@ public class KafkaConsumerConfig {
//设置kafka服务器地址
props.put("bootstrap.servers", KAFKA_CON);
//每个消费者分配独立的组号
props.put("group.id", "g2");
props.put("group.id", "fst1");
//如果value合法则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量

View File

@ -0,0 +1,14 @@
package com.god.data.controller;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @Author fst
* @date 2023/12/1 20:51
*/
@RestController
public class CarMsgController {
}

View File

@ -0,0 +1,14 @@
package com.god.data.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.god.data.common.domain.CarMessage;
import org.apache.ibatis.annotations.Mapper;
/**
*
* @Author fst
* @date 2023/12/1 20:53
*/
@Mapper
public interface CarMsgMapper extends BaseMapper<CarMessage> {
}

View File

@ -0,0 +1,152 @@
package com.god.data.queue;
import com.god.common.core.utils.SpringUtils;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.CarMessage;
import com.god.data.service.CarMsgService;
import com.god.data.service.EventService;
import com.god.data.utils.AnalyzeUtils;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
@EnableAsync
@Data
@Log4j2
public class MessageProcessor {
@Autowired
private CarMsgService carMsgService;
@Autowired
private RedisService redisService;
private ConcurrentLinkedQueue<String> queueA = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<String> queueB = new ConcurrentLinkedQueue<>();
private AtomicBoolean isQueueABlocked = new AtomicBoolean(false);
private AtomicBoolean isQueueBBlocked = new AtomicBoolean(false);
// 定义一个类成员变量,用于执行定时任务
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* A
* @param message
*/
@Async
public void processFromQueueA(String message) {
log.info("处理队列A中的消息" + message);
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
this.eventProducer(carMessage);
//数据缓存
carMsgService.insert(carMessage);
}
/**
* B
* @param message
*/
@Async
public void processFromQueueB(String message) {
log.info("处理队列B中的消息" + message);
CarMessage carMessage = AnalyzeUtils.parseVehicleData(message);
this.eventProducer(carMessage);
carMsgService.insert(carMessage);
}
/**
*
* @param carMessage
*/
public void eventProducer(CarMessage carMessage){
// 根据对象车辆vin获取事件集合从redis中获取
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
// 执行事件
if(!eventList.isEmpty()){
for (String event : eventList) {
EventService eventService = SpringUtils.getBean(event);
eventService.execute(carMessage);
}
}
}
/**
* A B
* @param message
*/
public void addToQueueA(String message) {
if (!isQueueABlocked.get()) {
queueA.offer(message);
} else {
queueB.offer(message);
}
}
/**
* B A
* @param message
*/
public void addToQueueB(String message) {
if (!isQueueBBlocked.get()) {
queueB.offer(message);
} else {
queueA.offer(message);
}
}
/**
*
*/
public void consumeMessages() {
// 尝试从queueA中取出5条消息然后分别异步处理这5条消息
while (true) {
String message = queueA.poll();
if (message != null) {
CompletableFuture.runAsync(() -> processFromQueueA(message));
}else {
checkQueueStatus();
break;
}
}
// 尝试从queueB中取出5条消息然后分别异步处理这5条消息
while (true) {
String message = queueB.poll();
if (message != null) {
CompletableFuture.runAsync(() -> processFromQueueB(message));
}else {
checkQueueStatus();
break;
}
}
}
@PostConstruct
public void startScheduledTask() {
scheduler.scheduleAtFixedRate(this::consumeMessages, 0, 5, TimeUnit.SECONDS);
}
@PreDestroy
public void stopScheduledTask() {
scheduler.shutdown();
}
public void checkQueueStatus() {
if (queueA.isEmpty()) {
isQueueABlocked.set(true);
}
if (queueB.isEmpty()) {
isQueueBBlocked.set(true);
}
}
}

View File

@ -0,0 +1,14 @@
package com.god.data.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.god.data.common.domain.CarMessage;
/**
* @description:
* @Author fst
* @date 2023/12/1 20:52
*/
public interface CarMsgService extends IService<CarMessage> {
public void insert(CarMessage carMessage);
}

View File

@ -3,21 +3,18 @@ package com.god.data.service;
import com.god.common.core.utils.SpringUtils;
import com.god.common.redis.service.RedisService;
import com.god.data.common.domain.CarMessage;
import com.god.data.service.impl.ElectronicFenceEvent;
import com.god.data.queue.MessageProcessor;
import com.god.data.utils.AnalyzeUtils;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -29,12 +26,14 @@ import java.util.List;
@Log4j2
@Component
public class ParseDataService {
@Autowired
private RedisService redisService;
@Autowired
private KafkaConsumer<String, String> consumer;
@Autowired
private MessageProcessor messageProcessor;
@PostConstruct
@ -49,19 +48,19 @@ public class ParseDataService {
//死循环拉取kafka数据
records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
//把报文解析成对象
CarMessage carMessage = AnalyzeUtils.parseVehicleData(record.value());
//根据对象车辆vin获取事件集合从redis中获取
List<String> eventList = redisService.getCacheList("event" + carMessage.getVin());
//执行事件
if(eventList!= null && !eventList.isEmpty()){
for (String event : eventList) {
EventService eventService = SpringUtils.getBean(event);
eventService.execute(carMessage);
}
String value = record.value();
log.info("{}为从kafak拉取到的10进制报文",value);
//判断往那条队列发送消息
if (!messageProcessor.getIsQueueABlocked().get()) {
log.info("将消息放入队列A");
messageProcessor.addToQueueA(value);
} else if (!messageProcessor.getIsQueueBBlocked().get()) {
log.info("将消息放入队列B");
messageProcessor.addToQueueB(value);
} else {
// 两个队列都被阻塞,根据自定义的策略进行处理,这里可以根据具体情况进行处理
log.info("队列A和队列B都被阻塞根据自定义的策略进行处理");
}
// System.out.println("======="+carMessage+"====================");
System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value());
}

View File

@ -0,0 +1,33 @@
package com.god.data.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.god.common.core.exception.ServiceException;
import com.god.data.common.domain.CarMessage;
import com.god.data.mapper.CarMsgMapper;
import com.god.data.service.CarMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
* @Author fst
* @date 2023/12/1 20:52
*/
@Service
public class CarMsgServiceImpl extends ServiceImpl<CarMsgMapper, CarMessage> implements CarMsgService {
@Autowired
private CarMsgMapper carMsgMapper;
/**
*
* @param carMessage
*/
@Override
public void insert(CarMessage carMessage) {
int insert = carMsgMapper.insert(carMessage);
if (insert < 1) {
throw new ServiceException("保存报文失败");
}
}
}

View File

@ -82,6 +82,11 @@ public class ElectronicFenceEvent implements EventService {
});
}
}
log.info("电子围栏事件结束测试");
rabbitTemplate.convertAndSend("FENCE_STATUS", "电子围栏事件结束测试", message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
}
}

View File

@ -83,7 +83,4 @@ public class AnalyzeUtils {
}