diff --git a/pom.xml b/pom.xml index 5f280e8..377bd69 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,14 @@ 2.6.13 + + + org.springframework.boot + spring-boot-starter-data-jpa + - + + net.sf.ehcache ehcache @@ -34,6 +40,11 @@ 0.14.0-preview1 + + org.springframework.session + spring-session-data-redis + + org.springframework.boot spring-boot-starter-data-redis @@ -133,12 +144,7 @@ 8.0.11 runtime - - - - - - + org.springframework.kafka diff --git a/src/main/java/com/hyc/config/RabbitmqConfig.java b/src/main/java/com/hyc/config/RabbitmqConfig.java index f6a1f5f..64d6bca 100644 --- a/src/main/java/com/hyc/config/RabbitmqConfig.java +++ b/src/main/java/com/hyc/config/RabbitmqConfig.java @@ -15,12 +15,22 @@ import org.springframework.context.annotation.Configuration; */ @Configuration public class RabbitmqConfig { + public static final String FAULT_EXCHANGE = "fault_exchange"; + public static final String FAULT_MESSAGE_QUEUE = "fault_message_queue"; + + public static final String FAULT_MESSAGE_ROUTINGKEY = "fault_message_router"; public static final String INIT_CONNECT = "init_connect"; public static final String DISCONNECT_CONNECT = "disconnect_connect"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; + public static final String CREATE_MQTT_CLIENT = "create_mqtt_client"; + + @Bean(CREATE_MQTT_CLIENT) + public Queue CREATE_MQTT_CLIENT(){ + return new Queue(CREATE_MQTT_CLIENT); + } @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ @@ -28,12 +38,12 @@ public class RabbitmqConfig { return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } - //声明QUEUE_INFORM_EMAIL队列 + //声明INIT_CONNECT队列 @Bean(INIT_CONNECT) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(INIT_CONNECT); } - //声明QUEUE_INFORM_SMS队列 + //声明DISCONNECT_CONNECT队列 @Bean(DISCONNECT_CONNECT) public Queue QUEUE_INFORM_SMS(){ return new Queue(DISCONNECT_CONNECT); @@ -51,4 +61,21 @@ public class RabbitmqConfig { @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } + + @Bean(FAULT_EXCHANGE) + public Exchange FAULT_EXCHANGE(){ + //durable(true) 持久化,mq重启之后交换机还在 + return ExchangeBuilder.topicExchange(FAULT_EXCHANGE).durable(true).build(); + } + + @Bean(FAULT_MESSAGE_QUEUE) + public Queue FAULT_MESSAGE_QUEUE(){ + return new Queue(FAULT_MESSAGE_QUEUE); + } + + @Bean + public Binding FAULT_MESSAGE_ROUTINGKEY(@Qualifier(FAULT_MESSAGE_QUEUE) Queue queue, + @Qualifier(FAULT_EXCHANGE) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(FAULT_MESSAGE_ROUTINGKEY).noargs(); + } } diff --git a/src/main/java/com/hyc/runner/SyncCacheRunner.java b/src/main/java/com/hyc/config/SyncCacheRunner.java similarity index 92% rename from src/main/java/com/hyc/runner/SyncCacheRunner.java rename to src/main/java/com/hyc/config/SyncCacheRunner.java index ea79a03..9f42ee3 100644 --- a/src/main/java/com/hyc/runner/SyncCacheRunner.java +++ b/src/main/java/com/hyc/config/SyncCacheRunner.java @@ -1,10 +1,8 @@ -package com.hyc.runner; +package com.hyc.config; import com.alibaba.fastjson.JSONObject; import com.hyc.domain.CacheCarEvent; -import com.hyc.domain.SummarizeResp; import com.hyc.kafka.demo.service.CarEventService; -import com.hyc.service.SummarizeService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -16,7 +14,7 @@ import org.springframework.stereotype.Component; import java.util.List; /** - * 同步缓存Runner + * 项目启动同步本地缓存 * * @author YouChe·He * @ClassName: SyncCacheRunner diff --git a/src/main/java/com/hyc/config/TransactionConfig.java b/src/main/java/com/hyc/config/TransactionConfig.java new file mode 100644 index 0000000..84a2299 --- /dev/null +++ b/src/main/java/com/hyc/config/TransactionConfig.java @@ -0,0 +1,33 @@ +package com.hyc.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.persistence.EntityManagerFactory; + + +/** + * 手动事务配置 + * + * @author YouChe·He + * @ClassName: TransactionConfig + * @Description: 手动事务配置 + * @CreateTime: 2024/6/18 16:26 + */ +@Configuration +@EnableTransactionManagement +public class TransactionConfig { + @Bean + public PlatformTransactionManager transactionManager(EntityManagerFactory emf) { + return new JpaTransactionManager(emf); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + return new TransactionTemplate(transactionManager); + } +} diff --git a/src/main/java/com/hyc/controller/SummarizeController.java b/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java similarity index 72% rename from src/main/java/com/hyc/controller/SummarizeController.java rename to src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java index f5768f3..018ecea 100644 --- a/src/main/java/com/hyc/controller/SummarizeController.java +++ b/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java @@ -1,72 +1,62 @@ -package com.hyc.controller; +package com.hyc.consumer; +import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.hyc.config.RabbitmqConfig; import com.hyc.domain.ConnectionParameter; -import com.hyc.domain.SummarizeResp; -import com.hyc.kafka.demo.config.KafkaSendResultHandler; -import com.hyc.result.Result; -import com.hyc.service.SummarizeService; import com.hyc.util.ConversionUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.web.bind.annotation.*; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.UUID; /** - * 创建Mqtt客户端 + * 创建MQTT客户端消费者 * * @author YouChe·He - * @ClassName: CreateMqttClient - * @Description: 创建Mqtt客户端 - * @CreateTime: 2024/5/31 10:25 + * @ClassName: CreateMqttClientConsumer + * @Description: 创建MQTT客户端 + * @CreateTime: 2024/6/18 15:45 */ +@Component @Slf4j -@RestController -@RequestMapping("/create") -public class SummarizeController { +public class CreateMqttClientConsumer { @Autowired - private SummarizeService summarizeService; + private TransactionTemplate transactionTemplate; + @Autowired private KafkaTemplate kafkaTemplate; - private final TransactionTemplate transactionTemplate; - public SummarizeController(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler, TransactionTemplate transactionTemplate) { - this.kafkaTemplate = kafkaTemplate; - this.transactionTemplate = transactionTemplate; - this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); - } - @PostMapping("/createMqttClient") - public String createMqttClient(@RequestBody List connectionParameterList){ + @RabbitListener(queues = RabbitmqConfig.CREATE_MQTT_CLIENT) + public void createMqttClient(String connectionParameterJson){ + log.info("消息是:{}",connectionParameterJson); + String s = connectionParameterJson.replaceAll("\\\\", ""); + String substring = s.substring(1, s.length() - 1); + log.error("切割之后:{}",substring); + ConnectionParameter connectionParameter = JSONObject.parseObject(substring, ConnectionParameter.class); - log.error("进来了!!!"); - for (ConnectionParameter connectionParameter : connectionParameterList) { - boolean oneMqttClient = createOneMqttClient(connectionParameter); - if (oneMqttClient==true){ - log.warn("broker为:{}的mqtt客户端连接成功",connectionParameter.getBroker()); - } + boolean oneMqttClient = createOneMqttClient(connectionParameter); + if (oneMqttClient==true){ + log.warn("broker为:{}的mqtt客户端连接成功",connectionParameter.getBroker()); } - - return "成功!"; - } - - @GetMapping("/getAllLoadInfo") - public Result getAllLoadInfo(){ - return summarizeService.getAllLoadInfo(); } - @Transactional + public boolean createOneMqttClient(ConnectionParameter connectionParameter){ int qos = 0; @@ -96,9 +86,10 @@ public class SummarizeController { @Override public void messageArrived(String topic, MqttMessage message) { - System.out.println("topic: " + topic); - System.out.println("Qos: " + message.getQos()); - System.out.println("message content: " + new String(message.getPayload())); + log.info("topic: {}" , topic); +// System.out.println("topic: " + topic); +// System.out.println("Qos: " + message.getQos()); +// System.out.println("message content: " + new String(message.getPayload())); String resultString = ConversionUtil.hexStringToString(new String(message.getPayload())); log.warn("解析后的字符串是:{}",resultString); log.warn("长度为:{}",resultString.length()); @@ -121,11 +112,13 @@ public class SummarizeController { "positionStatus","easStatus","ptcStatus","epsStatus","absStatus","mcuStatus", "heatingStatus","batteryStatus","batteryInsulationStatus","dcdcStatus","chgStatus"}; LinkedHashMap linkedHashMap = new LinkedHashMap<>(); - HashMap stringStringHashMap = new HashMap<>(); + + String vin = "vin"; for (int i = 0; i < 47; i++) { String substring = realString.substring(count, count + intArr[i]); if (strArr[i]=="vin" || strArr[i] == "gear"){ + vin = substring; linkedHashMap.put(strArr[i],substring); }else if (strArr[i] == "startTime") { linkedHashMap.put(strArr[i],Long.valueOf(substring)); @@ -140,9 +133,10 @@ public class SummarizeController { try { String jsonData = objectMapper.writeValueAsString(linkedHashMap); log.error("json格式:{}",jsonData); + String finalVin = vin; transactionTemplate.execute(status -> { try { - kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), jsonData); + kafkaTemplate.send("topichyc", finalVin, jsonData); status.flush(); }catch (Exception e){ log.error("kafka生产异常: {}", e.getMessage()); @@ -172,5 +166,4 @@ public class SummarizeController { return true; } - } diff --git a/src/main/java/com/hyc/consumer/EventUpdateConsumer.java b/src/main/java/com/hyc/consumer/EventUpdateConsumer.java deleted file mode 100644 index c65871e..0000000 --- a/src/main/java/com/hyc/consumer/EventUpdateConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.hyc.consumer; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 事件更新消费者 - * - * @author YouChe·He - * @ClassName: EventUpdateConsumer - * @Description: 事件更新消费者 - * @CreateTime: 2024/6/16 10:28 - */ -@Component -@Slf4j -public class EventUpdateConsumer { - - -} diff --git a/src/main/java/com/hyc/consumer/MQTTConsumer.java b/src/main/java/com/hyc/consumer/MQTTConsumer.java deleted file mode 100644 index 163859c..0000000 --- a/src/main/java/com/hyc/consumer/MQTTConsumer.java +++ /dev/null @@ -1,68 +0,0 @@ -//package com.hyc.consumer; -// -//import com.hyc.domain.ConnectionParameter; -//import lombok.extern.slf4j.Slf4j; -//import org.eclipse.paho.client.mqttv3.*; -//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -//import org.springframework.stereotype.Component; -// -///** -// * MQTT消费者 -// * -// * @author YouChe·He -// * @ClassName: MQTTConsumer -// * @Description: MQTT消费者 -// * @CreateTime: 2024/5/30 19:12 -// */ -//@Component -//@Slf4j -//public class MQTTConsumer { -// -// public void parseMessage(ConnectionParameter connectionParameter){ -// int qos = 0; -// -// try { -// MqttClient client = null; -// -// client = new MqttClient(connectionParameter.getBroker(), connectionParameter.getClientid(), new MemoryPersistence()); -// -// -// // 连接参数 -// MqttConnectOptions options = new MqttConnectOptions(); -// if (connectionParameter.getUsername() !=null && !"".equals(connectionParameter.getUsername())){ -// options.setUserName(connectionParameter.getUsername()); -// options.setPassword(connectionParameter.getPassword().toCharArray()); -// } -// -// options.setConnectionTimeout(60); -// options.setKeepAliveInterval(60); -// // 设置回调 -// client.setCallback(new MqttCallback() { -// -// @Override -// public void connectionLost(Throwable cause) { -// System.out.println("connectionLost: " + cause.getMessage()); -// } -// -// @Override -// public void messageArrived(String topic, MqttMessage message) { -// System.out.println("topic: " + topic); -// System.out.println("Qos: " + message.getQos()); -// System.out.println("message content: " + new String(message.getPayload())); -// -// } -// -// @Override -// public void deliveryComplete(IMqttDeliveryToken token) { -// System.out.println("deliveryComplete---------" + token.isComplete()); -// } -// -// }); -// client.connect(options); -// client.subscribe(connectionParameter.getTopic(), qos); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -//} diff --git a/src/main/java/com/hyc/consumer/ReceiveHandler.java b/src/main/java/com/hyc/consumer/ReceiveHandler.java deleted file mode 100644 index 9ffa28a..0000000 --- a/src/main/java/com/hyc/consumer/ReceiveHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -//package com.hyc.consumer; -// -//import com.alibaba.fastjson.JSONObject; -//import com.hyc.config.RabbitmqConfig; -//import com.hyc.domain.ConnectionParameter; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.amqp.core.Message; -//import org.springframework.amqp.rabbit.annotation.RabbitListener; -//import org.springframework.beans.factory.annotation.Autowired; -// -//import org.springframework.stereotype.Component; -// -///** -// * 断开连接消费者 -// * -// * @author YouChe·He -// * @ClassName: ReceiveHandler -// * @Description: 断开连接消费者 -// * @CreateTime: 2024/5/27 17:00 -// */ -//@Component -//@Slf4j -//public class ReceiveHandler { -// -// @Autowired -// private MQTTConsumer mqttConsumer; -// -// @RabbitListener(queues = {RabbitmqConfig.INIT_CONNECT}) -// public void receive_email(Message msg){ -// String messageBody = new String(msg.getBody()); -// ConnectionParameter connectionParameter = getConnectionParameter(messageBody); -// mqttConsumer.parseMessage(connectionParameter); -// } -// -// -// public ConnectionParameter getConnectionParameter(String messageBody){ -// -// -// log.error("链接事件得到的消息:{}",messageBody); -// JSONObject jsonObject = JSONObject.parseObject(messageBody); -// String carVin = jsonObject.getString("clientId"); -// -// String[] split = carVin.split("-"); -// String vin = split[0]; -// String broker = split[1]; -// String topic = split[2]; -// log.warn("ConnectionParameter对象的属性 broker:{},vin:{},topic:{}",broker,vin,topic); -// return new ConnectionParameter(broker,null,null,vin,topic); -// } -// -//} diff --git a/src/main/java/com/hyc/domain/VehicleData.java b/src/main/java/com/hyc/domain/VehicleData.java index 5f9e48c..4681f96 100644 --- a/src/main/java/com/hyc/domain/VehicleData.java +++ b/src/main/java/com/hyc/domain/VehicleData.java @@ -48,21 +48,21 @@ public class VehicleData { private Double singleBatteryMaxTemperature; private Double singleBatteryMinTemperature; private Double availableBatteryCapacity; - private Double vehicleStatus; - private Double chargingStatus; - private Double operatingStatus; - private Double socStatus; - private Double chargingEnergyStorageStatus; - private Double driveMotorStatus; - private Double positionStatus; - private Double easStatus; - private Double ptcStatus; - private Double epsStatus; - private Double absStatus; - private Double mcuStatus; - private Double heatingStatus; - private Double batteryStatus; - private Double batteryInsulationStatus; - private Double dcdcStatus; - private Double chgStatus; + private Integer vehicleStatus; + private Integer chargingStatus; + private Integer operatingStatus; + private Integer socStatus; + private Integer chargingEnergyStorageStatus; + private Integer driveMotorStatus; + private Integer positionStatus; + private Integer easStatus; + private Integer ptcStatus; + private Integer epsStatus; + private Integer absStatus; + private Integer mcuStatus; + private Integer heatingStatus; + private Integer batteryStatus; + private Integer batteryInsulationStatus; + private Integer dcdcStatus; + private Integer chgStatus; } diff --git a/src/main/java/com/hyc/result/HttpStatus.java b/src/main/java/com/hyc/domain/result/HttpStatus.java similarity index 98% rename from src/main/java/com/hyc/result/HttpStatus.java rename to src/main/java/com/hyc/domain/result/HttpStatus.java index 7f32c7b..fa4e492 100644 --- a/src/main/java/com/hyc/result/HttpStatus.java +++ b/src/main/java/com/hyc/domain/result/HttpStatus.java @@ -1,4 +1,4 @@ -package com.hyc.result; +package com.hyc.domain.result; /** * 返回状态码 diff --git a/src/main/java/com/hyc/result/Result.java b/src/main/java/com/hyc/domain/result/Result.java similarity index 98% rename from src/main/java/com/hyc/result/Result.java rename to src/main/java/com/hyc/domain/result/Result.java index f858c2d..486c31d 100644 --- a/src/main/java/com/hyc/result/Result.java +++ b/src/main/java/com/hyc/domain/result/Result.java @@ -1,4 +1,4 @@ -package com.hyc.result; +package com.hyc.domain.result; import lombok.Data; diff --git a/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java b/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java index 49cd689..a914d72 100644 --- a/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java +++ b/src/main/java/com/hyc/iotdbdemo/server/impl/IotDbServerImpl.java @@ -126,23 +126,23 @@ public class IotDbServerImpl implements IotDbServer { iotDbResult.setSingleBatteryMaxTemperature(Double.valueOf(map.get("singleBatteryMaxTemperature"))); iotDbResult.setSingleBatteryMinTemperature(Double.valueOf(map.get("singleBatteryMinTemperature"))); iotDbResult.setAvailableBatteryCapacity(Double.valueOf(map.get("availableBatteryCapacity"))); - iotDbResult.setVehicleStatus(Double.valueOf(map.get("vehicleStatus"))); - iotDbResult.setChargingStatus(Double.valueOf(map.get("chargingStatus"))); - iotDbResult.setOperatingStatus(Double.valueOf(map.get("operatingStatus"))); - iotDbResult.setSocStatus(Double.valueOf(map.get("socStatus"))); - iotDbResult.setChargingEnergyStorageStatus(Double.valueOf(map.get("chargingEnergyStorageStatus"))); - iotDbResult.setDriveMotorStatus(Double.valueOf(map.get("driveMotorStatus"))); - iotDbResult.setPositionStatus(Double.valueOf(map.get("positionStatus"))); - iotDbResult.setEasStatus(Double.valueOf(map.get("easStatus"))); - iotDbResult.setPtcStatus(Double.valueOf(map.get("ptcStatus"))); - iotDbResult.setEpsStatus(Double.valueOf(map.get("epsStatus"))); - iotDbResult.setAbsStatus(Double.valueOf(map.get("absStatus"))); - iotDbResult.setMcuStatus(Double.valueOf(map.get("mcuStatus"))); - iotDbResult.setHeatingStatus(Double.valueOf(map.get("heatingStatus"))); - iotDbResult.setBatteryStatus(Double.valueOf(map.get("batteryStatus"))); - iotDbResult.setBatteryInsulationStatus(Double.valueOf(map.get("batteryInsulationStatus"))); - iotDbResult.setDcdcStatus(Double.valueOf(map.get("dcdcStatus"))); - iotDbResult.setChgStatus(Double.valueOf(map.get("chgStatus"))); + iotDbResult.setVehicleStatus(Integer.valueOf(map.get("vehicleStatus"))); + iotDbResult.setChargingStatus(Integer.valueOf(map.get("chargingStatus"))); + iotDbResult.setOperatingStatus(Integer.valueOf(map.get("operatingStatus"))); + iotDbResult.setSocStatus(Integer.valueOf(map.get("socStatus"))); + iotDbResult.setChargingEnergyStorageStatus(Integer.valueOf(map.get("chargingEnergyStorageStatus"))); + iotDbResult.setDriveMotorStatus(Integer.valueOf(map.get("driveMotorStatus"))); + iotDbResult.setPositionStatus(Integer.valueOf(map.get("positionStatus"))); + iotDbResult.setEasStatus(Integer.valueOf(map.get("easStatus"))); + iotDbResult.setPtcStatus(Integer.valueOf(map.get("ptcStatus"))); + iotDbResult.setEpsStatus(Integer.valueOf(map.get("epsStatus"))); + iotDbResult.setAbsStatus(Integer.valueOf(map.get("absStatus"))); + iotDbResult.setMcuStatus(Integer.valueOf(map.get("mcuStatus"))); + iotDbResult.setHeatingStatus(Integer.valueOf(map.get("heatingStatus"))); + iotDbResult.setBatteryStatus(Integer.valueOf(map.get("batteryStatus"))); + iotDbResult.setBatteryInsulationStatus(Integer.valueOf(map.get("batteryInsulationStatus"))); + iotDbResult.setDcdcStatus(Integer.valueOf(map.get("dcdcStatus"))); + iotDbResult.setChgStatus(Integer.valueOf(map.get("chgStatus"))); iotDbResultList.add(iotDbResult); } } diff --git a/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java b/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java index b39ee0d..1faaeca 100644 --- a/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java +++ b/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java @@ -47,31 +47,16 @@ public class KafkaConsumerConfig { @Bean public Map consumerConfigs() { - Map propsMap = new HashMap<>(16); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); - //自动提交的时间间隔,自动提交开启时生效 - propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); - //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: - //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 - //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) - //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); - //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); - //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 - //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, - //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, - //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 - //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 - //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); - //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); - //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return propsMap; @@ -79,33 +64,22 @@ public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { - // 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 try (JsonDeserializer deserializer = new JsonDeserializer<>()) { deserializer.trustedPackages("*"); return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); } } - /** - * KafkaListenerContainerFactory是Spring Kafka提供的用于创建KafkaListenerContainer的工厂类。 - * KafkaListenerContainer是一个用于消费Kafka消息的容器,它封装了Kafka的消费者API,提供了更加方便的使用方式。 - * KafkaListenerContainerFactory可以配置KafkaListenerContainer的一些属性,如消费者的个数、批量消费的大小、消费者的超时时间等。 - * 在Spring Kafka中,可以通过配置KafkaListenerContainerFactory来创建KafkaListenerContainer,从而实现对Kafka消息的消费 - * @return - */ + @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); - //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 factory.setConcurrency(concurrency); - // 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 factory.setMissingTopicsFatal(missingTopicsFatal); - // 自动提交关闭,需要设置手动消息确认 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setPollTimeout(pollTimeout); - // 设置为批量监听,需要用List接收 - // factory.setBatchListener(true); + factory.setBatchListener(true); return factory; } diff --git a/src/main/java/com/hyc/kafka/demo/config/KafkaSendResultHandler.java b/src/main/java/com/hyc/kafka/demo/config/KafkaSendResultHandler.java index a7f7af7..1f133e6 100644 --- a/src/main/java/com/hyc/kafka/demo/config/KafkaSendResultHandler.java +++ b/src/main/java/com/hyc/kafka/demo/config/KafkaSendResultHandler.java @@ -1,5 +1,6 @@ package com.hyc.kafka.demo.config; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.support.ProducerListener; @@ -12,14 +13,15 @@ import org.springframework.stereotype.Component; * kafka消息发送回调处理 */ @Component +@Slf4j public class KafkaSendResultHandler implements ProducerListener { @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { - System.out.println("消息发送成功:" + producerRecord.toString()); + log.info("消息发送成功:{}" , producerRecord.toString()); } @Override public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { - System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + log.error("消息发送失败:{}{}",producerRecord.toString() ,exception.getMessage()); } } diff --git a/src/main/java/com/hyc/kafka/demo/config/RedisConfiguration.java b/src/main/java/com/hyc/kafka/demo/config/RedisConfiguration.java new file mode 100644 index 0000000..3381679 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/config/RedisConfiguration.java @@ -0,0 +1,37 @@ +package com.hyc.kafka.demo.config; + +import com.hyc.kafka.demo.listener.KeyExpiredListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +/** + * redis配置类 + * + * @author YouChe·He + * @ClassName: RedisConfiguration + * @Description: redis配置类 + * @CreateTime: 2024/6/20 21:57 + */ +@Configuration +public class RedisConfiguration { + @Autowired + private RedisConnectionFactory redisConnectionFactory; + @Autowired + private RabbitTemplate rabbitTemplate; + + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer() { + RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); + redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); + return redisMessageListenerContainer; + } + + @Bean + public KeyExpiredListener keyExpiredListener() { + return new KeyExpiredListener(redisMessageListenerContainer(),rabbitTemplate); + } +} diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java index 24a384b..05ede93 100644 --- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java @@ -6,12 +6,13 @@ import com.hyc.domain.VehicleData; import com.hyc.domain.constant.EventConstant; import com.hyc.iotdbdemo.config.IotDBSessionConfig; import com.hyc.iotdbdemo.server.IotDbServer; -import com.hyc.kafka.demo.strategy.Strategy; import com.hyc.kafka.demo.service.CarEventService; +import com.hyc.kafka.demo.strategy.EventStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -21,6 +22,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.rmi.ServerException; import java.util.ArrayList; @@ -52,45 +54,47 @@ public class KafkaConsumer { private IotDbServer iotDbServer; @Resource private IotDBSessionConfig iotDBSessionConfig; + + @Autowired + private RabbitTemplate rabbitTemplate; + LinkedHashMap eventStrategyLinkedHashMap = new LinkedHashMap<>(); + + @PostConstruct + public void initStrategyMap(){ + eventStrategyLinkedHashMap.put("ELECTRONIC_FENCE",EventStrategy.ELECTRONIC_FENCE); + eventStrategyLinkedHashMap.put("FAULT_ALARM",EventStrategy.FAULT_ALARM); + eventStrategyLinkedHashMap.put("INDEX_WARNING",EventStrategy.INDEX_WARNING); + eventStrategyLinkedHashMap.put("REAL_TIME_DATA",EventStrategy.REAL_TIME_DATA); + } @KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", errorHandler = "myKafkaListenerErrorHandler") - public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { + public void consume(List> consumerRecords, Acknowledgment acknowledgment) { try { //策略map集合 // LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); // stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); // stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); - //解析得到VIN - String value = (String) consumerRecord.value(); + log.info("拉取到的记录数是:{}",consumerRecords.size()); + for (ConsumerRecord consumerRecord : consumerRecords) { + String value = (String) consumerRecord.value(); - VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); - log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); - - iotDbServer.insertData(vehicleData); - //得到小车绑定信息 - String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); - //解析绑定的信息 - String[] eventArr = carEventByVin.split(","); - for (String eventNumber : eventArr) { - //电子围栏事件 - if (eventNumber.equals("1")){ - //从缓存中查询车辆绑定的围栏组,与车辆信息进行比较 - } - //实时数据事件 - if (eventNumber.equals("2")) { - if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){ - //有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中 - String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA; - String carData = JSONObject.toJSONString(vehicleData); - this.updateValueWithoutChangingTTL(key, value); - } - } - //故障预警事件 - if (eventNumber.equals("3")) { + VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); + log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); + iotDbServer.insertData(vehicleData); + //得到小车绑定信息 + String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); + //解析绑定的信息 + String[] eventArr = carEventByVin.split(","); + for (String eventString : eventArr) { + //处理事件 + eventStrategyLinkedHashMap.get(eventString).exe(cacheManager,redisTemplate,vehicleData,rabbitTemplate); } } + //解析得到VIN + + //解析得到的VIN // String dataMessage = value.toString(); @@ -162,7 +166,7 @@ public class KafkaConsumer { Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS); // 更新键的值 - redisTemplate.opsForValue().set(key, newValue); + redisTemplate.opsForList().rightPush( key,newValue); // 重新设置过期时间 if (currentTTL != null && currentTTL > 0) { diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java index 603eb95..2bb389d 100644 --- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java @@ -7,10 +7,12 @@ import com.hyc.domain.constant.EventConstant; import com.hyc.iotdbdemo.config.IotDBSessionConfig; import com.hyc.iotdbdemo.server.IotDbServer; import com.hyc.kafka.demo.service.CarEventService; +import com.hyc.kafka.demo.strategy.EventStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -19,9 +21,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.rmi.ServerException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -43,51 +47,44 @@ public class KafkaConsumer1 { @Autowired private CarEventService carEventService; + @Autowired + private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; @Resource private IotDbServer iotDbServer; - @Resource - private IotDBSessionConfig iotDBSessionConfig; + + LinkedHashMap eventStrategyLinkedHashMap = new LinkedHashMap<>(); + + @PostConstruct + public void initStrategyMap(){ + eventStrategyLinkedHashMap.put("ELECTRONIC_FENCE",EventStrategy.ELECTRONIC_FENCE); + eventStrategyLinkedHashMap.put("FAULT_ALARM",EventStrategy.FAULT_ALARM); + eventStrategyLinkedHashMap.put("INDEX_WARNING",EventStrategy.INDEX_WARNING); + eventStrategyLinkedHashMap.put("REAL_TIME_DATA",EventStrategy.REAL_TIME_DATA); + } @KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", errorHandler = "myKafkaListenerErrorHandler") - public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { + public void consume(List> consumerRecords, Acknowledgment acknowledgment) { try { - //策略map集合 -// LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); -// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); -// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); - - //解析得到VIN - String value = (String) consumerRecord.value(); - - VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); - log.error("消费者1得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); - - iotDbServer.insertData(vehicleData); - //得到小车绑定信息 - String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); - //解析绑定的信息 - String[] eventArr = carEventByVin.split(","); - for (String eventNumber : eventArr) { - //电子围栏事件 - if (eventNumber.equals("1")){ - //从缓存中查询车辆绑定的围栏组,与车辆信息进行比较 - } - //实时数据事件 - if (eventNumber.equals("2")) { - if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){ - //有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中 - String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA; - String carData = JSONObject.toJSONString(vehicleData); - this.updateValueWithoutChangingTTL(key, value); - } - } - //故障预警事件 - if (eventNumber.equals("3")) { + log.info("拉取到的记录数是:{}",consumerRecords.size()); + for (ConsumerRecord consumerRecord : consumerRecords) { + String value = (String) consumerRecord.value(); + VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); + log.error("消费者1得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); + iotDbServer.insertData(vehicleData); + //得到小车绑定信息 + String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); + //解析绑定的信息 + String[] eventArr = carEventByVin.split(","); + for (String eventString : eventArr) { + //处理事件 + eventStrategyLinkedHashMap.get(eventString).exe(cacheManager,redisTemplate,vehicleData,rabbitTemplate); } } + //解析得到VIN + // String dataMessage = value.toString(); @@ -159,7 +156,7 @@ public class KafkaConsumer1 { Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS); // 更新键的值 - redisTemplate.opsForValue().set(key, newValue); + redisTemplate.opsForList().rightPush( key,newValue); // 重新设置过期时间 if (currentTTL != null && currentTTL > 0) { diff --git a/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java b/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java deleted file mode 100644 index 9b1e669..0000000 --- a/src/main/java/com/hyc/kafka/demo/controller/KafkaController.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.hyc.kafka.demo.controller; - -import com.hyc.kafka.demo.config.KafkaSendResultHandler; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; - -import java.util.UUID; - -/** - * 测试kafka控制层 - * - * @author YouChe·He - * @ClassName: KafkaController - * @Description: 测试kafka控制层 - * @CreateTime: 2024/6/6 14:42 - */ -@RestController -public class KafkaController { - - - private KafkaTemplate kafkaTemplate; - - public KafkaController(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { - this.kafkaTemplate = kafkaTemplate; - this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); - } - - @GetMapping("/send") - @Transactional - public void sendMessage(String message) { - System.out.println("呼呼呼"); - kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), message); - } - -} - - diff --git a/src/main/java/com/hyc/kafka/demo/event/EventPosting.java b/src/main/java/com/hyc/kafka/demo/event/EventPosting.java deleted file mode 100644 index 3328dff..0000000 --- a/src/main/java/com/hyc/kafka/demo/event/EventPosting.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.hyc.kafka.demo.event; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; - -/** - * 事件配置中心 - * - * @author YouChe·He - * @ClassName: EventPosting - * @Description: 事件配置中心 - * @CreateTime: 2024/6/11 08:44 - */ -@Component -public class EventPosting { - @Autowired - private RedisTemplate redisTemplate; - - -} diff --git a/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java b/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java new file mode 100644 index 0000000..b2d3e80 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java @@ -0,0 +1,165 @@ +package com.hyc.kafka.demo.strategy; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.VehicleData; +import com.hyc.domain.constant.EventConstant; +import com.hyc.util.EventHandleUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; + +import java.util.concurrent.TimeUnit; + +/** + * 事件策略枚举类 + * + * @author YouChe·He + * @ClassName: StrategyA + * @Description: 策略枚举A + * @CreateTime: 2024/6/4 14:13 + */ +@Slf4j +public enum EventStrategy { + + + /** + * 实时数据 + */ + REAL_TIME_DATA { + @Override + public void exe(CacheManager cacheManager,RedisTemplate redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) { + log.info("执行实时数据逻辑"); + if (redisTemplate.hasKey(vehicleData.getVin() + EventConstant.REAL_TIME_DATA)) { + //有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中 + String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA; + // 获取当前的过期时间 + Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS); + // 更新键的值 + redisTemplate.opsForList().rightPush(key, JSONObject.toJSONString(vehicleData)); + // 重新设置过期时间 + if (currentTTL != null && currentTTL > 0) { + redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS); + } + + } + } + }, + /** + * 故障报警 + */ + FAULT_ALARM { + @Override + public void exe(CacheManager cacheManager,RedisTemplate redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) { + System.out.println("执行故障报警逻辑"); + + if (vehicleData.getVehicleStatus() == 0) { + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ001", rabbitTemplate); + } + + if (vehicleData.getChargingStatus() == 0) { + // 处理 chargingStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ002", rabbitTemplate); + } + + if (vehicleData.getOperatingStatus() == 0) { + // 处理 operatingStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ003", rabbitTemplate); + } + + if (vehicleData.getSocStatus() == 0) { + // 处理 socStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ004", rabbitTemplate); + } + + if (vehicleData.getChargingEnergyStorageStatus() == 0) { + // 处理 chargingEnergyStorageStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ005", rabbitTemplate); + } + + if (vehicleData.getDriveMotorStatus() == 0) { + // 处理 driveMotorStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ006", rabbitTemplate); + } + + if (vehicleData.getPositionStatus() == 0) { + // 处理 positionStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ007", rabbitTemplate); + } + + if (vehicleData.getEasStatus() == 0) { + // 处理 easStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ008", rabbitTemplate); + } + + if (vehicleData.getPtcStatus() == 0) { + // 处理 ptcStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ009", rabbitTemplate); + } + + if (vehicleData.getEpsStatus() == 0) { + // 处理 epsStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ010", rabbitTemplate); + } + + if (vehicleData.getAbsStatus() == 0) { + // 处理 absStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ011", rabbitTemplate); + } + + if (vehicleData.getMcuStatus() == 0) { + // 处理 mcuStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ012", rabbitTemplate); + } + + if (vehicleData.getHeatingStatus() == 0) { + // 处理 heatingStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ013", rabbitTemplate); + } + + if (vehicleData.getBatteryStatus() == 0) { + // 处理 batteryStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ014", rabbitTemplate); + } + + if (vehicleData.getBatteryInsulationStatus() == 0) { + // 处理 batteryInsulationStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ015", rabbitTemplate); + } + + if (vehicleData.getDcdcStatus() == 0) { + // 处理 dcdcStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ016", rabbitTemplate); + } + + if (vehicleData.getChgStatus() == 0) { + // 处理 chgStatus 为 0 的情况 + EventHandleUtil.insertToCache(cacheManager,redisTemplate,vehicleData,"GZ017", rabbitTemplate); + } + + + } + }, + /** + * 电子围栏 + */ + ELECTRONIC_FENCE { + @Override + public void exe(CacheManager cacheManager,RedisTemplate redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) { + log.info("执行电子围栏逻辑"); + } + }, + /** + * 指标预警 + */ + INDEX_WARNING { + @Override + public void exe(CacheManager cacheManager,RedisTemplate redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate) { + log.info("执行指标预警逻辑"); + } + }; + + + public abstract void exe(CacheManager cacheManager, RedisTemplate redisTemplate, VehicleData vehicleData, RabbitTemplate rabbitTemplate); + +} diff --git a/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java b/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java deleted file mode 100644 index e06bce9..0000000 --- a/src/main/java/com/hyc/kafka/demo/strategy/Strategy.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.hyc.kafka.demo.strategy; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; - -/** - * 策略枚举类 - * - * @author YouChe·He - * @ClassName: StrategyA - * @Description: 策略枚举A - * @CreateTime: 2024/6/4 14:13 - */ -@Slf4j -public enum Strategy { - REAL_TIME_DATA{ - @Override - public void exe(RedisTemplate redisTemplate, String realData){ - - JSONObject jsonObject = JSON.parseObject(realData); - String vin = jsonObject.get("vin").toString(); - log.error("vin是:{}",vin); - if (redisTemplate.hasKey(vin)){ - redisTemplate.opsForList().rightPush(vin,realData); - } - } - }, - STORE_DATA{ - @Override - public void exe(RedisTemplate redisTemplate, String realData){ - System.out.println("执行具体策略B"); - } - }; - public abstract void exe(RedisTemplate redisTemplate, String realData); -} diff --git a/src/main/java/com/hyc/mapper/SummarizeMapper.java b/src/main/java/com/hyc/mapper/SummarizeMapper.java deleted file mode 100644 index 266147f..0000000 --- a/src/main/java/com/hyc/mapper/SummarizeMapper.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.hyc.mapper; -/** - * 持久层 - * - * @author YouChe·He - * @ClassName: SummarizeMapper - * @Description: 持久层 - * @CreateTime: 2024/6/2 16:47 - */ - -import com.hyc.domain.VinIp; -import org.apache.ibatis.annotations.Mapper; -import org.springframework.web.bind.annotation.Mapping; - -import javax.annotation.ManagedBean; -import java.util.List; - -/** - *@ClassName SummarizeMapper - *@Description 描述 - *@Author ZHIHAO.DAI - *@Date 2024/6/2 16:47 - */ -@Mapper -public interface SummarizeMapper { - - List getAllLoadInfo(); - - List getNewConnectVin(); - -} diff --git a/src/main/java/com/hyc/service/SummarizeService.java b/src/main/java/com/hyc/service/SummarizeService.java deleted file mode 100644 index b0d3dc5..0000000 --- a/src/main/java/com/hyc/service/SummarizeService.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.hyc.service; -/** - * 服务层 - * - * @author YouChe·He - * @ClassName: SummarizeService - * @Description: 服务层 - * @CreateTime: 2024/6/2 16:46 - */ - -import com.hyc.domain.SummarizeResp; -import com.hyc.result.Result; - -/** - *@ClassName SummarizeService - *@Description 描述 - *@Author ZHIHAO.DAI - *@Date 2024/6/2 16:46 - */ -public interface SummarizeService { - Result getAllLoadInfo(); -} diff --git a/src/main/java/com/hyc/service/impl/SummarizeServiceImpl.java b/src/main/java/com/hyc/service/impl/SummarizeServiceImpl.java deleted file mode 100644 index 37bed6c..0000000 --- a/src/main/java/com/hyc/service/impl/SummarizeServiceImpl.java +++ /dev/null @@ -1,191 +0,0 @@ -package com.hyc.service.impl; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.aliyun.ecs20140526.Client; -import com.aliyun.ecs20140526.models.DescribeInstancesRequest; -import com.aliyun.ecs20140526.models.DescribeInstancesResponse; -import com.aliyun.teaopenapi.models.Config; -import com.aliyun.teautil.models.RuntimeOptions; -import com.hyc.domain.GetWayServerLoad; -import com.hyc.domain.SummarizeResp; -import com.hyc.domain.TimeSortCar; -import com.hyc.domain.VinIp; -import com.hyc.mapper.SummarizeMapper; -import com.hyc.result.Result; -import com.hyc.service.SummarizeService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.*; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; - -import java.math.BigDecimal; -import java.util.*; -import java.util.stream.Collectors; - -/** - * 实现 - * - * @author YouChe·He - * @ClassName: SummarizeServiceImpl - * @Description: 实现 - * @CreateTime: 2024/6/2 16:46 - */ -@Service -@Slf4j -public class SummarizeServiceImpl implements SummarizeService { - - @Autowired - private SummarizeMapper summarizeMapper; - @Autowired - private RestTemplate restTemplate; - @Override - public Result getAllLoadInfo() { - - ArrayList getWayServerLoads = new ArrayList<>(); - - Integer carSum = 0; - - Double allLoad = 0.0; - - Integer gateWayServerNum = 0; - - Integer dataAnalyze = 22; - - - - ArrayList ipList = new ArrayList<>(); - - List connectLongVinList = summarizeMapper.getAllLoadInfo(); - - List newConnectVinList = summarizeMapper.getNewConnectVin(); - - - try { - Client client = this.createClient(); - //填写地区参数和实例规格参数 - DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() - .setRegionId("cn-shanghai") - .setInstanceType("ecs.e-c1m2.xlarge"); - RuntimeOptions runtime = new RuntimeOptions(); - - //获得ip并添加进入集合 - DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); - List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); - for (List strings : ipListList) { - for (String ip : strings) { - ipList.add(ip); - gateWayServerNum ++; - } - System.out.println("------------------------"); - } - //遍历ip 访问URL - for (String ip : ipList) { - String url = "http://" + ip + ":8080/public/login"; - Map request = new HashMap<>(); - request.put("username", "fluxmq"); - request.put("password", "fluxmq"); - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - HttpEntity> r = new HttpEntity<>(request, httpHeaders); - String result = restTemplate.postForObject(url, r, String.class); - - - //http://fluxmq.muyu.icu/public/cluster - - int nextInt = new Random().nextInt(1000); - String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt; - - HttpHeaders httpHeadersGetInfo = new HttpHeaders(); - httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); - httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); - httpHeadersGetInfo.set("Cookie", result); - HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); - ResponseEntity responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); - - log.info("响应是:{}", responseInfo.getBody()); - - JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); - if (jsonArray.size() > 0) { - - JSONObject jsonObject = jsonArray.getJSONObject(0); - - Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); - carSum = connectSize + carSum; - log.info("负载:{}",connectSize*1.0/100); - - allLoad = connectSize*1.0/100; - - - - String cSys = jsonObject.getJSONObject("cpuInfo").getString("idle"); - int i = cSys.indexOf("%"); - Double cpuInfo = Double.valueOf(cSys.substring(0, i)); - log.info("CPU利用率:{}",cSys); - - String heapUsed =jsonObject.getJSONObject("jvmInfo").getString("heap-used"); - int x = heapUsed.indexOf("M"); - String realHeapUsed = heapUsed.substring(0, x); - - String noHeapUsed = jsonObject.getJSONObject("jvmInfo").getString("no_heap-used"); - int y = noHeapUsed.indexOf("M"); - String realNoHeapUsed = noHeapUsed.substring(0, y); - - String heapMax = jsonObject.getJSONObject("jvmInfo").getString("heap-max"); - int z = heapMax.indexOf("G"); - String realHeapMax = heapMax.substring(0, z); - - double v = (Double.valueOf(realHeapUsed) + Double.valueOf(realNoHeapUsed))/ (Double.valueOf(realHeapMax) * 1000) * 100; - - BigDecimal bd = new BigDecimal(v).setScale(2, BigDecimal.ROUND_HALF_UP); - double roundedNumber = bd.doubleValue(); - log.info("heapUsed:{},noHeapUsed:{},heapMax:{},内存使用率:{}",heapUsed,noHeapUsed,heapMax,roundedNumber); - getWayServerLoads.add(new GetWayServerLoad(ip,connectSize*1.0/100*100,roundedNumber,cpuInfo)); - log.info("链接数量:{}", connectSize); - } else { - log.error("得到的相应数据为null"); - } - } - - } catch (Exception e) { - throw new RuntimeException(e); - } - - ArrayList sortCarArrayList = new ArrayList<>(); - - for (VinIp vinIp : connectLongVinList) { - Date connectTime = vinIp.getConnectTime(); - - Date currentTime = new Date(); - - // 计算时间差(单位:毫秒) - long timeDifference = currentTime.getTime() - connectTime.getTime(); - - // 转换为分钟数 - long minutesDifference = timeDifference / (1000 * 60); - sortCarArrayList.add(new TimeSortCar(vinIp.getVin(),vinIp.getIp(),vinIp.getConnectTime(),minutesDifference)); - - } - - - Collections.sort(sortCarArrayList); - SummarizeResp summarizeResp = new SummarizeResp(gateWayServerNum,dataAnalyze,allLoad / gateWayServerNum * 100,carSum,sortCarArrayList,getWayServerLoads,newConnectVinList); - - return Result.success(summarizeResp); - } - - public Client createClient() throws Exception { - // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 - // 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378657.html。 - Config config = new Config() - // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 - .setAccessKeyId("LTAI5tAEQA9AgnqasQ7Y56cJ") - // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 - .setAccessKeySecret("IsrnZ6dKBgEit5HXv2xyfo0xT8VGkj"); - // Endpoint 请参考 https://api.aliyun.com/product/Ecs - config.endpoint = "ecs.cn-shanghai.aliyuncs.com"; - return new Client(config); - } -} diff --git a/src/main/java/com/hyc/simulate/faulttest/FaultTest.java b/src/main/java/com/hyc/simulate/faulttest/FaultTest.java new file mode 100644 index 0000000..84b0c1d --- /dev/null +++ b/src/main/java/com/hyc/simulate/faulttest/FaultTest.java @@ -0,0 +1,30 @@ +package com.hyc.simulate.faulttest; + +import com.hyc.config.RabbitmqConfig; +import com.hyc.domain.FaultMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; + +/** + * 故障测试监听 + * + * @author YouChe·He + * @ClassName: FaultTestController + * @Description: 故障测试监听 + * @CreateTime: 2024/6/21 09:17 + */ +@Slf4j +@Component +public class FaultTest { + @RabbitListener(queues = RabbitmqConfig.FAULT_MESSAGE_QUEUE) + public void receiveMessage(String faultMessage, Channel channel, Message message) throws Exception { + + // 处理消息 + log.error("故障消费者得到的消息是:{}",faultMessage); + + } +} diff --git a/src/main/java/com/hyc/simulate/faulttest/domain/FaultRecord.java b/src/main/java/com/hyc/simulate/faulttest/domain/FaultRecord.java new file mode 100644 index 0000000..50389ce --- /dev/null +++ b/src/main/java/com/hyc/simulate/faulttest/domain/FaultRecord.java @@ -0,0 +1,12 @@ +package com.hyc.simulate.faulttest.domain; + +/** + * 故障记录类 + * + * @author YouChe·He + * @ClassName: FaultRecord + * @Description: 故障记录类 + * @CreateTime: 2024/6/21 11:32 + */ +public class FaultRecord { +} diff --git a/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java b/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java index d9cb75b..9ad6bf0 100644 --- a/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java +++ b/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java @@ -3,7 +3,7 @@ package com.hyc.simulate.redistest; import com.alibaba.fastjson.JSONObject; import com.hyc.domain.VehicleData; import com.hyc.domain.constant.EventConstant; -import com.hyc.result.Result; +import com.hyc.domain.result.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; @@ -28,15 +28,16 @@ public class RedisCreateKey { @RequestMapping("/createRealTimeDataKey") public Result createRealTimeDataKey(String vin) { - redisTemplate.opsForValue().set(vin + EventConstant.REAL_TIME_DATA,"",3, TimeUnit.MINUTES); + redisTemplate.opsForList().rightPush(vin + EventConstant.REAL_TIME_DATA,""); + redisTemplate.expire(vin+EventConstant.REAL_TIME_DATA, 180, TimeUnit.SECONDS); // 设置键 "key" 的过期时间为180秒 return Result.success("","创建成功"); } @RequestMapping("/getDataByRedis") public Result getDataByRedis(String vin){ - String vehicleJson = redisTemplate.opsForValue().get(vin + EventConstant.REAL_TIME_DATA); + String vehicleJson = redisTemplate.opsForList().index(vin+EventConstant.REAL_TIME_DATA, -1); + redisTemplate.expire(vin+EventConstant.REAL_TIME_DATA, 180, TimeUnit.SECONDS); // 设置键 "your_key" 的过期时间为180秒 VehicleData vehicleData = JSONObject.parseObject(vehicleJson, VehicleData.class); - redisTemplate.opsForValue().set(vin+EventConstant.REAL_TIME_DATA,vehicleJson,3,TimeUnit.MINUTES); return Result.success(vehicleData); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 46d00a5..667b4f6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -22,9 +22,9 @@ spring: matching-strategy: ant_path_matcher datasource: driver-class-name: com.mysql.jdbc.Driver - url: jdbc:mysql://47.103.75.98:3306/netcar?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false + url: jdbc:mysql://47.121.124.220:3306/netcar?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false username: root - password: hyc@123 + password: hyc2002 druid: # 下面为连接池的补充设置,应用到上面所有数据源中 # 初始化大小,最小,最大 @@ -52,81 +52,64 @@ spring: application: name: shop-server redis: + database: 0 host: 47.103.75.98 port: 6379 password: + jedis: + pool: + #最大连接数 + max-active: 15 + #最大阻塞等待时间(负数表示没限制) + max-wait: -1 + #最大空闲 + max-idle: 15 + #最小空闲 + min-idle: 0 + #连接超时时间 + timeout: 10000 kafka: producer: - # Kafka服务器 bootstrap-servers: 121.43.127.44:9092 - # 开启事务,必须在开启了事务的方法中发送,否则报错 transaction-id-prefix: kafkaTx- - # 发生错误后,消息重发的次数,开启事务必须设置大于0。 retries: 3 - # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 - # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 - # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 - # 开启事务时,必须设置为all acks: all - # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 - batch-size: 16384 - # 生产者内存缓冲区的大小。 + batch-size: 86384 buffer-memory: 1024000 - # 键的序列化方式 key-serializer: org.springframework.kafka.support.serializer.JsonSerializer - # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: - # Kafka服务器 bootstrap-servers: 121.43.127.44:9092 group-id: firstGroup - # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D - auto-commit-interval: 2s - # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: - # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 - # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) - # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + auto-commit-interval: 10S + poll-interval: 10000 auto-offset-reset: latest - # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false - # 键的反序列化方式 - #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 properties: spring: json: trusted: packages: "*" - # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 - # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, - # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, - # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 - # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 - # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 - max-poll-records: 3 + max-poll-records: 300 + fetch-min-size: 1024 + fetch-max-wait: 1000 properties: - # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance max: poll: interval: ms: 600000 - # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s session: timeout: ms: 10000 listener: - # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 - concurrency: 4 - # 自动提交关闭,需要设置手动消息确认 + concurrency: 1 ack-mode: manual_immediate - # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 missing-topics-fatal: false - # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance poll-timeout: 600000 + type: batch # mybatis mybatis: diff --git a/src/main/resources/ehcache.xml b/src/main/resources/ehcache.xml index dfccbc2..717782b 100644 --- a/src/main/resources/ehcache.xml +++ b/src/main/resources/ehcache.xml @@ -22,6 +22,16 @@ timeToIdleSeconds="0" diskExpiryThreadIntervalSeconds="120" memoryStoreEvictionPolicy="LRU"/> +