重构车辆数据处理模块

- 新增 CacheUtil 类用于缓存管理
- 添加车辆上线和下线的 RabbitMQ 消息消费者- 实现基于事件的 IoTDB 数据持久化逻辑
- 重构 Kafka 消费者,支持多线程处理
- 移除多个不再使用的类和接口
dev.carData
张腾 2024-10-06 09:45:37 +08:00
parent 6c2daa0d7e
commit aeec2245c0
21 changed files with 321 additions and 358 deletions

View File

@ -1,54 +0,0 @@
package com.muyu.common.kafka.config;
import com.muyu.common.kafka.constants.KafkaConstants;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* kafka
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public KafkaConsumer kafkaConsumer() {
Map<String, Object> configs = new HashMap<>();
//kafka服务端的IP和端口,格式:(ip:port)
configs.put("bootstrap.servers", "60.204.221.52:9092");
//开启consumer的偏移量(offset)自动提交到Kafka
configs.put("enable.auto.commit", true);
//consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒
configs.put("auto.commit.interval", 5000);
//在Kafka中没有初始化偏移量或者当前偏移量不存在情况
//earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量
//latest, 在偏移量无效的情况下, 自动重置为最新的偏移量
//none, 在偏移量无效的情况下, 抛出异常.
configs.put("auto.offset.reset", "latest");
//请求阻塞的最大时间(毫秒)
configs.put("fetch.max.wait", 500);
//请求应答的最小字节数
configs.put("fetch.min.size", 1);
//心跳间隔时间(毫秒)
configs.put("heartbeat-interval", 3000);
//一次调用poll返回的最大记录条数
configs.put("max.poll.records", 500);
//指定消费组
configs.put("group.id", KafkaConstants.KafkaGrop);
//指定key使用的反序列化类
Deserializer keyDeserializer = new StringDeserializer();
//指定value使用的反序列化类
Deserializer valueDeserializer = new StringDeserializer();
//创建Kafka消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer);
return kafkaConsumer;
}
}

View File

@ -1,45 +0,0 @@
package com.muyu.common.kafka.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* kafka
*/
@Configuration
public class KafkaProviderConfig {
@Bean
public KafkaProducer kafkaProducer() {
Map<String, Object> configs = new HashMap<>();
//#kafka服务端的IP和端口,格式:(ip:port)
configs.put("bootstrap.servers", "47.116.173.119:9092");
//客户端发送服务端失败的重试次数
configs.put("retries", 2);
//多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求.
//此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位)
configs.put("batch.size", 16384);
//生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位)
configs.put("buffer-memory", 33554432);
//生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
//acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
//acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失.
//acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失.
configs.put("acks", "-1");
//指定key使用的序列化类
Serializer keySerializer = new StringSerializer();
//指定value使用的序列化类
Serializer valueSerializer = new StringSerializer();
//创建Kafka生产者
KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer);
return kafkaProducer;
}
}

View File

@ -1,9 +0,0 @@
package com.muyu.common.kafka.constants;
public class KafkaConstants {
public final static String KafkaTopic = "carJsons";
// public final static String KafkaGrop = "kafka_grop";
}

View File

@ -1,9 +1,11 @@
package com.muyu.carData; package com.muyu.carData;
import com.muyu.carData.listener.MyListener; import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients; import com.muyu.common.security.annotation.EnableMyFeignClients;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.scheduling.annotation.EnableAsync;
/** /**
* @Author * @Author
@ -14,11 +16,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
*/ */
@SpringBootApplication @SpringBootApplication
@EnableMyFeignClients @EnableMyFeignClients
@EnableCustomConfig
@EnableAsync
@EnableCaching
public class CarDataApplication { public class CarDataApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication application = new SpringApplication(CarDataApplication.class); SpringApplication.run(CarDataApplication.class,args);
application.addListeners(new MyListener());
application.run(args);
} }
} }

View File

@ -1,15 +1,22 @@
package com.muyu.carData.config.lotdbconfig; package com.muyu.carData.config.lotdbconfig;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session; import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
/** /**
@ -76,6 +83,66 @@ public class IotDBSessionConfig {
} }
/**
*
* @param deviceId
* @param time
* @param measurements
* @param values
*/
public void execute(String deviceId,long time,List<String> measurements,List<String> values){
if (CollectionUtils.isEmpty(measurements) && !CollectionUtils.isEmpty(values)){
try {
iotSession().insertAlignedRecord(deviceId,time,measurements,values);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
}
}
}
/**
*
* @param sql
* @return
*/
public List<HashMap<String,Object>> executeQuery(String sql){
logger.info("sql:{}",sql);
List<HashMap<String,Object>> list = new ArrayList<>();
try {
SessionDataSet sessionDataSet = iotSession().executeQueryStatement(sql);
int fetchSize = sessionDataSet.getFetchSize();
List<String> columnNames = sessionDataSet.getColumnNames();
logger.info("columnNames:{}",columnNames);
List<String> columnTypes = sessionDataSet.getColumnTypes();
logger.info("columnTypes:{}",columnTypes);
if (fetchSize > 0){
while (sessionDataSet.hasNext()){
HashMap<String, Object> map = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
String key = field.getStringValue();
Object value = field.getObjectValue(field.getDataType());
map.put(key,value);
}
list.add(map);
}
sessionDataSet.closeOperationHandle();
}
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
return list;
}

View File

@ -0,0 +1,34 @@
package com.muyu.carData.consumer;
import com.muyu.carData.util.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** 线
* @Author
* @Packagecom.muyu.carData.consumer
* @Projectcloud-server-8
* @nameCarOffConsumer
* @Date2024/10/4 14:30
*/
@Log4j2
@Component
public class CarOffConsumer {
@Autowired
private CacheUtil cacheUtil;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "CAR_OFFLINE",durable = "true"),
exchange = @Exchange(value = "CAR_OFF_EXCHANGE",type = "fanout")
))
public void inline(String vin){
log.info("车辆vin:{},车辆下线成功!开始消费...");
cacheUtil.remove(vin);
}
}

View File

@ -0,0 +1,33 @@
package com.muyu.carData.consumer;
import com.muyu.carData.util.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**线
* @Author
* @Packagecom.muyu.carData.consumer
* @Projectcloud-server-8
* @nameCarOnlineConsumer
* @Date2024/10/4 14:27
*/
@Log4j2
@Component
public class CarOnlineConsumer {
@Autowired
private CacheUtil cacheUtil;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "CAR_ONLINE",durable = "true"),
exchange = @Exchange(value = "ONLINE_EXCHANGE",type = "fanout")
))
public void online(String vin){
log.info("车辆vin:{},已上线,开始消费",vin);
}
}

View File

@ -1,19 +1,23 @@
package com.muyu.carData.consumer; package com.muyu.carData.consumer;
import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSONObject;
import cn.hutool.json.JSONUtil; import com.muyu.carData.event.EventPublisher;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.carData.pojo.Student;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.Collection; import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* @Author * @Author
@ -24,33 +28,43 @@ import java.util.Collection;
*/ */
@Component @Component
@Log4j2 @Log4j2
public class MyKafkaConsumer implements InitializingBean { public class MyKafkaConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
@Autowired @Autowired
private KafkaConsumer kafkaConsumer; private KafkaConsumer kafkaConsumer;
@Autowired
private EventPublisher eventPublisher;
private final String topicName = "carJsons"; private final String topicName = "carJsons";
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override @Override
public void afterPropertiesSet() throws Exception { public void run(ApplicationArguments args) throws Exception {
log.info("启动线程开始监听topic:{}",topicName); log.info("开始监听kafka-topic:{}",topicName);
Thread thread = new Thread(() -> { List<String> topics = Collections.singletonList(topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics); kafkaConsumer.subscribe(topics);
while (true){ while (true){
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { consumerRecords.forEach(record ->{
//从consumerRecord中获取消费数据 executorService.submit(() -> handleRecord(record));
String value = consumerRecord.value(); log.info("数据为:{},消费成功!",record);
log.info("从Kafka中消费的原始数据===============>>{}",value);
}
}
}); });
thread.start(); }
}
log.info("启动线程结束监听topic:{}",topicName);
private void handleRecord(ConsumerRecord<String,String> record) {
String value = record.value();
JSONObject jsonObject = JSONObject.parseObject(value);
eventPublisher.publishEvent(jsonObject);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭kafka和线程");
kafkaConsumer.close();
executorService.shutdown();
} }
} }

View File

@ -1,3 +1,4 @@
/*
package com.muyu.carData.controller; package com.muyu.carData.controller;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
@ -8,13 +9,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
*/
/** /**
* @Author * @Author
* @Packagecom.muyu.carData.controller * @Packagecom.muyu.carData.controller
* @Projectcloud-server-8 * @Projectcloud-server-8
* @nameTestController * @nameTestController
* @Date2024/9/26 23:56 * @Date2024/9/26 23:56
*/ *//*
@RestController @RestController
@RequestMapping("/testCache") @RequestMapping("/testCache")
@Log4j2 @Log4j2
@ -42,3 +45,4 @@ public class CacheController {
} }
} }
*/

View File

@ -1,3 +1,4 @@
/*
package com.muyu.carData.controller; package com.muyu.carData.controller;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -9,13 +10,15 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
*/
/** /**
* @Author * @Author
* @Packagecom.muyu.carData.testcontroller * @Packagecom.muyu.carData.testcontroller
* @Projectcloud-server-8 * @Projectcloud-server-8
* @nameKafkaProducerController * @nameKafkaProducerController
* @Date2024/9/28 15:10 * @Date2024/9/28 15:10
*/ *//*
@RestController @RestController
@RequestMapping("/produce") @RequestMapping("/produce")
@Log4j2 @Log4j2
@ -43,3 +46,4 @@ public class KafkaProducerController {
} }
*/

View File

@ -15,8 +15,12 @@ public class EsSaveEvent extends ApplicationEvent {
private JSONObject data; private JSONObject data;
public EsSaveEvent(JSONObject source) { public EsSaveEvent(Object source,JSONObject data) {
super(source); super(source);
this.data = source; this.data = data;
}
public JSONObject getData(){
return data;
} }
} }

View File

@ -0,0 +1,15 @@
package com.muyu.carData.event;
import org.springframework.context.ApplicationListener;
/**
* @Author
* @Packagecom.muyu.carData.event
* @Projectcloud-server-8
* @nameEventListener
* @Date2024/10/4 10:39
*/
public interface EventListener extends ApplicationListener<EsSaveEvent> {
void onEvent(EsSaveEvent event);
}

View File

@ -0,0 +1,28 @@
package com.muyu.carData.event;
import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.event
* @Projectcloud-server-8
* @nameEventPublisher
* @Date2024/10/4 10:40
*/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publishEvent(JSONObject message){
EsSaveEvent esSaveEvent = new EsSaveEvent(this, message);
publisher.publishEvent(esSaveEvent);
}
}

View File

@ -1,22 +0,0 @@
package com.muyu.carData.listener;
import com.muyu.carData.event.EsSaveEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameCustomEventListener
* @Date2024/9/29 23:49
*/
@Component
public class CustomEventListener {
@EventListener
public void handMyEvent(EsSaveEvent event){
//处理事件详情
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.carData.listener;
import com.alibaba.fastjson.JSONObject;
import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig;
import com.muyu.carData.event.EsSaveEvent;
import com.muyu.carData.event.EventListener;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameInsertIotDBListener
* @Date2024/10/6 9:25
*/
@Log4j2
@Component
public class InsertIotDBListener implements EventListener {
@Override
public void onEvent(EsSaveEvent event) {
JSONObject jsonObject = event.getData();
log.info("持久化:监听到数据:{}", jsonObject);
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
jsonObject.forEach((key,value) -> {
keys.add(key);
values.add((String) value);
});
IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig();
long time = System.currentTimeMillis();
iotDBSessionConfig.execute("root.vehicle",time,keys,values);
log.info("数据写入成功");
}
@Override
public void onApplicationEvent(EsSaveEvent event) {
onEvent(event);
}
}

View File

@ -1,20 +0,0 @@
package com.muyu.carData.listener;
import com.muyu.carData.event.EsSaveEvent;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.ApplicationListener;
/**
* @Author
* @Packagecom.muyu.carData.listener
* @Projectcloud-server-8
* @nameMyListener
* @Date2024/9/29 21:18
*/
@Log4j2
public class MyListener implements ApplicationListener<EsSaveEvent> {
@Override
public void onApplicationEvent(EsSaveEvent event) {
log.info("监听到自定义事件........");
}
}

View File

@ -1,41 +0,0 @@
package com.muyu.carData.pojo;
import com.muyu.carData.config.cacheconfig.ExpiryTime;
import lombok.*;
import lombok.experimental.SuperBuilder;
/**
* @Author
* @Packagecom.muyu.carData.pojo
* @Projectcloud-server-8
* @nameStudent
* @Date2024/9/27 0:40
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class Student extends ExpiryTime{
/**
*
*/
private Integer id;
/**
*
*/
private String name;
/**
*
*/
private String sex;
/**
*
*/
private long time = System.currentTimeMillis();
}

View File

@ -1,29 +0,0 @@
package com.muyu.carData.pulisher;
import com.alibaba.fastjson.JSONObject;
import com.muyu.carData.event.EsSaveEvent;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.pulisher
* @Projectcloud-server-8
* @nameCustomEventPublisher
* @Date2024/9/29 23:51
*/
@Log4j2
@Component
@AllArgsConstructor
public class CustomEventPublisher {
private ApplicationEventPublisher applicationEventPublisher;
public void publish(JSONObject data){
EsSaveEvent esSaveEvent = new EsSaveEvent(data);
applicationEventPublisher.publishEvent(esSaveEvent);
log.info("事件发布成功 - 消息是:{}",data);
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.carData.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.carData.util
* @Projectcloud-server-8
* @nameCacheUtil
* @Date2024/10/4 10:42
*/
@Component
public class CacheUtil<T> {
private final Cache<String,T> cache;
public CacheUtil() {
this.cache = Caffeine.newBuilder()
.maximumSize(500L)
.build();
}
public T get(String key) {
return (T)cache.getIfPresent(key);
}
public void put(String key, T value) {
cache.put(key, value);
}
public void remove(String key) {
cache.invalidate(key);
}
}

View File

@ -1,104 +0,0 @@
package com.muyu.server.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.muyu.domain.CarMessage;
import com.muyu.server.service.CarMessageService;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class Demo {
@Resource
private CarMessageService service;
@Resource
private KafkaProducer<String, String> kafkaProducer;
@PostConstruct
public void test() {
String topic = "vehicle";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://106.15.136.7:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
List<CarMessage> list= service.selectCarMessageList(1,2);
String str = new String( mqttMessage.getPayload() );
System.out.println(str);
String[] test = str.split(" ");
String[] results = new String[list.size()];
List<CompletableFuture<String>> futures = new ArrayList<>();
for (CarMessage carmsg : list) {
futures.add(CompletableFuture.supplyAsync(() -> {
int startIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageStartIndex())) - 1;
int endIndex = Integer.parseInt(String.valueOf(carmsg.getCarMessageEndIndex()));
StringBuilder hexBuilder = new StringBuilder();
for (int j = startIndex; j < endIndex; j++) {
hexBuilder.append(test[j]);
}
// 创建16进制的对象
String hex = hexBuilder.toString();
// 转橙字符数组
char[] result = new char[hex.length() / 2];
for (int x = 0; x < hex.length(); x += 2) {
// 先转十进制
int high = Character.digit(hex.charAt(x), 16);
// 转二进制
int low = Character.digit(hex.charAt(x + 1), 16);
// 转字符
result[x / 2] = (char) ((high << 4) + low);
}
return new String(result);
}));
}
for (int i = 0; i < futures.size(); i++) {
results[i] = futures.get(i).get();
}
String jsonString = JSONObject.toJSONString( results );
ProducerRecord<String, String> producerRecord = new ProducerRecord<>( "carJsons", jsonString);
kafkaProducer.send(producerRecord);
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

View File

@ -1,7 +1,6 @@
package com.muyu.mqtt; package com.muyu.mqtt;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.domain.CarMessage; import com.muyu.domain.CarMessage;
import com.muyu.domain.KafKaData; import com.muyu.domain.KafKaData;
@ -90,7 +89,7 @@ public class MqttTest {
.build()); .build());
} }
String jsonString = JSONObject.toJSONString(kafKaDataList); String jsonString = JSONObject.toJSONString(kafKaDataList);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("carJsons", jsonString);
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
log.info("kafka投产{}", jsonString); log.info("kafka投产{}", jsonString);
// HashMap<String, String> stringStringHashMap = new HashMap<>(); // HashMap<String, String> stringStringHashMap = new HashMap<>();