Mqtt的发送和监听

dev.template
Number7 2024-09-27 22:43:38 +08:00
parent 61e8d5903a
commit ef2f41ed87
14 changed files with 588 additions and 219 deletions

View File

@ -19,6 +19,14 @@
<dependencies> <dependencies>
<!--mqtt依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.5</version>
</dependency>
<!-- SpringCloud Alibaba Nacos --> <!-- SpringCloud Alibaba Nacos -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>

View File

@ -0,0 +1,30 @@
package com.template.controller;
import com.template.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @Authorliuxinyue
* @Packagecom.template.controller
* @Projectcloud-server
* @nameMqttController
* @Date2024/9/26 15:44
*/
@Log4j2
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttService mqttService;
@PostMapping("/connectionMqtt")
public void connectionMqtt(@RequestParam("message") String message) throws MqttException {
mqttService.connectionMqtt(message);
}
}

View File

@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RestController;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
/** /**
* @Authorliuxinyue * @Authorliuxinyue
@ -45,7 +46,7 @@ public class TemplateController {
* @return * @return
*/ */
@PostMapping("/messageParsing") @PostMapping("/messageParsing")
public Result messageParsing(@RequestParam("templateMessage") String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException { public Result messageParsing(@RequestParam("templateMessage") String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException, ExecutionException, InterruptedException {
templateService.messageParsing(templateMessage); templateService.messageParsing(templateMessage);
return Result.success(); return Result.success();
} }

View File

@ -0,0 +1,16 @@
package com.template.service;
import org.eclipse.paho.client.mqttv3.MqttException;
/**
* @Authorliuxinyue
* @Packagecom.template.service
* @Projectcloud-server
* @nameMqttService
* @Date2024/9/26 15:57
*/
public interface MqttService {
void connectionMqtt(String message) throws MqttException;
}

View File

@ -7,6 +7,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
/** /**
* @Authorliuxinyue * @Authorliuxinyue
@ -19,7 +20,7 @@ public interface TemplateService {
List<Template> templateList(); List<Template> templateList();
void messageParsing(String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException; void messageParsing(String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException, ExecutionException, InterruptedException;
List<MessageTemplateType> findTemplateById(Integer templateId); List<MessageTemplateType> findTemplateById(Integer templateId);

View File

@ -0,0 +1,50 @@
package com.template.service.impl;
import com.template.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Authorliuxinyue
* @Packagecom.template.service.impl
* @Projectcloud-server
* @nameMqttServiceImpl
* @Date2024/9/26 15:58
*/
@Log4j2
@Service
public class MqttServiceImpl implements MqttService{
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://47.101.53.251:1883";
String clientId = "javaLxy";
@Override
public void connectionMqtt(String message) throws MqttException {
MqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions connectOptions = new MqttConnectOptions();
//清理缓存
connectOptions.setCleanSession(true);
//连接
mqttClient.connect(connectOptions);
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttClient.publish(topic, mqttMessage);
log.info("发送成功");
}
}

View File

@ -1,25 +1,20 @@
package com.template.service.impl; package com.template.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.template.domain.*; import com.template.domain.*;
import com.template.mapper.TemplateMapper; import com.template.mapper.TemplateMapper;
import com.template.service.CarService; import com.template.service.CarService;
import com.template.service.MessageTemplateTypeService; import com.template.service.MessageTemplateTypeService;
import com.template.service.TemplateService; import com.template.service.TemplateService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/** /**
* @Authorliuxinyue * @Authorliuxinyue
* @Packagecom.template.service.impl * @Packagecom.template.service.impl
@ -32,7 +27,7 @@ import java.util.List;
public class TemplateServiceImpl implements TemplateService{ public class TemplateServiceImpl implements TemplateService{
@Autowired @Autowired
private TemplateMapper templateMapper; private static TemplateMapper templateMapper;
@Autowired @Autowired
private CarService carService; private CarService carService;
@ -49,52 +44,45 @@ public class TemplateServiceImpl implements TemplateService{
} }
@Override @Override
public void messageParsing(String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException { public void messageParsing(String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException, ExecutionException, InterruptedException {
//给一个JSON对象
JSONObject jsonObject = new JSONObject();
//先截取出VIN码 然后根据VIN码查询这个车属于什么类型
if(templateMessage.length()<18){
throw new RuntimeException("The vehicle message is incorrect");
}
//将报文进行切割
String[] hexArray = templateMessage.split(" ");
StringBuilder result = new StringBuilder();
for (String hex : hexArray) {
int decimal = Integer.parseInt(hex, 16);
result.append((char) decimal);
}
//取出VIN码
String carVin = result.substring(0, 18-1);
log.info("carVin码为:"+carVin);
//根据VIN码获取车辆信息
SysCar carByVin = carService.findCarByVin(carVin);
if(carByVin==null){
throw new RuntimeException("Check this car!!");
}else{
//根据车辆类型ID获取车辆类型名称
CarType carTypeById = carService.findCarTypeById(carByVin.getCarTypeId());
//查询报文模版
Template templateDate=templateMapper.findTemplateByName(carTypeById.getTypeName());
log.info("取出Redis中对象报文模版信息");
List range = redisTemplate.opsForList().range("VehicleType", 0, -1);
range.forEach(o -> {
}); List<MessageTemplateType> templateList = templateMapper.findTemplateById(1);
//根据报文模版的ID查询对应的模版 String[] split = templateMessage.split(" ");
List<MessageTemplateType> messageByTemplateName = messageTemplateTypeService.findMessageByTemplateName(templateDate.getTemplateId()); String[] strings = new String[split.length];
//将模版里面有的配置进行循环
for (MessageTemplateType messageTemplateType : messageByTemplateName) { List<CompletableFuture<String>> futures = new ArrayList<>();
//开始位置 for (MessageTemplateType templateType : templateList) {
Integer startIndex = messageTemplateType.getStartIndex();
//结束位置 futures.add(CompletableFuture.supplyAsync(() -> {
Integer endIndex = messageTemplateType.getEndIndex(); int startIndex = Integer.parseInt(String.valueOf(templateType.getStartIndex())) - 1;
//将每个解析后的字段都存入到JSON对象中 int endIndex = Integer.parseInt(String.valueOf(templateType.getEndIndex()));
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex-1)); StringBuilder hexBuilder = new StringBuilder();
for (int j = startIndex; j < endIndex; j++) {
hexBuilder.append(split[j]);
}
// 创建16进制的对象
String hex = hexBuilder.toString();
// 转橙字符数组
char[] result = new char[hex.length() / 2];
for (int x = 0; x < hex.length(); x += 2) {
// 先转十进制
int high = Character.digit(hex.charAt(x), 16);
// 转二进制
int low = Character.digit(hex.charAt(x + 1), 16);
// 转字符
result[x / 2] = (char) ((high << 4) + low);
}
return new String(result);
}));
} }
log.info("解析后的报文是:"+jsonObject); for (int i = 0; i < futures.size(); i++) {
strings[i] = futures.get(i).get();
} }
System.out.println("哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈");
log.info("结果是:"+strings);
} }
@Override @Override
@ -103,58 +91,4 @@ public class TemplateServiceImpl implements TemplateService{
} }
public void insertIoTDB(JSONObject jsonObject) throws SQLException, ClassNotFoundException, IoTDBConnectionException, StatementExecutionException {
System.out.println("Hello IoTDB Java Example");
//初始化与连接
Session session = new Session.Builder()
.host("47.116.173.119")
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_12)
.build();
//开启Session RPC不压缩
session.open(false);
session.open(false);
log.info("写入数据");
//写入数据
List<Object> values = new ArrayList<>();
values.add(jsonObject);
insertRecord(session, (List<Object>) jsonObject);
//添加sql语句
String sql="INSERT INTO template (vinCode, ' or timeStamp' or longItude, latitude, speedVehicle, totalMileage, totalVoltage) VALUES(0, 0, 0, 0, 0, 0, 0)";
}
private static void insertRecord(Session session,List<Object> values) throws SQLException, ClassNotFoundException, IoTDBConnectionException, StatementExecutionException {
List<String> strings = new ArrayList<>();
List<TSDataType> objects = new ArrayList<>();
strings.add("status");
objects.add(TSDataType.INT32);
session.insertRecord("root.test.test",System.currentTimeMillis(),strings,objects,values);
System.out.println("----------------写入数据成功----------------");
}
private static void queryRecord(Session session) throws IoTDBConnectionException, StatementExecutionException {
System.out.println("----------------查询数据开始----------------");
try(SessionDataSet sessionDataSet = session.executeLastDataQuery(Collections.singletonList("select status from root.test.test"))){
System.out.println(sessionDataSet.getColumnNames());
sessionDataSet.setFetchSize(1024);
while(sessionDataSet.hasNext()){
System.out.println(sessionDataSet.hasNext());
}
}
System.out.println("----------------查询数据结束----------------");
}
} }

View File

@ -0,0 +1,51 @@
package com.template.util;
import com.muyu.common.core.domain.Result;
import com.muyu.common.security.annotation.RequiresPermissions;
import com.template.domain.MessageTemplateType;
import com.template.service.TemplateService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameCallback
* @Date2024/9/26 15:23
*/
@Log4j2
public class Callback implements MqttCallback{
@Resource
private static TemplateService templateService;
@Override
public void connectionLost(Throwable throwable) {
log.error(throwable.getMessage(),throwable);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
log.info("收到了来自:"+s+"的消息:{}",new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
}

View File

@ -0,0 +1,179 @@
package com.template.util;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttReceivedMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameMqttClient
* @Date2024/9/26 16:05
*/
@Log4j2
public class MyMqttClient {
public static MqttClient mqttClient=null;
private static MemoryPersistence memoryPersistence=null;
private static MqttConnectOptions mqttConnectOptions=null;
private static String ClientName="";
private static String IP="";
public static void main(String[] args) {
start(ClientName);
}
public static void start(String clientId){
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//设置是否清空session
mqttConnectOptions.setCleanSession(true);
//设置连接超时时间
mqttConnectOptions.setConnectionTimeout(10);
//设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null!=clientId){
try {
mqttClient = new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
} catch (MqttException e) {
log.error(e);
}
}
log.info("连接状态:"+mqttClient.isConnected());
//设置连接和回调
if(null!=mqttClient){
if(!mqttClient.isConnected()){
//创建回调函数对象
Callback mqttReceiveCallBack = new Callback();
//客户端添加回调函数
mqttClient.setCallback(mqttReceiveCallBack);
try {
log.info("连接");
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
e.printStackTrace();
}
}
}else{
log.info("mqttClient为空");
}
log.info("连接状态:"+mqttClient.isConnected());
}
//关闭连接
public void closeConnect(){
//关闭存储方式
if(null!=memoryPersistence){
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
e.printStackTrace();
}
}else{
log.info("memoryPersistence为空");
}
//关闭连接
if(null!=mqttClient){
if(mqttClient.isConnected()){
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
e.printStackTrace();
}
}else{
log.info("mqttClient未连接");
}
}else{
log.info("mqttClient为空");
}
}
//发布消息
public static void publish(String topic, String message,int ops){
if(null!=mqttClient && mqttClient.isConnected()){
log.info("发布消息:"+mqttClient.isConnected());
log.info("id:"+mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(ops);
mqttMessage.setPayload(message.getBytes());
MqttTopic topic1 = mqttClient.getTopic(topic);
if(null!=topic1){
try {
MqttDeliveryToken publish = topic1.publish(mqttMessage);
if(!publish.isComplete()){
log.info("消息发布成功");
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}else{
reConnect();
}
}
// 重新连接
public static void reConnect() {
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
if(null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
e.printStackTrace();
}
}else {
log.info("mqttConnectOptions是空的");
}
}else {
log.info("mqttClient没有连接");
}
}else {
start(ClientName);
}
}
//订阅主题
public static void subTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try {
mqttClient.subscribe(topic,1);
} catch (MqttException e) {
e.printStackTrace();
}
}else{
log.info("mqttClient是错误的");
}
}
//清空主题
public void cleanTopic(String topic){
if(null!=mqttClient && mqttClient.isConnected()){
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}else{
log.info("mqttClient是错误的");
}
}
}

View File

@ -0,0 +1,52 @@
package com.template.util;
import org.eclipse.paho.client.mqttv3.*;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameReceive
* @Date2024/9/27 19:14
*/
public class Receive {
public static void main(String[] args) {
String topic = "vehicle";
String broker = "tcp://47.101.53.251:1883";
String clientId = "lxy";
try (MqttClient sampleClient = new MqttClient(broker, clientId)) {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this example
}
});
sampleClient.subscribe(topic);
System.out.println("Subscribed to topic \"" + topic + "\"");
} catch (MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -1,53 +0,0 @@
package com.template.util;
import lombok.extern.log4j.Log4j2;
import java.nio.charset.StandardCharsets;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameStringCutterUtils
* @Date2024/9/25 21:18
*/
@Log4j2
public class StringCutterUtils {
public static String hexadecimalCharacter(String s){
StringBuilder stringBuilder = new StringBuilder();
int len = s.length();
for (int i = 0; i < len; i++) {
char c = s.charAt(i);
String hexString = Integer.toHexString(c);
stringBuilder.append(hexString+" ");
}
return stringBuilder.toString();
}
public static void main(String[] args) {
String s = hexadecimalCharacter("3C3F786D6C2076657273696F6E3D22312E30223F3E0D0A3C6D6F6E69746F72526F6F7420747970653D22706172616D223E3C73796E6368726F6E697A65537970746F6D206576656E743D22302220696E697469616C3D2274727565223E3C416374696F6E5F4543473E3C52687974686D3E53696E75733C2F52687974686D3E3C48523E38303C2F48523E3C454D443E4E6F204368616E67653C2F454D443E3C436F6E647563743E303C2F436F6E647563743E3C2F416374696F6E5F4543473E3C416374696F6E5F4F7361742076616C75653D2239342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F425020697352656C617469766550657263656E743D2266616C7365223E3C536872696E6B2076616C75653D22313230222F3E3C537472657463682076616C75653D223830222F3E3C2F416374696F6E5F42503E3C416374696F6E5F5265737020627265617468547970653D224E6F726D616C222076616C75653D2231342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F6574434F322076616C75653D2233342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F54656D70657261747572652076616C75653D2233352E32222F3E3C416374696F6E5F4356502076616C75653D22362E30222F3E3C416374696F6E5F5041504469612076616C75653D223130222F3E3C416374696F6E5F5041505379732076616C75653D223235222F3E3C416374696F6E5F57502076616C75653D2239222F3E3C2F73796E6368726F6E697A65537970746F6D3E3C2F6D6F6E69746F72526F6F743E0D0A");
String string = toString(s);
log.info(string);
}
public static String toString(String s){
if(s==null || s.equals("")){
return null;
}
log.info("将字符串中的空格去除");
s = s.replace(" ", "");
byte[] bytes = new byte[s.length() / 2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) (0xff & Integer.parseInt(s.substring(i*2,i*2+2), 16));
}
s=new String(bytes, StandardCharsets.UTF_8);
return s;
}
}

View File

@ -1,60 +1,61 @@
package com.template.util; //package com.template.util;
import com.template.domain.MessageTemplateType; //import com.template.domain.MessageTemplateType;
import com.template.domain.SysCar; //import com.template.domain.SysCar;
import com.template.domain.Template; //import com.template.domain.Template;
import com.template.domain.resp.CarTypeResp; //import com.template.domain.resp.CarTypeResp;
import com.template.service.CarService; //import com.template.service.CarService;
import com.template.service.TemplateService; //import com.template.service.TemplateService;
import lombok.extern.log4j.Log4j2; //import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations; //import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; //import javax.annotation.PostConstruct;
import javax.annotation.Resource; //import javax.annotation.Resource;
import java.util.List; //import java.util.List;
/** ///**
* @Authorliuxinyue // * @Authorliuxinyue
* @Packagecom.template.util // * @Packagecom.template.util
* @Projectcloud-server // * @Projectcloud-server
* @nameSynchronizingTemplate // * @nameSynchronizingTemplate
* @Date2024/9/25 20:03 // * @Date2024/9/25 20:03 项目一启动 查询数据库 将报文模版存入到Redis
*/ // */
@Component //@Component
@Log4j2 //@Log4j2
public class SynchronizingTemplate { //public class SynchronizingTemplate {
//
// //调用报文模版列表接口
//调用报文模版列表接口 // @Resource
@Resource // private TemplateService templateService;
private TemplateService templateService; //
// //redis
//redis // @Resource
@Resource // private RedisTemplate redisTemplate;
private RedisTemplate redisTemplate; //
// @Autowired
@Autowired // private CarService carService;
private CarService carService; //
// @PostConstruct
// public void synchronizeTemplate() {
@PostConstruct // //获取所有报文模版的ID
public void synchronizeTemplate() { // log.info("获取所有报文模版的ID");
//获取所有报文模版的ID // List<Template> templates = templateService.templateList();
log.info("获取所有报文模版的ID"); // templates.forEach(template -> {
List<Template> templates = templateService.templateList(); // Integer templateId = template.getTemplateId(); //报文模版ID
templates.forEach(template -> { // List<MessageTemplateType> list=templateService.findTemplateById(templateId); //根据报文模版ID查询所有的报文模版
Integer templateId = template.getTemplateId(); //报文模版ID // ListOperations<String,Object> listOperations = redisTemplate.opsForList(); //将报文信息存储到redis中
List<MessageTemplateType> list=templateService.findTemplateById(templateId); //根据报文模版ID查询所有的报文模版 // redisTemplate.delete(template.getTemplateName());//因为每一次添加缓存的时候不会覆盖之前的数据 所有将数据先清空
ListOperations<String,Object> listOperations = redisTemplate.opsForList(); //将报文信息存储到redis中 // List<CarTypeResp> allCars = carService.findAllCars();//查询所有车辆 里面有模版名称
redisTemplate.delete(template.getTemplateName());//因为每一次添加缓存的时候不会覆盖之前的数据 所有将数据先清空 // redisTemplate.opsForList().leftPushAll("VehicleType", allCars);//将车辆类型放入列表
List<CarTypeResp> allCars = carService.findAllCars();//查询所有车辆 里面有模版名称 // listOperations.leftPushAll(template.getTemplateName(), list); //将报文信息存储到redis中
redisTemplate.opsForList().leftPushAll("VehicleType", allCars);//将车辆类型放入列表 // List range = redisTemplate.opsForList().range("VehicleType", 0, -1);
listOperations.leftPushAll(template.getTemplateName(), list); //将报文信息存储到redis中 // range.forEach(o -> {
List range = redisTemplate.opsForList().range("VehicleType", 0, -1); // log.info("数据为:"+o);
range.forEach(o -> { // });
log.info("数据为:"+o); //
}); // listOperations.leftPushAll("CarType", allCars);
// listOperations.leftPushAll(template.getTemplateName(), range);
}); //
} // });
} // }
//}

View File

@ -0,0 +1,59 @@
package com.template.util;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameTest
* @Date2024/9/27 11:55
*/
@Log4j2
public class Test {
public static void main(String[] args) {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://47.101.53.251:1883";
String clientId = "Lxy";
try {
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
//是否清空session
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
//连接
sampleClient.connect(connOpts);
sampleClient.subscribe("vehicle",qos);
sampleClient.setCallback(new MqttCallback() {
//连接丢失(报错)
@Override
public void connectionLost(Throwable throwable) {
log.error("error:"+throwable.getMessage());
}
//消息已经接收到
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
}
//交付完成
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -0,0 +1,40 @@
package com.template.util;
import com.template.service.TemplateService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
/**
* @Authorliuxinyue
* @Packagecom.template.util
* @Projectcloud-server
* @nameTest1
* @Date2024/9/27 22:39
*/
@Log4j2
public class Test1 implements MqttCallback{
@Resource
private static TemplateService templateService;
@Override
public void connectionLost(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
log.info("收到了来自:"+mqttMessage.toString()+"的消息:{}",new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
}