diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/CarType.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/CarType.java index 52ea7a0..ccaa7ca 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/CarType.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/CarType.java @@ -6,6 +6,14 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +/** + * 车辆类型对象 + * @Author:liuxinyue + * @Package:com.sheep.message.domain + * @Project:cloud-server-c + * @name:MessageTemplateType + * @Date:2024/9/18 21:01 + */ @Data @AllArgsConstructor @NoArgsConstructor diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/DataType.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/DataType.java index b70a583..7fb9ea2 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/DataType.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/DataType.java @@ -10,6 +10,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import java.io.Serializable; /** + * 数据类型对象 * @Author:liuxinyue * @Package:com.sheep.message.domain * @Project:cloud-server-c diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplate.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplate.java index b14c894..8f21d62 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplate.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplate.java @@ -9,6 +9,7 @@ import lombok.experimental.SuperBuilder; import java.sql.Date; /** + * 新能源车模版 * @Author:liuxinyue * @Package:com.template.domain * @Project:cloud-server diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplateType.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplateType.java index 6d84684..3cdd2a5 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplateType.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/MessageTemplateType.java @@ -11,6 +11,7 @@ import lombok.experimental.SuperBuilder; import java.io.Serializable; /** + * 模版对应的配置 * @Author:liuxinyue * @Package:com.sheep.message.domain * @Project:cloud-server-c @@ -25,6 +26,7 @@ import java.io.Serializable; @TableName(value = "message_template_type",autoResultMap = true) public class MessageTemplateType implements Serializable { + /** * 主键 */ diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java index fd544eb..85d7e28 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java @@ -9,6 +9,14 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +/** + * 车辆对象 + * @Author:liuxinyue + * @Package:com.sheep.message.domain + * @Project:cloud-server-c + * @name:MessageTemplateType + * @Date:2024/9/18 21:01 + */ @Data @AllArgsConstructor @NoArgsConstructor diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCarLog.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCarLog.java index 4d13240..1f1833d 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCarLog.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCarLog.java @@ -10,6 +10,14 @@ import lombok.NoArgsConstructor; import java.util.Date; +/** + * 车辆记录对象 + * @Author:liuxinyue + * @Package:com.sheep.message.domain + * @Project:cloud-server-c + * @name:MessageTemplateType + * @Date:2024/9/18 21:01 + */ @Data @AllArgsConstructor @NoArgsConstructor diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/Template.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/Template.java index 58f9c2d..70dbf2c 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/Template.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/Template.java @@ -12,6 +12,7 @@ import lombok.experimental.SuperBuilder; import java.util.Date; /** + * 报文模版对象 * @Author:liuxinyue * @Package:com.template.domain * @Project:cloud-server-c diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaCommonProperties.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaCommonProperties.java new file mode 100644 index 0000000..a713ae1 --- /dev/null +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaCommonProperties.java @@ -0,0 +1,239 @@ +package com.muyu.common.util; + +import java.util.Collection; +import java.util.Collections; + +/** + * kafka通用配置 + * @author liuxinyue + * @Package:com.muyu.common.util + * @name:KafkaCommonProperties + * @Date:2024/9/29 12:26 + */ +public class KafkaCommonProperties{ + + /** + * Kafka主机 + */ + private String kafkaHost = "47.101.53.251:9092"; + + /** + * 生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化 + * 消息确认: + * 0:生产者不等待服务器确认,此时retry参数不生效 + * 1:leader写入记录到log,不会等待follower的确认即向生产者发送通知 + * all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-1 + */ + private String ack = "all"; + + /** + * 生产者重试次数 + */ + private Integer retryTimes = 1; + + /** + * 生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K + */ + private Integer batchSize = 16384; + + /** + * 生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒 + */ + private Integer lingerMs = 1; + + /** + * 生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M. + */ + private Integer bufferMemory = 33554432; + + /** + * 生产者:键编码器 + */ + private String keyEncoder = "org.apache.kafka.common.serialization.StringSerializer"; + + /** + * 生产者:值编码器 + */ + private String valueEncoder = "org.apache.kafka.common.serialization.StringSerializer"; + + /** + * 消费者:消费topic的组ID + */ + private String groupId = "my-group-id"; + + /** + * 消费者:后台定期提交offset + */ + private String autoCommit = "true"; + + /** + * 消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效 + */ + private String autoCommitIntervalMs = "1000"; + + /** + * 消费者:键解码器 + */ + private String keyDecoder = "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * 消费者:值解码器 + */ + private String valueDecoder = "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * 消费者:重启后配置offset + * earliest:消费者恢复到当前topic最早的offset + * latest:消费者从最新的offset开始消费 + * none:如果消费者组没找到之前的offset抛出异常 + * 其他任何值都会抛出异常 + */ + private String autoOffsetReset = "latest"; + + /** + * TOPIC + */ + private Collection topic = Collections.singleton("my-topic"); + + public KafkaCommonProperties() { + + } + + public KafkaCommonProperties(String kafkaHost, String ack, Integer retryTimes, Integer batchSize, Integer lingerMs, Integer bufferMemory, String keyEncoder, String valueEncoder, String groupId, String autoCommit, String autoCommitIntervalMs, String keyDecoder, String valueDecoder, String autoOffsetReset, Collection topic) { + this.kafkaHost = kafkaHost; + this.ack = ack; + this.retryTimes = retryTimes; + this.batchSize = batchSize; + this.lingerMs = lingerMs; + this.bufferMemory = bufferMemory; + this.keyEncoder = keyEncoder; + this.valueEncoder = valueEncoder; + this.groupId = groupId; + this.autoCommit = autoCommit; + this.autoCommitIntervalMs = autoCommitIntervalMs; + this.keyDecoder = keyDecoder; + this.valueDecoder = valueDecoder; + this.autoOffsetReset = autoOffsetReset; + this.topic = topic; + } + + public String getKafkaHost() { + return kafkaHost; + } + + public void setKafkaHost(String kafkaHost) { + this.kafkaHost = kafkaHost; + } + + public String getAck() { + return ack; + } + + public void setAck(String ack) { + this.ack = ack; + } + + public Integer getRetryTimes() { + return retryTimes; + } + + public void setRetryTimes(Integer retryTimes) { + this.retryTimes = retryTimes; + } + + public Integer getBatchSize() { + return batchSize; + } + + public void setBatchSize(Integer batchSize) { + this.batchSize = batchSize; + } + + public Integer getLingerMs() { + return lingerMs; + } + + public void setLingerMs(Integer lingerMs) { + this.lingerMs = lingerMs; + } + + public Integer getBufferMemory() { + return bufferMemory; + } + + public void setBufferMemory(Integer bufferMemory) { + this.bufferMemory = bufferMemory; + } + + public String getKeyEncoder() { + return keyEncoder; + } + + public void setKeyEncoder(String keyEncoder) { + this.keyEncoder = keyEncoder; + } + + public String getValueEncoder() { + return valueEncoder; + } + + public void setValueEncoder(String valueEncoder) { + this.valueEncoder = valueEncoder; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getAutoCommit() { + return autoCommit; + } + + public void setAutoCommit(String autoCommit) { + this.autoCommit = autoCommit; + } + + public String getAutoCommitIntervalMs() { + return autoCommitIntervalMs; + } + + public void setAutoCommitIntervalMs(String autoCommitIntervalMs) { + this.autoCommitIntervalMs = autoCommitIntervalMs; + } + + public String getKeyDecoder() { + return keyDecoder; + } + + public void setKeyDecoder(String keyDecoder) { + this.keyDecoder = keyDecoder; + } + + public String getValueDecoder() { + return valueDecoder; + } + + public void setValueDecoder(String valueDecoder) { + this.valueDecoder = valueDecoder; + } + + public String getAutoOffsetReset() { + return autoOffsetReset; + } + + public void setAutoOffsetReset(String autoOffsetReset) { + this.autoOffsetReset = autoOffsetReset; + } + + public Collection getTopic() { + return topic; + } + + public void setTopic(Collection topic) { + this.topic = topic; + } +} diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaProducerTest.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaProducerTest.java new file mode 100644 index 0000000..004dff9 --- /dev/null +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/util/KafkaProducerTest.java @@ -0,0 +1,64 @@ +package com.muyu.common.util; + +import org.apache.kafka.clients.producer.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.Properties; +/** + * kafka生产 + * @author liuxinyue + * @Package:com.muyu.common.util + * @name:KafkaProducerTest + * @Date:2024/9/29 12:27 + */ +public class KafkaProducerTest { + private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class); + + public static KafkaProducer getDefaultKafkaProducer(KafkaCommonProperties kafkaCommonProperties) { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost()); + properties.put(ProducerConfig.ACKS_CONFIG, kafkaCommonProperties.getAck()); + properties.put(ProducerConfig.RETRIES_CONFIG, kafkaCommonProperties.getRetryTimes()); + properties.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaCommonProperties.getBatchSize()); + properties.put(ProducerConfig.LINGER_MS_CONFIG, kafkaCommonProperties.getLingerMs()); + properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaCommonProperties.getBufferMemory()); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyEncoder()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueEncoder()); + return new KafkaProducer<>(properties); + } + + static class MyProducerCallback implements Callback { + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (Objects.nonNull(exception)) { + logger.error(">>>>>>>>>>Producer生产消息异常:", exception); + } + if (Objects.nonNull(metadata)) { + logger.info(">>>>>>>>>>Producer生产消息:metadata:{},partition:{}, offset:{}", metadata, metadata.partition(), metadata.offset()); + } + } + } + + public static void main(String[] args) { + KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties(); + KafkaProducer producer = getDefaultKafkaProducer(kafkaCommonProperties); + String message = "hello world "; + try { + for (int i = 0; i < 10; i++) { + // 异步写入数据 + String topic = kafkaCommonProperties.getTopic().toArray()[0].toString(); + ProducerRecord producerRecord = new ProducerRecord<>(topic, message + i); + producer.send(producerRecord, new MyProducerCallback()); + } + } catch (Exception ex) { + logger.error(">>>>>>>>生产数据异常:", ex); + throw new RuntimeException(ex); + } finally { + producer.close(); + } + } + +}