commit 492d7c0453c87ef05ed05d9cb39ea1961768c6a9 Author: Saisai Liu <1374434128@qq.com> Date: Wed Jun 5 11:24:37 2024 +0800 feat():协议接收解析模块 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..63574ec --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..c3f3b0a --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..f4cd3ff --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..96b39a5 --- /dev/null +++ b/pom.xml @@ -0,0 +1,93 @@ + + + 4.0.0 + + com.mobai + mq-active + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + org.springframework.boot + spring-boot-starter-parent + 2.7.15 + + + + + org.springframework.boot + spring-boot-dependencies + 2.7.15 + pom + import + + + + + + + org.projectlombok + lombok + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.apache.commons + commons-lang3 + + + + org.springframework.boot + spring-boot-starter-test + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.46 + + + + com.alibaba + fastjson + 1.2.83 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + + + org.apache.poi + poi-ooxml + 4.1.2 + + + + + diff --git a/src/main/java/com/mobai/MqttApplication.java b/src/main/java/com/mobai/MqttApplication.java new file mode 100644 index 0000000..5cf4718 --- /dev/null +++ b/src/main/java/com/mobai/MqttApplication.java @@ -0,0 +1,19 @@ +package com.mobai; + +import lombok.extern.java.Log; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @ClassName MqttApplication + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/31 14:33 + */ +@Log +@SpringBootApplication +public class MqttApplication { + public static void main(String[] args) { + SpringApplication.run(MqttApplication.class,args); + } +} diff --git a/src/main/java/com/mobai/api/MqttPublishSample.java b/src/main/java/com/mobai/api/MqttPublishSample.java new file mode 100644 index 0000000..3d31731 --- /dev/null +++ b/src/main/java/com/mobai/api/MqttPublishSample.java @@ -0,0 +1,44 @@ +package com.mobai.api; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class MqttPublishSample { + + public static void main(String[] args) { + + String topic = "MQTT Examples"; + String content = "Message from MqttPublishSample"; + int qos = 2; + String broker = "tcp://iot.eclipse.org:1883"; + String clientId = "JavaSample"; + MemoryPersistence persistence = new MemoryPersistence(); + + try { + MqttClient sampleClient = new MqttClient(broker, clientId, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + System.out.println("Connecting to broker: "+broker); + sampleClient.connect(connOpts); + System.out.println("Connected"); + System.out.println("Publishing message: "+content); + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + sampleClient.publish(topic, message); + System.out.println("Message published"); + sampleClient.disconnect(); + System.out.println("Disconnected"); + System.exit(0); + } catch(MqttException me) { + System.out.println("reason "+me.getReasonCode()); + System.out.println("msg "+me.getMessage()); + System.out.println("loc "+me.getLocalizedMessage()); + System.out.println("cause "+me.getCause()); + System.out.println("excep "+me); + me.printStackTrace(); + } + } +} diff --git a/src/main/java/com/mobai/api/SubscribeSample.java b/src/main/java/com/mobai/api/SubscribeSample.java new file mode 100644 index 0000000..985162b --- /dev/null +++ b/src/main/java/com/mobai/api/SubscribeSample.java @@ -0,0 +1,33 @@ +package com.mobai.api; + +import com.mobai.domian.GetOptions; +import com.mobai.domian.MqttCallBackServiceImpl; +import com.mobai.cofig.MqttFactory; +import com.mobai.cofig.MqttProperties; +import org.eclipse.paho.client.mqttv3.*; + +public class SubscribeSample { + public static void main(String[] args) { + MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0"); +// MqttProperties mqttProperties = new MqttProperties(); +// mqttProperties.setBroker("tcp://39.98.69.92:1883"); +// mqttProperties.setTopic("mqtt/test"); + mqttProperties.setUsername("emqx"); + mqttProperties.setPassword("public"); +// mqttProperties.setClientid("subscribe_client"); + + int qos = 0; + try { + MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties); + // 连接参数 + MqttConnectOptions options = GetOptions.getMqttOptions(mqttProperties); + // 设置回调 + client.setCallback(new MqttCallBackServiceImpl()); + // 进行连接 + client.connect(options); + client.subscribe(mqttProperties.getTopic(), qos); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/mobai/cofig/MqttFactory.java b/src/main/java/com/mobai/cofig/MqttFactory.java new file mode 100644 index 0000000..afe8bba --- /dev/null +++ b/src/main/java/com/mobai/cofig/MqttFactory.java @@ -0,0 +1,44 @@ +package com.mobai.cofig; + +import com.mobai.domian.GetOptions; +import com.mobai.domian.MqttCallBackServiceImpl; +import lombok.AllArgsConstructor; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +/** + * @ClassName MqttFactory + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/31 11:35 + */ + +@Service +@AllArgsConstructor +public class MqttFactory { + + private final MqttCallBackServiceImpl mqttCallBackService; + + public MqttClient buildOptions( MqttProperties mqttProperties) { + MqttClient client = null; + try { + client = new MqttClient( + mqttProperties.getBroker(), + mqttProperties.getClientid(), + new MemoryPersistence()); + MqttConnectOptions optionas = GetOptions.getMqttOptions(mqttProperties); + client.connect(optionas); + client.setCallback(mqttCallBackService); + + client.subscribe(mqttProperties.getTopic(), 0); + + } catch (MqttException e) { + throw new RuntimeException(e); + } + + return client; + } +} diff --git a/src/main/java/com/mobai/cofig/MqttProperties.java b/src/main/java/com/mobai/cofig/MqttProperties.java new file mode 100644 index 0000000..2ea9031 --- /dev/null +++ b/src/main/java/com/mobai/cofig/MqttProperties.java @@ -0,0 +1,35 @@ +package com.mobai.cofig; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.UUID; + +/** + * @ClassName MqttProperties + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/30 20:05 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties { + private String broker ; + private String topic ; + private String username; + private String password; + private String clientid; + + public static MqttProperties configBuild(String ip,String topic){ +// "tcp://39.98.69.92:1883" + return MqttProperties.builder() + .broker("tcp://"+ip+":1883") + .topic(topic) + .clientid("mobai-mq") + .build(); + } +} diff --git a/src/main/java/com/mobai/cofig/RabbitConfig.java b/src/main/java/com/mobai/cofig/RabbitConfig.java new file mode 100644 index 0000000..5d61ead --- /dev/null +++ b/src/main/java/com/mobai/cofig/RabbitConfig.java @@ -0,0 +1,43 @@ +package com.mobai.cofig; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @ClassName RabbitConfig + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/31 21:47 + */ +@Configuration +public class RabbitConfig { + + @Bean + public Queue initQueue(){ + return new Queue("create.topic",true); + } + + /** + * 创建主题队列 + * @return + */ + @Bean + public DirectExchange direct() { + return new DirectExchange("mobai-mq"); + } + + /** + * 绑定交换机 + * @param direct + * @param initQueue + * @return + */ + @Bean + public Binding binding1a(DirectExchange direct, + Queue initQueue) { + return BindingBuilder.bind(initQueue) + .to(direct) + .with("mobai-mq"); + } +} diff --git a/src/main/java/com/mobai/domian/GetOptions.java b/src/main/java/com/mobai/domian/GetOptions.java new file mode 100644 index 0000000..f92ecb1 --- /dev/null +++ b/src/main/java/com/mobai/domian/GetOptions.java @@ -0,0 +1,29 @@ +package com.mobai.domian; + +import com.mobai.cofig.MqttProperties; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +/** + * 获取Mqtt连接参数 + * @ClassName GetOptions + * @Description 获取Mqtt连接参数 + * @Author SaiSai.Liu + * @Date 2024/5/31 14:26 + */ + +public class GetOptions { + + public static MqttConnectOptions getMqttOptions(MqttProperties mqttProperties){ + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + if (!StringUtils.isAllBlank(mqttProperties.getUsername(), + mqttProperties.getPassword())){ + options.setUserName(mqttProperties.getUsername()); + options.setPassword(mqttProperties.getPassword().toCharArray()); + } + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + return options; + } +} diff --git a/src/main/java/com/mobai/domian/MqttCallBackServiceImpl.java b/src/main/java/com/mobai/domian/MqttCallBackServiceImpl.java new file mode 100644 index 0000000..680f6a4 --- /dev/null +++ b/src/main/java/com/mobai/domian/MqttCallBackServiceImpl.java @@ -0,0 +1,34 @@ +package com.mobai.domian; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +/** + * 回执消息类 + * @ClassName MqttCallBackServiceImpl + * @Description 回执消息类 + * @Author SaiSai.Liu + * @Date 2024/5/30 20:02 + */ +@Service +public class MqttCallBackServiceImpl implements MqttCallback { + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + +} diff --git a/src/main/java/com/mobai/rabbitMq/MessageHandler.java b/src/main/java/com/mobai/rabbitMq/MessageHandler.java new file mode 100644 index 0000000..669b1d4 --- /dev/null +++ b/src/main/java/com/mobai/rabbitMq/MessageHandler.java @@ -0,0 +1,38 @@ +package com.mobai.rabbitMq; + +import com.mobai.cofig.MqttFactory; +import com.mobai.cofig.MqttProperties; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 消费者:消息处理器 + * @ClassName MessageHandler + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/31 14:37 + */ +@Log4j2 +@Component +public class MessageHandler { + + @Autowired + private MqttFactory mqttFactory; + + @RabbitListener(queues = {"create.topic" }) + private void message(String msg){ + log.info("消息内容:{}",msg); + MqttProperties topic0 = MqttProperties.configBuild( + "39.98.69.92", + "topic0"); + log.info("接收到消息:{}",topic0); + MqttClient client = mqttFactory.buildOptions(topic0); + log.info("client创建:{}",client); + log.info("clientID创建:{}",client.getClientId()); + } + + +} diff --git a/src/main/java/com/mobai/util/RedisService.java b/src/main/java/com/mobai/util/RedisService.java new file mode 100644 index 0000000..05bfb61 --- /dev/null +++ b/src/main/java/com/mobai/util/RedisService.java @@ -0,0 +1,261 @@ +package com.mobai.util; + +import org.apache.poi.ss.formula.functions.T; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.core.BoundSetOperations; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ValueOperations; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * spring redis 工具类 + * + * @author muyu + **/ +@SuppressWarnings(value = {"unchecked", "rawtypes"}) +@Component +@Lazy +public class RedisService { + @Autowired + public RedisTemplate redisTemplate; + + /** + * 缓存基本的对象,Integer、String、实体类等 + * + * @param key 缓存的键值 + * @param value 缓存的值 + */ + public void setCacheObject(final String key, final T value) { + redisTemplate.opsForValue().set(key, value); + } + + /** + * 缓存基本的对象,Integer、String、实体类等 + * + * @param key 缓存的键值 + * @param value 缓存的值 + * @param timeout 时间 + * @param timeUnit 时间颗粒度 + */ + public void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit) { + redisTemplate.opsForValue().set(key, value, timeout, timeUnit); + } + + /** + * 设置有效时间 + * + * @param key Redis键 + * @param timeout 超时时间 + * @return true=设置成功;false=设置失败 + */ + public boolean expire(final String key, final long timeout) { + return expire(key, timeout, TimeUnit.SECONDS); + } + + /** + * 设置有效时间 + * + * @param key Redis键 + * @param timeout 超时时间 + * @param unit 时间单位 + * @return true=设置成功;false=设置失败 + */ + public boolean expire(final String key, final long timeout, final TimeUnit unit) { + return redisTemplate.expire(key, timeout, unit); + } + + /** + * 获取有效时间 + * + * @param key Redis键 + * @return 有效时间 + */ + public long getExpire(final String key) { + return redisTemplate.getExpire(key); + } + + /** + * 判断 key是否存在 + * + * @param key 键 + * @return true 存在 false不存在 + */ + public Boolean hasKey(String key) { + return redisTemplate.hasKey(key); + } + + /** + * 获得缓存的基本对象。 + * + * @param key 缓存键值 + * @return 缓存键值对应的数据 + */ + public T getCacheObject(final String key) { + ValueOperations operation = redisTemplate.opsForValue(); + return operation.get(key); + } + + /** + * 删除单个对象 + * + * @param key + */ + public boolean deleteObject(final String key) { + return redisTemplate.delete(key); + } + + /** + * 删除集合对象 + * + * @param collection 多个对象 + * @return + */ + public boolean deleteObject(final Collection collection) { + return redisTemplate.delete(collection) > 0; + } + + /** + * 缓存List数据 + * + * @param key 缓存的键值 + * @param dataList 待缓存的List数据 + * @return 缓存的对象 + */ + public long setCacheList(final String key, final List dataList) { + Long count = redisTemplate.opsForList().rightPushAll(key, dataList); + return count == null ? 0 : count; + } + + /** + * 获得缓存的list对象 + * + * @param key 缓存的键值 + * @return 缓存键值对应的数据 + */ + public List getCacheList(final String key) { + return redisTemplate.opsForList().range(key, 0, -1); + } + + /** + * 缓存Set + * + * @param key 缓存键值 + * @param dataSet 缓存的数据 + * @return 缓存数据的对象 + */ + public BoundSetOperations setCacheSet(final String key, final Set dataSet) { + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + Iterator it = dataSet.iterator(); + while (it.hasNext()) { + setOperation.add(it.next()); + } + return setOperation; + } + + /** + * 获得缓存的set + * + * @param key + * @return + */ + public Set getCacheSet(final String key) { + return redisTemplate.opsForSet().members(key); + } + + /** + * 缓存Map + * + * @param key + * @param dataMap + */ + public void setCacheMap(final String key, final Map dataMap) { + if (dataMap != null) { + redisTemplate.opsForHash().putAll(key, dataMap); + } + } + + /** + * 获得缓存的Map + * + * @param key + * @return + */ + public Map getCacheMap(final String key) { + return redisTemplate.opsForHash().entries(key); + } + + /** + * 往Hash中存入数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @param value 值 + */ + public void setCacheMapValue(final String key, final String hKey, final T value) { + redisTemplate.opsForHash().put(key, hKey, value); + } + + /** + * 获取Hash中的数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @return Hash中的对象 + */ + public T getCacheMapValue(final String key, final String hKey) { + HashOperations opsForHash = redisTemplate.opsForHash(); + return opsForHash.get(key, hKey); + } + + /** + * 获取多个Hash中的数据 + * + * @param key Redis键 + * @param hKeys Hash键集合 + * @return Hash对象集合 + */ + public List getMultiCacheMapValue(final String key, final Collection hKeys) { + return redisTemplate.opsForHash().multiGet(key, hKeys); + } + + /** + * 删除Hash中的某条数据 + * + * @param key Redis键 + * @param hKey Hash键 + * @return 是否成功 + */ + public boolean deleteCacheMapValue(final String key, final String hKey) { + return redisTemplate.opsForHash().delete(key, hKey) > 0; + } + + /** + * 获得缓存的基本对象列表 + * + * @param pattern 字符串前缀 + * @return 对象列表 + */ + public Collection keys(final String pattern) { + return redisTemplate.keys(pattern); + } + + /** + * 存入一个集合 + * @param key + * @param t + * @param + */ + public > void setCacheList(String key, T t) { + redisTemplate.opsForList().leftPush(String.valueOf(key),t); + } + + public long setCacheList(final String key, final T dataList) { + Long count = redisTemplate.opsForList().rightPushAll(key, dataList); + return count == null ? 0 : count; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..182a86c --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,7 @@ +server: + port: 8082 +spring: + application: + name: mobai-mq + rabbitmq: + host: 43.142.100.73