feat():elk监听日志

master
Saisai Liu 2024-05-22 22:44:15 +08:00
parent 85b84ffd2d
commit aff10b4d77
12 changed files with 14228 additions and 9 deletions

File diff suppressed because it is too large Load Diff

View File

@ -70,8 +70,13 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
String databaseName = basicConfigInfo.getDatabaseName();
DruidDataSource druidDataSource = map.get(host + ":" + port + "/" + databaseName);
threadSource.set(druidDataSource);
DruidDataSource init = null;
try {
return druidDataSource.getConnection();
init = druidDataSource;
if (init==null){
init = this.init(basicConfigInfo);
}
return init.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}

View File

@ -25,8 +25,8 @@ public class EtlApplicationRunner implements ApplicationRunner {
@Autowired
private BasicConfigInfoService basicConfigInfoService;
// @Autowired
// private RedisService redisService;
@Autowired
private RedisService redisService;
@Autowired
@ -43,11 +43,11 @@ public class EtlApplicationRunner implements ApplicationRunner {
@Scheduled(zone = "0 0/10 * * * ? *")
public void delay(){
// if (!redisService.hasKey(assetCache.preKey()+"basic")) try {
// throw new ServletException("过期了");
// } catch (ServletException e) {
// throw new RuntimeException(e);
// }
// redisService.expire(assetCache.preKey()+"basic",10, TimeUnit.MINUTES);
if (!redisService.hasKey(assetCache.preKey()+"basic")) try {
throw new ServletException("过期了");
} catch (ServletException e) {
throw new RuntimeException(e);
}
redisService.expire(assetCache.preKey()+"basic",10, TimeUnit.MINUTES);
}
}

View File

@ -230,6 +230,7 @@ public class BasicConfigInfoServiceImpl extends ServiceImpl<BasicConfigInfoMappe
message.getMessageProperties().setConsumerTag(UUID.randomUUID().toString().replaceAll("-", ""));
return message;
});
log.info("放入队列");
} catch (SQLException e) {
log.error(e.getMessage());
basicConfigInfo.setIsTest("0");

View File

@ -102,5 +102,46 @@
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
<!-- 时序数据库iotdb-->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,188 @@
package com.muyu.iotdb;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
/**
* description: iotdb
* root.a1eaKSRpRty.CA3013A303A25467
* root.a1eaKSRpRty.CA3013A303A25467.heart root.a1eaKSRpRty
* author: Saisai.Liu
*/
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "43.142.100.73";
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
/**
* description: - insertRecord
* author: Saisai.Liu
* @param * @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* time:
* measurementsList
* type BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
* valuesList ---
* @return
*/
public void insertRecordType(String deviceId, Long time,List<String> measurementsList, TSDataType type,List<Object> valuesList)
throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() != valuesList.size()) {
throw new ServerException("measurementsList 与 valuesList 值不对应");
}
List<TSDataType> types = new ArrayList<>();
measurementsList.forEach(item -> {
types.add(type);
});
session.insertRecord(deviceId, time, measurementsList, types, valuesList);
}
/**
* description: - insertRecord
* author: Saisai.Liu
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param measurementsList
* @param valuesList ---
* @return
*/
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList)
throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: Saisai.Liu
*/
public void insertRecords(List<String> deviceIdList, List<Long> timeList, List<List<String>> measurementsList, List<List<String>> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecords(deviceIdList, timeList, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: Saisai.Liu
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param schemaList: + List<MeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64));
* @param maxRowNumber
* @return
*/
public void insertTablet(String deviceId, Long time,List<MeasurementSchema> schemaList, List<Object> valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException {
Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
// 向iotdb里面添加数据
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, time);
for (int i = 0; i < valueList.size(); i++) {
tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i));
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
}
/**
* description: SQL
* author: Saisai.Liu
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql);
}
/**
* description: root.a1eaKSRpRty
* author: Saisai.Liu
* @param groupName
* @return
*/
public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroup(groupName);
}
/**
* description: Timeseries root.a1eaKSRpRty.CA3013A303A25467.breath
* author: Saisai.Liu
*/
public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseries);
}
/**
* description: Timeseries
* author: Saisai.Liu
*/
public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseriesList);
}
/**
* description:
* author: Saisai.Liu
*/
public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroups(storageGroupList);
}
/**
* description:
* author: Saisai.Liu
*/
public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(path, endTime);
}
/**
* description:
* author: Saisai.Liu
*/
public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, endTime);
}
/**
* description:
* author: Saisai.Liu
*/
public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, startTime, endTime);
}
}

View File

@ -0,0 +1,60 @@
package com.muyu.iotdb;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.rmi.ServerException;
/**
* description: iotdb
* date: 2022/8/15 21:50
* author: zhouhong
*/
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
*
*
* @param iotDbParam
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(iotDbParam);
return ResponseData.success();
}
/**
*
*
* @param iotDbParam
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
}
/**
*
*
* @return
*/
@PostMapping("/api/device/deleteGroup")
public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
return ResponseData.success();
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.iotdb;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* description:
* date: 2022/8/15 21:53
* author: Saisai.Liu
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IotDbParam {
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private Long time;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
/***
*
*/
private String startTime;
/***
*
*/
private String endTime;
}

View File

@ -0,0 +1,37 @@
package com.muyu.iotdb;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* description:
* date: 2022/8/15 21:56
* author: Saisai.Liu
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IotDbResult {
/***
*
*/
private String time;
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
}

View File

@ -0,0 +1,19 @@
package com.muyu.iotdb;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
import java.util.List;
/**
* @ClassName IotDbServer
* @Description
* @Author SaiSai.Liu
* @Date 2024/5/21 16:28
*/
public interface IotDbServer {
void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException;
List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
}

View File

@ -0,0 +1,106 @@
package com.muyu.iotdb;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* description: iot
* date: 2022/8/15 9:43
* author: Saisai.Liu
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey." + iotDbParam.getPk() + "." + iotDbParam.getSn();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("heart");
measurementsList.add("breath");
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(iotDbParam.getHeart()));
valuesList.add(String.valueOf(iotDbParam.getBreath()));
iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
}
@Override
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
List<IotDbResult> iotDbResultList = new ArrayList<>();
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
String sql = "select * from root.bizkey." + iotDbParam.getPk() + "." + iotDbParam.getSn() + " where time >= "
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空");
}
return iotDbResultList;
}
/**
*
*
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
IotDbResult iotDbResult = new IotDbResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setTime(timeString);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setTime(timeString);
iotDbResult.setPk(iotDbParam.getPk());
iotDbResult.setSn(iotDbParam.getSn());
iotDbResult.setHeart(map.get("heart"));
iotDbResult.setBreath(map.get("breath"));
iotDbResultList.add(iotDbResult);
}
}
}
}

View File

@ -0,0 +1,52 @@
package com.muyu.iotdb;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @ClassName ResponseData
* @Description
* @Author SaiSai.Liu
* @Date 2024/5/21 16:29
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResponseData {
private Integer code;
private String msg;
private Object data;
public ResponseData(int code, String msg, Object data) {
this.code = code;
this.msg = msg;
this.data = data;
}
public ResponseData(int code, String msg) {
this.code = code;
this.msg = msg;
this.data = null;
}
public static ResponseData success(String msg, Object data) {
return new ResponseData(200, msg, data);
}
public static ResponseData success(String msg) {
return new ResponseData(200, msg, null);
}
public static ResponseData success() {
return new ResponseData(200, "请求成功", null);
}
public static ResponseData success(Object data) {
return new ResponseData(200, "请求成功", data);
}
}