feat 修改reids

master
rouchen 2024-06-18 15:55:19 +08:00
parent daffa478a2
commit 669acc5537
11 changed files with 47351 additions and 43014 deletions

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder @SuperBuilder
public class CarEvent { public class CarEvent {
private Integer id;
private String vin; private String vin;
private String event; private String event;

View File

@ -3,8 +3,11 @@ package com.muyu.event.controller;
import com.muyu.demos.model.Result; import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent; import com.muyu.event.common.CarEvent;
import com.muyu.event.service.EventService; import com.muyu.event.service.EventService;
import com.muyu.mqtt.dao.MessageData;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
@ -23,7 +26,13 @@ public class EventController {
private EventService eventService; private EventService eventService;
@PostMapping("/bindingEvent") @PostMapping("/bindingEvent")
public Result<List<CarEvent>> bindingEvent() { public Result<List<CarEvent>> bindingEvent(@RequestBody CarEvent carEvent) {
return Result.success(eventService.getCarEventList()); return Result.success(eventService.getCarEventList(carEvent));
}
@PostMapping("/selectEvent")
public Result<MessageData> selectEvent(@RequestBody CarEvent carEvent) {
Result<MessageData> messageDataResult = eventService.selectEvent(carEvent);
return messageDataResult;
} }
} }

View File

@ -13,7 +13,7 @@ import java.util.List;
*/ */
@Mapper @Mapper
public interface EventMapper { public interface EventMapper {
List<CarEvent> getCarEventList(); List<CarEvent> getCarEventList(CarEvent carEvent);
} }

View File

@ -1,6 +1,8 @@
package com.muyu.event.service; package com.muyu.event.service;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent; import com.muyu.event.common.CarEvent;
import com.muyu.mqtt.dao.MessageData;
import java.util.List; import java.util.List;
@ -11,6 +13,7 @@ import java.util.List;
* Date 2024/6/17 19:57 * Date 2024/6/17 19:57
*/ */
public interface EventService { public interface EventService {
List<CarEvent> getCarEventList(); List<CarEvent> getCarEventList(CarEvent carEvent);
Result<MessageData> selectEvent(CarEvent carEvent);
} }

View File

@ -1,12 +1,20 @@
package com.muyu.event.service.impl; package com.muyu.event.service.impl;
import com.alibaba.fastjson.JSON;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent; import com.muyu.event.common.CarEvent;
import com.muyu.event.mapper.EventMapper; import com.muyu.event.mapper.EventMapper;
import com.muyu.event.service.EventService; import com.muyu.event.service.EventService;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* EventServiceImpl * EventServiceImpl
@ -15,11 +23,49 @@ import java.util.List;
* Date 2024/6/17 19:57 * Date 2024/6/17 19:57
*/ */
@Service @Service
@Log4j2
public class EventServiceImpl implements EventService { public class EventServiceImpl implements EventService {
@Autowired @Autowired
private EventMapper eventMapper; private EventMapper eventMapper;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Override @Override
public List<CarEvent> getCarEventList() { public List<CarEvent> getCarEventList(CarEvent carEvent) {
return eventMapper.getCarEventList(); return eventMapper.getCarEventList(carEvent);
} }
@Override
public Result<MessageData> selectEvent(CarEvent carEvent) {
if (carEvent.getEvent() != null) {
if (carEvent.getEvent().contains("1")) {
System.out.println("1");
}
if (carEvent.getEvent().contains("2")) {
String maximumCoordinate = getMaximumCoordinate(carEvent.getVin());
MessageData messageData = JSON.parseObject(maximumCoordinate, MessageData.class);
return Result.success(messageData);
}
if (carEvent.getEvent().contains("3")) {
}
}
return Result.error("车辆没有绑定事件");
}
/**
*
* @return
*/
public String getMaximumCoordinate (String vin) {
String index = redisTemplate.opsForList().index(vin, -1);
return index;
}
// //创建分区
// NewTopic newTopic = new NewTopic(topic, 8, (short) 1);
// kafkaAdmin.createOrModifyTopics(newTopic);
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, main.getVin(), JSON.toJSONString(main));
// // 发送消息
// kafkaTemplate.send(producerRecord);
} }

View File

@ -1,5 +1,6 @@
package com.muyu.kafka; package com.muyu.kafka;
import com.alibaba.fastjson.JSON;
import com.muyu.iotDB.service.IotDbServer; import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData; import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -11,16 +12,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/** /**
* SimpleKafkaConsumer * SimpleKafkaConsumer
@ -35,44 +35,23 @@ public class SimpleKafkaConsumer {
@Resource @Resource
private IotDbServer iotDbServer; private IotDbServer iotDbServer;
@Autowired
// @KafkaListener(topics = "test1", groupId = "Partitions") private RedisTemplate<String,String> redisTemplate;
// public void consumer() throws ServerException, IoTDBConnectionException, StatementExecutionException { @KafkaListener(topics = "test1", groupId = "Topics")
// log.info("定时器开启"); public void consume(ConsumerRecord<String, String> record) {
//// List<MessageData> messageDataList = new ArrayList<>(); log.info("开始消费");
//// // 配置Kafka消费者属性 try {
// Properties props = new Properties(); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "Partitions");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//
//// // 创建Kafka消费者实例
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// // 持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// String value = record.value();
// log.info("value:{}",value);
// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
// log.info("messageData1:{}",messageData1);
// iotDbServer.add(messageData1);
// }
//
// }
// }
@KafkaListener(topics = "test1", groupId = "Partitions")
public void consumer(ConsumerRecord<String, String> record) throws ServerException, IoTDBConnectionException, StatementExecutionException {
log.info("Received message");
String value = record.value(); String value = record.value();
log.info("value:{}", value); log.info("value:{}", value);
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class); MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1); log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1); iotDbServer.add(messageData1);
redisTemplate.opsForList().rightPush(messageData1.getVin(), JSON.toJSONString(messageData1));
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑
}
} }

View File

@ -13,6 +13,7 @@
//import org.apache.kafka.common.serialization.StringDeserializer; //import org.apache.kafka.common.serialization.StringDeserializer;
//import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
// //
@ -23,6 +24,7 @@
//import java.util.Collections; //import java.util.Collections;
//import java.util.List; //import java.util.List;
//import java.util.Properties; //import java.util.Properties;
//import java.util.concurrent.ConcurrentLinkedQueue;
// //
///** ///**
// * 定时器 SimpleKafkaConsumer // * 定时器 SimpleKafkaConsumer
@ -34,55 +36,32 @@
//@Log4j2 //@Log4j2
//public class SimpleKafkaConsumer1 { //public class SimpleKafkaConsumer1 {
// //
//
// @Autowired // @Autowired
// private RedisTemplate<String,String> redisTemplate; // private RedisTemplate<String,String> redisTemplate;
// //
//
// public void consumer1() { // public void consumer1() {
// System.out.println(123); // System.out.println(123);
// } // }
// //
//// public void consumer() {
//// List<MessageData> dataArrayList = new ArrayList<>();
//// log.info("添加到reids定时器开启");
//// // 配置Kafka消费者属性
//// Properties props = new Properties();
//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "Partitions");
//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
////
//// // 创建Kafka消费者实例
//// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
////
//// // 订阅主题
//// TopicPartition topicPartition = new TopicPartition("test1", 0);
//// consumer.assign(Collections.singletonList(topicPartition));
////
//// log.info("定时器结束");
//// // 持续消费消息
//// while (true) {
//// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
//// for (ConsumerRecord<String, String> record : records) {
//// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//// String value = record.value();
//// log.info("value:{}", value);
//// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
//// log.info("messageData1:{}", messageData1);
//// dataArrayList.add(messageData1);
//// }
//// if (dataArrayList.size() >= 10) {
//// for (MessageData messageData : dataArrayList) {
//// // 将数据添加到Redis中这里以messageData的id作为keymessageData对象序列化为JSON字符串作为value
//// redisTemplate.opsForList().rightPush(messageData.getVin(), messageData.toString());
//// }
//// dataArrayList.clear();
//// }
////
//// }
// //
// //
// @KafkaListener(topics = "test1", groupId = "Topics")
// public void consume(ConsumerRecord<String, String> record) {
// log.info("添加reids");
// try {
// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// String value = record.value();
// log.info("value:{}", value);
// MessageData messageData = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
// log.info("messageData:{}", messageData);
//
// } catch (Exception e) {
// log.error("Error consuming Kafka message", e);
// // 处理异常,可能需要重试或其他逻辑
// }
// }
//
// //
//} //}
// //

View File

@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.muyu.mqtt.dao.MessageData; import com.muyu.mqtt.dao.MessageData;
import com.muyu.utils.ConversionUtil; import com.muyu.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallback;
@ -15,6 +16,10 @@ import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* MessageCallbackService * MessageCallbackService
* *
@ -35,6 +40,7 @@ public class MessageCallbackService implements MqttCallback {
System.out.println("connectionLost:"+cause.getMessage()); System.out.println("connectionLost:"+cause.getMessage());
} }
// AtomicInteger partitionCounter = new AtomicInteger(-1);
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) { public void messageArrived(String topic, MqttMessage mqttMessage) {
@ -43,49 +49,17 @@ public class MessageCallbackService implements MqttCallback {
log.info("message content:{}",new String(mqttMessage.getPayload())); log.info("message content:{}",new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload()); String s = new String(mqttMessage.getPayload());
MessageData main = ConversionUtil.main(s); MessageData main = ConversionUtil.main(s);
// try {
// List<NewTopic> newTopicList = createTopic(topic,main); // int numPartitions = 8;
// for (NewTopic newTopic : newTopicList) { // NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
//// int partitionIndex = getPartitionIndexWithWeight(newTopic.getPartitions()); // // 使用轮询计数器来选择分区,并确保它在分区数范围内
// kafkaAdmin.createOrModifyTopics(newTopic); // int partition = partitionCounter.getAndUpdate(prev -> (prev + 1) % numPartitions);
// ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(topic, main.getVin(), main.toString());
// kafkaTemplate.send(stringObjectProducerRecord); // 创建ProducerRecord并指定分区
// } ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, null, main.getVin(), JSON.toJSONString(main));
// } catch (Exception e) { // 发送消息
// e.printStackTrace(); kafkaTemplate.send(producerRecord);
// }
ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(topic, main.getVin(), JSON.toJSONString(main));
kafkaTemplate.send(stringObjectProducerRecord);
} }
private int currentPartitionIndex = 0;
// public List<NewTopic> createTopic(String topic,MessageData messageData) {
// // 创建Kafka生产者实例
// List<NewTopic> newTopics = new ArrayList<>();
// newTopics.add(new NewTopic(topic, 8, (short) 1));
//// ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(topic, messageData.getVin(), messageData.toString());
//// kafkaTemplate.send(stringObjectProducerRecord);
// return newTopics;
// }
// public static int getPartitionIndexWithWeight(List<PartitionInfo> partitions) {
// int totalWeight = 0;
// for (PartitionInfo partition : partitions) {
// totalWeight += partition.getReplicas().size();
// }
//
// int randomNum = new Random().nextInt(totalWeight);
// int currentWeight = 0;
// for (int i = 0; i < partitions.size(); i++) {
// currentWeight += partitions.get(i).getReplicas().size();
// if (randomNum < currentWeight) {
// return i;
// }
// }
//
// return -1; // should never reach here
// }
@Override @Override

View File

@ -56,6 +56,11 @@ forest:
log-enabled: false log-enabled: false
kafka: kafka:
bootstrap-servers: 127.0.0.1:9092 bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: Topics
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer: producer:
acks: all acks: all
retries: 0 retries: 0
@ -64,3 +69,4 @@ kafka:
value-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

View File

@ -4,6 +4,6 @@
<mapper namespace="com.muyu.event.mapper.EventMapper"> <mapper namespace="com.muyu.event.mapper.EventMapper">
<select id="getCarEventList" resultType="com.muyu.event.common.CarEvent"> <select id="getCarEventList" resultType="com.muyu.event.common.CarEvent">
select * from car_event select * from car_event where vin = #{vin}
</select> </select>
</mapper> </mapper>