Merge remote-tracking branch 'origin/dev.processing' into dev

dev.redis
晨哀 2024-10-07 10:16:28 +08:00
commit edb3e74824
21 changed files with 696 additions and 5 deletions

View File

@ -107,6 +107,18 @@
<version>3.6.3</version> <version>3.6.3</version>
</dependency> </dependency>
<!-- MQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 引入Caffeine缓存库-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.0.5</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -2,6 +2,7 @@ package com.muyu.processing.abstraction;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.muyu.processing.interfaces.EventInterface; import com.muyu.processing.interfaces.EventInterface;
import lombok.extern.log4j.Log4j2;
/** /**
* *
@ -11,6 +12,7 @@ import com.muyu.processing.interfaces.EventInterface;
* @nameEventProcessor * @nameEventProcessor
* @Date2024/9/28 20:58 * @Date2024/9/28 20:58
*/ */
@Log4j2
public abstract class EventProcessor implements EventInterface { public abstract class EventProcessor implements EventInterface {
private EventProcessor eventProcessor; private EventProcessor eventProcessor;

View File

@ -0,0 +1,24 @@
package com.muyu.processing.basic;
import cn.hutool.json.JSONObject;
import org.springframework.context.ApplicationEvent;
/**
*
* @Author
* @Packagecom.muyu.processing.basic
* @Projectcar-cloud-server
* @nameEventCustom
* @Date2024/9/29 21:18
*/
public class EventCustom extends ApplicationEvent{
private JSONObject data;
public EventCustom(Object source, JSONObject data) {
super(source);
this.data = data;
}
public JSONObject getData(){
return data;
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.processing.basic;
import org.springframework.context.ApplicationListener;
/**
*
* @Author
* @Packagecom.muyu.processing.basic
* @Projectcar-cloud-server
* @nameEventListener
* @Date2024/9/29 22:29
*/
public interface EventListener extends ApplicationListener<EventCustom> {
void onEvent(EventCustom event);
}

View File

@ -0,0 +1,23 @@
package com.muyu.processing.basic;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
/**
*
* @Author
* @Packagecom.muyu.processing.basic
* @Projectcar-cloud-server
* @nameEventPublisher
* @Date2024/9/29 22:31
*/
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher){
this.publisher = applicationEventPublisher;
}
}

View File

@ -0,0 +1,23 @@
package com.muyu.processing.config;
import com.muyu.processing.listener.AddDatabaseListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
* @Author
* @Packagecom.muyu.processing.config
* @Projectcar-cloud-server
* @nameAppConfig
* @Date2024/9/29 22:23
*/
@Configuration
public class AppConfig {
@Bean
public AddDatabaseListener addDatabaseListener(){
return new AddDatabaseListener();
}
}

View File

@ -0,0 +1,42 @@
package com.muyu.processing.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* bean
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到 broker 成功");
} else {
System.out.println("消息发送到 broker 失败,失败的原因:" + cause);
}
}
}

View File

@ -0,0 +1,107 @@
package com.muyu.processing.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* iotdb
* @Author
* @Packagecom.muyu.processing.config
* @Projectcar-cloud-server
* @nameIotDBConfig
* @Date2024/9/30 15:13
*/
@Log4j2
@Component
@Configuration
public class IotDBConfig {
/**
* session
*/
@Bean
public static Session session(){
Session session = null;
try {
session = new Session(
"47.101.49.53",
6667,
"root",
"root"
);
session.open(false);
session.setFetchSize(100);
} catch (Exception e) {
throw new RuntimeException(e);
}
return session;
}
/**
*
*/
public void execute(String deviceId, Long time, List<String> measurement, List<String> values){
if (!CollectionUtils.isEmpty(measurement) && !CollectionUtils.isEmpty(values)){
try {
session().insertAlignedRecord(deviceId,time,measurement,values);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
}
}
}
/**
*
*/
public List<HashMap<String, Object>> executeQuery(String sql){
log.info("sql:{}",sql);
ArrayList<HashMap<String, Object>> list = new ArrayList<>();
try {
SessionDataSet sessionDataSet = session().executeQueryStatement(sql);
int fetchSize = sessionDataSet.getFetchSize();
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> columnTypes = sessionDataSet.getColumnTypes();
System.out.println(columnNames);
System.out.println(columnTypes);
if (fetchSize > 0){
while (sessionDataSet.hasNext()){
HashMap<String, Object> map = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
// 查询结果第一个为时间戳
long timestamp = next.getTimestamp();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
String key = field.getStringValue();
// 这里的需要按照类型获取
Object value = field.getObjectValue(field.getDataType());
map.put(key, value);
}
list.add(map);
}
}
sessionDataSet.closeOperationHandle();
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}
}

View File

@ -0,0 +1,50 @@
package com.muyu.processing.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdmin
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* rabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,18 @@
package com.muyu.processing.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
*/
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,39 @@
package com.muyu.processing.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* bean
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setReturnsCallback(this);
}
/**
* queue
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() +
"被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// TODO 回退了所有的信息,可做补偿机制
}
}

View File

@ -6,7 +6,12 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy;
import com.muyu.processing.interfaces.EventInterface; import com.muyu.processing.interfaces.EventInterface;
import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -17,8 +22,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.Duration; import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
/** /**
* kafka
* @Author * @Author
* @Packagecom.muyu.processing.consumer * @Packagecom.muyu.processing.consumer
* @Projectcar-cloud-server * @Projectcar-cloud-server
@ -32,6 +39,9 @@ public class KafkaConsumerService implements InitializingBean {
@Resource @Resource
private KafkaConsumer kafkaConsumer; private KafkaConsumer kafkaConsumer;
@Resource
private CacheUtil cacheUtil;
// @Resource // @Resource
// private EventInterface eventInterface; // private EventInterface eventInterface;
@ -53,8 +63,16 @@ public class KafkaConsumerService implements InitializingBean {
JSONObject jsonObject = JSON.parseObject(originalMsg); JSONObject jsonObject = JSON.parseObject(originalMsg);
log.info("消费数据转换为JSON对象: " + jsonObject); log.info("消费数据转换为JSON对象: " + jsonObject);
log.info("消费数据转换为JSON对象: " + jsonObject.toString()); log.info("消费数据转换为JSON对象: " + jsonObject.toString());
// eventInterface.handle(jsonObject);
String value = jsonObject.toString();
String vin = value.substring(0, 11);
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
WarnRule warnRule = (WarnRule) map.get("warnRule");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
Vehicle vehicle = (Vehicle) map.get("vehicle");
Object breakdown = map.get("breakdown");
Fence fence = (Fence) map.get("fence");
// eventInterface.handle(jsonObject);
} }
} }
}); });

View File

@ -0,0 +1,45 @@
package com.muyu.processing.consumer;
import com.muyu.enterprise.cache.FaultCacheService;
import com.muyu.enterprise.cache.FenceCahceService;
import com.muyu.enterprise.cache.VehicleCacheService;
import com.muyu.enterprise.cache.WarnRuleCacheService;
import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 线
* @Author
* @Packagecom.muyu.processing.consumer
* @Projectcloud-vehicle
* @nameOfflineMonitoringConsumer
* @Date2024/10/4 14:48
*/
@Log4j2
@Component
public class OfflineMonitoringConsumer {
@Resource
private CacheUtil cacheUtil;
/**
*
* @param vin vin
*/
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring"))
public void receive(String vin){
log.info("清除缓存中的数据,车辆vin: {}", vin);
// 清除缓存
cacheUtil.remove(vin);
}
}

View File

@ -0,0 +1,81 @@
package com.muyu.processing.consumer;
import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy;
import com.muyu.enterprise.cache.*;
import com.muyu.processing.utils.CacheUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* 线
* @Author
* @Packagecom.muyu.processing.consumer
* @Projectcar-cloud-server
* @nameMQconsumer
* @Date2024/9/29 17:19
*/
@Log4j2
@Component
public class OnLineMonitoringConsumer {
@Resource
private CacheUtil cacheUtil;
@Resource
private VehicleCacheService vehicleCacheService;
@Resource
private FaultCacheService faultCacheService;
@Resource
private FenceCahceService fenceCahceService;
@Resource
private WarnRuleCacheService warnRuleCacheService;
@Resource
private WarnStrategyCacheService warnStrategyCacheService;
/**
* 线线
*/
@RabbitListener(queuesToDeclare = @Queue("long_time_no_see"))
public void receive(String vin, Message message, Channel channel){
try {
log.info("添加本地缓存,车辆vin: {}", vin);
WarnRule warnRule = warnRuleCacheService.get(vin);
WarnStrategy warnStrategy = warnStrategyCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin);
Object breakdown = faultCacheService.get(vin);
Fence fence = fenceCahceService.get(vin);
HashMap<String, Object> map = new HashMap<>();
map.put("warnRule",warnRule);
map.put("warnStrategy",warnStrategy);
map.put("vehicle",vehicle);
map.put("breakdown",breakdown);
map.put("fence",fence);
cacheUtil.put(vin,map);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
}

View File

@ -0,0 +1,59 @@
package com.muyu.processing.controller;
import com.muyu.processing.config.IotDBConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* iotdb
* @Author
* @Packagecom.muyu.processing.controller
* @Projectcar-cloud-server
* @nameIotdbController
* @Date2024/10/2 9:39
*/
@RestController
@RequestMapping("iotdb")
public class IotDbController {
@Resource
private IotDBConfig iotDBConfig;
private String json = "{\n" + " \"carVin\": \"VIN123456\",\n" +
" \"carName\": \"宝马\",\n" + "}";
/**
*
*/
@GetMapping("add")
public void add(){
// Map map = JSON.parseObject(json, Map.class);
// Set set = map.keySet();
ArrayList<String> key = new ArrayList<>();
ArrayList<String> value = new ArrayList<>();
key.add("car_vin");
key.add("car_name");
value.add("VIN123456");
value.add("宝马");
System.out.println(key);
System.out.println(value);
long l = System.currentTimeMillis();
iotDBConfig.execute("root.vehicle", l, key, value);
}
/**
*
*/
@GetMapping("findList")
public void findList(){
String sql = "select * from root.vehicle";
List<HashMap<String, Object>> list = iotDBConfig.executeQuery(sql);
System.out.println(list);
}
}

View File

@ -1,12 +1,14 @@
package com.muyu.processing.controller; package com.muyu.processing.controller;
import com.alibaba.fastjson.JSONObject; import cn.hutool.json.JSONObject;
import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.kafka.config.KafkaProducerConfig; import com.muyu.common.kafka.config.KafkaProducerConfig;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Field;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -14,6 +16,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
/** /**
* -
* @Author * @Author
* @Packagecom.muyu.processing.controller * @Packagecom.muyu.processing.controller
* @Projectcar-cloud-server * @Projectcar-cloud-server
@ -29,10 +32,44 @@ public class TestKafka {
@Resource @Resource
private KafkaProducer<String, String> kafkaProducer; private KafkaProducer<String, String> kafkaProducer;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* Kafka
* @return String
*/
@GetMapping("/send") @GetMapping("/send")
public void sendMsg(){ public String sendMsg(){
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", "你好啊"); JSONObject entries = new JSONObject();
entries.set("vin","vin123468");
entries.set("name","宝马");
String entriesString = entries.toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", entriesString);
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
return "OK";
} }
/**
* MQ
* @return String
*/
@GetMapping("/sendMQ")
public String sendMQ(){
rabbitTemplate.convertAndSend("long_time_no_see","晨哀,好久不见",message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
return "OK";
}
/**
* MQ
* @return String
*/
@GetMapping("/sendDui")
public String sedDui() {
rabbitTemplate.convertAndSend("myExchange","Im.fine","");
return "OK";
}
} }

View File

@ -1,6 +1,8 @@
package com.muyu.processing.interfaces; package com.muyu.processing.interfaces;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.checkerframework.checker.units.qual.C;
import org.springframework.stereotype.Component;
/** /**
* *

View File

@ -0,0 +1,34 @@
package com.muyu.processing.listener;
import cn.hutool.json.JSONObject;
import com.muyu.processing.basic.EventCustom;
import com.muyu.processing.basic.EventListener;
import java.util.ArrayList;
/**
*
* @Author
* @Packagecom.muyu.processing.listener
* @Projectcar-cloud-server
* @nameAddDatabaseListener
* @Date2024/9/29 22:25
*/
public class AddDatabaseListener implements EventListener {
@Override
public void onEvent(EventCustom event) {
JSONObject jsonObject = event.getData();
ArrayList<Object> keys = new ArrayList<>();
ArrayList<Object> values = new ArrayList<>();
jsonObject.forEach((key, value) ->{
keys.add(key);
values.add(value);
});
// 添加数据库
}
@Override
public void onApplicationEvent(EventCustom event) {
onEvent(event);
}
}

View File

@ -0,0 +1,58 @@
package com.muyu.processing.utils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Component;
/**
*
* @Author
* @Packagecom.muyu.processing.utils
* @Projectcloud-vehicle
* @nameCacheUtil
* @Date2024/10/4 15:14
*/
@Component
public class CacheUtil<T> {
/**
*
*/
private final Cache<String, T> cache;
/**
*
*/
public CacheUtil(){
this.cache = Caffeine.newBuilder()
.maximumSize(500L)
.build();
}
/**
*
* @param key
* @return
*/
public T get(String key){
return cache.getIfPresent(key);
}
/**
*
* @param key
* @param value
*/
public void put(String key, T value){
cache.put(key, value);
}
/**
*
* @param key
*/
public void remove(String key){
cache.invalidate(key);
}
}

View File

@ -13,6 +13,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* IotDb
* @Author * @Author
* @Packagecom.muyu.processing.utils * @Packagecom.muyu.processing.utils
* @Projectcar-cloud-server * @Projectcar-cloud-server

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848 addr: 47.101.49.53:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: seven namespace: dev
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring: