新增本地缓存工具类和缓存实体类,优化报文解析服务

- 新增 CacheUtill 工具类,实现本地缓存功能,包括添加、获取、移除缓存和定时清理过期缓存
- 新增 MyCache 实体类,用于缓存键值对数据及过期时间
- 新增 oneMse测试类(具体用途待明确)- 在 CarMessageServiceImpl 中注释掉未使用的 RedisTemplate 相关代码
- 在 pom.xml 中添加 caffeine 依赖,用于本地缓存
- 移除 Demo 类中关于 MQTT 和 Kafka 的测试代码- 在 MqttTest 类中简化 Kafka 生产者发送消息的逻辑
dev.carData
Aaaaaaaa 2024-10-06 16:31:26 +08:00
parent f3c90ada2a
commit 00d1eb18d1
8 changed files with 207 additions and 113 deletions

View File

@ -92,6 +92,12 @@
<artifactId>cloud-common-kafka</artifactId>
<version>3.6.3</version>
</dependency>
<!--caffeine-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>

View File

@ -0,0 +1,29 @@
package com.muyu.cache.bean;
import lombok.Data;
@Data
public class MyCache {
/**
* key --
*/
private String key;
/**
* value --
*/
private Object value;
/**
* --
*/
private Long expireTime;
}

View File

@ -0,0 +1,73 @@
package com.muyu.cache.bean;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class oneMse {
/**
*
*/
private Long numther;
/**
*
*/
private Integer numthonre;
/**
*
*/
private Integer numtweh;
/**
*
*/
private String numtrwh;
/**
*
*/
private Integer numrrth;
/**
*
*/
private Integer numereth;
/**
*
*/
private String numth;
/**
*
*/
private Integer numttruh;
/**
*
*/
private Integer numtrert;
/**
*
*/
private Integer erg;
/**
*
*/
private Integer numtgreh;
/**
*
*/
private Integer rtetg;
/**
*
*/
private Integer geewr;
/**
*
*/
private Integer heertherh;
}

View File

@ -0,0 +1,93 @@
package com.muyu.cache.utill;
import com.muyu.cache.bean.MyCache;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class CacheUtill {
/**
* Map
*/
private static final Map<String , MyCache> CACHE_MAP = new ConcurrentHashMap<>();
/**
* 线
*/
private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
static {
// 注册一个定时小鹌鹑线程任务,当服务启动了1秒之后,每隔500毫秒执行一次定时任务
//定时清理过期缓存
executorService.scheduleAtFixedRate(CacheUtill::clearCache,1000,500,TimeUnit.MILLISECONDS);
}
/**
*
* @param key
* @param value
* @param expire
*/
public static void put(String key ,Object value,long expire){
MyCache myCache = new MyCache();
myCache.setKey(key);
myCache.setValue(value);
if (expire > 0 ){
long expireTime = System.currentTimeMillis() + Duration.ofSeconds(expire).toMillis();
myCache.setExpireTime(expireTime);
}
CACHE_MAP.put(key ,myCache);
}
/**
*
* @param key
* @return
*/
public static Object get(String key){
if(CACHE_MAP.containsKey(key)){
return CACHE_MAP.get(key).getValue();
}
return null;
}
/**
*
* @param key
*/
public static void remove(String key){
CACHE_MAP.remove(key);
}
/**
*
*/
private static void clearCache(){
if(CACHE_MAP.size() <= 0){
return;
}
//判断是否过期,过期的话从缓存MAP中删除这个元素
CACHE_MAP.entrySet().removeIf(entry -> entry.getValue().getExpireTime() != null && entry.getValue().getExpireTime() > System.currentTimeMillis());
}
}

View File

@ -9,11 +9,6 @@ import lombok.NoArgsConstructor;
/**
*
* @Author
* @Packagecom.muyu.warn.domain.car
* @Projectcloud-server-8
* @nameCarMessage
* @Date2024/9/22 3:07
*/
@Data
@Builder

View File

@ -1,104 +0,0 @@
package com.muyu.server.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.muyu.domain.CarMessage;
import com.muyu.server.service.CarMessageService;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class Demo {
@Resource
private CarMessageService service;
@Resource
private KafkaProducer<String, String> kafkaProducer;
@PostConstruct
public void test() {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
List<CarMessage> list= service.selectCarMessageList(1,2);
String str = new String( mqttMessage.getPayload() );
System.out.println(str);
String[] test = str.split(" ");
String[] results = new String[list.size()];
List<CompletableFuture<String>> futures = new ArrayList<>();
for (CarMessage carmsg : list) {
futures.add(CompletableFuture.supplyAsync(() -> {
int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1;
int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex()));
StringBuilder hexBuilder = new StringBuilder();
for (int j = startIndex; j < endIndex; j++) {
hexBuilder.append(test[j]);
}
// 创建16进制的对象
String hex = hexBuilder.toString();
// 转橙字符数组
char[] result = new char[hex.length() / 2];
for (int x = 0; x < hex.length(); x += 2) {
// 先转十进制
int high = Character.digit(hex.charAt(x), 16);
// 转二进制
int low = Character.digit(hex.charAt(x + 1), 16);
// 转字符
result[x / 2] = (char) ((high << 4) + low);
}
return new String(result);
}));
}
for (int i = 0; i < futures.size(); i++) {
results[i] = futures.get(i).get();
}
String jsonString = JSONObject.toJSONString( results );
ProducerRecord<String, String> producerRecord = new ProducerRecord<>( "carJsons", jsonString);
kafkaProducer.send(producerRecord);
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -22,8 +22,6 @@ import java.util.List;
* mqtt
*
* @ClassName MqttTest
* @Description
* @Date 2024/9/28 23:49
*/
@Slf4j
@Component
@ -91,6 +89,7 @@ public class MqttTest {
}
String jsonString = JSONObject.toJSONString(kafKaDataList);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
kafkaProducer.send(producerRecord);
log.info("kafka投产{}", jsonString);
// HashMap<String, String> stringStringHashMap = new HashMap<>();

View File

@ -1,5 +1,6 @@
package com.muyu.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@ -13,10 +14,12 @@ import com.muyu.mapper.CarMessageMapper;
import com.muyu.mapper.CarMessagePlusMapper;
import com.muyu.service.CarMessageService;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
*
@ -62,8 +65,8 @@ public class CarMessageServiceImpl
//报文处理
// @Resource
// private RedisTemplate<String ,Objects > redisTemplate;
//
// private RedisTemplate<String , Objects> redisTemplate;
// /**
// * 报文解析
// * @param testString