diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excel.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excel.java index 36180e5..5168b9d 100644 --- a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excel.java +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excel.java @@ -21,132 +21,132 @@ public @interface Excel { /** * 导出时在excel中排序 */ - public int sort () default Integer.MAX_VALUE; + public int sort() default Integer.MAX_VALUE; /** * 导出到Excel中的名字. */ - public String name () default ""; + public String name() default ""; /** * 日期格式, 如: yyyy-MM-dd */ - public String dateFormat () default ""; + public String dateFormat() default ""; /** * 读取内容转表达式 (如: 0=男,1=女,2=未知) */ - public String readConverterExp () default ""; + public String readConverterExp() default ""; /** * 分隔符,读取字符串组内容 */ - public String separator () default ","; + public String separator() default ","; /** * BigDecimal 精度 默认:-1(默认不开启BigDecimal格式化) */ - public int scale () default -1; + public int scale() default -1; /** * BigDecimal 舍入规则 默认:BigDecimal.ROUND_HALF_EVEN */ - public int roundingMode () default BigDecimal.ROUND_HALF_EVEN; + public int roundingMode() default BigDecimal.ROUND_HALF_EVEN; /** * 导出时在excel中每个列的高度 */ - public double height () default 14; + public double height() default 14; /** * 导出时在excel中每个列的宽度 */ - public double width () default 16; + public double width() default 16; /** * 文字后缀,如% 90 变成90% */ - public String suffix () default ""; + public String suffix() default ""; /** * 当值为空时,字段的默认值 */ - public String defaultValue () default ""; + public String defaultValue() default ""; /** * 提示信息 */ - public String prompt () default ""; + public String prompt() default ""; /** * 设置只能选择不能输入的列内容. */ - public String[] combo () default {}; + public String[] combo() default {}; /** * 是否需要纵向合并单元格,应对需求:含有list集合单元格) */ - public boolean needMerge () default false; + public boolean needMerge() default false; /** * 是否导出数据,应对需求:有时我们需要导出一份模板,这是标题需要但内容需要用户手工填写. */ - public boolean isExport () default true; + public boolean isExport() default true; /** * 另一个类中的属性名称,支持多级获取,以小数点隔开 */ - public String targetAttr () default ""; + public String targetAttr() default ""; /** * 是否自动统计数据,在最后追加一行统计数据总和 */ - public boolean isStatistics () default false; + public boolean isStatistics() default false; /** * 导出类型(0数字 1字符串) */ - public ColumnType cellType () default ColumnType.STRING; + public ColumnType cellType() default ColumnType.STRING; /** * 导出列头背景颜色 */ - public IndexedColors headerBackgroundColor () default IndexedColors.GREY_50_PERCENT; + public IndexedColors headerBackgroundColor() default IndexedColors.GREY_50_PERCENT; /** * 导出列头字体颜色 */ - public IndexedColors headerColor () default IndexedColors.WHITE; + public IndexedColors headerColor() default IndexedColors.WHITE; /** * 导出单元格背景颜色 */ - public IndexedColors backgroundColor () default IndexedColors.WHITE; + public IndexedColors backgroundColor() default IndexedColors.WHITE; /** * 导出单元格字体颜色 */ - public IndexedColors color () default IndexedColors.BLACK; + public IndexedColors color() default IndexedColors.BLACK; /** * 导出字段对齐方式 */ - public HorizontalAlignment align () default HorizontalAlignment.CENTER; + public HorizontalAlignment align() default HorizontalAlignment.CENTER; /** * 自定义数据处理器 */ - public Class handler () default ExcelHandlerAdapter.class; + public Class handler() default ExcelHandlerAdapter.class; /** * 自定义数据处理器参数 */ - public String[] args () default {}; + public String[] args() default {}; /** * 字段类型(0:导出导入;1:仅导出;2:仅导入) */ - Type type () default Type.ALL; + Type type() default Type.ALL; public enum Type { ALL(0), EXPORT(1), IMPORT(2); diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excels.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excels.java index f8fc165..ea8dd80 100644 --- a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excels.java +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/annotation/Excels.java @@ -13,5 +13,5 @@ import java.lang.annotation.Target; @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface Excels { - Excel[] value (); + Excel[] value(); } diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/utils/SpringUtils.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/utils/SpringUtils.java index c37a65c..6f50b56 100644 --- a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/utils/SpringUtils.java +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/utils/SpringUtils.java @@ -26,7 +26,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor { * * @return Object 一个以所给名字注册的bean的实例 * - * @throws org.springframework.beans.BeansException + * @throws BeansException */ @SuppressWarnings("unchecked") public static T getBean (String name) throws BeansException { @@ -40,7 +40,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor { * * @return * - * @throws org.springframework.beans.BeansException + * @throws BeansException */ public static T getBean (Class clz) throws BeansException { T result = (T) beanFactory.getBean(clz); @@ -65,7 +65,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor { * * @return boolean * - * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + * @throws NoSuchBeanDefinitionException */ public static boolean isSingleton (String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); @@ -76,7 +76,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor { * * @return Class 注册对象的类型 * - * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + * @throws NoSuchBeanDefinitionException */ public static Class getType (String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); @@ -89,7 +89,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor { * * @return * - * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + * @throws NoSuchBeanDefinitionException */ public static String[] getAliases (String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/xss/Xss.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/xss/Xss.java index fa31755..9b53042 100644 --- a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/xss/Xss.java +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/xss/Xss.java @@ -17,11 +17,11 @@ import java.lang.annotation.Target; @Target(value = {ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR, ElementType.PARAMETER}) @Constraint(validatedBy = {XssValidator.class}) public @interface Xss { - String message () + String message() default "不允许任何脚本运行"; - Class[] groups () default {}; + Class[] groups() default {}; - Class[] payload () default {}; + Class[] payload() default {}; } diff --git a/cloud-common/cloud-common-datascope/src/main/java/com/muyu/common/datascope/annotation/DataScope.java b/cloud-common/cloud-common-datascope/src/main/java/com/muyu/common/datascope/annotation/DataScope.java index 498f06b..5277b20 100644 --- a/cloud-common/cloud-common-datascope/src/main/java/com/muyu/common/datascope/annotation/DataScope.java +++ b/cloud-common/cloud-common-datascope/src/main/java/com/muyu/common/datascope/annotation/DataScope.java @@ -14,15 +14,15 @@ public @interface DataScope { /** * 部门表的别名 */ - public String deptAlias () default ""; + public String deptAlias() default ""; /** * 用户表的别名 */ - public String userAlias () default ""; + public String userAlias() default ""; /** * 权限字符(用于多个角色匹配符合要求的权限)默认根据权限注解@RequiresPermissions获取,多个权限用逗号分隔开来 */ - public String permission () default ""; + public String permission() default ""; } diff --git a/cloud-common/cloud-common-log/src/main/java/com/muyu/common/log/annotation/Log.java b/cloud-common/cloud-common-log/src/main/java/com/muyu/common/log/annotation/Log.java index ac6394e..a8b6fea 100644 --- a/cloud-common/cloud-common-log/src/main/java/com/muyu/common/log/annotation/Log.java +++ b/cloud-common/cloud-common-log/src/main/java/com/muyu/common/log/annotation/Log.java @@ -17,30 +17,30 @@ public @interface Log { /** * 模块 */ - public String title () default ""; + public String title() default ""; /** * 功能 */ - public BusinessType businessType () default BusinessType.OTHER; + public BusinessType businessType() default BusinessType.OTHER; /** * 操作人类别 */ - public OperatorType operatorType () default OperatorType.MANAGE; + public OperatorType operatorType() default OperatorType.MANAGE; /** * 是否保存请求的参数 */ - public boolean isSaveRequestData () default true; + public boolean isSaveRequestData() default true; /** * 是否保存响应的参数 */ - public boolean isSaveResponseData () default true; + public boolean isSaveResponseData() default true; /** * 排除指定的请求参数 */ - public String[] excludeParamNames () default {}; + public String[] excludeParamNames() default {}; } diff --git a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/EnableMyFeignClients.java b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/EnableMyFeignClients.java index 7a59fa4..1a4443b 100644 --- a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/EnableMyFeignClients.java +++ b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/EnableMyFeignClients.java @@ -15,13 +15,13 @@ import java.lang.annotation.*; @Documented @EnableFeignClients public @interface EnableMyFeignClients { - String[] value () default {}; + String[] value() default {}; - String[] basePackages () default {"com.muyu"}; + String[] basePackages() default {"com.muyu"}; - Class[] basePackageClasses () default {}; + Class[] basePackageClasses() default {}; - Class[] defaultConfiguration () default {}; + Class[] defaultConfiguration() default {}; - Class[] clients () default {}; + Class[] clients() default {}; } diff --git a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/InnerAuth.java b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/InnerAuth.java index 092a573..69b0912 100644 --- a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/InnerAuth.java +++ b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/InnerAuth.java @@ -14,5 +14,5 @@ public @interface InnerAuth { /** * 是否校验用户信息 */ - boolean isUser () default false; + boolean isUser() default false; } diff --git a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresPermissions.java b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresPermissions.java index 8d95bb4..c0d9e08 100644 --- a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresPermissions.java +++ b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresPermissions.java @@ -16,10 +16,10 @@ public @interface RequiresPermissions { /** * 需要校验的权限码 */ - String[] value () default {}; + String[] value() default {}; /** * 验证模式:AND | OR,默认AND */ - Logical logical () default Logical.AND; + Logical logical() default Logical.AND; } diff --git a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresRoles.java b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresRoles.java index 78911cc..df6c145 100644 --- a/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresRoles.java +++ b/cloud-common/cloud-common-security/src/main/java/com/muyu/common/security/annotation/RequiresRoles.java @@ -16,10 +16,10 @@ public @interface RequiresRoles { /** * 需要校验的角色标识 */ - String[] value () default {}; + String[] value() default {}; /** * 验证逻辑:AND | OR,默认AND */ - Logical logical () default Logical.AND; + Logical logical() default Logical.AND; } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java index e96fac4..f83fa50 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java @@ -57,7 +57,7 @@ public class ECSTool { // 打印诊断推荐链接 System.out.println(teaError.getData().get("Recommend")); // 断言错误信息 - com.aliyun.teautil.Common.assertAsString(teaError.getMessage()); + Common.assertAsString(teaError.getMessage()); } else { // 处理其他类型的异常 System.out.println(error.getMessage()); @@ -83,7 +83,7 @@ public class ECSTool { System.out.println(error.getMessage()); // 诊断地址 System.out.println(error.getData().get("Recommend")); - com.aliyun.teautil.Common.assertAsString(error.message); + Common.assertAsString(error.message); } catch (Exception _error) { TeaException error = new TeaException(_error.getMessage(), _error); // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 @@ -91,7 +91,7 @@ public class ECSTool { System.out.println(error.getMessage()); // 诊断地址 System.out.println(error.getData().get("Recommend")); - com.aliyun.teautil.Common.assertAsString(error.message); + Common.assertAsString(error.message); } } diff --git a/cloud-modules/cloud-modules-data-processing/pom.xml b/cloud-modules/cloud-modules-data-processing/pom.xml index 572d4d5..2810991 100644 --- a/cloud-modules/cloud-modules-data-processing/pom.xml +++ b/cloud-modules/cloud-modules-data-processing/pom.xml @@ -8,20 +8,18 @@ cloud-modules 3.6.3 - cloud-modules-data-processing - cloud-data-processing 数据处理模块 + cloud-modules-data-processing 数据处理模块 - 17 17 UTF-8 - + com.muyu cloud-common-kafka @@ -90,8 +88,47 @@ com.muyu cloud-common-datasource - + + + com.muyu + cloud-common-api-doc + + + + + com.muyu + cloud-common-xxl + + + + com.muyu + cloud-common-rabbit + + + + org.bouncycastle + bcpkix-jdk15on + 1.70 + + + + com.muyu + cloud-common-kafka + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + + + + com.muyu + cloud-modules-car-gateway + 3.6.3 + + ${project.artifactId} @@ -108,5 +145,4 @@ - diff --git a/cloud-modules/cloud-modules-openbusiness/cloud-modules-openbusiness-remote/src/main/java/com/muyu/openbusiness/remote/SysCarRemoteService.java b/cloud-modules/cloud-modules-openbusiness/cloud-modules-openbusiness-remote/src/main/java/com/muyu/openbusiness/remote/SysCarRemoteService.java index 654d4c7..f52e472 100644 --- a/cloud-modules/cloud-modules-openbusiness/cloud-modules-openbusiness-remote/src/main/java/com/muyu/openbusiness/remote/SysCarRemoteService.java +++ b/cloud-modules/cloud-modules-openbusiness/cloud-modules-openbusiness-remote/src/main/java/com/muyu/openbusiness/remote/SysCarRemoteService.java @@ -4,9 +4,11 @@ import com.muyu.common.core.constant.ServiceNameConstants; import com.muyu.common.core.domain.Result; import com.muyu.openbusiness.domain.SysCar; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; @FeignClient(contextId = "sysCarRemoteService", value = ServiceNameConstants.CAR_SERVICE) public interface SysCarRemoteService { + @GetMapping("/car/findByVin") public Result findByVin(String vin); } diff --git a/cloud-modules/cloud-modules-parsing/pom.xml b/cloud-modules/cloud-modules-parsing/pom.xml index 7f2fd25..0969c45 100644 --- a/cloud-modules/cloud-modules-parsing/pom.xml +++ b/cloud-modules/cloud-modules-parsing/pom.xml @@ -13,12 +13,36 @@ cloud-modules-parsing 协议解析模块 + 17 17 UTF-8 + + com.muyu + cloud-common-kafka + 3.6.3 + + + + com.muyu + cloud-common-caffeine + 3.6.3 + + + + com.muyu + cloud-common-rabbit + + + + com.muyu + cloud-common-iotdb + 3.6.3 + + com.alibaba.cloud @@ -43,51 +67,26 @@ spring-boot-starter-actuator + + org.springframework.boot + spring-boot-starter-tomcat + + com.mysql mysql-connector-j - - - com.muyu - cloud-common-datasource - - com.muyu cloud-common-datascope - com.muyu - cloud-common-log - - - - - com.muyu - cloud-common-api-doc - - - - - com.muyu - cloud-common-xxl - - - - com.muyu - cloud-common-rabbit - - - - org.bouncycastle - bcpkix-jdk15on - 1.70 + cloud-common-datasource diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java new file mode 100644 index 0000000..a188c25 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java @@ -0,0 +1,131 @@ +package com.muyu.parsing.consumer; + +import com.muyu.cargateway.domain.properties.MqttProperties; +import com.muyu.common.kafka.constants.KafkaConstants; +import com.muyu.parsing.domain.KafKaData; +import com.muyu.parsing.domain.SysCarMessage; +import com.muyu.parsing.manager.TaskManager; +import com.muyu.parsing.service.impl.SysCarMessageServiceImpl; +import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @ClassName FormMessageConsumer + * @Description 描述 + * @Author Chen + * @Date 2024/10/9 15:55 + */ +@Component +@Slf4j +public class FormMessageConsumer { + @Resource + private RedisTemplate redisTemplate; + + @Resource + private KafkaProducer kafkaProducer; + @Resource + private SysCarMessageServiceImpl sysCarMessageService; + private final static String FORM_QUEUE = "queue_inform_sms"; + + @RabbitListener(queuesToDeclare = {@Queue(FORM_QUEUE)}) + public void downline(MqttProperties mqttProperties, Message message, Channel channel) { + log.info("基础信息 {} 。。。", mqttProperties); + try { + // 第三个参数为空,默认持久化策略 + MqttClient sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + //用户名 + connOpts.setUserName(mqttProperties.getUserName()); + //密码 + connOpts.setPassword(mqttProperties.getPassword().toCharArray()); + connOpts.setCleanSession(true); + System.out.println("Connecting to broker: " + mqttProperties.getBroker()); + sampleClient.connect(connOpts); + sampleClient.subscribe(mqttProperties.getTopic(), 0); + sampleClient.setCallback(new MqttCallback() { + // 连接丢失 + @Override + public void connectionLost(Throwable throwable) { + log.error("连接丢失:{}", throwable.getMessage()); + } + + // 连接成功 + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + TaskManager manager = new TaskManager(10); + manager.execute(() -> processMessage(mqttMessage)); + } + + // 接收信息 + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("接收消息:{}", iMqttDeliveryToken); + } + }); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (MqttException | IOException me) { +// System.out.println("reason " + me.getReasonCode()); + System.out.println("msg " + me.getMessage()); + System.out.println("loc " + me.getLocalizedMessage()); + System.out.println("cause " + me.getCause()); + System.out.println("excep " + me); + me.printStackTrace(); + throw new RuntimeException(me); + } + } + + + private void processMessage(MqttMessage mqttMessage) { + String string = new String(mqttMessage.getPayload()); + log.info(string); + List list = sysCarMessageService.selectSysCarMessageLists(1); + List kafKaDataList = new ArrayList<>(); + String[] test = string.split(" "); + for (SysCarMessage carMessage : list) { + int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; + int end = Integer.parseInt(carMessage.getMessageEndIndex()); + StringBuilder hexBuilder = new StringBuilder(); + for (int i = start; i < end; i++) { + hexBuilder.append(test[i]); + } + String hex = hexBuilder.toString(); + char[] result = new char[hex.length() / 2]; + for (int x = 0; x < hex.length(); x += 2) { + int high = Character.digit(hex.charAt(x), 16); + int low = Character.digit(hex.charAt(x + 1), 16); + result[x / 2] = (char) ((high << 4) + low); + } + String value = new String(result); + kafKaDataList.add(KafKaData.builder() + .key(carMessage.getMessageTypeCode()) + .label(carMessage.getMessageTypeCode()) + .value(value) + .type(carMessage.getMessageType()) + .build()); + } + kafKaDataList.add(KafKaData.builder() + .key("firmCode") + .label("企业编码") + .value("firm01") + .type("String") + .build()); + + String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(kafKaDataList); + ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + kafkaProducer.send(producerRecord); + log.info("kafka投产:{}", jsonString); + } +} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java new file mode 100644 index 0000000..b895ce6 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java @@ -0,0 +1,24 @@ +package com.muyu.parsing.manager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; + +/** + * 线程配置 + */ +public final class TaskManager { + private final ExecutorService executorService; + + public TaskManager(int threadPoolSize) { + this.executorService = Executors.newFixedThreadPool(threadPoolSize); + } + + public void execute(Runnable task) { + executorService.submit(task); + } + + // 关闭线程池 + public void shutdown() { + executorService.shutdown(); + } +} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java new file mode 100644 index 0000000..08fb605 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java @@ -0,0 +1,189 @@ +//package com.muyu.parsing.manager; +// +///** +// * @Author: 胡杨 +// * @Name: TaskManager +// * @Description: 任务管理器 +// * @CreatedDate: 2024/9/4 下午7:44 +// * @FilePath: com.muyu.quest.manager +// */ +// +// +//import lombok.extern.slf4j.Slf4j; +// +//import java.util.Collections; +//import java.util.LinkedList; +//import java.util.List; +// +//import static java.lang.Thread.sleep; +// +///** +// * 基础线程配置 +// */ +//@Slf4j +//public final class TaskManagers { +// // 线程池中默认线程的个数为5 +// private static int workerNum = 8; +// // 工作线程 +//// private final WorkThread[] workThrads; +// // 未处理的任务 +// private static volatile int finishedTask = 0; +//// +//// // 任务队列 +//// private final List taskQueue = new LinkedList(); +//// private static TaskManager taskManager; +//// +//// private long startTime; +//// private long endTime; +//// +//// // 创建具有默认线程个数的线程池 +//// private TaskManager() { +//// this(8); +//// } +//// +//// // 创建线程池,workerNum为线程池中工作线程的个数 +//// private TaskManager(int workerNum) { +//// taskManager.workerNum = workerNum; +//// workThrads = new WorkThread[workerNum]; +//// for (int i = 0; i < workerNum; i++) { +//// workThrads[i] = new WorkThread(); +//// workThrads[i].start();// 开启线程池中的线程 +//// } +//// startTime = System.currentTimeMillis(); +//// } +//// +//// // 单态模式,获得一个默认线程个数的线程池 +//// public static TaskManager getTaskManager() { +//// return getTaskManager(TaskManager.workerNum); +//// } +//// +//// // 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数 +//// // workerNum<=0创建默认的工作线程个数 +//// public static TaskManager getTaskManager(int workerNum1) { +//// if (workerNum1 <= 0) { +//// workerNum1 = TaskManager.workerNum; +//// } +//// if (taskManager == null) { +//// taskManager = new TaskManager(workerNum1); +//// } +//// return taskManager; +//// } +//// +//// // 把任务加入任务队列 +//// public void execute(List task) { +//// execute(task.toArray(new Runnable[0])); +//// } +//// +//// // 把任务加入任务队列 +//// public void execute(Runnable... task) { +//// synchronized (taskQueue) { +//// Collections.addAll(taskQueue, task); +//// taskQueue.notify(); +//// } +//// } +//// +//// +//// // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁 +//// public void destroy() { +//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧 +//// try { +//// sleep(500); +//// } catch (InterruptedException e) { +//// e.printStackTrace(); +//// } +//// } +//// // 工作线程停止工作,且置为null +//// for (int i = 0; i < workerNum; i++) { +//// workThrads[i].stopWorker(); +//// workThrads[i] = null; +//// } +//// taskManager = null; +//// taskQueue.clear();// 清空任务队列 +//// } +//// +//// // 任务完成后,统计线程池的运行情况 +//// public void closed() { +//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧 +//// try { +//// sleep(500); +//// } catch (InterruptedException e) { +//// e.printStackTrace(); +//// } +//// } +//// log.info("任务完成,线程状态: {}", this.toString()); +//// taskManager = null; +//// } +//// +//// // 返回工作线程的个数 +//// public int getWorkThreadNumber() { +//// return workerNum; +//// } +//// +//// // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成 +//// public int getFinishedTasknumber() { +//// return finishedTask; +//// } +//// +//// // 返回任务队列的长度,即还没处理的任务个数 +//// public int getWaitTasknumber() { +//// return taskQueue.size(); +//// } +//// +//// /** +//// * 返回线程池当前状态 +//// * +//// * @return 如果还在工作返回false,否则返回true +//// */ +//// public Boolean getIsRunning() { +//// return taskQueue.isEmpty() || (taskManager.getWaitTasknumber() == 0 && taskManager.getWorkThreadNumber() == 0); +//// } +//// +//// // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数 +//// @Override +//// public String toString() { +//// endTime = System.currentTimeMillis(); +//// return "工作任务数:" + workerNum + ",已完成任务数:" +//// + finishedTask + ",等待任务数:" + getWaitTasknumber() +//// + "线程池作时长:" + (endTime - startTime) + "ms"; +//// } +//// +//// /** +//// * 内部类,工作线程 +//// */ +//// private class WorkThread extends Thread { +//// // 该工作线程是否有效,用于结束该工作线程 +//// private boolean isRunning = true; +//// +//// /* +//// * 关键所在,如果任务队列不空,则取出任务执行,若任务队列空,则等待 +//// */ +//// @Override +//// public void run() { +//// Runnable r = null; +//// while (isRunning) {// 若线程无效则自然结束run方法,该线程就没用了 +//// synchronized (taskQueue) { +//// while (isRunning && taskQueue.isEmpty()) {// 队列为空 +//// try { +//// taskQueue.wait(50); +//// } catch (InterruptedException e) { +//// e.printStackTrace(); +//// } +//// } +//// if (!taskQueue.isEmpty()) { +//// r = taskQueue.remove(0);// 取出任务 +//// } +//// } +//// if (r != null) { +//// r.run();// 执行任务 +//// } +//// finishedTask++; +//// r = null; +//// } +//// } +//// +//// // 停止工作,让该线程自然执行完run方法,自然结束 +//// public void stopWorker() { +//// isRunning = false; +//// } +//// } +//} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/test/MqttTest.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/test/MqttTest.java new file mode 100644 index 0000000..92325f5 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/test/MqttTest.java @@ -0,0 +1,71 @@ +//package com.muyu.parsing.test; +// +//import com.alibaba.fastjson.JSONObject; +//import com.muyu.common.kafka.constants.KafkaConstants; +//import com.muyu.parsing.domain.KafKaData; +//import com.muyu.parsing.domain.SysCarMessage; +//import com.muyu.parsing.manager.TaskManager; +//import com.muyu.parsing.service.impl.SysCarMessageServiceImpl; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerRecord; +//import org.eclipse.paho.client.mqttv3.*; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.stereotype.Component; +// +//import javax.annotation.PostConstruct; +//import javax.annotation.Resource; +//import java.util.ArrayList; +//import java.util.List; +// +//@Component +//public class MqttTest { +// private static final Logger log = LoggerFactory.getLogger(MqttTest.class); +// +// private static final Integer ID = 1; +// private static final String TOPIC = "vehicle"; +// private static final String BROKER = "tcp://106.15.136.7:1883"; +// +// @Resource +// private KafkaProducer kafkaProducer; +// @Resource +// private SysCarMessageServiceImpl sysCarMessageService; +// @Resource +// private TaskManager taskManager; // 注入 TaskManager +// +// @PostConstruct +// public void init() { +// String clientId = "JavaSample"; +// String userName = ""; +// String password = ""; +// +// try { +// MqttClient sampleClient = new MqttClient(BROKER, clientId); +// MqttConnectOptions connOpts = new MqttConnectOptions(); +// connOpts.setCleanSession(true); +// System.out.println("Connecting to broker: " + BROKER); +// sampleClient.connect(connOpts); +// sampleClient.subscribe(TOPIC, 0); +// sampleClient.setCallback(new MqttCallback() { +// @Override +// public void connectionLost(Throwable throwable) { +// // 处理连接丢失 +// } +// +// @Override +// public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { +// taskManager.execute(() -> processMessage(mqttMessage)); +// } +// +// @Override +// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { +// // 处理消息发送完成 +// } +// }); +// } catch (MqttException me) { +// me.printStackTrace(); +// } +// } +// +// +//} diff --git a/cloud-modules/cloud-modules-system/src/main/java/com/muyu/system/rabbit/RabbitTest.java b/cloud-modules/cloud-modules-system/src/main/java/com/muyu/system/rabbit/RabbitTest.java index 21ae678..fda7ea0 100644 --- a/cloud-modules/cloud-modules-system/src/main/java/com/muyu/system/rabbit/RabbitTest.java +++ b/cloud-modules/cloud-modules-system/src/main/java/com/muyu/system/rabbit/RabbitTest.java @@ -1,4 +1,4 @@ -//package com.muyu.system.rabbit; +package com.muyu.system.rabbit;//package com.muyu.system.rabbit; // //import com.alibaba.fastjson2.JSONObject; //import com.muyu.system.domain.SysConfig;