kafka实现消息接收,直接编译成对象

vehicle-yinyuyang
黄大举 2024-04-02 23:08:57 +08:00
parent deebcff5bb
commit 64823b766e
8 changed files with 885 additions and 5 deletions

View File

@ -0,0 +1,123 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-analyze</artifactId>
<properties>
<maven.compiler.source>20</maven.compiler.source>
<maven.compiler.target>20</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Swagger UI -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.fox.version}</version>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- RuoYi Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-datasource</artifactId>
</dependency>
<!-- RuoYi Common DataScope -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-datascope</artifactId>
</dependency>
<!-- RuoYi Common Log -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-log</artifactId>
</dependency>
<!-- RuoYi Common Swagger -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-file-remote</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,36 @@
package com.muyu.analyze;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableRyFeignClients;
import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
*
*
* @author muyu
*/
@EnableCustomConfig
@EnableCustomSwagger2
@EnableRyFeignClients
@SpringBootApplication
@EnableScheduling
public class MuYuAnalyzeApplication
{
public static void main(String[] args)
{
SpringApplication.run(MuYuAnalyzeApplication.class, args);
System.out.println("(♥◠‿◠)ノ゙ 系统模块启动成功 ლ(´ڡ`ლ)゙ \n" +
" .-------. ____ __ \n" +
" | _ _ \\ \\ \\ / / \n" +
" | ( ' ) | \\ _. / ' \n" +
" |(_ o _) / _( )_ .' \n" +
" | (_,_).' __ ___(_ o _)' \n" +
" | |\\ \\ | || |(_,_)' \n" +
" | | \\ `' /| `-' / \n" +
" | | \\ / \\ / \n" +
" ''-' `'-' `-..-' ");
}
}

View File

@ -0,0 +1,48 @@
package com.muyu.analyze.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Configuration
public class KafkaConfig {
// Kafka Bootstrap Servers
public static final String BOOTSTRAP_SERVERS = "10.10.26.5:9092";
// 默认主题
public static final String DEFAULT_TOPIC = "test";
public static final String DEFAULT_KEY = "10001";
public static Properties properties1(){
// 1构建 Properties 对象 存放 kafka 生产者配置信息
Properties properties1 = new Properties();
// 设置 kafka 连接地址(从配置文件/环境变量获取)
properties1.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
// 设置 键值 序列化
properties1.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
properties1.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties1;
}
public static Properties properties2(){
Properties properties2 = new Properties();
properties2.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.26.5:9092");
properties2.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties2.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties2.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
return properties2;
}
}

View File

@ -0,0 +1,34 @@
package com.muyu.analyze.consumer;
import com.muyu.analyze.domian.VehicleData;
import com.muyu.analyze.utils.AnalyzeUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
*
* @author zhuwenqiang
* @date 2023/3/1
*/
@Component
public class Consumer {
@KafkaListener(topics="test", groupId = "group",properties = {"bootstrap.servers=10.10.26.5:9092"})
public void getMessage(String message) {
VehicleData analyze = AnalyzeUtils.analyze(message);
System.out.println("kafka 消费者监听,接收到消息:" + analyze);
}
}

View File

@ -0,0 +1,281 @@
package com.muyu.analyze.domian;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
/**
* @author
* @Classname VehicleData
* @Description
* @Date 2021/8/5
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleData {
/**
* VIN
*/
private String vin;
/**
*
*/
private String dateTime;
/**
* 线
*/
private String drivingRoute;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
/**
*
*/
private String current;
/**
*
*/
private String resistance;
/**
*
*/
private String gear = "P";
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
private String motorSpeed;
/**
*
*/
private String motorTorque;
/**
*
*/
private String motorTemperature;
/**
*
*/
private String motorVoltage;
/**
*
*/
private String motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private BigDecimal batteryLevel;
/**
*
*/
private String maximumFeedbackPower;
/**
*
*/
private String maximumDischargePower;
/**
* BMS
*/
private String selfCheckCounter;
/**
*
*/
private String totalBatteryCurrent;
/**
* V3
*/
private String totalBatteryVoltage;
/**
*
*/
private String singleBatteryMaxVoltage;
/**
*
*/
private String singleBatteryMinVoltage;
/**
*
*/
private String singleBatteryMaxTemperature;
/**
*
*/
private String singleBatteryMinTemperature;
/**
*
*/
private String availableBatteryCapacity;
/**
*
*/
private int vehicleStatus = 1;
/**
*
*/
private int chargingStatus = 1;
/**
*
*/
private int operatingStatus = 1;
/**
* SOC
*/
private int socStatus = 1;
/**
*
*/
private int chargingEnergyStorageStatus = 1;
/**
*
*/
private int driveMotorStatus = 1;
/**
*
*/
private int positionStatus = 1;
/**
* EAS()
*/
private int easStatus = 1;
/**
* PTC()
*/
private int ptcStatus = 0;
/**
* EPS()
*/
private int epsStatus = 1;
/**
* ABS()
*/
private int absStatus = 1;
/**
* MCU(/)
*/
private int mcuStatus = 1;
/**
*
*/
private int heatingStatus = 1;
/**
*
*/
private int batteryStatus = 1;
/**
*
*/
private int batteryInsulationStatus = 1;
/**
* DCDC()
*/
private int dcdcStatus = 1;
/**
* CHG()
*/
private int chgStatus = 1;
/**
*
*/
private String vehicleStatusMsg;
/**
*
*/
private String smartHardwareMsg;
/**
*
*/
private String batteryMsg;
}

View File

@ -0,0 +1,318 @@
package com.muyu.analyze.utils;
import com.muyu.analyze.domian.VehicleData;
import java.math.BigDecimal;
import java.util.Stack;
/**
* @ProjectName: cloud-vehicles
* @PackageName: com.muyu.analyze.config
* @Description TODO
* @Author HuangDaJu
* @Date 2024/4/2 21:33
* @Version 1.0
*/
public class AnalyzeUtils {
public static VehicleData analyze(String aa) {
VehicleData vehicleData = new VehicleData();
/**
* VIN private String vin;
*/
String vin = aa.substring(0, 17);
vehicleData.setVin(vin);
/**
* private String dateTime;
*/
String time = aa.substring(17, 30);
vehicleData.setDateTime(time);
/**
* 线 private String drivingRoute;
*/
/**
* private String longitude;
*/
String longitude = aa.substring(30, 41);
vehicleData.setLongitude(longitude);
/**
* private String latitude;
*/
String latitude = aa.substring(41, 51);
vehicleData.setLatitude(latitude);
/**
* private String speed;
*/
String speed = aa.substring(51, 57);
vehicleData.setSpeed(speed);
/**
* private BigDecimal mileage;
*/
String mileage = aa.substring(57, 68);
vehicleData.setMileage(new BigDecimal(mileage));
/**
* private String voltage;
*/
String voltage = aa.substring(68, 74);
vehicleData.setVoltage(voltage);
/**
* private String current;
*/
String current = aa.substring(74, 79);
vehicleData.setCurrent(current);
/**
* private String resistance;
*/
String resistance = aa.substring(79, 88);
vehicleData.setResistance(resistance);
/**
* private String gear = "P";
*/
String gear = aa.substring(88, 89);
vehicleData.setGear(gear);
/**
* private String accelerationPedal;
*/
String accelerationPedal = aa.substring(89, 91);
vehicleData.setAccelerationPedal(accelerationPedal);
/**
* private String brakePedal;
*/
String brakePedal =aa.substring(91,93);
vehicleData.setBrakePedal(brakePedal);
/**
* private String fuelConsumptionRate;
*/
String fuelConsumptionRate =aa.substring(93,98);
vehicleData.setFuelConsumptionRate(fuelConsumptionRate);
/**
* private String motorControllerTemperature;
*/
String motorControllerTemperature =aa.substring(98,104);
vehicleData.setMotorControllerTemperature(motorControllerTemperature);
/**
* private String motorSpeed;
*/
String motorSpeed =aa.substring(104,109);
vehicleData.setMotorSpeed(motorSpeed);
/**
* private String motorTorque;
*/
String motorTorque = aa.substring(109, 113);
vehicleData.setMotorTorque(motorTorque);
/**
* private String motorTemperature;
*/
String motorTemperature = aa.substring(113, 119);
vehicleData.setMotorTemperature(motorTemperature);
/**
* private String motorVoltage;
*/
String motorVoltage = aa.substring(119, 124);
vehicleData.setMotorVoltage(motorVoltage);
/**
* private String motorCurrent;
*/
// 电机电流: 10446000
String motorCurrent = aa.substring(124, 132);
vehicleData.setMotorCurrent(motorCurrent);
// 动力电池剩余电量 SOC: 44103.
/**
* SOC private BigDecimal remainingBattery;
*/
String remainingBattery = aa.substring(132, 138);
vehicleData.setRemainingBattery(new BigDecimal(remainingBattery));
// 当前状态允许的最大反馈功率: 400000
/**
* private String maximumFeedbackPower;
*/
String maximumFeedbackPower = aa.substring(138, 144);
vehicleData.setMaximumFeedbackPower(maximumFeedbackPower);
// 当前状态允许最大放电功率: 130000
/**
* private String maximumDischargePower;
*/
String maximumDischargePower = aa.substring(144, 150);
vehicleData.setMaximumDischargePower(maximumDischargePower);
// BMS 自检计数器: 20
/**
* BMSprivate String selfCheckCounter;
*/
String selfCheckCounter = aa.substring(150, 152);
vehicleData.setSelfCheckCounter(selfCheckCounter);
// 动力电池充放电电流: 00000
/**
* private String totalBatteryCurrent;
*/
String totalBatteryCurrent = aa.substring(152, 157);
vehicleData.setTotalBatteryCurrent(totalBatteryCurrent);
// 动力电池负载端总电压 V3: 605000
/**
* V3 private String totalBatteryVoltage;
*/
String totalBatteryVoltage = aa.substring(157, 163);
vehicleData.setTotalBatteryVoltage(totalBatteryVoltage);
// 单次最大电压: 4000
/**
* private String singleBatteryMaxVoltage;
*/
String singleBatteryMaxVoltage = aa.substring(163, 167);
vehicleData.setSingleBatteryMaxVoltage(singleBatteryMaxVoltage);
// 单体电池最低电压: 3000
/**
* private String singleBatteryMinVoltage;
*/
String singleBatteryMinVoltage = aa.substring(167, 171);
vehicleData.setSingleBatteryMinVoltage(singleBatteryMinVoltage);
// 单体电池最高温度: 650000
/**
* private String singleBatteryMaxTemperature;
*/
String singleBatteryMaxTemperature = aa.substring(171, 177);
vehicleData.setSingleBatteryMaxTemperature(singleBatteryMaxTemperature);
// 单体电池最低温度: 600000
/**
* private String singleBatteryMinTemperature;
*/
String singleBatteryMinTemperature = aa.substring(177, 183);
vehicleData.setSingleBatteryMinTemperature(singleBatteryMinTemperature);
// 动力电池可用容量: 530000
/**
* private String availableBatteryCapacity;
*/
String availableBatteryCapacity = aa.substring(183, 189);
vehicleData.setAvailableBatteryCapacity(availableBatteryCapacity);
// 车辆状态: 0
String vehicleStatus = aa.substring(189, 190);
vehicleData.setVehicleStatus(Integer.parseInt(vehicleStatus));
// 充电状态: 1
String chargingStatus = aa.substring(190, 191);
vehicleData.setChargingStatus(Integer.parseInt(chargingStatus));
// 运行状态: 1
String operatingStatus = aa.substring(191, 192);
vehicleData.setOperatingStatus(Integer.parseInt(operatingStatus));
// SOC: 1
String socStatus = aa.substring(192, 193);
vehicleData.setSocStatus(Integer.parseInt(socStatus));
// 可充电储能装置工作状态: 0
String chargingEnergyStorageStatus = aa.substring(193, 194);
vehicleData.setChargingEnergyStorageStatus(Integer.parseInt(chargingEnergyStorageStatus));
// 驱动电机状态: 1
String driveMotorStatus = aa.substring(194, 195);
vehicleData.setDriveMotorStatus(Integer.parseInt(driveMotorStatus));
// 定位是否有效: 1
String positionStatus = aa.substring(195, 196);
vehicleData.setPositionStatus(Integer.parseInt(positionStatus));
// EAS: 1
String easStatus = aa.substring(196, 197);
vehicleData.setEasStatus(Integer.parseInt(easStatus));
// PTC: 1
String ptcStatus = aa.substring(197, 198);
vehicleData.setPtcStatus(Integer.parseInt(ptcStatus));
// EPS: 1
String epsStatus = aa.substring(198, 199);
vehicleData.setEpsStatus(Integer.parseInt(epsStatus));
// ABS: 1
String absStatus = aa.substring(199, 200);
vehicleData.setAbsStatus(Integer.parseInt(absStatus));
// MCU: 1
String mcuStatus = aa.substring(200, 201);
vehicleData.setMcuStatus(Integer.parseInt(mcuStatus));
// 动力电池加热状态: 1
String heatingStatus = aa.substring(201, 202);
vehicleData.setHeatingStatus(Integer.parseInt(heatingStatus));
// 动力电池当前状态: 1
String batteryStatus = aa.substring(202, 203);
vehicleData.setBatteryStatus(Integer.parseInt(batteryStatus));
// 动力电池保温状态: 1
String batteryInsulationStatus = aa.substring(203, 204);
vehicleData.setBatteryInsulationStatus(Integer.parseInt(batteryInsulationStatus));
// DCDC: 1
String dcdcStatus = aa.substring(204, 205);
vehicleData.setDcdcStatus(Integer.parseInt(dcdcStatus));
// CHG: 1
String chgStatus = aa.substring(205, 206);
vehicleData.setChgStatus(Integer.parseInt(chgStatus));
System.out.println(vehicleData);
return vehicleData;
}
}

View File

@ -0,0 +1,36 @@
# Tomcat
server:
port: 9008
# Spring
spring:
application:
# 应用名称
name: muyu-analyze
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 10.10.26.1:8848
config:
# 配置中心地址
server-addr: 10.10.26.1:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# Kafka
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 10.10.26.5:9092
consumer:
group-id: group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#kafka:
# topic: test

14
pom.xml
View File

@ -47,6 +47,7 @@
<muyu-business-common.version>3.6.3</muyu-business-common.version>
<muyu-business-remote.version>3.6.3</muyu-business-remote.version>
<muyu-business-server.version>3.6.3</muyu-business-server.version>
<muyu-analyze.version>3.6.3</muyu-analyze.version>
</properties>
@ -276,11 +277,11 @@
<!-- <artifactId>muyu-search-remote</artifactId>-->
<!-- <version>${muyu.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.muyu</groupId>-->
<!-- <artifactId>muyu-search-server</artifactId>-->
<!-- <version>${muyu.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-analyze</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -290,6 +291,9 @@
<module>muyu-visual</module>
<module>muyu-modules</module>
<module>muyu-common</module>
<module>muyu-analyze</module>
</modules>
<packaging>pom</packaging>