修改实体类规范
parent
fd529f0cec
commit
dfacea67f6
|
@ -6,6 +6,14 @@ import lombok.AllArgsConstructor;
|
|||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 车辆类型对象
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.sheep.message.domain
|
||||
* @Project:cloud-server-c
|
||||
* @name:MessageTemplateType
|
||||
* @Date:2024/9/18 21:01
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
|
|
|
@ -10,6 +10,7 @@ import lombok.NoArgsConstructor;
|
|||
import lombok.experimental.SuperBuilder;
|
||||
import java.io.Serializable;
|
||||
/**
|
||||
* 数据类型对象
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.sheep.message.domain
|
||||
* @Project:cloud-server-c
|
||||
|
|
|
@ -9,6 +9,7 @@ import lombok.experimental.SuperBuilder;
|
|||
import java.sql.Date;
|
||||
|
||||
/**
|
||||
* 新能源车模版
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.template.domain
|
||||
* @Project:cloud-server
|
||||
|
|
|
@ -11,6 +11,7 @@ import lombok.experimental.SuperBuilder;
|
|||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 模版对应的配置
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.sheep.message.domain
|
||||
* @Project:cloud-server-c
|
||||
|
@ -25,6 +26,7 @@ import java.io.Serializable;
|
|||
@TableName(value = "message_template_type",autoResultMap = true)
|
||||
public class MessageTemplateType implements Serializable {
|
||||
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
|
|
|
@ -9,6 +9,14 @@ import lombok.Data;
|
|||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 车辆对象
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.sheep.message.domain
|
||||
* @Project:cloud-server-c
|
||||
* @name:MessageTemplateType
|
||||
* @Date:2024/9/18 21:01
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
|
|
|
@ -10,6 +10,14 @@ import lombok.NoArgsConstructor;
|
|||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 车辆记录对象
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.sheep.message.domain
|
||||
* @Project:cloud-server-c
|
||||
* @name:MessageTemplateType
|
||||
* @Date:2024/9/18 21:01
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
|
|
|
@ -12,6 +12,7 @@ import lombok.experimental.SuperBuilder;
|
|||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 报文模版对象
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.template.domain
|
||||
* @Project:cloud-server-c
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
package com.muyu.common.util;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* kafka通用配置
|
||||
* @author liuxinyue
|
||||
* @Package:com.muyu.common.util
|
||||
* @name:KafkaCommonProperties
|
||||
* @Date:2024/9/29 12:26
|
||||
*/
|
||||
public class KafkaCommonProperties{
|
||||
|
||||
/**
|
||||
* Kafka主机
|
||||
*/
|
||||
private String kafkaHost = "47.101.53.251:9092";
|
||||
|
||||
/**
|
||||
* 生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化
|
||||
* 消息确认:
|
||||
* 0:生产者不等待服务器确认,此时retry参数不生效
|
||||
* 1:leader写入记录到log,不会等待follower的确认即向生产者发送通知
|
||||
* all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-1
|
||||
*/
|
||||
private String ack = "all";
|
||||
|
||||
/**
|
||||
* 生产者重试次数
|
||||
*/
|
||||
private Integer retryTimes = 1;
|
||||
|
||||
/**
|
||||
* 生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K
|
||||
*/
|
||||
private Integer batchSize = 16384;
|
||||
|
||||
/**
|
||||
* 生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒
|
||||
*/
|
||||
private Integer lingerMs = 1;
|
||||
|
||||
/**
|
||||
* 生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M.
|
||||
*/
|
||||
private Integer bufferMemory = 33554432;
|
||||
|
||||
/**
|
||||
* 生产者:键编码器
|
||||
*/
|
||||
private String keyEncoder = "org.apache.kafka.common.serialization.StringSerializer";
|
||||
|
||||
/**
|
||||
* 生产者:值编码器
|
||||
*/
|
||||
private String valueEncoder = "org.apache.kafka.common.serialization.StringSerializer";
|
||||
|
||||
/**
|
||||
* 消费者:消费topic的组ID
|
||||
*/
|
||||
private String groupId = "my-group-id";
|
||||
|
||||
/**
|
||||
* 消费者:后台定期提交offset
|
||||
*/
|
||||
private String autoCommit = "true";
|
||||
|
||||
/**
|
||||
* 消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效
|
||||
*/
|
||||
private String autoCommitIntervalMs = "1000";
|
||||
|
||||
/**
|
||||
* 消费者:键解码器
|
||||
*/
|
||||
private String keyDecoder = "org.apache.kafka.common.serialization.StringDeserializer";
|
||||
|
||||
/**
|
||||
* 消费者:值解码器
|
||||
*/
|
||||
private String valueDecoder = "org.apache.kafka.common.serialization.StringDeserializer";
|
||||
|
||||
/**
|
||||
* 消费者:重启后配置offset
|
||||
* earliest:消费者恢复到当前topic最早的offset
|
||||
* latest:消费者从最新的offset开始消费
|
||||
* none:如果消费者组没找到之前的offset抛出异常
|
||||
* 其他任何值都会抛出异常
|
||||
*/
|
||||
private String autoOffsetReset = "latest";
|
||||
|
||||
/**
|
||||
* TOPIC
|
||||
*/
|
||||
private Collection<String> topic = Collections.singleton("my-topic");
|
||||
|
||||
public KafkaCommonProperties() {
|
||||
|
||||
}
|
||||
|
||||
public KafkaCommonProperties(String kafkaHost, String ack, Integer retryTimes, Integer batchSize, Integer lingerMs, Integer bufferMemory, String keyEncoder, String valueEncoder, String groupId, String autoCommit, String autoCommitIntervalMs, String keyDecoder, String valueDecoder, String autoOffsetReset, Collection<String> topic) {
|
||||
this.kafkaHost = kafkaHost;
|
||||
this.ack = ack;
|
||||
this.retryTimes = retryTimes;
|
||||
this.batchSize = batchSize;
|
||||
this.lingerMs = lingerMs;
|
||||
this.bufferMemory = bufferMemory;
|
||||
this.keyEncoder = keyEncoder;
|
||||
this.valueEncoder = valueEncoder;
|
||||
this.groupId = groupId;
|
||||
this.autoCommit = autoCommit;
|
||||
this.autoCommitIntervalMs = autoCommitIntervalMs;
|
||||
this.keyDecoder = keyDecoder;
|
||||
this.valueDecoder = valueDecoder;
|
||||
this.autoOffsetReset = autoOffsetReset;
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public String getKafkaHost() {
|
||||
return kafkaHost;
|
||||
}
|
||||
|
||||
public void setKafkaHost(String kafkaHost) {
|
||||
this.kafkaHost = kafkaHost;
|
||||
}
|
||||
|
||||
public String getAck() {
|
||||
return ack;
|
||||
}
|
||||
|
||||
public void setAck(String ack) {
|
||||
this.ack = ack;
|
||||
}
|
||||
|
||||
public Integer getRetryTimes() {
|
||||
return retryTimes;
|
||||
}
|
||||
|
||||
public void setRetryTimes(Integer retryTimes) {
|
||||
this.retryTimes = retryTimes;
|
||||
}
|
||||
|
||||
public Integer getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
public void setBatchSize(Integer batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public Integer getLingerMs() {
|
||||
return lingerMs;
|
||||
}
|
||||
|
||||
public void setLingerMs(Integer lingerMs) {
|
||||
this.lingerMs = lingerMs;
|
||||
}
|
||||
|
||||
public Integer getBufferMemory() {
|
||||
return bufferMemory;
|
||||
}
|
||||
|
||||
public void setBufferMemory(Integer bufferMemory) {
|
||||
this.bufferMemory = bufferMemory;
|
||||
}
|
||||
|
||||
public String getKeyEncoder() {
|
||||
return keyEncoder;
|
||||
}
|
||||
|
||||
public void setKeyEncoder(String keyEncoder) {
|
||||
this.keyEncoder = keyEncoder;
|
||||
}
|
||||
|
||||
public String getValueEncoder() {
|
||||
return valueEncoder;
|
||||
}
|
||||
|
||||
public void setValueEncoder(String valueEncoder) {
|
||||
this.valueEncoder = valueEncoder;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
return groupId;
|
||||
}
|
||||
|
||||
public void setGroupId(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public String getAutoCommit() {
|
||||
return autoCommit;
|
||||
}
|
||||
|
||||
public void setAutoCommit(String autoCommit) {
|
||||
this.autoCommit = autoCommit;
|
||||
}
|
||||
|
||||
public String getAutoCommitIntervalMs() {
|
||||
return autoCommitIntervalMs;
|
||||
}
|
||||
|
||||
public void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
|
||||
this.autoCommitIntervalMs = autoCommitIntervalMs;
|
||||
}
|
||||
|
||||
public String getKeyDecoder() {
|
||||
return keyDecoder;
|
||||
}
|
||||
|
||||
public void setKeyDecoder(String keyDecoder) {
|
||||
this.keyDecoder = keyDecoder;
|
||||
}
|
||||
|
||||
public String getValueDecoder() {
|
||||
return valueDecoder;
|
||||
}
|
||||
|
||||
public void setValueDecoder(String valueDecoder) {
|
||||
this.valueDecoder = valueDecoder;
|
||||
}
|
||||
|
||||
public String getAutoOffsetReset() {
|
||||
return autoOffsetReset;
|
||||
}
|
||||
|
||||
public void setAutoOffsetReset(String autoOffsetReset) {
|
||||
this.autoOffsetReset = autoOffsetReset;
|
||||
}
|
||||
|
||||
public Collection<String> getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(Collection<String> topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
package com.muyu.common.util;
|
||||
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
/**
|
||||
* kafka生产
|
||||
* @author liuxinyue
|
||||
* @Package:com.muyu.common.util
|
||||
* @name:KafkaProducerTest
|
||||
* @Date:2024/9/29 12:27
|
||||
*/
|
||||
public class KafkaProducerTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);
|
||||
|
||||
public static KafkaProducer<String, String> getDefaultKafkaProducer(KafkaCommonProperties kafkaCommonProperties) {
|
||||
Properties properties = new Properties();
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost());
|
||||
properties.put(ProducerConfig.ACKS_CONFIG, kafkaCommonProperties.getAck());
|
||||
properties.put(ProducerConfig.RETRIES_CONFIG, kafkaCommonProperties.getRetryTimes());
|
||||
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaCommonProperties.getBatchSize());
|
||||
properties.put(ProducerConfig.LINGER_MS_CONFIG, kafkaCommonProperties.getLingerMs());
|
||||
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaCommonProperties.getBufferMemory());
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyEncoder());
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueEncoder());
|
||||
return new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
static class MyProducerCallback implements Callback {
|
||||
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
if (Objects.nonNull(exception)) {
|
||||
logger.error(">>>>>>>>>>Producer生产消息异常:", exception);
|
||||
}
|
||||
if (Objects.nonNull(metadata)) {
|
||||
logger.info(">>>>>>>>>>Producer生产消息:metadata:{},partition:{}, offset:{}", metadata, metadata.partition(), metadata.offset());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties();
|
||||
KafkaProducer<String, String> producer = getDefaultKafkaProducer(kafkaCommonProperties);
|
||||
String message = "hello world ";
|
||||
try {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
// 异步写入数据
|
||||
String topic = kafkaCommonProperties.getTopic().toArray()[0].toString();
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message + i);
|
||||
producer.send(producerRecord, new MyProducerCallback());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.error(">>>>>>>>生产数据异常:", ex);
|
||||
throw new RuntimeException(ex);
|
||||
} finally {
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue