diff --git a/cloud-modules/cloud-modules-protocolparsing/pom.xml b/cloud-modules/cloud-modules-protocolparsing/pom.xml index 13b771d..8b93a35 100644 --- a/cloud-modules/cloud-modules-protocolparsing/pom.xml +++ b/cloud-modules/cloud-modules-protocolparsing/pom.xml @@ -92,6 +92,12 @@ cloud-common-kafka 3.6.3 + + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + 17 diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/MyCache.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/MyCache.java new file mode 100644 index 0000000..6d51879 --- /dev/null +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/MyCache.java @@ -0,0 +1,29 @@ +package com.muyu.cache.bean; + +import lombok.Data; + +@Data +public class MyCache { + + /** + * 缓存key --键 + */ + private String key; + + /** + * 缓存value --值 + */ + private Object value; + + /** + * 缓存过期时间 --单位秒 + */ + private Long expireTime; + + + + + + + +} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java new file mode 100644 index 0000000..da479ba --- /dev/null +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java @@ -0,0 +1,73 @@ +package com.muyu.cache.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class oneMse { + + + /** + * 测试类 + */ + private Long numther; + /** + * 测试类 + */ + private Integer numthonre; + /** + * 测试类 + */ + private Integer numtweh; + /** + * 测试类 + */ + private String numtrwh; + /** + * 测试类 + */ + private Integer numrrth; + /** + * 测试类 + */ + private Integer numereth; + /** + * 测试类 + */ + private String numth; + /** + * 测试类 + */ + private Integer numttruh; + /** + * 测试类 + */ + private Integer numtrert; + /** + * 测试类 + */ + private Integer erg; + /** + * 测试类 + */ + private Integer numtgreh; + /** + * 测试类 + */ + private Integer rtetg; + /** + * 测试类 + */ + private Integer geewr; + /** + * 测试类 + */ + private Integer heertherh; + + +} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/utill/CacheUtill.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/utill/CacheUtill.java new file mode 100644 index 0000000..ffe051e --- /dev/null +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/utill/CacheUtill.java @@ -0,0 +1,93 @@ +package com.muyu.cache.utill; + +import com.muyu.cache.bean.MyCache; + +import java.time.Duration; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + + +/** + * 自定义本地缓存工具类 + */ + + public class CacheUtill { + + /** + * 缓存数据Map + */ + private static final Map CACHE_MAP = new ConcurrentHashMap<>(); + + /** + * 定时器线程池 用于清理过期缓存 + */ + private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + + static { + // 注册一个定时小鹌鹑线程任务,当服务启动了1秒之后,每隔500毫秒执行一次定时任务 + //定时清理过期缓存 + executorService.scheduleAtFixedRate(CacheUtill::clearCache,1000,500,TimeUnit.MILLISECONDS); + + } + + /** + * 添加缓存 + * @param key 缓存建 + * @param value + * @param expire + */ + public static void put(String key ,Object value,long expire){ + MyCache myCache = new MyCache(); + myCache.setKey(key); + myCache.setValue(value); + if (expire > 0 ){ + long expireTime = System.currentTimeMillis() + Duration.ofSeconds(expire).toMillis(); + myCache.setExpireTime(expireTime); + } + CACHE_MAP.put(key ,myCache); + } + + + /** + * 获取缓存 + * @param key 缓存键 + * @return 缓存的数据 + */ + public static Object get(String key){ + if(CACHE_MAP.containsKey(key)){ + return CACHE_MAP.get(key).getValue(); + } + return null; + } + + /** + * 移除缓存 + * @param key + */ + public static void remove(String key){ + CACHE_MAP.remove(key); + } + + + /** + * 清理过期的缓存数据 + */ + private static void clearCache(){ + if(CACHE_MAP.size() <= 0){ + return; + } + + //判断是否过期,过期的话从缓存MAP中删除这个元素 + CACHE_MAP.entrySet().removeIf(entry -> entry.getValue().getExpireTime() != null && entry.getValue().getExpireTime() > System.currentTimeMillis()); + + } + + + +} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/domain/CarMessageType.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/domain/CarMessageType.java index 7bc7b48..2798ec2 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/domain/CarMessageType.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/domain/CarMessageType.java @@ -9,11 +9,6 @@ import lombok.NoArgsConstructor; /** * 车辆报文所属类型 - * @Author:蓬叁 - * @Package:com.muyu.warn.domain.car - * @Project:cloud-server-8 - * @name:CarMessage - * @Date:2024/9/22 下午3:07 */ @Data @Builder diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java deleted file mode 100644 index fa5f209..0000000 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/Demo.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.muyu.server.mqtt; - -import com.alibaba.fastjson.JSONObject; - -import com.muyu.domain.CarMessage; -import com.muyu.server.service.CarMessageService; -import jakarta.annotation.PostConstruct; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - - -@Component -public class Demo { - @Resource - private CarMessageService service; - @Resource - private KafkaProducer kafkaProducer; - @PostConstruct - public void test() { - - String topic = "vehicle"; - String content = "Message from MqttPublishSample"; - int qos = 2; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; - - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: "+broker); - sampleClient.connect(connOpts); - sampleClient.subscribe(topic,0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - - } - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - - List list= service.selectCarMessageList(1,2); - String str = new String( mqttMessage.getPayload() ); - System.out.println(str); - String[] test = str.split(" "); - String[] results = new String[list.size()]; - List> futures = new ArrayList<>(); - for (CarMessage carmsg : list) { - futures.add(CompletableFuture.supplyAsync(() -> { - int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1; - int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex())); - StringBuilder hexBuilder = new StringBuilder(); - for (int j = startIndex; j < endIndex; j++) { - hexBuilder.append(test[j]); - } - // 创建16进制的对象 - String hex = hexBuilder.toString(); - // 转橙字符数组 - char[] result = new char[hex.length() / 2]; - for (int x = 0; x < hex.length(); x += 2) { - // 先转十进制 - int high = Character.digit(hex.charAt(x), 16); - // 转二进制 - int low = Character.digit(hex.charAt(x + 1), 16); - // 转字符 - result[x / 2] = (char) ((high << 4) + low); - } - return new String(result); - })); - } - for (int i = 0; i < futures.size(); i++) { - results[i] = futures.get(i).get(); - } - String jsonString = JSONObject.toJSONString( results ); - ProducerRecord producerRecord = new ProducerRecord<>( "carJsons", jsonString); - kafkaProducer.send(producerRecord); - } - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - }); - } catch(MqttException me) { - System.out.println("reason "+me.getReasonCode()); - System.out.println("msg "+me.getMessage()); - System.out.println("loc "+me.getLocalizedMessage()); - System.out.println("cause "+me.getCause()); - System.out.println("excep "+me); - me.printStackTrace(); - } - } - -} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java index 0f0f5de..01ab058 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java @@ -22,8 +22,6 @@ import java.util.List; * mqtt * * @ClassName MqttTest - * @Description - * @Date 2024/9/28 23:49 */ @Slf4j @Component @@ -91,6 +89,7 @@ public class MqttTest { } String jsonString = JSONObject.toJSONString(kafKaDataList); ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + kafkaProducer.send(producerRecord); log.info("kafka投产:{}", jsonString); // HashMap stringStringHashMap = new HashMap<>(); diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/service/impl/CarMessageServiceImpl.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/service/impl/CarMessageServiceImpl.java index a854c67..56e8505 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/service/impl/CarMessageServiceImpl.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/service/impl/CarMessageServiceImpl.java @@ -1,5 +1,6 @@ package com.muyu.service.impl; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -13,10 +14,12 @@ import com.muyu.mapper.CarMessageMapper; import com.muyu.mapper.CarMessagePlusMapper; import com.muyu.service.CarMessageService; import lombok.extern.log4j.Log4j2; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; +import java.util.Objects; /** * 报文模板展示列表业务实现层 @@ -62,8 +65,8 @@ public class CarMessageServiceImpl //报文处理 // @Resource -// private RedisTemplate redisTemplate; -// +// private RedisTemplate redisTemplate; + // /** // * 报文解析 // * @param testString