feat()故障事件,监听redsi过期key

dev
20300 2024-06-18 17:11:26 +08:00
parent f8c5b56b10
commit 04fbe0741a
29 changed files with 506 additions and 715 deletions

18
pom.xml
View File

@ -14,6 +14,12 @@
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<!-- JPA-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- ehcahe-->
<dependency>
@ -34,6 +40,11 @@
<version>0.14.0-preview1</version>
</dependency>
<!-- redis-->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
@ -133,12 +144,7 @@
<version>8.0.11</version>
<scope>runtime</scope>
</dependency>
<!-- 转格式-->
<!-- <dependency>-->
<!-- <groupId>com.google.code.gson</groupId>-->
<!-- <artifactId>gson</artifactId>-->
<!-- <version>2.8.8</version>-->
<!-- </dependency>-->
<!-- kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>

View File

@ -15,12 +15,22 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
public class RabbitmqConfig {
public static final String FAULT_EXCHANGE = "fault_exchange";
public static final String FAULT_MESSAGE_QUEUE = "fault_message_queue";
public static final String FAULT_MESSAGE_ROUTINGKEY = "fault_message_router";
public static final String INIT_CONNECT = "init_connect";
public static final String DISCONNECT_CONNECT = "disconnect_connect";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
public static final String CREATE_MQTT_CLIENT = "create_mqtt_client";
@Bean(CREATE_MQTT_CLIENT)
public Queue CREATE_MQTT_CLIENT(){
return new Queue(CREATE_MQTT_CLIENT);
}
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
@ -28,12 +38,12 @@ public class RabbitmqConfig {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORM_EMAIL队列
//声明INIT_CONNECT队列
@Bean(INIT_CONNECT)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(INIT_CONNECT);
}
//声明QUEUE_INFORM_SMS队列
//声明DISCONNECT_CONNECT队列
@Bean(DISCONNECT_CONNECT)
public Queue QUEUE_INFORM_SMS(){
return new Queue(DISCONNECT_CONNECT);
@ -51,4 +61,21 @@ public class RabbitmqConfig {
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
@Bean(FAULT_EXCHANGE)
public Exchange FAULT_EXCHANGE(){
//durable(true) 持久化mq重启之后交换机还在
return ExchangeBuilder.topicExchange(FAULT_EXCHANGE).durable(true).build();
}
@Bean(FAULT_MESSAGE_QUEUE)
public Queue FAULT_MESSAGE_QUEUE(){
return new Queue(FAULT_MESSAGE_QUEUE);
}
@Bean
public Binding FAULT_MESSAGE_ROUTINGKEY(@Qualifier(FAULT_MESSAGE_QUEUE) Queue queue,
@Qualifier(FAULT_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(FAULT_MESSAGE_ROUTINGKEY).noargs();
}
}

View File

@ -1,10 +1,8 @@
package com.hyc.runner;
package com.hyc.config;
import com.alibaba.fastjson.JSONObject;
import com.hyc.domain.CacheCarEvent;
import com.hyc.domain.SummarizeResp;
import com.hyc.kafka.demo.service.CarEventService;
import com.hyc.service.SummarizeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@ -16,7 +14,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
* Runner
*
*
* @author YouChe·He
* @ClassName: SyncCacheRunner

View File

@ -0,0 +1,33 @@
package com.hyc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManagerFactory;
/**
*
*
* @author YouChe·He
* @ClassName: TransactionConfig
* @Description:
* @CreateTime: 2024/6/18 16:26
*/
@Configuration
@EnableTransactionManagement
public class TransactionConfig {
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory emf) {
return new JpaTransactionManager(emf);
}
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
}

View File

@ -1,72 +1,62 @@
package com.hyc.controller;
package com.hyc.consumer;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hyc.config.RabbitmqConfig;
import com.hyc.domain.ConnectionParameter;
import com.hyc.domain.SummarizeResp;
import com.hyc.kafka.demo.config.KafkaSendResultHandler;
import com.hyc.result.Result;
import com.hyc.service.SummarizeService;
import com.hyc.util.ConversionUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.*;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.UUID;
/**
* Mqtt
* MQTT
*
* @author YouChe·He
* @ClassName: CreateMqttClient
* @Description: Mqtt
* @CreateTime: 2024/5/31 10:25
* @ClassName: CreateMqttClientConsumer
* @Description: MQTT
* @CreateTime: 2024/6/18 15:45
*/
@Component
@Slf4j
@RestController
@RequestMapping("/create")
public class SummarizeController {
public class CreateMqttClientConsumer {
@Autowired
private SummarizeService summarizeService;
private TransactionTemplate transactionTemplate;
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
private final TransactionTemplate transactionTemplate;
public SummarizeController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler, TransactionTemplate transactionTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.transactionTemplate = transactionTemplate;
this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
}
@PostMapping("/createMqttClient")
public String createMqttClient(@RequestBody List<ConnectionParameter> connectionParameterList){
@RabbitListener(queues = RabbitmqConfig.CREATE_MQTT_CLIENT)
public void createMqttClient(String connectionParameterJson){
log.info("消息是:{}",connectionParameterJson);
String s = connectionParameterJson.replaceAll("\\\\", "");
String substring = s.substring(1, s.length() - 1);
log.error("切割之后:{}",substring);
ConnectionParameter connectionParameter = JSONObject.parseObject(substring, ConnectionParameter.class);
log.error("进来了!!!");
for (ConnectionParameter connectionParameter : connectionParameterList) {
boolean oneMqttClient = createOneMqttClient(connectionParameter);
if (oneMqttClient==true){
log.warn("broker为:{}的mqtt客户端连接成功",connectionParameter.getBroker());
}
}
return "成功!";
}
@GetMapping("/getAllLoadInfo")
public Result<SummarizeResp> getAllLoadInfo(){
return summarizeService.getAllLoadInfo();
}
@Transactional
public boolean createOneMqttClient(ConnectionParameter connectionParameter){
int qos = 0;
@ -96,9 +86,10 @@ public class SummarizeController {
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
log.info("topic: {}" , topic);
// System.out.println("topic: " + topic);
// System.out.println("Qos: " + message.getQos());
// System.out.println("message content: " + new String(message.getPayload()));
String resultString = ConversionUtil.hexStringToString(new String(message.getPayload()));
log.warn("解析后的字符串是:{}",resultString);
log.warn("长度为:{}",resultString.length());
@ -121,11 +112,13 @@ public class SummarizeController {
"positionStatus","easStatus","ptcStatus","epsStatus","absStatus","mcuStatus",
"heatingStatus","batteryStatus","batteryInsulationStatus","dcdcStatus","chgStatus"};
LinkedHashMap<String, Object> linkedHashMap = new LinkedHashMap<>();
HashMap<String, String> stringStringHashMap = new HashMap<>();
String vin = "vin";
for (int i = 0; i < 47; i++) {
String substring = realString.substring(count, count + intArr[i]);
if (strArr[i]=="vin" || strArr[i] == "gear"){
vin = substring;
linkedHashMap.put(strArr[i],substring);
}else if (strArr[i] == "startTime") {
linkedHashMap.put(strArr[i],Long.valueOf(substring));
@ -140,9 +133,10 @@ public class SummarizeController {
try {
String jsonData = objectMapper.writeValueAsString(linkedHashMap);
log.error("json格式:{}",jsonData);
String finalVin = vin;
transactionTemplate.execute(status -> {
try {
kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), jsonData);
kafkaTemplate.send("topichyc", finalVin, jsonData);
status.flush();
}catch (Exception e){
log.error("kafka生产异常: {}", e.getMessage());
@ -172,5 +166,4 @@ public class SummarizeController {
return true;
}
}

View File

@ -1,19 +0,0 @@
package com.hyc.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
*
*
* @author YouChe·He
* @ClassName: EventUpdateConsumer
* @Description:
* @CreateTime: 2024/6/16 10:28
*/
@Component
@Slf4j
public class EventUpdateConsumer {
}

View File

@ -1,68 +0,0 @@
//package com.hyc.consumer;
//
//import com.hyc.domain.ConnectionParameter;
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.*;
//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//import org.springframework.stereotype.Component;
//
///**
// * MQTT消费者
// *
// * @author YouChe·He
// * @ClassName: MQTTConsumer
// * @Description: MQTT消费者
// * @CreateTime: 2024/5/30 19:12
// */
//@Component
//@Slf4j
//public class MQTTConsumer {
//
// public void parseMessage(ConnectionParameter connectionParameter){
// int qos = 0;
//
// try {
// MqttClient client = null;
//
// client = new MqttClient(connectionParameter.getBroker(), connectionParameter.getClientid(), new MemoryPersistence());
//
//
// // 连接参数
// MqttConnectOptions options = new MqttConnectOptions();
// if (connectionParameter.getUsername() !=null && !"".equals(connectionParameter.getUsername())){
// options.setUserName(connectionParameter.getUsername());
// options.setPassword(connectionParameter.getPassword().toCharArray());
// }
//
// options.setConnectionTimeout(60);
// options.setKeepAliveInterval(60);
// // 设置回调
// client.setCallback(new MqttCallback() {
//
// @Override
// public void connectionLost(Throwable cause) {
// System.out.println("connectionLost: " + cause.getMessage());
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage message) {
// System.out.println("topic: " + topic);
// System.out.println("Qos: " + message.getQos());
// System.out.println("message content: " + new String(message.getPayload()));
//
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("deliveryComplete---------" + token.isComplete());
// }
//
// });
// client.connect(options);
// client.subscribe(connectionParameter.getTopic(), qos);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
//}

View File

@ -1,51 +0,0 @@
//package com.hyc.consumer;
//
//import com.alibaba.fastjson.JSONObject;
//import com.hyc.config.RabbitmqConfig;
//import com.hyc.domain.ConnectionParameter;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//
//import org.springframework.stereotype.Component;
//
///**
// * 断开连接消费者
// *
// * @author YouChe·He
// * @ClassName: ReceiveHandler
// * @Description: 断开连接消费者
// * @CreateTime: 2024/5/27 17:00
// */
//@Component
//@Slf4j
//public class ReceiveHandler {
//
// @Autowired
// private MQTTConsumer mqttConsumer;
//
// @RabbitListener(queues = {RabbitmqConfig.INIT_CONNECT})
// public void receive_email(Message msg){
// String messageBody = new String(msg.getBody());
// ConnectionParameter connectionParameter = getConnectionParameter(messageBody);
// mqttConsumer.parseMessage(connectionParameter);
// }
//
//
// public ConnectionParameter getConnectionParameter(String messageBody){
//
//
// log.error("链接事件得到的消息:{}",messageBody);
// JSONObject jsonObject = JSONObject.parseObject(messageBody);
// String carVin = jsonObject.getString("clientId");
//
// String[] split = carVin.split("-");
// String vin = split[0];
// String broker = split[1];
// String topic = split[2];
// log.warn("ConnectionParameter对象的属性 broker:{},vin:{},topic:{}",broker,vin,topic);
// return new ConnectionParameter(broker,null,null,vin,topic);
// }
//
//}

View File

@ -48,21 +48,21 @@ public class VehicleData {
private Double singleBatteryMaxTemperature;
private Double singleBatteryMinTemperature;
private Double availableBatteryCapacity;
private Double vehicleStatus;
private Double chargingStatus;
private Double operatingStatus;
private Double socStatus;
private Double chargingEnergyStorageStatus;
private Double driveMotorStatus;
private Double positionStatus;
private Double easStatus;
private Double ptcStatus;
private Double epsStatus;
private Double absStatus;
private Double mcuStatus;
private Double heatingStatus;
private Double batteryStatus;
private Double batteryInsulationStatus;
private Double dcdcStatus;
private Double chgStatus;
private Integer vehicleStatus;
private Integer chargingStatus;
private Integer operatingStatus;
private Integer socStatus;
private Integer chargingEnergyStorageStatus;
private Integer driveMotorStatus;
private Integer positionStatus;
private Integer easStatus;
private Integer ptcStatus;
private Integer epsStatus;
private Integer absStatus;
private Integer mcuStatus;
private Integer heatingStatus;
private Integer batteryStatus;
private Integer batteryInsulationStatus;
private Integer dcdcStatus;
private Integer chgStatus;
}

View File

@ -1,4 +1,4 @@
package com.hyc.result;
package com.hyc.domain.result;
/**
*

View File

@ -1,4 +1,4 @@
package com.hyc.result;
package com.hyc.domain.result;
import lombok.Data;

View File

@ -126,23 +126,23 @@ public class IotDbServerImpl implements IotDbServer {
iotDbResult.setSingleBatteryMaxTemperature(Double.valueOf(map.get("singleBatteryMaxTemperature")));
iotDbResult.setSingleBatteryMinTemperature(Double.valueOf(map.get("singleBatteryMinTemperature")));
iotDbResult.setAvailableBatteryCapacity(Double.valueOf(map.get("availableBatteryCapacity")));
iotDbResult.setVehicleStatus(Double.valueOf(map.get("vehicleStatus")));
iotDbResult.setChargingStatus(Double.valueOf(map.get("chargingStatus")));
iotDbResult.setOperatingStatus(Double.valueOf(map.get("operatingStatus")));
iotDbResult.setSocStatus(Double.valueOf(map.get("socStatus")));
iotDbResult.setChargingEnergyStorageStatus(Double.valueOf(map.get("chargingEnergyStorageStatus")));
iotDbResult.setDriveMotorStatus(Double.valueOf(map.get("driveMotorStatus")));
iotDbResult.setPositionStatus(Double.valueOf(map.get("positionStatus")));
iotDbResult.setEasStatus(Double.valueOf(map.get("easStatus")));
iotDbResult.setPtcStatus(Double.valueOf(map.get("ptcStatus")));
iotDbResult.setEpsStatus(Double.valueOf(map.get("epsStatus")));
iotDbResult.setAbsStatus(Double.valueOf(map.get("absStatus")));
iotDbResult.setMcuStatus(Double.valueOf(map.get("mcuStatus")));
iotDbResult.setHeatingStatus(Double.valueOf(map.get("heatingStatus")));
iotDbResult.setBatteryStatus(Double.valueOf(map.get("batteryStatus")));
iotDbResult.setBatteryInsulationStatus(Double.valueOf(map.get("batteryInsulationStatus")));
iotDbResult.setDcdcStatus(Double.valueOf(map.get("dcdcStatus")));
iotDbResult.setChgStatus(Double.valueOf(map.get("chgStatus")));
iotDbResult.setVehicleStatus(Integer.valueOf(map.get("vehicleStatus")));
iotDbResult.setChargingStatus(Integer.valueOf(map.get("chargingStatus")));
iotDbResult.setOperatingStatus(Integer.valueOf(map.get("operatingStatus")));
iotDbResult.setSocStatus(Integer.valueOf(map.get("socStatus")));
iotDbResult.setChargingEnergyStorageStatus(Integer.valueOf(map.get("chargingEnergyStorageStatus")));
iotDbResult.setDriveMotorStatus(Integer.valueOf(map.get("driveMotorStatus")));
iotDbResult.setPositionStatus(Integer.valueOf(map.get("positionStatus")));
iotDbResult.setEasStatus(Integer.valueOf(map.get("easStatus")));
iotDbResult.setPtcStatus(Integer.valueOf(map.get("ptcStatus")));
iotDbResult.setEpsStatus(Integer.valueOf(map.get("epsStatus")));
iotDbResult.setAbsStatus(Integer.valueOf(map.get("absStatus")));
iotDbResult.setMcuStatus(Integer.valueOf(map.get("mcuStatus")));
iotDbResult.setHeatingStatus(Integer.valueOf(map.get("heatingStatus")));
iotDbResult.setBatteryStatus(Integer.valueOf(map.get("batteryStatus")));
iotDbResult.setBatteryInsulationStatus(Integer.valueOf(map.get("batteryInsulationStatus")));
iotDbResult.setDcdcStatus(Integer.valueOf(map.get("dcdcStatus")));
iotDbResult.setChgStatus(Integer.valueOf(map.get("chgStatus")));
iotDbResultList.add(iotDbResult);
}
}

View File

@ -47,31 +47,16 @@ public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(16);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//自动提交的时间间隔,自动提交开启时生效
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
//earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
//latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
//none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
//这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
//这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
//要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
//注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
//序列化建议使用Json这种序列化方式可以无需额外配置传输实体类
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return propsMap;
@ -79,33 +64,22 @@ public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
// 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
deserializer.trustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
}
}
/**
* KafkaListenerContainerFactorySpring KafkaKafkaListenerContainer
* KafkaListenerContainerKafkaKafkaAPI便使
* KafkaListenerContainerFactoryKafkaListenerContainer
* Spring KafkaKafkaListenerContainerFactoryKafkaListenerContainerKafka
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//在侦听器容器中运行的线程数,一般设置为 机器数*分区数
factory.setConcurrency(concurrency);
// 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
factory.setMissingTopicsFatal(missingTopicsFatal);
// 自动提交关闭,需要设置手动消息确认
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(pollTimeout);
// 设置为批量监听需要用List接收
// factory.setBatchListener(true);
factory.setBatchListener(true);
return factory;
}

View File

@ -1,5 +1,6 @@
package com.hyc.kafka.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@ -12,14 +13,15 @@ import org.springframework.stereotype.Component;
* kafka
*/
@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功:" + producerRecord.toString());
log.info("消息发送成功:{}" , producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
log.error("消息发送失败:{}{}",producerRecord.toString() ,exception.getMessage());
}
}

View File

@ -0,0 +1,37 @@
package com.hyc.kafka.demo.config;
import com.hyc.kafka.demo.listener.KeyExpiredListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* redis
*
* @author YouChe·He
* @ClassName: RedisConfiguration
* @Description: redis
* @CreateTime: 2024/6/20 21:57
*/
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(redisMessageListenerContainer(),rabbitTemplate);
}
}

View File

@ -6,12 +6,13 @@ import com.hyc.domain.VehicleData;
import com.hyc.domain.constant.EventConstant;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.server.IotDbServer;
import com.hyc.kafka.demo.strategy.Strategy;
import com.hyc.kafka.demo.service.CarEventService;
import com.hyc.kafka.demo.strategy.EventStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@ -21,6 +22,7 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
@ -52,16 +54,29 @@ public class KafkaConsumer {
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
LinkedHashMap<String, EventStrategy> eventStrategyLinkedHashMap = new LinkedHashMap<>();
@PostConstruct
public void initStrategyMap(){
eventStrategyLinkedHashMap.put("ELECTRONIC_FENCE",EventStrategy.ELECTRONIC_FENCE);
eventStrategyLinkedHashMap.put("FAULT_ALARM",EventStrategy.FAULT_ALARM);
eventStrategyLinkedHashMap.put("INDEX_WARNING",EventStrategy.INDEX_WARNING);
eventStrategyLinkedHashMap.put("REAL_TIME_DATA",EventStrategy.REAL_TIME_DATA);
}
@KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
errorHandler = "myKafkaListenerErrorHandler")
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
public void consume(List<ConsumerRecord<Object,Object>> consumerRecords, Acknowledgment acknowledgment) {
try {
//策略map集合
// LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
//解析得到VIN
log.info("拉取到的记录数是:{}",consumerRecords.size());
for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
String value = (String) consumerRecord.value();
VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class);
@ -72,25 +87,14 @@ public class KafkaConsumer {
String carEventByVin = this.getCarEventByVin(vehicleData.getVin());
//解析绑定的信息
String[] eventArr = carEventByVin.split(",");
for (String eventNumber : eventArr) {
//电子围栏事件
if (eventNumber.equals("1")){
//从缓存中查询车辆绑定的围栏组,与车辆信息进行比较
}
//实时数据事件
if (eventNumber.equals("2")) {
if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){
//有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中
String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA;
String carData = JSONObject.toJSONString(vehicleData);
this.updateValueWithoutChangingTTL(key, value);
for (String eventString : eventArr) {
//处理事件
eventStrategyLinkedHashMap.get(eventString).exe(cacheManager,redisTemplate,vehicleData,rabbitTemplate);
}
}
//故障预警事件
if (eventNumber.equals("3")) {
//解析得到VIN
}
}
//解析得到的VIN
// String dataMessage = value.toString();
@ -162,7 +166,7 @@ public class KafkaConsumer {
Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS);
// 更新键的值
redisTemplate.opsForValue().set(key, newValue);
redisTemplate.opsForList().rightPush( key,newValue);
// 重新设置过期时间
if (currentTTL != null && currentTTL > 0) {

View File

@ -7,10 +7,12 @@ import com.hyc.domain.constant.EventConstant;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.server.IotDbServer;
import com.hyc.kafka.demo.service.CarEventService;
import com.hyc.kafka.demo.strategy.EventStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@ -19,9 +21,11 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -43,24 +47,29 @@ public class KafkaConsumer1 {
@Autowired
private CarEventService carEventService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
LinkedHashMap<String, EventStrategy> eventStrategyLinkedHashMap = new LinkedHashMap<>();
@PostConstruct
public void initStrategyMap(){
eventStrategyLinkedHashMap.put("ELECTRONIC_FENCE",EventStrategy.ELECTRONIC_FENCE);
eventStrategyLinkedHashMap.put("FAULT_ALARM",EventStrategy.FAULT_ALARM);
eventStrategyLinkedHashMap.put("INDEX_WARNING",EventStrategy.INDEX_WARNING);
eventStrategyLinkedHashMap.put("REAL_TIME_DATA",EventStrategy.REAL_TIME_DATA);
}
@KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
errorHandler = "myKafkaListenerErrorHandler")
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
public void consume(List<ConsumerRecord<Object,Object>> consumerRecords, Acknowledgment acknowledgment) {
try {
//策略map集合
// LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
//解析得到VIN
log.info("拉取到的记录数是:{}",consumerRecords.size());
for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
String value = (String) consumerRecord.value();
VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class);
log.error("消费者1得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition());
@ -69,25 +78,13 @@ public class KafkaConsumer1 {
String carEventByVin = this.getCarEventByVin(vehicleData.getVin());
//解析绑定的信息
String[] eventArr = carEventByVin.split(",");
for (String eventNumber : eventArr) {
//电子围栏事件
if (eventNumber.equals("1")){
//从缓存中查询车辆绑定的围栏组,与车辆信息进行比较
}
//实时数据事件
if (eventNumber.equals("2")) {
if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){
//有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中
String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA;
String carData = JSONObject.toJSONString(vehicleData);
this.updateValueWithoutChangingTTL(key, value);
for (String eventString : eventArr) {
//处理事件
eventStrategyLinkedHashMap.get(eventString).exe(cacheManager,redisTemplate,vehicleData,rabbitTemplate);
}
}
//故障预警事件
if (eventNumber.equals("3")) {
//解析得到VIN
}
}
// String dataMessage = value.toString();
@ -159,7 +156,7 @@ public class KafkaConsumer1 {
Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS);
// 更新键的值
redisTemplate.opsForValue().set(key, newValue);
redisTemplate.opsForList().rightPush( key,newValue);
// 重新设置过期时间
if (currentTTL != null && currentTTL > 0) {

View File

@ -1,41 +0,0 @@
package com.hyc.kafka.demo.controller;
import com.hyc.kafka.demo.config.KafkaSendResultHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* kafka
*
* @author YouChe·He
* @ClassName: KafkaController
* @Description: kafka
* @CreateTime: 2024/6/6 14:42
*/
@RestController
public class KafkaController {
private KafkaTemplate<Object, Object> kafkaTemplate;
public KafkaController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
}
@GetMapping("/send")
@Transactional
public void sendMessage(String message) {
System.out.println("呼呼呼");
kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), message);
}
}

View File

@ -1,21 +0,0 @@
package com.hyc.kafka.demo.event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
*
*
* @author YouChe·He
* @ClassName: EventPosting
* @Description:
* @CreateTime: 2024/6/11 08:44
*/
@Component
public class EventPosting {
@Autowired
private RedisTemplate<String,String> redisTemplate;
}

View File

@ -0,0 +1,165 @@
package com.hyc.kafka.demo.strategy;
import com.alibaba.fastjson.JSONObject;
import com.hyc.domain.VehicleData;
import com.hyc.domain.constant.EventConstant;
import com.hyc.util.EventHandleUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;
/**
*
*
* @author YouChe·He
* @ClassName: StrategyA
* @Description: A
* @CreateTime: 2024/6/4 14:13
*/
@Slf4j
public enum EventStrategy {
/**
*
*/
REAL_TIME_DATA {
@Override
public void exe(CacheManager cacheManager,RedisTemplate<String, String> redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) {
log.info("执行实时数据逻辑");
if (redisTemplate.hasKey(vehicleData.getVin() + EventConstant.REAL_TIME_DATA)) {
//有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中
String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA;
// 获取当前的过期时间
Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS);
// 更新键的值
redisTemplate.opsForList().rightPush(key, JSONObject.toJSONString(vehicleData));
// 重新设置过期时间
if (currentTTL != null && currentTTL > 0) {
redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS);
}
}
}
},
/**
*
*/
FAULT_ALARM {
@Override
public void exe(CacheManager cacheManager,RedisTemplate<String, String> redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) {
System.out.println("执行故障报警逻辑");
if (vehicleData.getVehicleStatus() == 0) {
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ001", rabbitTemplate);
}
if (vehicleData.getChargingStatus() == 0) {
// 处理 chargingStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ002", rabbitTemplate);
}
if (vehicleData.getOperatingStatus() == 0) {
// 处理 operatingStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ003", rabbitTemplate);
}
if (vehicleData.getSocStatus() == 0) {
// 处理 socStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ004", rabbitTemplate);
}
if (vehicleData.getChargingEnergyStorageStatus() == 0) {
// 处理 chargingEnergyStorageStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ005", rabbitTemplate);
}
if (vehicleData.getDriveMotorStatus() == 0) {
// 处理 driveMotorStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ006", rabbitTemplate);
}
if (vehicleData.getPositionStatus() == 0) {
// 处理 positionStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ007", rabbitTemplate);
}
if (vehicleData.getEasStatus() == 0) {
// 处理 easStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ008", rabbitTemplate);
}
if (vehicleData.getPtcStatus() == 0) {
// 处理 ptcStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ009", rabbitTemplate);
}
if (vehicleData.getEpsStatus() == 0) {
// 处理 epsStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ010", rabbitTemplate);
}
if (vehicleData.getAbsStatus() == 0) {
// 处理 absStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ011", rabbitTemplate);
}
if (vehicleData.getMcuStatus() == 0) {
// 处理 mcuStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ012", rabbitTemplate);
}
if (vehicleData.getHeatingStatus() == 0) {
// 处理 heatingStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ013", rabbitTemplate);
}
if (vehicleData.getBatteryStatus() == 0) {
// 处理 batteryStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ014", rabbitTemplate);
}
if (vehicleData.getBatteryInsulationStatus() == 0) {
// 处理 batteryInsulationStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ015", rabbitTemplate);
}
if (vehicleData.getDcdcStatus() == 0) {
// 处理 dcdcStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ016", rabbitTemplate);
}
if (vehicleData.getChgStatus() == 0) {
// 处理 chgStatus 为 0 的情况
EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ017", rabbitTemplate);
}
}
},
/**
*
*/
ELECTRONIC_FENCE {
@Override
public void exe(CacheManager cacheManager,RedisTemplate<String, String> redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) {
log.info("执行电子围栏逻辑");
}
},
/**
*
*/
INDEX_WARNING {
@Override
public void exe(CacheManager cacheManager,RedisTemplate<String, String> redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) {
log.info("执行指标预警逻辑");
}
};
public abstract void exe(CacheManager cacheManager, RedisTemplate<String, String> redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate);
}

View File

@ -1,37 +0,0 @@
package com.hyc.kafka.demo.strategy;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
/**
*
*
* @author YouChe·He
* @ClassName: StrategyA
* @Description: A
* @CreateTime: 2024/6/4 14:13
*/
@Slf4j
public enum Strategy {
REAL_TIME_DATA{
@Override
public void exe(RedisTemplate<String,String> redisTemplate, String realData){
JSONObject jsonObject = JSON.parseObject(realData);
String vin = jsonObject.get("vin").toString();
log.error("vin是:{}",vin);
if (redisTemplate.hasKey(vin)){
redisTemplate.opsForList().rightPush(vin,realData);
}
}
},
STORE_DATA{
@Override
public void exe(RedisTemplate<String,String> redisTemplate, String realData){
System.out.println("执行具体策略B");
}
};
public abstract void exe(RedisTemplate<String,String> redisTemplate, String realData);
}

View File

@ -1,31 +0,0 @@
package com.hyc.mapper;
/**
*
*
* @author YouChe·He
* @ClassName: SummarizeMapper
* @Description:
* @CreateTime: 2024/6/2 16:47
*/
import com.hyc.domain.VinIp;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.web.bind.annotation.Mapping;
import javax.annotation.ManagedBean;
import java.util.List;
/**
*@ClassName SummarizeMapper
*@Description
*@Author ZHIHAO.DAI
*@Date 2024/6/2 16:47
*/
@Mapper
public interface SummarizeMapper {
List<VinIp> getAllLoadInfo();
List<VinIp> getNewConnectVin();
}

View File

@ -1,22 +0,0 @@
package com.hyc.service;
/**
*
*
* @author YouChe·He
* @ClassName: SummarizeService
* @Description:
* @CreateTime: 2024/6/2 16:46
*/
import com.hyc.domain.SummarizeResp;
import com.hyc.result.Result;
/**
*@ClassName SummarizeService
*@Description
*@Author ZHIHAO.DAI
*@Date 2024/6/2 16:46
*/
public interface SummarizeService {
Result<SummarizeResp> getAllLoadInfo();
}

View File

@ -1,191 +0,0 @@
package com.hyc.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.ecs20140526.Client;
import com.aliyun.ecs20140526.models.DescribeInstancesRequest;
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;
import com.hyc.domain.GetWayServerLoad;
import com.hyc.domain.SummarizeResp;
import com.hyc.domain.TimeSortCar;
import com.hyc.domain.VinIp;
import com.hyc.mapper.SummarizeMapper;
import com.hyc.result.Result;
import com.hyc.service.SummarizeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;
/**
*
*
* @author YouChe·He
* @ClassName: SummarizeServiceImpl
* @Description:
* @CreateTime: 2024/6/2 16:46
*/
@Service
@Slf4j
public class SummarizeServiceImpl implements SummarizeService {
@Autowired
private SummarizeMapper summarizeMapper;
@Autowired
private RestTemplate restTemplate;
@Override
public Result<SummarizeResp> getAllLoadInfo() {
ArrayList<GetWayServerLoad> getWayServerLoads = new ArrayList<>();
Integer carSum = 0;
Double allLoad = 0.0;
Integer gateWayServerNum = 0;
Integer dataAnalyze = 22;
ArrayList<String> ipList = new ArrayList<>();
List<VinIp> connectLongVinList = summarizeMapper.getAllLoadInfo();
List<VinIp> newConnectVinList = summarizeMapper.getNewConnectVin();
try {
Client client = this.createClient();
//填写地区参数和实例规格参数
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.setRegionId("cn-shanghai")
.setInstanceType("ecs.e-c1m2.xlarge");
RuntimeOptions runtime = new RuntimeOptions();
//获得ip并添加进入集合
DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime);
List<List<String>> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList());
for (List<String> strings : ipListList) {
for (String ip : strings) {
ipList.add(ip);
gateWayServerNum ++;
}
System.out.println("------------------------");
}
//遍历ip 访问URL
for (String ip : ipList) {
String url = "http://" + ip + ":8080/public/login";
Map<String, Object> request = new HashMap<>();
request.put("username", "fluxmq");
request.put("password", "fluxmq");
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> r = new HttpEntity<>(request, httpHeaders);
String result = restTemplate.postForObject(url, r, String.class);
//http://fluxmq.muyu.icu/public/cluster
int nextInt = new Random().nextInt(1000);
String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt;
HttpHeaders httpHeadersGetInfo = new HttpHeaders();
httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON);
httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
httpHeadersGetInfo.set("Cookie", result);
HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo);
ResponseEntity<String> responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1);
log.info("响应是:{}", responseInfo.getBody());
JSONArray jsonArray = JSON.parseArray(responseInfo.getBody());
if (jsonArray.size() > 0) {
JSONObject jsonObject = jsonArray.getJSONObject(0);
Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize"));
carSum = connectSize + carSum;
log.info("负载:{}",connectSize*1.0/100);
allLoad = connectSize*1.0/100;
String cSys = jsonObject.getJSONObject("cpuInfo").getString("idle");
int i = cSys.indexOf("%");
Double cpuInfo = Double.valueOf(cSys.substring(0, i));
log.info("CPU利用率:{}",cSys);
String heapUsed =jsonObject.getJSONObject("jvmInfo").getString("heap-used");
int x = heapUsed.indexOf("M");
String realHeapUsed = heapUsed.substring(0, x);
String noHeapUsed = jsonObject.getJSONObject("jvmInfo").getString("no_heap-used");
int y = noHeapUsed.indexOf("M");
String realNoHeapUsed = noHeapUsed.substring(0, y);
String heapMax = jsonObject.getJSONObject("jvmInfo").getString("heap-max");
int z = heapMax.indexOf("G");
String realHeapMax = heapMax.substring(0, z);
double v = (Double.valueOf(realHeapUsed) + Double.valueOf(realNoHeapUsed))/ (Double.valueOf(realHeapMax) * 1000) * 100;
BigDecimal bd = new BigDecimal(v).setScale(2, BigDecimal.ROUND_HALF_UP);
double roundedNumber = bd.doubleValue();
log.info("heapUsed:{},noHeapUsed:{},heapMax:{},内存使用率:{}",heapUsed,noHeapUsed,heapMax,roundedNumber);
getWayServerLoads.add(new GetWayServerLoad(ip,connectSize*1.0/100*100,roundedNumber,cpuInfo));
log.info("链接数量:{}", connectSize);
} else {
log.error("得到的相应数据为null");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
ArrayList<TimeSortCar> sortCarArrayList = new ArrayList<>();
for (VinIp vinIp : connectLongVinList) {
Date connectTime = vinIp.getConnectTime();
Date currentTime = new Date();
// 计算时间差(单位:毫秒)
long timeDifference = currentTime.getTime() - connectTime.getTime();
// 转换为分钟数
long minutesDifference = timeDifference / (1000 * 60);
sortCarArrayList.add(new TimeSortCar(vinIp.getVin(),vinIp.getIp(),vinIp.getConnectTime(),minutesDifference));
}
Collections.sort(sortCarArrayList);
SummarizeResp summarizeResp = new SummarizeResp(gateWayServerNum,dataAnalyze,allLoad / gateWayServerNum * 100,carSum,sortCarArrayList,getWayServerLoads,newConnectVinList);
return Result.success(summarizeResp);
}
public Client createClient() throws Exception {
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
Config config = new Config()
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
.setAccessKeyId("LTAI5tAEQA9AgnqasQ7Y56cJ")
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
.setAccessKeySecret("IsrnZ6dKBgEit5HXv2xyfo0xT8VGkj");
// Endpoint 请参考 https://api.aliyun.com/product/Ecs
config.endpoint = "ecs.cn-shanghai.aliyuncs.com";
return new Client(config);
}
}

View File

@ -0,0 +1,30 @@
package com.hyc.simulate.faulttest;
import com.hyc.config.RabbitmqConfig;
import com.hyc.domain.FaultMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
/**
*
*
* @author YouChe·He
* @ClassName: FaultTestController
* @Description:
* @CreateTime: 2024/6/21 09:17
*/
@Slf4j
@Component
public class FaultTest {
@RabbitListener(queues = RabbitmqConfig.FAULT_MESSAGE_QUEUE)
public void receiveMessage(String faultMessage, Channel channel, Message message) throws Exception {
// 处理消息
log.error("故障消费者得到的消息是:{}",faultMessage);
}
}

View File

@ -0,0 +1,12 @@
package com.hyc.simulate.faulttest.domain;
/**
*
*
* @author YouChe·He
* @ClassName: FaultRecord
* @Description:
* @CreateTime: 2024/6/21 11:32
*/
public class FaultRecord {
}

View File

@ -3,7 +3,7 @@ package com.hyc.simulate.redistest;
import com.alibaba.fastjson.JSONObject;
import com.hyc.domain.VehicleData;
import com.hyc.domain.constant.EventConstant;
import com.hyc.result.Result;
import com.hyc.domain.result.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
@ -28,15 +28,16 @@ public class RedisCreateKey {
@RequestMapping("/createRealTimeDataKey")
public Result createRealTimeDataKey(String vin) {
redisTemplate.opsForValue().set(vin + EventConstant.REAL_TIME_DATA,"",3, TimeUnit.MINUTES);
redisTemplate.opsForList().rightPush(vin + EventConstant.REAL_TIME_DATA,"");
redisTemplate.expire(vin+EventConstant.REAL_TIME_DATA, 180, TimeUnit.SECONDS); // 设置键 "key" 的过期时间为180秒
return Result.success("","创建成功");
}
@RequestMapping("/getDataByRedis")
public Result<VehicleData> getDataByRedis(String vin){
String vehicleJson = redisTemplate.opsForValue().get(vin + EventConstant.REAL_TIME_DATA);
String vehicleJson = redisTemplate.opsForList().index(vin+EventConstant.REAL_TIME_DATA, -1);
redisTemplate.expire(vin+EventConstant.REAL_TIME_DATA, 180, TimeUnit.SECONDS); // 设置键 "your_key" 的过期时间为180秒
VehicleData vehicleData = JSONObject.parseObject(vehicleJson, VehicleData.class);
redisTemplate.opsForValue().set(vin+EventConstant.REAL_TIME_DATA,vehicleJson,3,TimeUnit.MINUTES);
return Result.success(vehicleData);
}
}

View File

@ -22,9 +22,9 @@ spring:
matching-strategy: ant_path_matcher
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://47.103.75.98:3306/netcar?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false
url: jdbc:mysql://47.121.124.220:3306/netcar?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false
username: root
password: hyc@123
password: hyc2002
druid:
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
@ -52,81 +52,64 @@ spring:
application:
name: shop-server
redis:
database: 0
host: 47.103.75.98
port: 6379
password:
jedis:
pool:
#最大连接数
max-active: 15
#最大阻塞等待时间(负数表示没限制)
max-wait: -1
#最大空闲
max-idle: 15
#最小空闲
min-idle: 0
#连接超时时间
timeout: 10000
kafka:
producer:
# Kafka服务器
bootstrap-servers: 121.43.127.44:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# 开启事务时必须设置为all
acks: all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小。
batch-size: 86384
buffer-memory: 1024000
# 键的序列化方式
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
# Kafka服务器
bootstrap-servers: 121.43.127.44:9092
group-id: firstGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-commit-interval: 10S
poll-interval: 10000
auto-offset-reset: latest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
spring:
json:
trusted:
packages: "*"
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3
max-poll-records: 300
fetch-min-size: 1024
fetch-max-wait: 1000
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
concurrency: 1
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
type: batch
# mybatis
mybatis:

View File

@ -22,6 +22,16 @@
timeToIdleSeconds="0"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU"/>
<cache
name="faultMessage"
eternal="false"
maxElementsInMemory="10000"
overflowToDisk="false"
diskPersistent="false"
timeToLiveSeconds="5"
timeToIdleSeconds="0"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU"/>
<!-- 存储到磁盘时的路径-->
<diskStore path="D:\workspace\parse-message\src\main\resources\log\ehcache" />