fix kafka的消费者

master
rouchen 2024-06-17 22:31:07 +08:00
parent 1b324da647
commit daffa478a2
8 changed files with 210 additions and 0 deletions

Binary file not shown.

View File

@ -0,0 +1,24 @@
package com.muyu.event.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* CarEvent
*
* @author Yangle
* Date 2024/6/17 19:55
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class CarEvent {
private Integer id;
private String vin;
private String event;
}

View File

@ -0,0 +1,29 @@
package com.muyu.event.controller;
import com.muyu.demos.model.Result;
import com.muyu.event.common.CarEvent;
import com.muyu.event.service.EventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* EventController
*
* @author Yangle
* Date 2024/6/17 19:53
*/
@RestController("/event")
public class EventController {
@Autowired
private EventService eventService;
@PostMapping("/bindingEvent")
public Result<List<CarEvent>> bindingEvent() {
return Result.success(eventService.getCarEventList());
}
}

View File

@ -0,0 +1,19 @@
package com.muyu.event.mapper;
import com.muyu.event.common.CarEvent;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* EventMapper
*
* @author Yangle
* Date 2024/6/17 20:00
*/
@Mapper
public interface EventMapper {
List<CarEvent> getCarEventList();
}

View File

@ -0,0 +1,16 @@
package com.muyu.event.service;
import com.muyu.event.common.CarEvent;
import java.util.List;
/**
* EventService
*
* @author Yangle
* Date 2024/6/17 19:57
*/
public interface EventService {
List<CarEvent> getCarEventList();
}

View File

@ -0,0 +1,25 @@
package com.muyu.event.service.impl;
import com.muyu.event.common.CarEvent;
import com.muyu.event.mapper.EventMapper;
import com.muyu.event.service.EventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* EventServiceImpl
*
* @author Yangle
* Date 2024/6/17 19:57
*/
@Service
public class EventServiceImpl implements EventService {
@Autowired
private EventMapper eventMapper;
@Override
public List<CarEvent> getCarEventList() {
return eventMapper.getCarEventList();
}
}

View File

@ -0,0 +1,88 @@
//package com.muyu.kafka;
//
//import com.muyu.iotDB.service.IotDbServer;
//import com.muyu.mqtt.dao.MessageData;
//import lombok.extern.log4j.Log4j2;
//import org.apache.iotdb.rpc.IoTDBConnectionException;
//import org.apache.iotdb.rpc.StatementExecutionException;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.apache.kafka.clients.consumer.ConsumerRecords;
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//import org.apache.kafka.common.TopicPartition;
//import org.apache.kafka.common.serialization.StringDeserializer;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.rmi.ServerException;
//import java.time.Duration;
//import java.util.ArrayList;
//import java.util.Collections;
//import java.util.List;
//import java.util.Properties;
//
///**
// * 定时器 SimpleKafkaConsumer
// *
// * @author Yangle
// * Date 2024/6/16 22:18
// */
//@Component
//@Log4j2
//public class SimpleKafkaConsumer1 {
//
// @Autowired
// private RedisTemplate<String,String> redisTemplate;
//
//
// public void consumer1() {
// System.out.println(123);
// }
//
//// public void consumer() {
//// List<MessageData> dataArrayList = new ArrayList<>();
//// log.info("添加到reids定时器开启");
//// // 配置Kafka消费者属性
//// Properties props = new Properties();
//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "Partitions");
//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
////
//// // 创建Kafka消费者实例
//// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
////
//// // 订阅主题
//// TopicPartition topicPartition = new TopicPartition("test1", 0);
//// consumer.assign(Collections.singletonList(topicPartition));
////
//// log.info("定时器结束");
//// // 持续消费消息
//// while (true) {
//// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
//// for (ConsumerRecord<String, String> record : records) {
//// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//// String value = record.value();
//// log.info("value:{}", value);
//// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
//// log.info("messageData1:{}", messageData1);
//// dataArrayList.add(messageData1);
//// }
//// if (dataArrayList.size() >= 10) {
//// for (MessageData messageData : dataArrayList) {
//// // 将数据添加到Redis中这里以messageData的id作为keymessageData对象序列化为JSON字符串作为value
//// redisTemplate.opsForList().rightPush(messageData.getVin(), messageData.toString());
//// }
//// dataArrayList.clear();
//// }
////
//// }
//
//
//
//}
//

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.muyu.event.mapper.EventMapper">
<select id="getCarEventList" resultType="com.muyu.event.common.CarEvent">
select * from car_event
</select>
</mapper>