fix():添加rabbitmq工具类,添加数据处理基础类,添加缓存

dev.gateway
王鑫 2024-09-30 16:28:08 +08:00 committed by ruyaxie
parent e9e32c2ad0
commit d6167ebfed
45 changed files with 1033 additions and 109 deletions

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# Spring
spring:
application:

View File

@ -27,6 +27,10 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-redis</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>enterpise-common</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,21 @@
package com.muyu.common.domain;
import com.muyu.domain.SysCar;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @Author WangXin
* @Data 2024/9/30
* @Description
* @Version 1.0.0
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class InformationData {
private SysCar sysCar;
}

View File

@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.muyu.common.iotdb.constant.IotdbConstant.ROOT_DATA_DATAJSON;
import static com.muyu.common.iotdb.constant.IotdbConstant.SELECT_ROOT_DATA_DATAJSON_DATASOURCE;
/**
* @Author WangXin
* @Data 2024/9/30
@ -64,18 +67,23 @@ public class IotDBSessionConfig {
* @param deviceId
* @param time
* @param measurements
* @param values
* @param value
*/
public void insertRecord(SessionPool sessionPool,String deviceId, long time, List<String> measurements, List<String> values) {
public void insertRecord(SessionPool sessionPool,String deviceId,
long time, List<String> measurements,List<TSDataType> dataTypeList, JSONObject value) {
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
sessionPool.insertRecord(deviceId, time, measurements, values);
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, value);
sessionPool.insertRecord(deviceId, time, measurements,dataTypeList,new Object[]{value.toJSONString()});
} catch (Exception e) {
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
deviceId, time, measurements, values, e.getMessage());
deviceId, time, measurements, value, e.getMessage());
}
}
public void JSONObject(JSONObject value){
}
public SessionDataSet selectRecord(SessionPool sessionPool,String sql) {
log.info("iotdb数据查询sql:[{}]",sql);
SessionDataSetWrapper sessionDataSetWrapper = null;
@ -90,11 +98,29 @@ public class IotDBSessionConfig {
}
public JSONObject getJsonFindByTime(SessionPool sessionPool,String fieldName,String deviceId,Long timestamp) {
String sql = String.format("SELECT %s FROM %s WHERE time = %d", fieldName, deviceId, timestamp);
SessionDataSet sessionDataSet = selectRecord(sessionPool, sql);
try {
while (sessionDataSet.hasNext()){
RowRecord next = sessionDataSet.next();
for (Field field : next.getFields()) {
String stringValue = field.getStringValue();
}
}
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
return null;
}
public static void main(String[] args) {
SessionPool sessionPool = new SessionPool("127.0.0.1", 6667, "root", "root", 10);
String ROOT_DATA_DATAJSON = "root.car.data.datajson";
String SELECT_ROOT_DATA_DATAJSON_DATASOURCE = "select * from root.car.data.datajson";
String jsonValue = """
{
@ -110,14 +136,13 @@ public class IotDBSessionConfig {
}
}
""";
JSONObject value = JSONObject.parseObject(jsonValue);
IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig();
List<String> values = new ArrayList<>();
values.add(jsonValue);
ArrayList<String> objects = new ArrayList<>();
objects.add("datasource");
iotDBSessionConfig.insertRecord(sessionPool,ROOT_DATA_DATAJSON,System.currentTimeMillis(),objects,values);
List<String> measurements = List.of("datasource");
List<TSDataType> datatypeList = List.of(TSDataType.TEXT);
iotDBSessionConfig.insertRecord(sessionPool,ROOT_DATA_DATAJSON,System.currentTimeMillis(),measurements,datatypeList,value);
SessionDataSet sessionDataSet = iotDBSessionConfig.selectRecord(sessionPool,SELECT_ROOT_DATA_DATAJSON_DATASOURCE);
@ -140,7 +165,6 @@ public class IotDBSessionConfig {
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
log.info("数据为:{}", JSONObject.toJSONString(longMapHashMap));
}

View File

@ -0,0 +1,13 @@
package com.muyu.common.iotdb.constant;
/**
* @Author WangXin
* @Data 2024/9/30
* @Description iotdb
* @Version 1.0.0
*/
public interface IotdbConstant {
String ROOT_DATA_DATAJSON = "root.car.data.datajson";
String SELECT_ROOT_DATA_DATAJSON_DATASOURCE = "select * from root.car.data.datajson";
}

View File

@ -0,0 +1,75 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: WangXin
* @Time: 2024/4/22 11:55
* @Description:
*/
@Configuration
public class TopicConfig {
/**
*
* @return exchange
*/
@Bean(name = "topicExchange")
public Exchange getTopicExchange(){
return ExchangeBuilder
.topicExchange("exchange_topic")
.build();
}
/**
* 01
* @return queue
*/
@Bean(name = "topicQueue01")
public Queue getTopicQueue01(){
return QueueBuilder
.durable("queue_topic_01")
.build();
}
/**
* 02
* @return queue
*/
@Bean(name = "topicQueue02")
public Queue getTopicQueue02(){
return QueueBuilder
.durable("queue_topic_02")
.build();
}
/**
* 01
* @return binding
*/
@Bean
public Binding getTopicBinding01(){
return BindingBuilder
.bind(getTopicQueue01())
.to(getTopicExchange())
//路由键 队列1接收debug级别的消息
.with("front.#")
.noargs();
}
/**
* 02
* @return binding
*/
@Bean
public Binding getTopicBinding02(){
return BindingBuilder
.bind(getTopicQueue02())
.to(getTopicExchange())
// 路由键 队列2接收info级别的消息
.with("back.order.*")
.noargs();
}
}

View File

@ -6,17 +6,29 @@ package com.muyu.common.rabbit.constants;
* @Description: rabbitmq
* @Version 1.0.0
*/
public class RabbitmqConstants {
public interface RabbitmqConstants {
//普通队列
public static final String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME";
String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME";
public static final String lOG_QUEUE_NAME = "LOG_QUEUE_NAME";
String lOG_QUEUE_NAME = "LOG_QUEUE_NAME";
//延迟队列
//队列名称
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
String DELAYED_QUEUE_NAME = "delayed_queue";
//交换机名称
public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
//交换机
public static final String DELAYED_ROUTING_KEY = "delayed";
String DELAYED_ROUTING_KEY = "delayed";
/**
* 线
*/
String TOP_BOTTOM_STITCHING = "top_bottom_stitching";
/**
* 线
*/
String TOP_RULE = "car.top.data";
/**
* 线
*/
String BOTTOM_RULE = "car.bottom.data";
}

View File

@ -77,5 +77,64 @@ public class RabbitMQConsumerUtil {
}
/**
*
* @param data
* @param message
* @param channel
*/
public void carUpConsumer(String data,Message message , Channel channel) {
log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message);
try {
// 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) {
return;
}
/**
* ---------------------------------------------------------------
*/
log.info("[ 根据vin拿到缓存 ] vin为 --》 {}",data);
log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("xxx消费者接收到消息消息内容{},消费成功...", message);
} catch (Exception e) {
log.error("xxx消费者接收到消息消息内容{},消费消息异常,异常信息:{}", message, e);
// 消息回退 拒绝消费消息
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean requeue 是否回到原来的队列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("xxx消费者接收到消息消息内容{},回退消息异常,异常信息:{}", message, ex);
}
}finally {
try {
channel.close();
} catch (Exception e) {
log.error("xxx消费者关闭Channel异常消息内容{},异常信息:{}", message, e);
}
}
}
}

View File

@ -133,9 +133,9 @@ public class RabbitMQProducerUtil {
* @param msg
* @return
*/
public Result<?> topicSendMessage(String exchange, String rule, Object obj, String msg) {
public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# Spring
spring:

View File

@ -1,25 +0,0 @@
package com.muyu.data.basics;
/**
* @Author WangXin
* @Data 2024/9/29
* @Description
* @Version 1.0.0
*/
public class EventHandler {
private static final ThreadLocal<EventQueueConfig> EVENT_THREAD = new ThreadLocal<>();
public static void set(final EventQueueConfig handler) {
EVENT_THREAD.set(handler);
}
public static EventQueueConfig get() {
return EVENT_THREAD.get();
}
public static void remove(){
EVENT_THREAD.remove();
}
}

View File

@ -1,35 +0,0 @@
package com.muyu.data.basics;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @Author WangXin
* @Data 2024/9/29
* @Description
* @Version 1.0.0
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventQueueConfig {
private LinkedBlockingDeque<EventProcessBasics> taskNodeQueue = new LinkedBlockingDeque<>();
public void addEvent(EventProcessBasics obj){
this.taskNodeQueue.add(obj);
}
public boolean hashEventNext(){
return !taskNodeQueue.isEmpty();
}
private EventProcessBasics nextTaskNode(){
return taskNodeQueue.poll();
}
}

View File

@ -1,5 +1,6 @@
package com.muyu.data.domain;
import com.alibaba.fastjson2.JSONObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
@ -29,5 +30,5 @@ public class DataJSON {
* JSON
*/
@Schema(name = "车辆JSON数据")
private String datasource;
private JSONObject datasource;
}

View File

@ -1,5 +1,6 @@
package com.muyu.data.domain;
import com.alibaba.fastjson2.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -21,7 +22,7 @@ public class EventActuate {
/**
* json
*/
private String jsonData;
private JSONObject jsonData;
/**
* key
*/

View File

@ -0,0 +1,90 @@
package com.muyu.data.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Date;
/**
* @author Bai
* @date 2024/9/20 10:29
* @description Information:
* @version: 1.0
*/
/**
*
*/
@Data
@SuperBuilder
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "car_information",autoResultMap = true)
public class Information {
//自增主键
@TableId(value = "id",type = IdType.AUTO)
private Long id;
//车辆VIn马
@Schema(name = "车辆VIn马",type = "String")
private String carVin;
//车辆品牌
@Schema(name = "车辆品牌",type = "String")
private String carBrand;
//车辆类型外键
@Schema(name = "车辆类型外键",type = "Integer")
private Integer typeId;
//车辆类型名称
@Schema(type = "String",description = "车辆类型名称")
private String typeName;
//电子围栏外键
@Schema(name = "电子围栏外键",type = "String")
private String groupId;
//车辆电机厂商
@Schema(name = "车辆电机厂商",type = "String")
private String carMotorManufacturer;
//电机型号
@Schema(name = "电机型号",type = "String")
private String carMotorModel;
//车辆电池厂商
@Schema(name = "车辆电池厂商",type = "String")
private String carBatteryManufacturer;
//电池型号
@Schema(name = "电池型号",type = "String")
private String carBatteryModel;
//围栏组编码
@Schema(name = "围栏组编码",type = "String")
private String groupCode;
//启用状态(1.在线 2.离线 3.已断开 4.待连接 5.维修中
@Schema(name = "启用状态(1.在线 2.离线 3.已断开 4.待连接 5.维修中",type = "String")
private String state;
//创建人
@Schema(name = "创建人",type = "Integer")
private String createBy;
//创建时间
@Schema(name = "创建时间",type = "Date")
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date createTime;
//更新人
@Schema(name = "更新人",type = "Integer")
private String updateBy;
//更新时间
@Schema(name = "更新时间",type = "Date")
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date updateTime;
//备注
@Schema(name = "备注",type = "String")
private String remark;
//策略id
@Schema(name = "策略id",type = "Integer")
private Integer strategyId;
}

View File

@ -1,6 +1,9 @@
package com.muyu.data.event;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.data.basics.StartEvent;
import com.muyu.data.domain.EventActuate;
import com.muyu.data.event.tactics.IotdbStoreEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@ -16,7 +19,9 @@ public class AutoStartupEventListener implements ApplicationListener<StartEvent>
@Override
public void onApplicationEvent(StartEvent event) {
EventActuate eventActuate = event.getEventActuate();
JSONObject jsonData = eventActuate.getJsonData();
new IotdbStoreEvent().execute(jsonData);
}

View File

@ -0,0 +1,21 @@
package com.muyu.data.event;
import com.alibaba.fastjson2.JSONObject;
import java.util.List;
/**
* @Author WangXin
* @Data 2024/9/29
* @Description
* @Version 1.0.0
*/
public interface EventStrategy {
/**
*
* @param jsonObject
*/
void execute(JSONObject jsonObject);
}

View File

@ -0,0 +1,30 @@
package com.muyu.data.event;
import com.alibaba.fastjson2.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* @Author WangXin
* @Data 2024/9/29
* @Description
* @Version 1.0.0
*/
@Data
@AllArgsConstructor
public class EventStrategyContext {
/**
*
*/
private EventStrategy eventStrategy;
/**
*
* @param jsonObject json
*/
public void handleEvent(JSONObject jsonObject) {
if (jsonObject != null) {
eventStrategy.execute(jsonObject);
}
}
}

View File

@ -0,0 +1,18 @@
package com.muyu.data.event.tactics;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.data.event.EventStrategy;
/**
* @Author WangXin
* @Data 2024/9/30
* @Description
* @Version 1.0.0
*/
public class FenceStrategyEvent implements EventStrategy {
@Override
public void execute(JSONObject jsonObject) {
}
}

View File

@ -0,0 +1,28 @@
package com.muyu.data.event.tactics;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.data.event.EventStrategy;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author WangXin
* @Data 2024/9/29
* @Description Iotdb
* @Version 1.0.0
*/
@Log4j2
@Component
public class IotdbStoreEvent implements EventStrategy {
/**
*
* @param jsonObject json
*/
@Override
public void execute(JSONObject jsonObject) {
log.info("[存储事件] ---》 json对象{}", jsonObject);
}
}

View File

@ -22,6 +22,33 @@
</description>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>enterprise-cache</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-data-process-common</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-iotdb</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- kafka 公共包 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
<!-- rabbit 公共包 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>

View File

@ -0,0 +1,24 @@
package com.muyu.data.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEvent;
/**
*
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-29 20:03
**/
public class EventCustom extends ApplicationEvent {
private JSONObject data;
public EventCustom(Object source, JSONObject data) {
super(source);
this.data = data;
}
public JSONObject getData() {
return data;
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.data.basic;
import org.springframework.context.ApplicationListener;
/**
*
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-29 20:05
**/
public interface EventListener extends ApplicationListener<EventCustom> {
void onEvent(EventCustom event);
}

View File

@ -0,0 +1,32 @@
package com.muyu.data.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
*
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-29 17:43
**/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publishEvent(JSONObject message) {
EventCustom event = new EventCustom(this, message);
publisher.publishEvent(event);
}
}

View File

@ -0,0 +1,38 @@
package com.muyu.data.consumer;
import com.muyu.data.util.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 线
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-30 10:39
**/
@Log4j2
@Component
public class GoOfflineConsumer {
@Autowired
private CacheUtil cacheUtil;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "GO_OFFLINE", durable = "true"),
exchange = @Exchange(value = "OFFLINE_EXCHANGE", type = "fanout")))
public void online(String vin) {
log.info("车辆vin: {},车辆开始消费", vin);
cacheUtil.remove(vin);
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.data.consumer;
import com.muyu.data.domain.Information;
import com.muyu.data.util.CacheUtil;
import com.muyu.domain.CarInformation;
import com.muyu.enterprise.cache.VehicleCacheService;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 线
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-29 16:37
**/
@Log4j2
@Component
public class GoOnlineConsumer {
@Autowired
private CacheUtil cacheUtil;
@Autowired
private VehicleCacheService vehicleCacheService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "GO_ONLINE", durable = "true"),
exchange = @Exchange(value = "ONLINE_EXCHANGE", type = "fanout")))
public void online(String vin) {
log.info("车辆vin: {},车辆开始消费", vin);
CarInformation carInformation = vehicleCacheService.get(vin);
cacheUtil.put(vin,carInformation);
}
}

View File

@ -0,0 +1,72 @@
package com.muyu.data.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.data.basic.EventPublisher;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-28 14:55
**/
@Log4j2
@Component
public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
@Autowired
private KafkaConsumer consumer;
@Autowired
private EventPublisher eventPublisher;
private final String topic = "vehicle";
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Async
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("开始监听kafka-topic{}", topic);
List<String> topics = Collections.singletonList(topic);
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
executorService.submit(() -> handleRecord(record));
});
}
}
private void handleRecord(ConsumerRecord<String, String> record) {
String value = record.value();
JSONObject jsonObject = JSONObject.parseObject(value);
log.info("value: {}", value);
eventPublisher.publishEvent(jsonObject);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭kafka和线程");
consumer.close();
executorService.shutdown();
}
}

View File

@ -0,0 +1,40 @@
package com.muyu.data.controller;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.core.domain.Result;
import com.muyu.common.iotdb.config.IotDBSessionConfig;
import jakarta.annotation.Resource;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import static com.muyu.common.iotdb.constant.IotdbConstant.ROOT_DATA_DATAJSON;
/**
* iotdb
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-28 19:09
**/
@RestController
@RequestMapping("iotdb")
public class IotdbController {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@PostMapping("/add")
public Result<?> addJSON(@RequestBody JSONObject jsonObject) {
iotDBSessionConfig.insertRecord(iotDBSessionConfig.getSessionPool(),ROOT_DATA_DATAJSON,System.currentTimeMillis(), List.of("datasource"),List.of(TSDataType.TEXT),jsonObject);
return Result.success();
}
@GetMapping("/findByDataTime/{time}")
public Result<?> findByDataTime(@PathVariable("time") Long time){
return null;
}
}

View File

@ -0,0 +1,32 @@
package com.muyu.data.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* rabbit
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-30 00:04
**/
@RestController
@RequestMapping("rabbit")
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send() {
rabbitTemplate.convertAndSend("ONLINE_EXCHANGE", "", "vin123456");
}
}

View File

@ -0,0 +1,40 @@
package com.muyu.data.controller;
import cn.hutool.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-28 14:55
**/
@RestController
@RequestMapping
public class TestController {
@Autowired
private KafkaProducer kafkaProducer;
private static final String topic = "vehicle";
@GetMapping("send")
public String sendKafka(){
JSONObject entries = new JSONObject();
entries.set("car_vin","vin123123");
entries.set("car_name","奥迪");
String entriesString = entries.toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,entriesString);
kafkaProducer.send(producerRecord);
return "success";
}
}

View File

@ -0,0 +1,42 @@
package com.muyu.data.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.data.basic.EventCustom;
import com.muyu.data.basic.EventListener;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-29 17:34
**/
@Configuration
public class AddDatabaseListener implements EventListener {
@Override
public void onEvent(EventCustom event) {
JSONObject jsonObject = event.getData();
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
jsonObject.forEach((key, value) -> {
keys.add(key);
values.add((String) value);
});
long time = System.currentTimeMillis();
}
@Override
public void onApplicationEvent(EventCustom event) {
onEvent(event);
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.data.monitor;
import com.muyu.common.core.domain.Result;
import com.muyu.common.rabbit.constants.RabbitmqConstants;
import com.muyu.common.rabbit.consumer.RabbitMQConsumerUtil;
import com.muyu.common.rabbit.producer.RabbitMQProducerUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author WangXin
* @Data 2024/9/30
* @Description 线
* @Version 1.0.0
*/
@Component
public class CatRabbitMonitor {
@Resource
private RabbitMQConsumerUtil rabbitMQConsumerUtil;
@Resource
private RabbitMQProducerUtil rabbitMQProducerUtil;
@RabbitListener(queues = {"queue_topic_01"})
public void topicConsumer01(String msg){
System.out.println("消费者 -01- 接收消息:" + msg);
}
/**
* 线
* @param carVin
*/
public void topicConsumer02(String carVin){
Result<?> result = rabbitMQProducerUtil.topicSendMessage(
RabbitmqConstants.TOP_BOTTOM_STITCHING,
RabbitmqConstants.TOP_RULE,
carVin
);
if (result.getCode() != Result.SUCCESS){
throw new RuntimeException(result.getMsg());
}
}
}

View File

@ -0,0 +1,38 @@
package com.muyu.data.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.data.domain.Information;
import org.springframework.stereotype.Component;
/**
*
*
* @program: cloud-server
* @author: WangXin
* @create: 2024-09-30 10:08
**/
@Component
public class CacheUtil<T> {
private final Cache<String, T> cache;
public CacheUtil() {
this.cache = Caffeine.newBuilder()
.maximumSize(500L)
.build();
}
public T get(String key) {
return cache.getIfPresent(key);
}
public void put(String key, T value) {
cache.put(key, value);
}
public void remove(String key) {
cache.invalidate(key);
}
}

View File

@ -11,6 +11,12 @@ nacos:
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
iotdb:
ip: 127.0.0.1
username: root
port: 6667
password: root
maxSize: 10
amqp:
deserialization:
trust:

View File

@ -41,5 +41,10 @@
<artifactId>swagger-annotations-jakarta</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-system</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,27 @@
package com.muyu.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @Author WangXin
* @Data 2024/9/30
* @Description
* @Version 1.0.0
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class CarFirmMiddle {
/**
* ID
*/
private Long CarId;
/**
* ID
*/
private Long firmId;
}

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
@ -19,7 +19,7 @@ spring:
allow-bean-definition-overriding: true
application:
# 应用名称
name: cloud-saas
name: cloud-warn
profiles:
# 环境配置
active: dev

View File

@ -18,19 +18,13 @@
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-cache</artifactId>
<version>${muyu.version}</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>enterpise-common</artifactId>
<version>${muyu.version}</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-cache</artifactId>
</dependency>
</dependencies>

View File

@ -0,0 +1,28 @@
package com.muyu.enterprise.cache;
import com.muyu.common.cache.CacheAbsBasic;
import com.muyu.domain.CarInformation;
/**
*
* @className: VehicleCacheService
* @author: Yang 🦅
* @date: 2024/9/30 00:36
* @Version: 1.0
* @description:
*/
public class VehicleCacheService extends CacheAbsBasic<String, CarInformation> {
@Override
public String keyPre() {
return "vehicle:info:";
}
@Override
public String decode(String key){
return key.replace("vehicle:info:","");
}
}

View File

@ -1,2 +1,3 @@
com.muyu.enterpise.cache.MessageValueCacheService
com.muyu.enterpise.cache.SysCarCacheService
com.muyu.enterprise.cache.VehicleCacheService

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# Spring
spring:

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:

View File

@ -4,10 +4,10 @@ server:
# nacos线上地址
nacos:
addr: 123.57.152.124:8848
addr: 127.0.0.1:8848
user-name: nacos
password: nacos
namespace: xyr
namespace: wx
# Spring
spring:

30
pom.xml
View File

@ -307,6 +307,36 @@
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- iotdb 公共包-->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-iotdb</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-data-process-common</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- 缓存 公共包 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-cache</artifactId>
<version>${muyu.version}</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>enterprise-cache</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies>
</dependencyManagement>