feat: 新增根据vin将实时数据存入redis

master
yaoxin 2024-06-18 20:49:56 +08:00
parent 9459bb2576
commit cb1abfa949
8 changed files with 443 additions and 9 deletions

View File

@ -0,0 +1,134 @@
package com.muyu.eventdriven.constants;
/**
*
*
* @author muyu
*/
public class Constants {
/**
* UTF-8
*/
public static final String UTF8 = "UTF-8";
/**
* GBK
*/
public static final String GBK = "GBK";
/**
* www
*/
public static final String WWW = "www.";
/**
* RMI
*/
public static final String LOOKUP_RMI = "rmi:";
/**
* LDAP
*/
public static final String LOOKUP_LDAP = "ldap:";
/**
* LDAPS
*/
public static final String LOOKUP_LDAPS = "ldaps:";
/**
* http
*/
public static final String HTTP = "http://";
/**
* https
*/
public static final String HTTPS = "https://";
/**
*
*/
public static final Integer SUCCESS = 200;
/**
*
*/
public static final Integer FAIL = 500;
/**
*
*/
public static final String LOGIN_SUCCESS_STATUS = "0";
/**
*
*/
public static final String LOGIN_FAIL_STATUS = "1";
/**
*
*/
public static final String LOGIN_SUCCESS = "Success";
/**
*
*/
public static final String LOGOUT = "Logout";
/**
*
*/
public static final String REGISTER = "Register";
/**
*
*/
public static final String LOGIN_FAIL = "Error";
/**
*
*/
public static final String PAGE_NUM = "pageNum";
/**
*
*/
public static final String PAGE_SIZE = "pageSize";
/**
*
*/
public static final String ORDER_BY_COLUMN = "orderByColumn";
/**
* "desc" "asc".
*/
public static final String IS_ASC = "isAsc";
/**
*
*/
public static final long CAPTCHA_EXPIRATION = 2;
/**
*
*/
public static final String RESOURCE_PREFIX = "/profile";
/**
* json
*/
public static final String[] JSON_WHITELIST_STR = {"org.springframework", "com.muyu"};
/**
* 访
*/
public static final String[] JOB_WHITELIST_STR = {"com.muyu"};
/**
*
*/
public static final String[] JOB_ERROR_STR = {"java.net.URL", "javax.naming.InitialContext", "org.yaml.snakeyaml",
"org.springframework", "org.apache", "com.muyu.common.core.utils.file"};
}

View File

@ -0,0 +1,93 @@
package com.muyu.eventdriven.constants;
/**
*
*
* @author muyu
*/
public class HttpStatus {
/**
*
*/
public static final int SUCCESS = 200;
/**
*
*/
public static final int CREATED = 201;
/**
*
*/
public static final int ACCEPTED = 202;
/**
*
*/
public static final int NO_CONTENT = 204;
/**
*
*/
public static final int MOVED_PERM = 301;
/**
*
*/
public static final int SEE_OTHER = 303;
/**
*
*/
public static final int NOT_MODIFIED = 304;
/**
*
*/
public static final int BAD_REQUEST = 400;
/**
*
*/
public static final int UNAUTHORIZED = 401;
/**
* 访
*/
public static final int FORBIDDEN = 403;
/**
*
*/
public static final int NOT_FOUND = 404;
/**
* http
*/
public static final int BAD_METHOD = 405;
/**
*
*/
public static final int CONFLICT = 409;
/**
*
*/
public static final int UNSUPPORTED_TYPE = 415;
/**
*
*/
public static final int ERROR = 500;
/**
*
*/
public static final int NOT_IMPLEMENTED = 501;
/**
*
*/
public static final int WARN = 601;
}

View File

@ -1,14 +1,11 @@
package com.muyu.eventdriven.consumer;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.domain.VehicleKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@ -26,14 +23,11 @@ public class KafkaConsumers {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private RedisTemplate<String,String> redisTemplate;
public KafkaConsumer kafkaConsumer(String vin){
Object o = redisTemplate.opsForHash().get("vehicleKafka", vin);
VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
public KafkaConsumer kafkaConsumer(String vin,VehicleKafka vehicleKafka){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@ -47,6 +41,8 @@ public class KafkaConsumers {
List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions()));
consumer.assign(topicPartitions);
// 设置每个分区的位移为latest即只消费启动后发布的消息
consumer.seekToEnd(topicPartitions);
return consumer;
}

View File

@ -1,6 +1,11 @@
package com.muyu.eventdriven.controller;
import com.muyu.eventdriven.domain.rest.Result;
import com.muyu.eventdriven.server.EventInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@ -12,5 +17,16 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/eventInfo")
public class EventInfoController {
@Autowired
private EventInfoService eventInfoService;
@GetMapping("/CreatKafkaConsumer")
public void creatKafkaConsumer(@RequestParam("vin") String vin) {
eventInfoService.creatKafkaConsumer(vin);
}
@GetMapping("/CloseKafkaConsumer")
public Result closeKafkaConsumer(@RequestParam("vin") String vin) {
return eventInfoService.closeKafkaConsumer(vin);
}
}

View File

@ -0,0 +1,112 @@
package com.muyu.eventdriven.domain.rest;
import com.muyu.eventdriven.constants.Constants;
import com.muyu.eventdriven.constants.HttpStatus;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
*
*
* @author muyu
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Result<T> implements Serializable {
/**
*
*/
public static final int SUCCESS = Constants.SUCCESS;
/**
*
*/
public static final int FAIL = Constants.FAIL;
/**
*
*/
public static final int WARN = HttpStatus.WARN;
private static final long serialVersionUID = 1L;
private int code;
private String msg;
private T data;
public static <T> Result<T> success () {
return restResult(null, SUCCESS, null);
}
public static <T> Result<T> success (T data) {
return restResult(data, SUCCESS, null);
}
public static <T> Result<T> success (T data, String msg) {
return restResult(data, SUCCESS, msg);
}
public static <T> Result<T> error () {
return restResult(null, FAIL, null);
}
public static <T> Result<T> error (String msg) {
return restResult(null, FAIL, msg);
}
public static <T> Result<T> error (T data) {
return restResult(data, FAIL, null);
}
public static <T> Result<T> error (T data, String msg) {
return restResult(data, FAIL, msg);
}
public static <T> Result<T> error (int code, String msg) {
return restResult(null, code, msg);
}
public static <T> Result<T> warn () {
return restResult(null, WARN, null);
}
public static <T> Result<T> warn (String msg) {
return restResult(null, WARN, msg);
}
public static <T> Result<T> warn (T data) {
return restResult(data, WARN, null);
}
public static <T> Result<T> warn (T data, String msg) {
return restResult(data, WARN, msg);
}
public static <T> Result<T> warn (int code, String msg) {
return restResult(null, code, msg);
}
private static <T> Result<T> restResult (T data, int code, String msg) {
return Result.<T>builder()
.code(code)
.data(data)
.msg(msg)
.build();
}
public static <T> Boolean isError (Result<T> ret) {
return !isSuccess(ret);
}
public static <T> Boolean isSuccess (Result<T> ret) {
return Result.SUCCESS == ret.getCode();
}
}

View File

@ -0,0 +1,15 @@
package com.muyu.eventdriven.server;
import com.muyu.eventdriven.domain.rest.Result;
/**
* @ClassName EventInfo
* @Description
* @Author Xin.Yao
* @Date 2024/6/18 9:34
*/
public interface EventInfoService {
void creatKafkaConsumer(String vin);
Result closeKafkaConsumer(String vin);
}

View File

@ -0,0 +1,68 @@
package com.muyu.eventdriven.server.impl;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.consumer.KafkaConsumers;
import com.muyu.eventdriven.domain.VehicleData;
import com.muyu.eventdriven.domain.VehicleKafka;
import com.muyu.eventdriven.domain.rest.Result;
import com.muyu.eventdriven.mapper.EventDrivenMapper;
import com.muyu.eventdriven.server.EventInfoService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @ClassName EventInfoServiceImpl
* @Description
* @Author Xin.Yao
* @Date 2024/6/18 9:35
*/
@Service
@Log4j2
public class EventInfoServiceImpl implements EventInfoService {
private static Map<String,KafkaConsumer> kafkaConsumerMap = new HashMap<>();
@Autowired
private KafkaConsumers kafkaConsumers;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private EventDrivenMapper eventDrivenMapper;
@Override
public void creatKafkaConsumer(String vin) {
Object o = redisTemplate.opsForHash().get("vehicleKafka", vin);
VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
redisTemplate.opsForList().rightPush(vin,"");
redisTemplate.expire(vin, 10, TimeUnit.HOURS);
if (!kafkaConsumerMap.containsKey(vehicleKafka.getConsumerName() + "-" + vehicleKafka.getPartitions())) {
KafkaConsumer kafkaConsumer = kafkaConsumers.kafkaConsumer(vin,vehicleKafka);
kafkaConsumerMap.put(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions(),kafkaConsumer);
while (kafkaConsumerMap.containsKey(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions())){
// 拉取消息
ConsumerRecords<String, String> msg = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : msg) {
if (redisTemplate.hasKey(vin)){
log.info("{}监听到的消息内容: {}", vin,consumerRecord.value());
redisTemplate.opsForList().rightPush(vin,consumerRecord.value());
}
}
}
}
}
@Override
public Result closeKafkaConsumer(String vin) {
redisTemplate.delete(vin);
return Result.success("释放消费者");
}
}

View File

@ -14,7 +14,7 @@ spring:
#这个可以和config/consumer.properties里的group.id不同
group-id: test-consumer-group
redis:
host: 127.0.0.1
host: 101.34.248.9
port: 6379
password:
datasource: