fix kafka的消费者

master
rouchen 2024-06-17 22:30:52 +08:00
parent 2e5af8e220
commit 1b324da647
10 changed files with 43166 additions and 283 deletions

File diff suppressed because it is too large Load Diff

View File

@ -42,8 +42,11 @@
<groupId>org.freemarker</groupId> <groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId> <artifactId>freemarker</artifactId>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.iotdb</groupId> <groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId> <artifactId>iotdb-session</artifactId>

View File

@ -5,6 +5,7 @@ import com.muyu.iotDB.config.IotDBSessionConfig;
import com.muyu.iotDB.data.IotDbParam; import com.muyu.iotDB.data.IotDbParam;
import com.muyu.iotDB.data.ResponseData; import com.muyu.iotDB.data.ResponseData;
import com.muyu.iotDB.service.IotDbServer; import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
@ -38,6 +39,21 @@ public class IotDbController {
return ResponseData.success(); return ResponseData.success();
} }
@PostMapping("/api/device/messageData")
public ResponseData messageDataAdd(@RequestBody MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException {
iotDbServer.add(messageData);
return ResponseData.success();
}
/**
*
*
*/
@PostMapping("/api/device/selectQueryData")
public ResponseData selectQueryData(@RequestParam String vin) throws Exception {
return ResponseData.success(iotDbServer.selectQueryData(vin));
}
/** /**
* *
* @param iotDbParam * @param iotDbParam

View File

@ -1,6 +1,7 @@
package com.muyu.iotDB.service; package com.muyu.iotDB.service;
import com.muyu.iotDB.data.IotDbParam; import com.muyu.iotDB.data.IotDbParam;
import com.muyu.mqtt.dao.MessageData;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
@ -17,4 +18,7 @@ public interface IotDbServer {
Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception; Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
void add(MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException;
Object selectQueryData(String vin) throws IoTDBConnectionException, StatementExecutionException;
} }

View File

@ -5,6 +5,7 @@ import com.muyu.iotDB.data.IotDbParam;
import com.muyu.iotDB.data.IotDbResult; import com.muyu.iotDB.data.IotDbResult;
import com.muyu.iotDB.service.IotDbServer; import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
@ -70,6 +71,140 @@ public class IotDbServerImpl implements IotDbServer {
} }
return iotDbResultList; return iotDbResultList;
} }
@Override
public void add(MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.bizkey."+ messageData.getVin();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("vin");
measurementsList.add("getTimestamp");
measurementsList.add("longitude");
measurementsList.add("latitude");
measurementsList.add("speed");
measurementsList.add("mileage");
measurementsList.add("dischargeVoltage");
measurementsList.add("dischargeCurrent");
measurementsList.add("insulationResistance");
measurementsList.add("gear");
measurementsList.add("accelerationPedal");
measurementsList.add("brakePedal");
measurementsList.add("fuelConsumption");
measurementsList.add("motorControllerTemperature");
measurementsList.add("motorSpeed");
measurementsList.add("motorTorque");
measurementsList.add("motorTemperature");
measurementsList.add("motorVoltage");
measurementsList.add("motorCurrent");
measurementsList.add("powerBattery");
measurementsList.add("maxFeedbackPower");
measurementsList.add("maxDischargePower");
measurementsList.add("bmsSelfCheck");
measurementsList.add("powerBatteryCurrent");
measurementsList.add("powerBatteryV3");
measurementsList.add("maxVoltage");
measurementsList.add("minVoltage");
measurementsList.add("maxTemperature");
measurementsList.add("minTemperature");
measurementsList.add("availableCapacity");
measurementsList.add("vehicleStatus");
measurementsList.add("chargeStatus");
measurementsList.add("runStatus");
measurementsList.add("soc");
measurementsList.add("chargeWorkStatus");
measurementsList.add("driveMotorStatus");
measurementsList.add("location");
measurementsList.add("eas");
measurementsList.add("ptc");
measurementsList.add("eps");
measurementsList.add("abs");
measurementsList.add("mcu");
measurementsList.add("powerBatteryHeating");
measurementsList.add("powerBatteryCurrentStatus");
measurementsList.add("powerBatteryHeat");
measurementsList.add("dcdc");
measurementsList.add("chg");
List<String> valuesList = new ArrayList<>();
valuesList.add(messageData.getVin());
valuesList.add(messageData.getTimestamp());
valuesList.add(messageData.getLongitude());
valuesList.add(messageData.getLatitude());
valuesList.add(messageData.getSpeed());
valuesList.add(messageData.getMileage());
valuesList.add(messageData.getDischargeVoltage());
valuesList.add(messageData.getDischargeCurrent());
valuesList.add(messageData.getInsulationResistance());
valuesList.add(messageData.getGear());
valuesList.add(messageData.getAccelerationPedal());
valuesList.add(messageData.getBrakePedal());
valuesList.add(messageData.getFuelConsumption());
valuesList.add(messageData.getMotorControllerTemperature());
valuesList.add(messageData.getMotorSpeed());
valuesList.add(messageData.getMotorTorque());
valuesList.add(messageData.getMotorTemperature());
valuesList.add(messageData.getMotorVoltage());
valuesList.add(messageData.getMotorCurrent());
valuesList.add(messageData.getPowerBattery());
valuesList.add(messageData.getMaxFeedbackPower());
valuesList.add(messageData.getMaxDischargePower());
valuesList.add(messageData.getBmsSelfCheck());
valuesList.add(messageData.getPowerBatteryCurrent());
valuesList.add(messageData.getPowerBatteryV3());
valuesList.add(messageData.getMaxVoltage());
valuesList.add(messageData.getMinVoltage());
valuesList.add(messageData.getMaxTemperature());
valuesList.add(messageData.getMinTemperature());
valuesList.add(messageData.getAvailableCapacity());
valuesList.add(messageData.getVehicleStatus());
valuesList.add(messageData.getChargeStatus());
valuesList.add(messageData.getRunStatus());
valuesList.add(messageData.getSoc());
valuesList.add(messageData.getChargeWorkStatus());
valuesList.add(messageData.getDriveMotorStatus());
valuesList.add(messageData.getLocation());
valuesList.add(messageData.getEas());
valuesList.add(messageData.getPtc());
valuesList.add(messageData.getEps());
valuesList.add(messageData.getAbs());
valuesList.add(messageData.getMcu());
valuesList.add(messageData.getPowerBatteryHeating());
valuesList.add(messageData.getPowerBatteryCurrentStatus());
valuesList.add(messageData.getPowerBatteryHeat());
valuesList.add(messageData.getDcdc());
valuesList.add(messageData.getChg());
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(messageData.getTimestamp()), measurementsList, valuesList);
}
/**
* vin
* @param vin
* @return
*/
@Override
public List<MessageData> selectQueryData(String vin) throws IoTDBConnectionException, StatementExecutionException {
// List<IotDbResult> iotDbResultList = new ArrayList<>();
//
// if (null !=vin) {
// String sql = "select * from root.bizkey."+ vin ;
// SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
// List<String> columnNames = sessionDataSet.getColumnNames();
// List<String> titleList = new ArrayList<>();
// // 排除Time字段 -- 方便后面后面拼装数据
// for (int i = 1; i < columnNames.size(); i++) {
// String[] temp = columnNames.get(i).split("\\.");
// titleList.add(temp[temp.length - 1]);
// }
// // 封装处理数据
// packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
// } else {
// log.info("PK或者SN不能为空");
// }
// return iotDbResultList;
// } }
return null;
}
/** /**
* *
* @param iotDbParam * @param iotDbParam

View File

@ -1,20 +1,21 @@
package com.muyu.kafka; package com.muyu.kafka;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.muyu.iotDB.service.IotDbServer; import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData; import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.rmi.ServerException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -28,41 +29,52 @@ import java.util.Properties;
* Date 2024/6/16 22:18 * Date 2024/6/16 22:18
*/ */
@Component @Component
@Log4j2
public class SimpleKafkaConsumer { public class SimpleKafkaConsumer {
@Resource @Resource
private IotDbServer iotDbServer; private IotDbServer iotDbServer;
@Scheduled(cron = "0 0/5 * * * ?") // @KafkaListener(topics = "test1", groupId = "Partitions")
public void consumer() { // public void consumer() throws ServerException, IoTDBConnectionException, StatementExecutionException {
List<MessageData> messageDataList = new ArrayList<>(); // log.info("定时器开启");
// 配置Kafka消费者属性 //// List<MessageData> messageDataList = new ArrayList<>();
Properties props = new Properties(); //// // 配置Kafka消费者属性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1"); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "Partitions");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//
//// // 创建Kafka消费者实例
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// // 持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// String value = record.value();
// log.info("value:{}",value);
// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
// log.info("messageData1:{}",messageData1);
// iotDbServer.add(messageData1);
// }
//
// }
// }
// 创建Kafka消费者实例 @KafkaListener(topics = "test1", groupId = "Partitions")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); public void consumer(ConsumerRecord<String, String> record) throws ServerException, IoTDBConnectionException, StatementExecutionException {
log.info("Received message");
// 订阅主题
consumer.subscribe(Collections.singletonList("test1"));
// 持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
String value = record.value(); String value = record.value();
MessageData messageData = JSONObject.parseObject(value, MessageData.class); log.info("value:{}", value);
messageDataList.add(messageData); MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
} log.info("messageData1:{}", messageData1);
} iotDbServer.add(messageData1);
} }
} }

View File

@ -2,6 +2,7 @@ package com.muyu.mqtt;
import com.alibaba.fastjson.JSON;
import com.muyu.mqtt.dao.MessageData; import com.muyu.mqtt.dao.MessageData;
import com.muyu.utils.ConversionUtil; import com.muyu.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -53,7 +54,7 @@ public class MessageCallbackService implements MqttCallback {
// } catch (Exception e) { // } catch (Exception e) {
// e.printStackTrace(); // e.printStackTrace();
// } // }
ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(topic, main.getVin(), main.toString()); ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(topic, main.getVin(), JSON.toJSONString(main));
kafkaTemplate.send(stringObjectProducerRecord); kafkaTemplate.send(stringObjectProducerRecord);
} }
private int currentPartitionIndex = 0; private int currentPartitionIndex = 0;

View File

@ -16,8 +16,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder @SuperBuilder
public class MessageData { public class MessageData {
/** /**
*vin *vin
*/ */
@ -208,8 +206,4 @@ public class MessageData {
private String chg; private String chg;
} }

View File

@ -35,13 +35,13 @@ public class ConversionUtil {
// String hexStr = "7E 48 58 49 54 48 58 55 51 52 32 54 55 43 30 58 33 55 31 37 31 37 35 38 39 32 35 37 35 35 31 31 31 36 2e 37 36 34 36 35 30 30 33 39 2e 35 37 34 38 38 30 30 32 37 30 2e 30 30 32 30 2e 33 35 30 30 30 30 30 30 32 37 36 30 30 30 38 30 30 30 30 32 32 30 38 30 30 30 30 30 44 31 30 30 30 39 2e 30 30 30 33 32 30 30 30 30 34 34 37 34 39 37 38 36 30 31 30 39 30 30 30 32 32 32 30 30 35 32 38 34 30 30 30 30 37 35 38 38 34 2e 33 32 30 30 30 30 31 33 30 30 30 30 31 31 34 30 30 30 30 36 39 39 30 30 30 34 30 30 30 33 30 30 30 35 30 30 30 30 30 35 30 30 30 30 30 32 33 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 11 7E"; // String hexStr = "7E 48 58 49 54 48 58 55 51 52 32 54 55 43 30 58 33 55 31 37 31 37 35 38 39 32 35 37 35 35 31 31 31 36 2e 37 36 34 36 35 30 30 33 39 2e 35 37 34 38 38 30 30 32 37 30 2e 30 30 32 30 2e 33 35 30 30 30 30 30 30 32 37 36 30 30 30 38 30 30 30 30 32 32 30 38 30 30 30 30 30 44 31 30 30 30 39 2e 30 30 30 33 32 30 30 30 30 34 34 37 34 39 37 38 36 30 31 30 39 30 30 30 32 32 32 30 30 35 32 38 34 30 30 30 30 37 35 38 38 34 2e 33 32 30 30 30 30 31 33 30 30 30 30 31 31 34 30 30 30 30 36 39 39 30 30 30 34 30 30 30 33 30 30 30 35 30 30 30 30 30 35 30 30 30 30 30 32 33 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 11 7E";
String hexStringToString = hexStringToString(args); String hexStringToString = hexStringToString(args);
System.out.println(args); // System.out.println(args);
System.out.println(args.length()); // System.out.println(args.length());
System.out.println(hexStringToString); // System.out.println(hexStringToString);
System.out.println(hexStringToString.length()); // System.out.println(hexStringToString.length());
//截取第一位和最后两位 //截取第一位和最后两位
String substring = hexStringToString.substring(1, hexStringToString.length() - 2); String substring = hexStringToString.substring(1, hexStringToString.length() - 2);
log.error("substring:{}",substring.length()); // log.error("substring:{}",substring.length());
//vin //vin
String vin = substring.substring(0,17); String vin = substring.substring(0,17);
// log.error("length:{}",vin.length()); // log.error("length:{}",vin.length());

View File

@ -11,43 +11,21 @@ spring:
host: 101.34.243.166 host: 101.34.243.166
port: 5672 port: 5672
datasource: datasource:
username: muyu driver-class-name: com.mysql.cj.jdbc.Driver
password: 123456 url: jdbc:mysql://101.34.243.166:3306/zncar?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
# 如果需要数据本地化,则改成 file 方式 username: root
# jdbc:h2:mem:testDB;DB_CLOSE_DELAY=-1 password: wan@123
url: jdbc:h2:file:./db/vehicleSimulationDataBaseFile;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1 application:
driver-class-name: org.h2.Driver name: shop-server
# secondary:
# driver-class-name: com.mysql.cj.jdbc.Driver
# url: jdbc:mysql://101.34.243.166:3306/zncar?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false
# username: root
# password: yl@123
mybatis:
# mybatis-plus 配置
mybatis-plus:
mapper-locations: classpath*:/com.muyu.mapper/**/*.xml
#实体扫描多个package用逗号或者分号分隔
typeAliasesPackage: com.dmo.entity
global-config:
#数据库相关配置
db-config:
#主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
id-type: AUTO
#字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
field-strategy: NOT_NULL
#驼峰下划线转换
column-underline: true
logic-delete-value: -1
logic-not-delete-value: 0
#原生配置
configuration: configuration:
# 打印sql
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true map-underscore-to-camel-case: true
cache-enabled: false log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
call-setters-on-nulls: true mapper-locations: classpath*:mapper/*Mapper.xml
jdbc-type-for-null: 'null' global-config:
db-config:
id-type: auto
# 日志输出配置 # 日志输出配置
logging: logging: