From 84a91d275c85b76afa3a7ac4e70e2f3681eb3c1f Mon Sep 17 00:00:00 2001 From: 20300 <643145201@qq.com> Date: Fri, 14 Jun 2024 19:01:37 +0800 Subject: [PATCH] =?UTF-8?q?feat()=E8=A7=A3=E6=9E=90=E5=B9=B6=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=88=B0iotDB=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 26 ++ src/main/java/com/hyc/config/KafkaConfig.java | 36 +++ src/main/java/com/hyc/config/RedisConfig.java | 33 +++ .../hyc/controller/SummarizeController.java | 12 +- .../com/hyc/domain/CarIncidentHandle.java | 33 +++ src/main/java/com/hyc/domain/TimeSortCar.java | 2 +- .../iotdbdemo/config/IotDBSessionConfig.java | 186 +++++++++++++ .../iotdbdemo/controller/IotDbController.java | 63 +++++ .../hyc/iotdbdemo/model/param/IotDbParam.java | 25 ++ .../iotdbdemo/model/result/IotDbResult.java | 33 +++ .../iotdbdemo/response/ErrorResponseData.java | 81 ++++++ .../hyc/iotdbdemo/response/ResponseData.java | 216 +++++++++++++++ .../response/SuccessResponseData.java | 28 ++ .../com/hyc/iotdbdemo/server/IotDbServer.java | 26 ++ .../server/impl/IotDbServerImpl.java | 251 ++++++++++++++++++ .../kafka/demo/consumer/KafkaConsumer.java | 69 ++++- .../kafka/demo/consumer/KafkaConsumer1.java | 30 +++ .../demo/controller/KafkaController.java | 3 +- .../hyc/kafka/demo/event/EventPosting.java | 21 ++ .../com/hyc/kafka/demo/strategy/Strategy.java | 37 +++ src/main/resources/application.yml | 8 +- 21 files changed, 1206 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/hyc/config/KafkaConfig.java create mode 100644 src/main/java/com/hyc/config/RedisConfig.java create mode 100644 src/main/java/com/hyc/domain/CarIncidentHandle.java create mode 100644 src/main/java/com/hyc/iotdbdemo/config/IotDBSessionConfig.java create mode 100644 src/main/java/com/hyc/iotdbdemo/controller/IotDbController.java create mode 100644 src/main/java/com/hyc/iotdbdemo/model/param/IotDbParam.java create mode 100644 src/main/java/com/hyc/iotdbdemo/model/result/IotDbResult.java create mode 100644 src/main/java/com/hyc/iotdbdemo/response/ErrorResponseData.java create mode 100644 src/main/java/com/hyc/iotdbdemo/response/ResponseData.java create mode 100644 src/main/java/com/hyc/iotdbdemo/response/SuccessResponseData.java create mode 100644 src/main/java/com/hyc/iotdbdemo/server/IotDbServer.java create mode 100644 src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java create mode 100644 src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java create mode 100644 src/main/java/com/hyc/kafka/demo/event/EventPosting.java create mode 100644 src/main/java/com/hyc/kafka/demo/strategy/Strategy.java diff --git a/pom.xml b/pom.xml index 51b91d4..31d957b 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,32 @@ 2.6.13 + + + org.apache.iotdb + iotdb-session + 0.14.0-preview1 + + + + org.springframework.boot + spring-boot-starter-data-redis + + + io.lettuce + lettuce-core + + + + + redis.clients + jedis + + + org.redisson + redisson + 3.16.0 + org.bouncycastle bcpkix-jdk15on diff --git a/src/main/java/com/hyc/config/KafkaConfig.java b/src/main/java/com/hyc/config/KafkaConfig.java new file mode 100644 index 0000000..c7d52ee --- /dev/null +++ b/src/main/java/com/hyc/config/KafkaConfig.java @@ -0,0 +1,36 @@ +//package com.hyc.config; +// +///** +// * kafka初始化配置类 +// * +// * @author YouChe·He +// * @ClassName: KafkaConfig +// * @Description: kafka初始化配置类 +// * @CreateTime: 2024/6/10 10:30 +// */ +//import org.apache.kafka.clients.admin.AdminClient; +//import org.apache.kafka.clients.admin.AdminClientConfig; +//import org.apache.kafka.clients.admin.NewTopic; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +//import java.util.Properties; +// +//@Configuration +//public class KafkaConfig { +// +// +// +// @Bean +// public AdminClient adminClient() { +// Properties properties = new Properties(); +// properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "47.103.75.98:9092"); +// return AdminClient.create(properties); +// } +// +// @Bean +// public NewTopic createTopic() { +// return new NewTopic("topichyc", 8, (short) 1); +// } +//} +// diff --git a/src/main/java/com/hyc/config/RedisConfig.java b/src/main/java/com/hyc/config/RedisConfig.java new file mode 100644 index 0000000..d4a03e6 --- /dev/null +++ b/src/main/java/com/hyc/config/RedisConfig.java @@ -0,0 +1,33 @@ +package com.hyc.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * redsi配置类 + * + * @author YouChe·He + * @ClassName: RedisConfoig + * @Description: redsi配置类 + * @CreateTime: 2024/5/27 14:16 + */ +@Configuration +public class RedisConfig { + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){ + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(redisConnectionFactory); + + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); + + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashValueSerializer(new StringRedisSerializer()); + return redisTemplate; + + } +} diff --git a/src/main/java/com/hyc/controller/SummarizeController.java b/src/main/java/com/hyc/controller/SummarizeController.java index 48b50b7..f5768f3 100644 --- a/src/main/java/com/hyc/controller/SummarizeController.java +++ b/src/main/java/com/hyc/controller/SummarizeController.java @@ -124,8 +124,16 @@ public class SummarizeController { HashMap stringStringHashMap = new HashMap<>(); for (int i = 0; i < 47; i++) { String substring = realString.substring(count, count + intArr[i]); - linkedHashMap.put(strArr[i],substring); + + if (strArr[i]=="vin" || strArr[i] == "gear"){ + linkedHashMap.put(strArr[i],substring); + }else if (strArr[i] == "startTime") { + linkedHashMap.put(strArr[i],Long.valueOf(substring)); + } else { + linkedHashMap.put(strArr[i],Double.valueOf(substring)); + } count = count + intArr[i]; + } log.warn("hashMap:{}",linkedHashMap); ObjectMapper objectMapper = new ObjectMapper(); @@ -134,7 +142,7 @@ public class SummarizeController { log.error("json格式:{}",jsonData); transactionTemplate.execute(status -> { try { - kafkaTemplate.send("topic1", UUID.randomUUID().toString(), jsonData); + kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), jsonData); status.flush(); }catch (Exception e){ log.error("kafka生产异常: {}", e.getMessage()); diff --git a/src/main/java/com/hyc/domain/CarIncidentHandle.java b/src/main/java/com/hyc/domain/CarIncidentHandle.java new file mode 100644 index 0000000..b194f55 --- /dev/null +++ b/src/main/java/com/hyc/domain/CarIncidentHandle.java @@ -0,0 +1,33 @@ +package com.hyc.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * 模拟小车绑定事件 + * + * @author YouChe·He + * @ClassName: CarIncidentHandle + * @Description: 模拟小车绑定事件 + * @CreateTime: 2024/6/10 10:25 + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class CarIncidentHandle { + /** + * id + */ + private Double id; + /** + * 车辆VIN + */ + private String vin; + /** + * 绑定事件 1:车辆录入2:实时数据 + */ + private String carIncident; +} diff --git a/src/main/java/com/hyc/domain/TimeSortCar.java b/src/main/java/com/hyc/domain/TimeSortCar.java index 400ddc5..fedbac9 100644 --- a/src/main/java/com/hyc/domain/TimeSortCar.java +++ b/src/main/java/com/hyc/domain/TimeSortCar.java @@ -44,6 +44,6 @@ public class TimeSortCar implements Comparable { @Override public int compareTo(TimeSortCar o) { - return Integer.compare((int) o.connectDuration,(int) this.connectDuration); + return Double.compare((int) o.connectDuration,(int) this.connectDuration); } } diff --git a/src/main/java/com/hyc/iotdbdemo/config/IotDBSessionConfig.java b/src/main/java/com/hyc/iotdbdemo/config/IotDBSessionConfig.java new file mode 100644 index 0000000..a92695e --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/config/IotDBSessionConfig.java @@ -0,0 +1,186 @@ +package com.hyc.iotdbdemo.config; + +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.session.util.Version; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.List; + +/** + * description: iotdb 配置工具类(常用部分,如需要可以自行扩展) + * 注意:可以不需要创建分组,插入时默认前两个节点名称为分组名称 比如: root.a1eaKSRpRty.CA3013A303A25467 或者 + * root.a1eaKSRpRty.CA3013A303A25467.heart 他们的分组都为 root.a1eaKSRpRty + * author: zhouhong + */ +@Log4j2 +@Component +@Configuration +public class IotDBSessionConfig { + + private static Session session; + private static final String LOCAL_HOST = "47.103.75.98"; + @Bean + public Session getSession() throws IoTDBConnectionException, StatementExecutionException { + if (session == null) { + log.info("正在连接iotdb......."); + session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build(); + session.open(false); + session.setFetchSize(100); + log.info("iotdb连接成功~"); + // 设置时区 + session.setTimeZone("+08:00"); + } + return session; + } + + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * time:时间戳 + * measurementsList:物理量 即:属性 + * type:数据类型: BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6); + * valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecordType(String deviceId, Long time,List measurementsList, TSDataType type,List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() != valuesList.size()) { + throw new ServerException("measurementsList 与 valuesList 值不对应"); + } + List types = new ArrayList<>(); + measurementsList.forEach(item -> { + types.add(type); + }); + session.insertRecord(deviceId, time, measurementsList, types, valuesList); + } + /** + * description: 带有数据类型的添加操作 - insertRecord没有指定类型 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param measurementsList:物理量 即:属性 + * @param valuesList:属性值 --- 属性必须与属性值一一对应 + * @return + */ + public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecord(deviceId, time, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + /** + * description: 批量插入 + * author: zhouhong + */ + public void insertRecords(List deviceIdList, List timeList, List> measurementsList, List> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { + if (measurementsList.size() == valuesList.size()) { + session.insertRecords(deviceIdList, timeList, measurementsList, valuesList); + } else { + log.error("measurementsList 与 valuesList 值不对应"); + } + } + + /** + * description: 插入操作 + * author: zhouhong + * @param deviceId:节点路径如:root.a1eaKSRpRty.CA3013A303A25467 + * @param time:时间戳 + * @param schemaList: 属性值 + 数据类型 例子: List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64)); + * @param maxRowNumber: + * @return + */ + public void insertTablet(String deviceId, Long time,List schemaList, List valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException { + + Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber); + // 向iotdb里面添加数据 + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, time); + for (int i = 0; i < valueList.size(); i++) { + tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i)); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet, true); + tablet.reset(); + } + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + /** + * description: 根据SQL查询 + * author: zhouhong + */ + public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException { + return session.executeQueryStatement(sql); + } + + /** + * description: 删除分组 如 root.a1eaKSRpRty + * author: zhouhong + * @param groupName:分组名称 + * @return + */ + public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroup(groupName); + } + + /** + * description: 根据Timeseries删除 如:root.a1eaKSRpRty.CA3013A303A25467.breath (个人理解:为具体的物理量) + * author: zhouhong + */ + public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseries); + } + /** + * description: 根据Timeseries批量删除 + * author: zhouhong + */ + public void deleteTimeserieList(List timeseriesList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteTimeseries(timeseriesList); + } + + /** + * description: 根据分组批量删除 + * author: zhouhong + */ + public void deleteStorageGroupList(List storageGroupList) throws StatementExecutionException, IoTDBConnectionException { + session.deleteStorageGroups(storageGroupList); + } + + /** + * description: 根据路径和结束时间删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(path, endTime); + } + /** + * description: 根据路径集合和结束时间批量删除 结束时间之前的所有数据 + * author: zhouhong + */ + public void deleteDataByPathListAndEndTime(List pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, endTime); + } + /** + * description: 根据路径集合和时间段批量删除 + * author: zhouhong + */ + public void deleteDataByPathListAndTime(List pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException { + session.deleteData(pathList, startTime, endTime); + } + +} diff --git a/src/main/java/com/hyc/iotdbdemo/controller/IotDbController.java b/src/main/java/com/hyc/iotdbdemo/controller/IotDbController.java new file mode 100644 index 0000000..0dc663d --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/controller/IotDbController.java @@ -0,0 +1,63 @@ +package com.hyc.iotdbdemo.controller; + +import com.hyc.domain.VehicleData; +import com.hyc.iotdbdemo.config.IotDBSessionConfig; +import com.hyc.iotdbdemo.model.param.IotDbParam; +import com.hyc.iotdbdemo.response.ResponseData; +import com.hyc.iotdbdemo.server.IotDbServer; +import lombok.extern.log4j.Log4j2; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.rmi.ServerException; + +/** + * description: iotdb 控制层 + * date: 2022/8/15 21:50 + * author: zhouhong + */ +@Log4j2 +@RestController +public class IotDbController { + + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + /** + * 插入数据 + * @param iotDbParam + */ + @PostMapping("/api/device/insert") + public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException { + //iotDbServer.insertData(iotDbParam); + return ResponseData.success(); + } + + /** + * 查询数据 + * @param iotDbParam + */ + @PostMapping("/api/device/queryData") + public ResponseData queryDataFromIotDb(@RequestBody IotDbParam vehicleData) throws Exception { + return ResponseData.success(iotDbServer.queryDataFromIotDb(vehicleData)); + } + + /** + * 删除分组 + * @return + */ + @PostMapping("/api/device/deleteGroup") + public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException { + iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty"); + iotDBSessionConfig.deleteStorageGroup("root.smartretirement"); + return ResponseData.success(); + } + +} diff --git a/src/main/java/com/hyc/iotdbdemo/model/param/IotDbParam.java b/src/main/java/com/hyc/iotdbdemo/model/param/IotDbParam.java new file mode 100644 index 0000000..115711d --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/model/param/IotDbParam.java @@ -0,0 +1,25 @@ +package com.hyc.iotdbdemo.model.param; + +import lombok.Data; +/** + * description: 入参 + * date: 2022/8/15 21:53 + * author: zhouhong + */ +@Data +public class IotDbParam { + + /*** + * 设备号 + */ + private String vin; + /*** + * 查询开始时间 + */ + private String startTime; + /*** + * 查询结束时间 + */ + private String endTime; + +} diff --git a/src/main/java/com/hyc/iotdbdemo/model/result/IotDbResult.java b/src/main/java/com/hyc/iotdbdemo/model/result/IotDbResult.java new file mode 100644 index 0000000..b2a3eab --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/model/result/IotDbResult.java @@ -0,0 +1,33 @@ +package com.hyc.iotdbdemo.model.result; + +import lombok.Data; + +/** + * description: 返回结果 + * date: 2022/8/15 21:56 + * author: zhouhong + */ +@Data +public class IotDbResult { + /*** + * 时间 + */ + private String time; + /*** + * 产品PK + */ + private String pk; + /*** + * 设备号 + */ + private String sn; + /*** + * 实时呼吸 + */ + private String breath; + /*** + * 实时心率 + */ + private String heart; + +} diff --git a/src/main/java/com/hyc/iotdbdemo/response/ErrorResponseData.java b/src/main/java/com/hyc/iotdbdemo/response/ErrorResponseData.java new file mode 100644 index 0000000..129cb3e --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/response/ErrorResponseData.java @@ -0,0 +1,81 @@ +package com.hyc.iotdbdemo.response; + +/** + * description: 错误返回封装 + * date: 2022/8/15 21:30 + * author: zhouhong + */ +public class ErrorResponseData extends ResponseData { + private String exceptionClazz; + + ErrorResponseData(String message) { + super(false, DEFAULT_ERROR_CODE, message, message, (Object)null); + } + + public ErrorResponseData(Integer code, String message) { + super(false, code, message, message, (Object)null); + } + + ErrorResponseData(Integer code, String message, Object object) { + super(false, code, message, object); + } + + ErrorResponseData(Integer code, String message, String localizedMsg, Object object) { + super(false, code, message, localizedMsg, object); + } + + @Override + public boolean equals(final Object o) { + if (o == this) { + return true; + } else if (!(o instanceof ErrorResponseData)) { + return false; + } else { + ErrorResponseData other = (ErrorResponseData)o; + if (!other.canEqual(this)) { + return false; + } else if (!super.equals(o)) { + return false; + } else { + Object this$exceptionClazz = this.getExceptionClazz(); + Object other$exceptionClazz = other.getExceptionClazz(); + if (this$exceptionClazz == null) { + if (other$exceptionClazz != null) { + return false; + } + } else if (!this$exceptionClazz.equals(other$exceptionClazz)) { + return false; + } + + return true; + } + } + } + + @Override + protected boolean canEqual(final Object other) { + return other instanceof ErrorResponseData; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + Object $exceptionClazz = this.getExceptionClazz(); + result = result * 59 + ($exceptionClazz == null ? 43 : $exceptionClazz.hashCode()); + return result; + } + + public String getExceptionClazz() { + return this.exceptionClazz; + } + + public void setExceptionClazz(final String exceptionClazz) { + this.exceptionClazz = exceptionClazz; + } + + @Override + public String toString() { + return "ErrorResponseData(exceptionClazz=" + this.getExceptionClazz() + ")"; + } +} + diff --git a/src/main/java/com/hyc/iotdbdemo/response/ResponseData.java b/src/main/java/com/hyc/iotdbdemo/response/ResponseData.java new file mode 100644 index 0000000..4cc46b4 --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/response/ResponseData.java @@ -0,0 +1,216 @@ +package com.hyc.iotdbdemo.response; + +/** + * description: 返回结果封装 + * date: 2022/8/15 21:32 + * author: zhouhong + */ +public class ResponseData { + public static final String DEFAULT_SUCCESS_MESSAGE = "请求成功"; + public static final String DEFAULT_ERROR_MESSAGE = "网络异常"; + public static final Integer DEFAULT_SUCCESS_CODE = 200; + public static final Integer DEFAULT_ERROR_CODE = 500; + private Boolean success; + private Integer code; + private String message; + private String localizedMsg; + private Object data; + + public ResponseData() { + } + + public ResponseData(Boolean success, Integer code, String message, Object data) { + this.success = success; + this.code = code; + this.message = message; + this.data = data; + } + + public ResponseData(Boolean success, Integer code, String message, String localizedMsg, Object data) { + this.success = success; + this.code = code; + this.message = message; + this.localizedMsg = localizedMsg; + this.data = data; + } + + public ResponseData(Boolean success, Integer code, String message) { + this.success = success; + this.code = code; + this.message = message; + } + + public static SuccessResponseData success() { + return new SuccessResponseData(); + } + + public static SuccessResponseData success(Object object) { + return new SuccessResponseData(object); + } + + public static SuccessResponseData success(Integer code, String message, Object object) { + return new SuccessResponseData(code, message, object); + } + + public static SuccessResponseData success(Integer code, String message) { + return new SuccessResponseData(code, message); + } + + public static SuccessResponseData success(Integer code, String message, String localizedMsg, Object object) { + return new SuccessResponseData(code, message, localizedMsg, object); + } + + public static ErrorResponseData error(String message) { + return new ErrorResponseData(message); + } + + public static ErrorResponseData error(Integer code, String message) { + return new ErrorResponseData(code, message); + } + + public static ErrorResponseData error(Integer code, String message, Object object) { + return new ErrorResponseData(code, message, object); + } + + public static ErrorResponseData error(Integer code, String message, String localizedMsg, Object object) { + return new ErrorResponseData(code, message, localizedMsg, object); + } + + public Boolean getSuccess() { + return this.success; + } + + public Integer getCode() { + return this.code; + } + + public String getMessage() { + return this.message; + } + + public String getLocalizedMsg() { + return this.localizedMsg; + } + + public Object getData() { + return this.data; + } + + public void setSuccess(final Boolean success) { + this.success = success; + } + + public void setCode(final Integer code) { + this.code = code; + } + + public void setMessage(final String message) { + this.message = message; + } + + public void setLocalizedMsg(final String localizedMsg) { + this.localizedMsg = localizedMsg; + } + + public void setData(final Object data) { + this.data = data; + } + + @Override + public boolean equals(final Object o) { + if (o == this) { + return true; + } else if (!(o instanceof ResponseData)) { + return false; + } else { + ResponseData other = (ResponseData)o; + if (!other.canEqual(this)) { + return false; + } else { + label71: { + Object this$success = this.getSuccess(); + Object other$success = other.getSuccess(); + if (this$success == null) { + if (other$success == null) { + break label71; + } + } else if (this$success.equals(other$success)) { + break label71; + } + + return false; + } + + Object this$code = this.getCode(); + Object other$code = other.getCode(); + if (this$code == null) { + if (other$code != null) { + return false; + } + } else if (!this$code.equals(other$code)) { + return false; + } + + label57: { + Object this$message = this.getMessage(); + Object other$message = other.getMessage(); + if (this$message == null) { + if (other$message == null) { + break label57; + } + } else if (this$message.equals(other$message)) { + break label57; + } + + return false; + } + + Object this$localizedMsg = this.getLocalizedMsg(); + Object other$localizedMsg = other.getLocalizedMsg(); + if (this$localizedMsg == null) { + if (other$localizedMsg != null) { + return false; + } + } else if (!this$localizedMsg.equals(other$localizedMsg)) { + return false; + } + + Object this$data = this.getData(); + Object other$data = other.getData(); + if (this$data == null) { + if (other$data == null) { + return true; + } + } else if (this$data.equals(other$data)) { + return true; + } + + return false; + } + } + } + + protected boolean canEqual(final Object other) { + return other instanceof ResponseData; + } + + @Override + public int hashCode() { + int result1 = 1; + Object $success = this.getSuccess(); + int result = result1 * 59 + ($success == null ? 43 : $success.hashCode()); + Object $code = this.getCode(); + result = result * 59 + ($code == null ? 43 : $code.hashCode()); + Object $message = this.getMessage(); + result = result * 59 + ($message == null ? 43 : $message.hashCode()); + Object $localizedMsg = this.getLocalizedMsg(); + result = result * 59 + ($localizedMsg == null ? 43 : $localizedMsg.hashCode()); + Object $data = this.getData(); + result = result * 59 + ($data == null ? 43 : $data.hashCode()); + return result; + } + @Override + public String toString() { + return "ResponseData(success=" + this.getSuccess() + ", code=" + this.getCode() + ", message=" + this.getMessage() + ", localizedMsg=" + this.getLocalizedMsg() + ", data=" + this.getData() + ")"; + } +} diff --git a/src/main/java/com/hyc/iotdbdemo/response/SuccessResponseData.java b/src/main/java/com/hyc/iotdbdemo/response/SuccessResponseData.java new file mode 100644 index 0000000..4f12c18 --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/response/SuccessResponseData.java @@ -0,0 +1,28 @@ +package com.hyc.iotdbdemo.response; +/** + * description: 正确返回结果封装 + * date: 2022/8/15 21:40 + * author: zhouhong + */ + +public class SuccessResponseData extends ResponseData { + public SuccessResponseData() { + super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", (Object)null); + } + + public SuccessResponseData(Object object) { + super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", object); + } + + public SuccessResponseData(Integer code, String message, Object object) { + super(true, code, message, message, object); + } + + public SuccessResponseData(Integer code, String message, String localizedMsg, Object object) { + super(true, code, message, localizedMsg, object); + } + + public SuccessResponseData(Integer code, String message) { + super(true, code, message); + } +} diff --git a/src/main/java/com/hyc/iotdbdemo/server/IotDbServer.java b/src/main/java/com/hyc/iotdbdemo/server/IotDbServer.java new file mode 100644 index 0000000..2c71016 --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/server/IotDbServer.java @@ -0,0 +1,26 @@ +package com.hyc.iotdbdemo.server; + +import com.hyc.domain.VehicleData; +import com.hyc.iotdbdemo.model.param.IotDbParam; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import java.rmi.ServerException; + +/** + * description: iot服务类 + * date: 2022/8/15 21:41 + * author: zhouhong + */ + +public interface IotDbServer { + /** + * 添加数据 + */ + void insertData(VehicleData vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException; + + /** + * 查询数据 + */ + Object queryDataFromIotDb(IotDbParam vehicleData) throws Exception; +} diff --git a/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java b/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java new file mode 100644 index 0000000..49cd689 --- /dev/null +++ b/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java @@ -0,0 +1,251 @@ +package com.hyc.iotdbdemo.server.impl; + +import com.hyc.domain.VehicleData; +import com.hyc.iotdbdemo.config.IotDBSessionConfig; +import com.hyc.iotdbdemo.model.param.IotDbParam; +import com.hyc.iotdbdemo.model.result.IotDbResult; +import com.hyc.iotdbdemo.server.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * description: iot服务实现类 + * date: 2022/8/15 9:43 + * author: zhouhong + */ + +@Log4j2 +@Service +public class IotDbServerImpl implements IotDbServer { + + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + @Override + public void insertData(VehicleData vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException { + // iotDbParam: 模拟设备上报消息 + // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码 + String deviceId = "root.bizkey."+vehicleData.getVin(); + // 将设备上报的数据存入数据库(时序数据库) + List measurementsList = getInsertList(); + List valuesList = getInsertValueList(vehicleData); + iotDBSessionConfig.insertRecord(deviceId, vehicleData.getStartTime(), measurementsList, valuesList); + } + + + + @Override + public List queryDataFromIotDb(IotDbParam vehicleData) throws Exception { + List iotDbResultList = new ArrayList<>(); + log.error("查询数据的参数是:{}",vehicleData); + if (null != vehicleData.getVin()) { + String sql = "select * from root.bizkey."+ vehicleData.getVin() + " where time >= " + + vehicleData.getStartTime() + " and time < " + vehicleData.getEndTime(); + SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql); + List columnNames = sessionDataSet.getColumnNames(); + List titleList = new ArrayList<>(); + // 排除Time字段 -- 方便后面后面拼装数据 + for (int i = 1; i < columnNames.size(); i++) { + String[] temp = columnNames.get(i).split("\\."); + titleList.add(temp[temp.length - 1]); + } + // 封装处理数据 + packagingData(vehicleData, iotDbResultList, sessionDataSet, titleList); + } else { + log.info("PK或者SN不能为空!!"); + } + return iotDbResultList; + } + /** + * 封装处理数据 + * @param iotDbParam + * @param iotDbResultList + * @param sessionDataSet + * @param titleList + * @throws StatementExecutionException + * @throws IoTDBConnectionException + */ + private void packagingData(IotDbParam iotDbParam, List iotDbResultList, SessionDataSet sessionDataSet, List titleList) + throws StatementExecutionException, IoTDBConnectionException { + int fetchSize = sessionDataSet.getFetchSize(); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + VehicleData iotDbResult = new VehicleData(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + long timestamp = next.getTimestamp(); +// new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + iotDbResult.setStartTime(timestamp); + Map map = new HashMap<>(); + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + // 这里的需要按照类型获取 + map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString()); + } + iotDbResult.setStartTime(timestamp); + + iotDbResult.setVin(map.get("vin")); + iotDbResult.setLongitude(Double.valueOf(map.get("longitude"))); + iotDbResult.setLatitude(Double.valueOf(map.get("latitude"))); + iotDbResult.setSpeed(Double.valueOf(map.get("speed"))); + iotDbResult.setMileage(Double.valueOf(map.get("mileage"))); + iotDbResult.setVoltage(Double.valueOf(map.get("voltage"))); + iotDbResult.setCurrent(Double.valueOf(map.get("current"))); + iotDbResult.setResistance(Double.valueOf(map.get("resistance"))); + iotDbResult.setGear(map.get("gear")); + iotDbResult.setAccelerationPedal(Double.valueOf(map.get("accelerationPedal"))); + iotDbResult.setBrakePedal(Double.valueOf(map.get("brakePedal"))); + iotDbResult.setFuelConsumptionRate(Double.valueOf(map.get("fuelConsumptionRate"))); + iotDbResult.setMotorControllerTemperature(Double.valueOf(map.get("motorControllerTemperature"))); + iotDbResult.setMotorSpeed(Double.valueOf(map.get("motorSpeed"))); + iotDbResult.setMotorTorque(Double.valueOf(map.get("motorTorque"))); + iotDbResult.setMotorTemperature(Double.valueOf(map.get("motorTemperature"))); + iotDbResult.setMotorVoltage(Double.valueOf(map.get("motorVoltage"))); + iotDbResult.setMotorCurrent(Double.valueOf(map.get("motorCurrent"))); + iotDbResult.setRemainingBattery(Double.valueOf(map.get("remainingBattery"))); + iotDbResult.setMaximumFeedbackPower(Double.valueOf(map.get("maximumFeedbackPower"))); + iotDbResult.setMaximumDischargePower(Double.valueOf(map.get("maximumDischargePower"))); + iotDbResult.setSelfCheckCounter(Double.valueOf(map.get("selfCheckCounter"))); + iotDbResult.setTotalBatteryCurrent(Double.valueOf(map.get("totalBatteryCurrent"))); + iotDbResult.setTotalBatteryVoltage(Double.valueOf(map.get("totalBatteryVoltage"))); + iotDbResult.setSingleBatteryMaxVoltage(Double.valueOf(map.get("singleBatteryMaxVoltage"))); + iotDbResult.setSingleBatteryMinVoltage(Double.valueOf(map.get("singleBatteryMinVoltage"))); + iotDbResult.setSingleBatteryMaxTemperature(Double.valueOf(map.get("singleBatteryMaxTemperature"))); + iotDbResult.setSingleBatteryMinTemperature(Double.valueOf(map.get("singleBatteryMinTemperature"))); + iotDbResult.setAvailableBatteryCapacity(Double.valueOf(map.get("availableBatteryCapacity"))); + iotDbResult.setVehicleStatus(Double.valueOf(map.get("vehicleStatus"))); + iotDbResult.setChargingStatus(Double.valueOf(map.get("chargingStatus"))); + iotDbResult.setOperatingStatus(Double.valueOf(map.get("operatingStatus"))); + iotDbResult.setSocStatus(Double.valueOf(map.get("socStatus"))); + iotDbResult.setChargingEnergyStorageStatus(Double.valueOf(map.get("chargingEnergyStorageStatus"))); + iotDbResult.setDriveMotorStatus(Double.valueOf(map.get("driveMotorStatus"))); + iotDbResult.setPositionStatus(Double.valueOf(map.get("positionStatus"))); + iotDbResult.setEasStatus(Double.valueOf(map.get("easStatus"))); + iotDbResult.setPtcStatus(Double.valueOf(map.get("ptcStatus"))); + iotDbResult.setEpsStatus(Double.valueOf(map.get("epsStatus"))); + iotDbResult.setAbsStatus(Double.valueOf(map.get("absStatus"))); + iotDbResult.setMcuStatus(Double.valueOf(map.get("mcuStatus"))); + iotDbResult.setHeatingStatus(Double.valueOf(map.get("heatingStatus"))); + iotDbResult.setBatteryStatus(Double.valueOf(map.get("batteryStatus"))); + iotDbResult.setBatteryInsulationStatus(Double.valueOf(map.get("batteryInsulationStatus"))); + iotDbResult.setDcdcStatus(Double.valueOf(map.get("dcdcStatus"))); + iotDbResult.setChgStatus(Double.valueOf(map.get("chgStatus"))); + iotDbResultList.add(iotDbResult); + } + } + } + public List getInsertList(){ + List measurementsList = new ArrayList<>(); + measurementsList.add("startTime"); + measurementsList.add("longitude"); + measurementsList.add("latitude"); + measurementsList.add("speed"); + measurementsList.add("mileage"); + measurementsList.add("voltage"); + measurementsList.add("current"); + measurementsList.add("resistance"); + measurementsList.add("gear"); + measurementsList.add("accelerationPedal"); + measurementsList.add("brakePedal"); + measurementsList.add("fuelConsumptionRate"); + measurementsList.add("motorControllerTemperature"); + measurementsList.add("motorSpeed"); + measurementsList.add("motorTorque"); + measurementsList.add("motorTemperature"); + measurementsList.add("motorVoltage"); + measurementsList.add("motorCurrent"); + measurementsList.add("remainingBattery"); + measurementsList.add("maximumFeedbackPower"); + measurementsList.add("maximumDischargePower"); + measurementsList.add("selfCheckCounter"); + measurementsList.add("totalBatteryCurrent"); + measurementsList.add("totalBatteryVoltage"); + measurementsList.add("singleBatteryMaxVoltage"); + measurementsList.add("singleBatteryMinVoltage"); + measurementsList.add("singleBatteryMaxTemperature"); + measurementsList.add("singleBatteryMinTemperature"); + measurementsList.add("availableBatteryCapacity"); + measurementsList.add("vehicleStatus"); + measurementsList.add("chargingStatus"); + measurementsList.add("operatingStatus"); + measurementsList.add("socStatus"); + measurementsList.add("chargingEnergyStorageStatus"); + measurementsList.add("driveMotorStatus"); + measurementsList.add("positionStatus"); + measurementsList.add("easStatus"); + measurementsList.add("ptcStatus"); + measurementsList.add("epsStatus"); + measurementsList.add("absStatus"); + measurementsList.add("mcuStatus"); + measurementsList.add("heatingStatus"); + measurementsList.add("batteryStatus"); + measurementsList.add("batteryInsulationStatus"); + measurementsList.add("dcdcStatus"); + measurementsList.add("chgStatus"); + return measurementsList; + } + public List getInsertValueList(VehicleData vehicleData){ + List valuesList = new ArrayList<>(); + valuesList.add(String.valueOf(vehicleData.getStartTime())); + valuesList.add(String.valueOf(vehicleData.getLongitude())); + valuesList.add(String.valueOf(vehicleData.getLatitude())); + valuesList.add(String.valueOf(vehicleData.getSpeed())); + valuesList.add(String.valueOf(vehicleData.getMileage())); + valuesList.add(String.valueOf(vehicleData.getVoltage())); + valuesList.add(String.valueOf(vehicleData.getCurrent())); + valuesList.add(String.valueOf(vehicleData.getResistance())); + valuesList.add(String.valueOf(vehicleData.getGear())); + valuesList.add(String.valueOf(vehicleData.getAccelerationPedal())); + valuesList.add(String.valueOf(vehicleData.getBrakePedal())); + valuesList.add(String.valueOf(vehicleData.getFuelConsumptionRate())); + valuesList.add(String.valueOf(vehicleData.getMotorControllerTemperature())); + valuesList.add(String.valueOf(vehicleData.getMotorSpeed())); + valuesList.add(String.valueOf(vehicleData.getMotorTorque())); + valuesList.add(String.valueOf(vehicleData.getMotorTemperature())); + valuesList.add(String.valueOf(vehicleData.getMotorVoltage())); + valuesList.add(String.valueOf(vehicleData.getMotorCurrent())); + valuesList.add(String.valueOf(vehicleData.getRemainingBattery())); + valuesList.add(String.valueOf(vehicleData.getMaximumFeedbackPower())); + valuesList.add(String.valueOf(vehicleData.getMaximumDischargePower())); + valuesList.add(String.valueOf(vehicleData.getSelfCheckCounter())); + valuesList.add(String.valueOf(vehicleData.getTotalBatteryCurrent())); + valuesList.add(String.valueOf(vehicleData.getTotalBatteryVoltage())); + valuesList.add(String.valueOf(vehicleData.getSingleBatteryMaxVoltage())); + valuesList.add(String.valueOf(vehicleData.getSingleBatteryMinVoltage())); + valuesList.add(String.valueOf(vehicleData.getSingleBatteryMaxTemperature())); + valuesList.add(String.valueOf(vehicleData.getSingleBatteryMinTemperature())); + valuesList.add(String.valueOf(vehicleData.getAvailableBatteryCapacity())); + valuesList.add(String.valueOf(vehicleData.getVehicleStatus())); + valuesList.add(String.valueOf(vehicleData.getChargingStatus())); + valuesList.add(String.valueOf(vehicleData.getOperatingStatus())); + valuesList.add(String.valueOf(vehicleData.getSocStatus())); + valuesList.add(String.valueOf(vehicleData.getChargingEnergyStorageStatus())); + valuesList.add(String.valueOf(vehicleData.getDriveMotorStatus())); + valuesList.add(String.valueOf(vehicleData.getPositionStatus())); + valuesList.add(String.valueOf(vehicleData.getEasStatus())); + valuesList.add(String.valueOf(vehicleData.getPtcStatus())); + valuesList.add(String.valueOf(vehicleData.getEpsStatus())); + valuesList.add(String.valueOf(vehicleData.getAbsStatus())); + valuesList.add(String.valueOf(vehicleData.getMcuStatus())); + valuesList.add(String.valueOf(vehicleData.getHeatingStatus())); + valuesList.add(String.valueOf(vehicleData.getBatteryStatus())); + valuesList.add(String.valueOf(vehicleData.getBatteryInsulationStatus())); + valuesList.add(String.valueOf(vehicleData.getDcdcStatus())); + valuesList.add(String.valueOf(vehicleData.getChgStatus())); + return valuesList; + } + +} diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java index 3a316ae..781701a 100644 --- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java @@ -1,11 +1,28 @@ package com.hyc.kafka.demo.consumer; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.VehicleData; +import com.hyc.iotdbdemo.config.IotDBSessionConfig; +import com.hyc.iotdbdemo.server.IotDbServer; +import com.hyc.kafka.demo.strategy.Strategy; import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Random; + /** * kafka消费者 * @@ -17,15 +34,59 @@ import org.springframework.stereotype.Service; @Slf4j @Service public class KafkaConsumer { - @KafkaListener(topics = "topic1", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", + + @Autowired + private RedisTemplate redisTemplate; + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + @KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", errorHandler = "myKafkaListenerErrorHandler") public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { try { - Object value = consumerRecord.value(); - log.error("消费者得到的数据:{},所在分区:{}",value,consumerRecord.partition()); - }finally { + //策略map集合 + LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); + stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); + stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); + + //解析得到VIN + String value = (String) consumerRecord.value(); + + VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); + log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); + + iotDbServer.insertData(vehicleData); +// String dataMessage = value.toString(); +// JSONObject jsonObject = JSON.parseObject(dataMessage); +// //根据VIN得到该小车拥有的事件 +// String vin = jsonObject.get("vin").toString(); +// List eventList = getEvent(vin); +// //循环事件集合,并执行响应的事件 +// for (String event : eventList) { +// stringStrategyLinkedHashMap.get(event).exe(redisTemplate,dataMessage); +// } + + } catch (ServerException e) { + log.error("添加iotDb异常"); + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } finally { acknowledgment.acknowledge(); } + } + public List getEvent(String vin){ + ArrayList strings = new ArrayList<>(); + + strings.add("存储数据"); + int nextInt = new Random().nextInt(100); + if (nextInt % 2 ==0){ + strings.add("实时数据"); + } + return strings; } } diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java new file mode 100644 index 0000000..9775cfc --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java @@ -0,0 +1,30 @@ +package com.hyc.kafka.demo.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; + +/** + * kafka消费者 + * + * @author YouChe·He + * @ClassName: KafkaConsumer + * @Description: kafka消费者 + * @CreateTime: 2024/6/6 15:33 + */ +@Slf4j +@Service +public class KafkaConsumer1 { + @KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", + errorHandler = "myKafkaListenerErrorHandler") + public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { + try { + Object value = consumerRecord.value(); + log.error("消费者1得到的数据:{},所在分区:{}",value,consumerRecord.partition()); + }finally { + acknowledgment.acknowledge(); + } + } +} diff --git a/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java b/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java index c5c5273..9b1e669 100644 --- a/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java +++ b/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java @@ -33,8 +33,7 @@ public class KafkaController { @Transactional public void sendMessage(String message) { System.out.println("呼呼呼"); - - kafkaTemplate.send("topic1", UUID.randomUUID().toString(), message); + kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), message); } } diff --git a/src/main/java/com/hyc/kafka/demo/event/EventPosting.java b/src/main/java/com/hyc/kafka/demo/event/EventPosting.java new file mode 100644 index 0000000..3328dff --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/event/EventPosting.java @@ -0,0 +1,21 @@ +package com.hyc.kafka.demo.event; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * 事件配置中心 + * + * @author YouChe·He + * @ClassName: EventPosting + * @Description: 事件配置中心 + * @CreateTime: 2024/6/11 08:44 + */ +@Component +public class EventPosting { + @Autowired + private RedisTemplate redisTemplate; + + +} diff --git a/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java b/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java new file mode 100644 index 0000000..e06bce9 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java @@ -0,0 +1,37 @@ +package com.hyc.kafka.demo.strategy; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; + +/** + * 策略枚举类 + * + * @author YouChe·He + * @ClassName: StrategyA + * @Description: 策略枚举A + * @CreateTime: 2024/6/4 14:13 + */ +@Slf4j +public enum Strategy { + REAL_TIME_DATA{ + @Override + public void exe(RedisTemplate redisTemplate, String realData){ + + JSONObject jsonObject = JSON.parseObject(realData); + String vin = jsonObject.get("vin").toString(); + log.error("vin是:{}",vin); + if (redisTemplate.hasKey(vin)){ + redisTemplate.opsForList().rightPush(vin,realData); + } + } + }, + STORE_DATA{ + @Override + public void exe(RedisTemplate redisTemplate, String realData){ + System.out.println("执行具体策略B"); + } + }; + public abstract void exe(RedisTemplate redisTemplate, String realData); +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0accf78..48a66a3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,7 +5,7 @@ server: spring: rabbitmq: - host: 115.159.47.91 + host: 47.103.75.98 port: 5672 username: guest password: guest @@ -51,7 +51,7 @@ spring: kafka: producer: # Kafka服务器 - bootstrap-servers: 115.159.211.196:9092 + bootstrap-servers: 121.43.127.44:9092 # 开启事务,必须在开启了事务的方法中发送,否则报错 transaction-id-prefix: kafkaTx- # 发生错误后,消息重发的次数,开启事务必须设置大于0。 @@ -72,10 +72,10 @@ spring: consumer: # Kafka服务器 - bootstrap-servers: 115.159.211.196:9092 + bootstrap-servers: 121.43.127.44:9092 group-id: firstGroup # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D - #auto-commit-interval: 2s + auto-commit-interval: 2s # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)