commit 41ef696396bfe4ca38e8cfc782dafb9d16861e6c Author: gukaixuan <1> Date: Fri Sep 15 20:44:44 2023 +0800 car-receive diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..09bdfea --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +###################################################################### +# Build Tools + +.gradle +/build/ +!gradle/wrapper/gradle-wrapper.jar + +target/ +!.mvn/wrapper/maven-wrapper.jar + +###################################################################### +# IDE + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### JRebel ### +rebel.xml +### NetBeans ### +nbproject/private/ +build/* +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ + +###################################################################### +# Others +*.log +*.xml.versionsBackup +*.swp + +!*/build/*.java +!*/build/*.html +!*/build/*.xml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7d64ffd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM openjdk:17-ea-slim +LABEL authors="Car-two <3079188394@qq.com>" + +RUN mkdir /car + +# 暴露端口 +EXPOSE 9700 + +# 创建着陆点 +WORKDIR "/car" + +# 复制新的运行程序 +COPY ./ruoyi-receive-server/target/ruoyi-receive-server.jar /car/app.jar + +# 挂载持续的目录 +VOLUME /car/logs/ruoyi-receive-server + + +# 运行你的jar包 +CMD ["java","-jar","/car/app.jar"] + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..e4897e5 --- /dev/null +++ b/pom.xml @@ -0,0 +1,26 @@ + + + + com.ruoyi + ruoyi + 3.6.3 + + pom + + ruoyi-receive-common + ruoyi-receive-remote + ruoyi-receive-server + + 3.6.3 + 4.0.0 + + ruoyi-modules-receive + + + ruoyi-modules-receive 接收模块 + + + + diff --git a/ruoyi-receive-common/.gitignore b/ruoyi-receive-common/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/ruoyi-receive-common/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/ruoyi-receive-common/pom.xml b/ruoyi-receive-common/pom.xml new file mode 100644 index 0000000..61a16f4 --- /dev/null +++ b/ruoyi-receive-common/pom.xml @@ -0,0 +1,71 @@ + + + 4.0.0 + + com.ruoyi + ruoyi-modules-receive + 3.6.3 + + + com.ruoyi + ruoyi-receive-common + + + 8 + 8 + UTF-8 + + + + + org.projectlombok + lombok + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + io.springfox + springfox-swagger-ui + ${swagger.fox.version} + + + + + + + + + + com.ruoyi + ruoyi-common-core + 3.6.3 + + + + diff --git a/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Car.java b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Car.java new file mode 100644 index 0000000..4cebd96 --- /dev/null +++ b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Car.java @@ -0,0 +1,25 @@ +package com.ruoyi.receive.domain; + +import lombok.Data; + +/** + * @Author: JCC + * @Date: 2023/8/25 22:02 + * @Description: + */ +@Data +public class Car { + + //车辆vin + private String vin; + + //车辆所在经度 + private double longitude; + + //车辆所在纬度 + private double latitude; + + //携带token + private String token; + +} diff --git a/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Response.java b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Response.java new file mode 100644 index 0000000..a8994b8 --- /dev/null +++ b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/Response.java @@ -0,0 +1,135 @@ +package com.ruoyi.receive.domain; + +import com.ruoyi.receive.domain.enums.ResponseEnum; + +/** + * @author 牧鱼 + * @Classname Response + * @Description TODO + * @Date 2021/8/5 + */ +public class Response { + + // 状态 + public int code; + + // 描述 + public String msg; + + // 实体 + public T data; + + /** + * 静态构造 无法直接创建对象 + */ + private Response() { + } + + public Response(int code, String msg, T data) { + this.code = code; + this.msg = msg; + this.data = data; + } + + /** + * 成功操作 + * @return + */ + public static Response success(){ + return new Response(ResponseEnum.SUCCESS.code, ResponseEnum.SUCCESS.msg, null); + } + + + + /** + * 成功操作 自定义返回描述 + * @return + */ + public static Response success(String msg){ + return new Response(ResponseEnum.SUCCESS.code, msg, null); + } + + /** + * 成功操作 自定义返回实体 + * @param data + * @param + * @return + */ + public static Response success(T data){ + return new Response(ResponseEnum.SUCCESS.code, Response.success().msg, data); + } + + /** + * 成功操作 自定义返回描述,返回实体 + * @param msg + * @param data + * @param + * @return + */ + public static Response success(String msg , T data){ + return new Response(ResponseEnum.SUCCESS.code, msg, data); + } + + /** + * 失败操作 + * @return + */ + public static Response error(){ + return new Response(ResponseEnum.ERROR.code, ResponseEnum.SUCCESS.msg, null); + } + + + /** + * 失败操作 自定义返回描述 + * @return + */ + public static Response error(String msg){ + return new Response(ResponseEnum.ERROR.code, msg, null); + } + + /** + * 失败操作 自定义返回实体 + * @param data + * @param + * @return + */ + public static Response error(T data){ + return new Response(ResponseEnum.ERROR.code, Response.success().msg, data); + } + + /** + * 失败操作 自定义返回描述,返回实体 + * @param msg + * @param data + * @param + * @return + */ + public static Response error(String msg , T data){ + return new Response(ResponseEnum.ERROR.code, msg, data); + } + + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } +} diff --git a/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/SystemServer.java b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/SystemServer.java new file mode 100644 index 0000000..900c9ee --- /dev/null +++ b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/SystemServer.java @@ -0,0 +1,27 @@ +package com.ruoyi.receive.domain; + +import lombok.Data; +import org.bouncycastle.asn1.its.Longitude; + +/** + * 服务 + * @Author: JCC + * @Date: 2023/8/25 10:29 + * @Description: + */ +@Data +public class SystemServer { + + //服务器ip + private String ip; + + //服务器经度 + private double longitude; + + //服务器纬度 + private double latitude; + + //距离 + private double distance; + +} diff --git a/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/enums/ResponseEnum.java b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/enums/ResponseEnum.java new file mode 100644 index 0000000..4cf269e --- /dev/null +++ b/ruoyi-receive-common/src/main/java/com/ruoyi/receive/domain/enums/ResponseEnum.java @@ -0,0 +1,22 @@ +package com.ruoyi.receive.domain.enums; + +/** + * @author 牧鱼 + * @Classname ResponseEnum + * @Description TODO + * @Date 2021/8/5 + */ +public enum ResponseEnum { + + SUCCESS(100,"操作成功"), + ERROR(150,"操作异常"); + + public int code; + + public String msg; + + ResponseEnum(int code, String msg) { + this.code = code; + this.msg = msg; + } +} diff --git a/ruoyi-receive-remote/.gitignore b/ruoyi-receive-remote/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/ruoyi-receive-remote/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/ruoyi-receive-remote/pom.xml b/ruoyi-receive-remote/pom.xml new file mode 100644 index 0000000..48cb0b4 --- /dev/null +++ b/ruoyi-receive-remote/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + com.ruoyi + ruoyi-modules-receive + 3.6.3 + + + com.ruoyi + ruoyi-receive-remote + + + 8 + 8 + UTF-8 + + + + + com.ruoyi + ruoyi-receive-common + 3.6.3 + + + + \ No newline at end of file diff --git a/ruoyi-receive-server/.gitignore b/ruoyi-receive-server/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/ruoyi-receive-server/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/ruoyi-receive-server/pom.xml b/ruoyi-receive-server/pom.xml new file mode 100644 index 0000000..432fd30 --- /dev/null +++ b/ruoyi-receive-server/pom.xml @@ -0,0 +1,122 @@ + + + 4.0.0 + + com.ruoyi + ruoyi-modules-receive + 3.6.3 + + + com.ruoyi + ruoyi-receive-server + + + 8 + 8 + UTF-8 + + + + + + + com.ruoyi + ruoyi-receive-common + 3.6.3 + + + + + + + + + + + + + + + + + + + + + + com.ruoyi + ruoyi-common-swagger + + + + com.ruoyi + ruoyi-file-remote + 3.6.3 + + + + + io.netty + netty-all + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + org.springframework.boot + spring-boot-starter-thymeleaf + 2.0.4.RELEASE + + + + org.springframework.kafka + spring-kafka + + + + + org.apache.curator + curator-framework + 4.2.0 + + + + com.gu + ruoyi-business-cache + 3.6.3 + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + + + diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/RuoYiReceiveApplication.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/RuoYiReceiveApplication.java new file mode 100644 index 0000000..397d5bc --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/RuoYiReceiveApplication.java @@ -0,0 +1,47 @@ +package com.ruoyi.receive; + +import com.ruoyi.common.swagger.annotation.EnableCustomSwagger2; +import com.ruoyi.receive.netty.server.NettyServer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +/** + * 接收模块 + * + * @author ruoyi + */ +@EnableCustomSwagger2 +@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class }) +public class RuoYiReceiveApplication implements ApplicationRunner +{ + public static void main(String[] args) + { + SpringApplication.run(RuoYiReceiveApplication.class, args); + System.out.println("(♥◠‿◠)ノ゙ 系统模块启动成功 ლ(´ڡ`ლ)゙ \n" + + " .-------. ____ __ \n" + + " | _ _ \\ \\ \\ / / \n" + + " | ( ' ) | \\ _. / ' \n" + + " |(_ o _) / _( )_ .' \n" + + " | (_,_).' __ ___(_ o _)' \n" + + " | |\\ \\ | || |(_,_)' \n" + + " | | \\ `' /| `-' / \n" + + " | | \\ / \\ / \n" + + " ''-' `'-' `-..-' "); + } + + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(()->{ + NettyServer nettyServer = new NettyServer(); + try { + nettyServer.startNetty(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java new file mode 100644 index 0000000..6869b65 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java @@ -0,0 +1,25 @@ +package com.ruoyi.receive.kafka.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + + +/** + * 消费者配置 + */ + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka.consumer") +public class ConsumerCustomConfig { + + public String group; + + private String topic; + + private String partitions; + + + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaConsumerConfig.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..3588c79 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,104 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +/** + * kafka消费者配置 + * @Author JCC + * @date 2023/8/22 10:00 + * kafka配置,也可以写在yml,这个文件会覆盖yml + */ +@SpringBootConfiguration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.consumer.bootstrap-servers}") + private String bootstrapServers; + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean enableAutoCommit; + @Value("${spring.kafka.properties.session.timeout.ms}") + private String sessionTimeout; + @Value("${spring.kafka.properties.max.poll.interval.ms}") + private String maxPollIntervalTime; + @Value("${spring.kafka.consumer.max-poll-records}") + private String maxPollRecords; + @Value("${spring.kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + @Value("${spring.kafka.listener.concurrency}") + private Integer concurrency; + @Value("${spring.kafka.listener.missing-topics-fatal}") + private boolean missingTopicsFatal; + @Value("${spring.kafka.listener.poll-timeout}") + private long pollTimeout; + + @Bean + public Map consumerConfigs() { + + Map propsMap = new HashMap<>(16); + propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + //自动提交的时间间隔,自动提交开启时生效 + propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); + //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); + //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); + //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + return propsMap; + } + + @Bean + public ConsumerFactory consumerFactory() { + // 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 + try (JsonDeserializer deserializer = new JsonDeserializer<>()) { + deserializer.trustedPackages("*"); + return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); + } + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + factory.setConcurrency(concurrency); + //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 + factory.setMissingTopicsFatal(missingTopicsFatal); + // 自动提交关闭,需要设置手动消息确认 + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.getContainerProperties().setPollTimeout(pollTimeout); + // 设置为批量监听,需要用List接收 + // factory.setBatchListener(true); + return factory; + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaProviderConfig.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaProviderConfig.java new file mode 100644 index 0000000..716f9c6 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaProviderConfig.java @@ -0,0 +1,86 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; +/** + * kafka生产者配置 + * @Author JCC + * @date 2023/8/22 10:00 + * kafka配置,也可以写在yml,这个文件会覆盖yml + */ +@SpringBootConfiguration +public class KafkaProviderConfig { + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + @Value("${spring.kafka.producer.transaction-id-prefix}") + private String transactionIdPrefix; + @Value("${spring.kafka.producer.acks}") + private String acks; + @Value("${spring.kafka.producer.retries}") + private String retries; + @Value("${spring.kafka.producer.batch-size}") + private String batchSize; + @Value("${spring.kafka.producer.buffer-memory}") + private String bufferMemory; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(16); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + //开启事务必须设为all + props.put(ProducerConfig.ACKS_CONFIG, acks); + //发生错误后,消息重发的次数,开启事务必须大于0 + props.put(ProducerConfig.RETRIES_CONFIG, retries); + //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 + //批次的大小可以通过batch.size 参数设置.默认是16KB + //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 + //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 + //实测batchSize这个参数没有用 + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + //有的时刻消息比较少,过了很久,比如 5min也没有凑够 16KB, 这样延时就很大, 所以需要一个参数. 再设置一个时间, 到了这个时间, + //即使数据没达到16KB,也将这个批次发送出去 + + props.put("allow.auto.create.topics.enable", true); + props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); + // 生产者内存缓冲区的大小 + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + //反序列化,和生产者的序列化方式对应 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + //自定义分区器 + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ruoyi.receive.kafka.config.MyPartitioner"); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); + // 开启事务,会导致 LINGER_MS_CONFIG 配置失效 + factory.setTransactionIdPrefix(transactionIdPrefix); + return factory; + } + + @Bean + public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSendResultHandler.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSendResultHandler.java new file mode 100644 index 0000000..5820deb --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSendResultHandler.java @@ -0,0 +1,26 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Component; + +/** + * kafka消息回调 + * @Author JCC + * @date 2023/8/22 10:00 + * kafka消息发送回调 + */ +@Component +public class KafkaSendResultHandler implements ProducerListener { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSender.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSender.java new file mode 100644 index 0000000..9c61798 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaSender.java @@ -0,0 +1,43 @@ +package com.ruoyi.receive.kafka.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; + +@Component +@Slf4j +public class KafkaSender { + public final static String MSG_TOPIC = "first"; + + @Autowired + private KafkaTemplate kafkaTemplate; + private static KafkaTemplate template; + + @PostConstruct + public void init() { + KafkaSender.template = this.kafkaTemplate; + } + + //发送消息到kafka队列 + public static boolean send(String topic,Integer partition, String key,String message) { + try { + //在发送消息的方法中使用KafkaTemplate.executeInTransaction()方法,将发送消息的操作放在事务范围内。 + template.executeInTransaction(new KafkaOperations.OperationsCallback() { + @Override + public Object doInOperations(KafkaOperations kafkaOperations) { + kafkaOperations.send(topic, partition,key,message); + log.info("消息发送成功:{} , {}", topic, message); + return null; + } + }); + } catch (Exception e) { + log.error("消息发送失败:{} , {}", topic, message, e); + return false; + } + return true; + } + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaTopic.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaTopic.java new file mode 100644 index 0000000..21896ff --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/KafkaTopic.java @@ -0,0 +1,36 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + +/** + * kafka主题 + * @Author JCC + * @Date: 2023/8/18 15:35 + * @Description: + */ +@Configuration +public class KafkaTopic { + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "43.142.96.146:9092"); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic topic1() { + return new NewTopic("qaq", 50, (short) 1); + } + + @Bean + public NewTopic topic2() { + return new NewTopic("test", 3, (short) 1); + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyKafkaListenerErrorHandler.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyKafkaListenerErrorHandler.java new file mode 100644 index 0000000..5c64595 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyKafkaListenerErrorHandler.java @@ -0,0 +1,35 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.lang.NonNull; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * kafka异常处理 + * @Author JCC + * @date 2023/8/22 15:27 + * 异常处理 + */ +@Component +public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { + + @Override + @NonNull + public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception) { + return new Object(); + } + + @Override + @NonNull + public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception, + Consumer consumer) { + System.out.println("消息详情:" + message); + System.out.println("异常信息::" + exception); + System.out.println("消费者详情::" + consumer.groupMetadata()); + System.out.println("监听主题::" + consumer.listTopics()); + return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyPartitioner.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyPartitioner.java new file mode 100644 index 0000000..b57befb --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/MyPartitioner.java @@ -0,0 +1,34 @@ +package com.ruoyi.receive.kafka.config; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.Map; + +/** + * 自定义分区器 + * @Author: JCC + * @Date: 2023/8/22 22:25 + * @Description: + */ +public class MyPartitioner implements Partitioner { + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + List partitionInfos = cluster.partitionsForTopic(topic); + int size = partitionInfos.size(); + int parId = ((String) value).hashCode() % size; + return parId; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Config.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Config.java new file mode 100644 index 0000000..4d926ee --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Config.java @@ -0,0 +1,109 @@ +package com.ruoyi.receive.netty.config; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +/** + * @Author: JCC + * @Date: 2023/8/17 19:00 + * @Description: + */ +public class Config { + + /** + * 分包符 + */ + public static final String DATA_PACK_SEPARATOR = "#$&*"; + + /** + * 工作组配置 + */ + public static EventLoopGroup workerGroup = new NioEventLoopGroup(); + + /** + * 通道处理上下文 + */ + public static ChannelHandlerContext ctx; + + /** + * 加密方式 + */ + public static final String[] CIPHER_ARRAY = {"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_DHE_DSS_WITH_AES_128_GCM_SHA256"}; + + /** + * 车辆VIN + */ + public static String VIN = ""; + + /** + * 服务器状态 + */ + public static boolean IS_CONNECT = false; + + /** + * 报文起始位 + */ + public static final String MSG_START = "7E "; + + /** + * 报文结束位 + */ + public static final String MSG_END = "7E"; + + /** + * 连接消息VIN + */ + public final static String START_VIN_SUF = "START_VIN:"; + /** + * 连接消息VIN + */ + public final static String START_VIN_SUCCESS_SUF = "SUCCESS_VIN:"; + /** + * 车辆消息报文前缀 + */ + public final static String VEHICLE_MSG_SUF = "VEHICLE_MSG:"; + /** + * 车辆启动报文前缀 + */ + public final static String VEHICLE_START_SUF = "VEHICLE_START:"; + /** + * 车辆关闭报文前缀 + */ + public final static String VEHICLE_STOP_SUF = "VEHICLE_STOP:"; + + /** + * 连接启动信息 + */ + public final static String NETTY_CONNECT = "CONNECT"; + /** + * 连接关闭信息 + */ + public final static String NETTY_WILL_CLOSE = "WILL_CLOSE:"; + /** + * 连接关闭信息 + */ + public final static String NETTY_CLOSE = "CLOSE"; + + /** + * 车辆VIN正则表达式 + */ + public final static String VIN_REGEX = "^(?![0-9]+$)(?![a-zA-Z]+$)[0-9A-Za-z]{17}$"; + + + /** + * 车辆基础故障组 + */ + public final static String VEHICLE_BASE_FAULT = "vehicle_base"; + + /** + * 车辆零配件故障组 + */ + public final static String VEHICLE_PARTS_FAULT = "vehicle_parts"; + + /** + * 车辆电池故障组 + */ + public final static String VEHICLE_BATTERY_FAULT = "vehicle_battery"; + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Constants.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Constants.java new file mode 100644 index 0000000..e48879d --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/config/Constants.java @@ -0,0 +1,15 @@ +package com.ruoyi.receive.netty.config; + +/** + * netty集群参数 + * @Author: JCC + * @Date: 2023/8/20 18:48 + * @Description: + */ +public class Constants { + + public final static int ZK_SESSION_TIMEOUT = 60000; + public final static String ZK_DATA_PATH = "/netty-servers"; + public final static String ZK_REGISTRY_PATH = "/"; + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/handler/NettyServerHandler.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/handler/NettyServerHandler.java new file mode 100644 index 0000000..2a07faa --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/handler/NettyServerHandler.java @@ -0,0 +1,83 @@ +package com.ruoyi.receive.netty.handler; + +import com.alibaba.fastjson.JSON; +import com.ruoyi.receive.netty.config.Config; +import com.ruoyi.receive.kafka.config.KafkaSender; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.annotation.Transactional; + +import java.net.SocketAddress; + +/** + * netty服务端处理器 + * @Author: JCC + * @Date: 2023/8/17 18:58 + * @Description: + */ +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class); + int readTime = 0; + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + IdleStateEvent event = (IdleStateEvent) evt; + String eventType = null; + switch (event.state()){ + case READER_IDLE: + eventType = "读空闲"; + readTime++; + break; + case WRITER_IDLE: + eventType = "写空闲"; + break; + case ALL_IDLE: + eventType = "读写空闲"; + break; + default: + break; + } + log.info(ctx.channel().remoteAddress() + "超时事件:" + eventType); + if (readTime>10){ + log.info("服务端读空闲超过10次,关闭连接,释放更多资源"); + ctx.writeAndFlush("关闭此连接"); + ctx.channel().close(); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + SocketAddress socketAddress = ctx.channel().remoteAddress(); + log.info(socketAddress + " 已连接"); + + // 发送数据 + ctx.writeAndFlush("你好客户端"+ Config.DATA_PACK_SEPARATOR); + } + + @Override + @Transactional(rollbackFor = RuntimeException.class) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("客户端信息:" + msg); + KafkaSender.send("test",2,null, JSON.toJSONString(msg)); + } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.info(ctx.channel().remoteAddress() + " 已断开连接"); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/NettyServer.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/NettyServer.java new file mode 100644 index 0000000..d958320 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/NettyServer.java @@ -0,0 +1,88 @@ +package com.ruoyi.receive.netty.server; + +import com.ruoyi.receive.netty.config.Config; +import com.ruoyi.receive.netty.handler.NettyServerHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.IdleStateHandler; + +/** + * netty服务端 + * @Author: JCC + * @Date: 2023/8/17 18:55 + * @Description: + */ +public class NettyServer { + + public void startNetty() throws InterruptedException{ + //创建两个线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(10); //默认cpu核数*2 + + try{ + // 服务端启动类 + ServerBootstrap bootstrap = new ServerBootstrap(); + // 传入两个线程组 + bootstrap.group(bossGroup, workerGroup) + // 指定Channel 和NIO一样是采用Channel通道的方式通信 所以需要指定服务端通道 + .channel(NioServerSocketChannel.class) + + .option(ChannelOption.SO_BACKLOG, 1024) + //保持长连接 + .childOption(ChannelOption.SO_KEEPALIVE,true) + + //设置数据处理器 + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + //分包器 + channel.pipeline().addLast( + new DelimiterBasedFrameDecoder( + 1024, + Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes() + ) + ) + ); + //编码器 + channel.pipeline().addLast("encoder", new StringEncoder()); + //解码器 + channel.pipeline().addLast("decoder", new StringDecoder()); + // 在管道中 添加数据处理类 + channel.pipeline().addLast(new NettyServerHandler()); + //IdleStateHandler的readerIdleTime参数指定超过10秒还没收到客户端的连接, + //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须 + //实现userEventTriggered方法处理对应事件 + channel.pipeline().addLast("HBeat", new IdleStateHandler( + 10, + 20, 0)); + } + }); + // 同步等待成功 + ChannelFuture future = bootstrap.bind(7000).sync(); + if (future.isSuccess()) { + System.out.println("启动 Netty Server 成功"); + } + ServiceRegistry serviceRegistry = new ServiceRegistry("10.100.27.4:2181"); + if (serviceRegistry != null) { + serviceRegistry.register("10.100.27.4" + ":" + 7000); + } + //等待服务端监听端口关闭 链路关闭后main函数才会结束 + future.channel().closeFuture().sync(); + } finally { + // 优雅的关闭 释放资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/ServiceRegistry.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/ServiceRegistry.java new file mode 100644 index 0000000..780d730 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/ServiceRegistry.java @@ -0,0 +1,78 @@ +package com.ruoyi.receive.netty.server; + +import com.ruoyi.receive.netty.config.Constants; +import org.apache.zookeeper.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * zookeeper注册节点 + * @Author: JCC + * @Date: 2023/8/20 18:44 + * @Description: + */ +public class ServiceRegistry { + + private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class); + private CountDownLatch latch = new CountDownLatch(1); + private String registryAddress; + /** + * 构造函数,传入zookeeper服务器地址 + * @param registryAddress zookeeper服务器地址 + */ + public ServiceRegistry(String registryAddress) { + this.registryAddress = registryAddress; + } + public void register(String data) { + if (data != null) { + // 连接zookeeper服务器 + ZooKeeper zk = connectServer(); + if (zk != null) { + // 创建节点 + createNode(zk, data); + } + } + } + /** + * 连接 zookeeper 服务器 + * @return + */ + private ZooKeeper connectServer() { + ZooKeeper zk = null; + try { + // 创建ZooKeeper对象,连接zookeeper服务器 + zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() { + @Override + public void process(WatchedEvent event) { + // 当连接成功时,释放CountDownLatch + if (event.getState() == Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + latch.await(); + } catch (IOException | InterruptedException e) { + logger.error("", e); + } + return zk; + } + /** + * 创建节点 + * @param zk + * @param data + */ + private void createNode(ZooKeeper zk, String data) { + try { + // 将服务数据转换为字节数组 + byte[] bytes = data.getBytes(); + // 创建临时顺序节点 + String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + logger.debug("创建zookeeper节点 ({} => {})", path, data); + } catch (KeeperException | InterruptedException e) { + logger.error("", e); + } + } + +} diff --git a/ruoyi-receive-server/src/main/resources/banner.txt b/ruoyi-receive-server/src/main/resources/banner.txt new file mode 100644 index 0000000..fbd45f5 --- /dev/null +++ b/ruoyi-receive-server/src/main/resources/banner.txt @@ -0,0 +1,10 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} + _ _ + (_) | | + _ __ _ _ ___ _ _ _ ______ ___ _ _ ___ | |_ ___ _ __ ___ +| '__|| | | | / _ \ | | | || ||______|/ __|| | | |/ __|| __| / _ \| '_ ` _ \ +| | | |_| || (_) || |_| || | \__ \| |_| |\__ \| |_ | __/| | | | | | +|_| \__,_| \___/ \__, ||_| |___/ \__, ||___/ \__| \___||_| |_| |_| + __/ | __/ | + |___/ |___/ \ No newline at end of file diff --git a/ruoyi-receive-server/src/main/resources/bootstrap.yml b/ruoyi-receive-server/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..2433b7f --- /dev/null +++ b/ruoyi-receive-server/src/main/resources/bootstrap.yml @@ -0,0 +1,111 @@ +# Tomcat +server: + port: 3339 + +# Spring +spring: + kafka: + producer: + # Kafka服务器 + bootstrap-servers: 43.142.96.146:9092 + # 开启事务,必须在开启了事务的方法中发送,否则报错 + transaction-id-prefix: kafkaTx- + # 发生错误后,消息重发的次数,开启事务必须设置大于0。 + retries: 3 + # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + # 开启事务时,必须设置为all + acks: all + # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 + batch-size: 16384 + # 生产者内存缓冲区的大小。 + buffer-memory: 1024000 + # 键的序列化方式 + key-serializer: org.springframework.kafka.support.serializer.JsonSerializer + # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + bootstrap.servers: 43.142.96.146:9092 + auto.create.topics: true + + consumer: + # Kafka服务器 + bootstrap-servers: 43.142.96.146:9092 + group-id: firstGroup + # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D + #auto-commit-interval: 2s + # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + # latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + # none: 当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + auto-offset-reset: latest + # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + enable-auto-commit: false + # 键的反序列化方式 + #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 + properties: + bootstrap.servers: 43.142.96.146:9092 + allow.auto.create.topics: true + spring: + json: + trusted: + packages: "*" + # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + max-poll-records: 3 + properties: + # 两次 poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + max: + poll: + interval: + ms: 600000 + # 当 broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + session: + timeout: + ms: 10000 + listener: + # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + concurrency: 4 + # 自动提交关闭,需要设置手动消息确认 + ack-mode: manual_immediate + # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 + missing-topics-fatal: false + # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + poll-timeout: 600000 + redis: + host: 10.100.27.4 + port: 6379 + thymeleaf: + cache: false + mode: LEGACYHTML5 + prefix: classpath:/templates/ + suffix: .html + application: + # 应用名称 + name: ruoyi-receive + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: 10.100.27.4:8848 + config: + # 配置中心地址 + server-addr: 10.100.27.4:8848 + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + diff --git a/ruoyi-receive-server/src/main/resources/logback.xml b/ruoyi-receive-server/src/main/resources/logback.xml new file mode 100644 index 0000000..4379fe7 --- /dev/null +++ b/ruoyi-receive-server/src/main/resources/logback.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ruoyi-receive-server/src/main/resources/templates/index.html b/ruoyi-receive-server/src/main/resources/templates/index.html new file mode 100644 index 0000000..f71c1a8 --- /dev/null +++ b/ruoyi-receive-server/src/main/resources/templates/index.html @@ -0,0 +1,41 @@ + + + + + netty集群 + + + + + +

服务端集群列表:

+ + + + + + + + + + + +
ip端口启动时间
+ +

设备列表:

+ + + + + + + + + + + + + +
ip端口clientId连接时间
+ + \ No newline at end of file