fix(): 修复合并异常与业务模块无法启动问题

dev
面包骑士 2024-10-10 10:10:55 +08:00
commit 15b1b6c639
19 changed files with 547 additions and 95 deletions

View File

@ -21,132 +21,132 @@ public @interface Excel {
/**
* excel
*/
public int sort () default Integer.MAX_VALUE;
public int sort() default Integer.MAX_VALUE;
/**
* Excel.
*/
public String name () default "";
public String name() default "";
/**
* , : yyyy-MM-dd
*/
public String dateFormat () default "";
public String dateFormat() default "";
/**
* (: 0=,1=,2=)
*/
public String readConverterExp () default "";
public String readConverterExp() default "";
/**
*
*/
public String separator () default ",";
public String separator() default ",";
/**
* BigDecimal :-1(BigDecimal)
*/
public int scale () default -1;
public int scale() default -1;
/**
* BigDecimal :BigDecimal.ROUND_HALF_EVEN
*/
public int roundingMode () default BigDecimal.ROUND_HALF_EVEN;
public int roundingMode() default BigDecimal.ROUND_HALF_EVEN;
/**
* excel
*/
public double height () default 14;
public double height() default 14;
/**
* excel
*/
public double width () default 16;
public double width() default 16;
/**
* ,% 90 90%
*/
public String suffix () default "";
public String suffix() default "";
/**
* ,
*/
public String defaultValue () default "";
public String defaultValue() default "";
/**
*
*/
public String prompt () default "";
public String prompt() default "";
/**
* .
*/
public String[] combo () default {};
public String[] combo() default {};
/**
* ,:list)
*/
public boolean needMerge () default false;
public boolean needMerge() default false;
/**
* ,:,.
*/
public boolean isExport () default true;
public boolean isExport() default true;
/**
* ,,
*/
public String targetAttr () default "";
public String targetAttr() default "";
/**
* ,
*/
public boolean isStatistics () default false;
public boolean isStatistics() default false;
/**
* 0 1
*/
public ColumnType cellType () default ColumnType.STRING;
public ColumnType cellType() default ColumnType.STRING;
/**
*
*/
public IndexedColors headerBackgroundColor () default IndexedColors.GREY_50_PERCENT;
public IndexedColors headerBackgroundColor() default IndexedColors.GREY_50_PERCENT;
/**
*
*/
public IndexedColors headerColor () default IndexedColors.WHITE;
public IndexedColors headerColor() default IndexedColors.WHITE;
/**
*
*/
public IndexedColors backgroundColor () default IndexedColors.WHITE;
public IndexedColors backgroundColor() default IndexedColors.WHITE;
/**
*
*/
public IndexedColors color () default IndexedColors.BLACK;
public IndexedColors color() default IndexedColors.BLACK;
/**
*
*/
public HorizontalAlignment align () default HorizontalAlignment.CENTER;
public HorizontalAlignment align() default HorizontalAlignment.CENTER;
/**
*
*/
public Class<?> handler () default ExcelHandlerAdapter.class;
public Class<?> handler() default ExcelHandlerAdapter.class;
/**
*
*/
public String[] args () default {};
public String[] args() default {};
/**
* 012
*/
Type type () default Type.ALL;
Type type() default Type.ALL;
public enum Type {
ALL(0), EXPORT(1), IMPORT(2);

View File

@ -13,5 +13,5 @@ import java.lang.annotation.Target;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Excels {
Excel[] value ();
Excel[] value();
}

View File

@ -26,7 +26,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor {
*
* @return Object bean
*
* @throws org.springframework.beans.BeansException
* @throws BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean (String name) throws BeansException {
@ -40,7 +40,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor {
*
* @return
*
* @throws org.springframework.beans.BeansException
* @throws BeansException
*/
public static <T> T getBean (Class<T> clz) throws BeansException {
T result = (T) beanFactory.getBean(clz);
@ -65,7 +65,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor {
*
* @return boolean
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
* @throws NoSuchBeanDefinitionException
*/
public static boolean isSingleton (String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
@ -76,7 +76,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor {
*
* @return Class
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
* @throws NoSuchBeanDefinitionException
*/
public static Class<?> getType (String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
@ -89,7 +89,7 @@ public final class SpringUtils implements BeanFactoryPostProcessor {
*
* @return
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
* @throws NoSuchBeanDefinitionException
*/
public static String[] getAliases (String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);

View File

@ -17,11 +17,11 @@ import java.lang.annotation.Target;
@Target(value = {ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR, ElementType.PARAMETER})
@Constraint(validatedBy = {XssValidator.class})
public @interface Xss {
String message ()
String message()
default "不允许任何脚本运行";
Class<?>[] groups () default {};
Class<?>[] groups() default {};
Class<? extends Payload>[] payload () default {};
Class<? extends Payload>[] payload() default {};
}

View File

@ -14,15 +14,15 @@ public @interface DataScope {
/**
*
*/
public String deptAlias () default "";
public String deptAlias() default "";
/**
*
*/
public String userAlias () default "";
public String userAlias() default "";
/**
* @RequiresPermissions
*/
public String permission () default "";
public String permission() default "";
}

View File

@ -17,30 +17,30 @@ public @interface Log {
/**
*
*/
public String title () default "";
public String title() default "";
/**
*
*/
public BusinessType businessType () default BusinessType.OTHER;
public BusinessType businessType() default BusinessType.OTHER;
/**
*
*/
public OperatorType operatorType () default OperatorType.MANAGE;
public OperatorType operatorType() default OperatorType.MANAGE;
/**
*
*/
public boolean isSaveRequestData () default true;
public boolean isSaveRequestData() default true;
/**
*
*/
public boolean isSaveResponseData () default true;
public boolean isSaveResponseData() default true;
/**
*
*/
public String[] excludeParamNames () default {};
public String[] excludeParamNames() default {};
}

View File

@ -15,13 +15,13 @@ import java.lang.annotation.*;
@Documented
@EnableFeignClients
public @interface EnableMyFeignClients {
String[] value () default {};
String[] value() default {};
String[] basePackages () default {"com.muyu"};
String[] basePackages() default {"com.muyu"};
Class<?>[] basePackageClasses () default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration () default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients () default {};
Class<?>[] clients() default {};
}

View File

@ -14,5 +14,5 @@ public @interface InnerAuth {
/**
*
*/
boolean isUser () default false;
boolean isUser() default false;
}

View File

@ -16,10 +16,10 @@ public @interface RequiresPermissions {
/**
*
*/
String[] value () default {};
String[] value() default {};
/**
* AND | ORAND
*/
Logical logical () default Logical.AND;
Logical logical() default Logical.AND;
}

View File

@ -16,10 +16,10 @@ public @interface RequiresRoles {
/**
*
*/
String[] value () default {};
String[] value() default {};
/**
* AND | ORAND
*/
Logical logical () default Logical.AND;
Logical logical() default Logical.AND;
}

View File

@ -57,7 +57,7 @@ public class ECSTool {
// 打印诊断推荐链接
System.out.println(teaError.getData().get("Recommend"));
// 断言错误信息
com.aliyun.teautil.Common.assertAsString(teaError.getMessage());
Common.assertAsString(teaError.getMessage());
} else {
// 处理其他类型的异常
System.out.println(error.getMessage());
@ -83,7 +83,7 @@ public class ECSTool {
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
Common.assertAsString(error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
@ -91,7 +91,7 @@ public class ECSTool {
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
Common.assertAsString(error.message);
}
}

View File

@ -8,20 +8,18 @@
<artifactId>cloud-modules</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>cloud-modules-data-processing</artifactId>
<description>
cloud-data-processing 数据处理模块
cloud-modules-data-processing 数据处理模块
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
@ -90,8 +88,47 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-datasource</artifactId>
</dependency>
</dependencies>
<!-- 接口模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-api-doc</artifactId>
</dependency>
<!-- XllJob定时任务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-xxl</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<!-- kafka-->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
<!-- mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-car-gateway</artifactId>
<version>3.6.3</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
@ -108,5 +145,4 @@
</plugin>
</plugins>
</build>
</project>

View File

@ -4,9 +4,11 @@ import com.muyu.common.core.constant.ServiceNameConstants;
import com.muyu.common.core.domain.Result;
import com.muyu.openbusiness.domain.SysCar;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
@FeignClient(contextId = "sysCarRemoteService", value = ServiceNameConstants.CAR_SERVICE)
public interface SysCarRemoteService {
@GetMapping("/car/findByVin")
public Result<SysCar> findByVin(String vin);
}

View File

@ -13,12 +13,36 @@
<description>
cloud-modules-parsing 协议解析模块
</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-caffeine</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-iotdb</artifactId>
<version>3.6.3</version>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
@ -43,51 +67,26 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- MuYu Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-datasource</artifactId>
</dependency>
<!-- MuYu Common DataScope -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-datascope</artifactId>
</dependency>
<!-- MuYu Common Log -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-log</artifactId>
</dependency>
<!-- 接口模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-api-doc</artifactId>
</dependency>
<!-- XllJob定时任务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-xxl</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
<artifactId>cloud-common-datasource</artifactId>
</dependency>
<!-- kafka-->
<dependency>

View File

@ -0,0 +1,131 @@
package com.muyu.parsing.consumer;
import com.muyu.cargateway.domain.properties.MqttProperties;
import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.parsing.domain.KafKaData;
import com.muyu.parsing.domain.SysCarMessage;
import com.muyu.parsing.manager.TaskManager;
import com.muyu.parsing.service.impl.SysCarMessageServiceImpl;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName FormMessageConsumer
* @Description
* @Author Chen
* @Date 2024/10/9 15:55
*/
@Component
@Slf4j
public class FormMessageConsumer {
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private KafkaProducer<String, String> kafkaProducer;
@Resource
private SysCarMessageServiceImpl sysCarMessageService;
private final static String FORM_QUEUE = "queue_inform_sms";
@RabbitListener(queuesToDeclare = {@Queue(FORM_QUEUE)})
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
log.info("基础信息 {} 。。。", mqttProperties);
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions();
//用户名
connOpts.setUserName(mqttProperties.getUserName());
//密码
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + mqttProperties.getBroker());
sampleClient.connect(connOpts);
sampleClient.subscribe(mqttProperties.getTopic(), 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
log.error("连接丢失:{}", throwable.getMessage());
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
TaskManager manager = new TaskManager(10);
manager.execute(() -> processMessage(mqttMessage));
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("接收消息:{}", iMqttDeliveryToken);
}
});
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (MqttException | IOException me) {
// System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
throw new RuntimeException(me);
}
}
private void processMessage(MqttMessage mqttMessage) {
String string = new String(mqttMessage.getPayload());
log.info(string);
List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageLists(1);
List<KafKaData> kafKaDataList = new ArrayList<>();
String[] test = string.split(" ");
for (SysCarMessage carMessage : list) {
int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
int end = Integer.parseInt(carMessage.getMessageEndIndex());
StringBuilder hexBuilder = new StringBuilder();
for (int i = start; i < end; i++) {
hexBuilder.append(test[i]);
}
String hex = hexBuilder.toString();
char[] result = new char[hex.length() / 2];
for (int x = 0; x < hex.length(); x += 2) {
int high = Character.digit(hex.charAt(x), 16);
int low = Character.digit(hex.charAt(x + 1), 16);
result[x / 2] = (char) ((high << 4) + low);
}
String value = new String(result);
kafKaDataList.add(KafKaData.builder()
.key(carMessage.getMessageTypeCode())
.label(carMessage.getMessageTypeCode())
.value(value)
.type(carMessage.getMessageType())
.build());
}
kafKaDataList.add(KafKaData.builder()
.key("firmCode")
.label("企业编码")
.value("firm01")
.type("String")
.build());
String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(kafKaDataList);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
kafkaProducer.send(producerRecord);
log.info("kafka投产{}", jsonString);
}
}

View File

@ -0,0 +1,24 @@
package com.muyu.parsing.manager;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
/**
* 线
*/
public final class TaskManager {
private final ExecutorService executorService;
public TaskManager(int threadPoolSize) {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public void execute(Runnable task) {
executorService.submit(task);
}
// 关闭线程池
public void shutdown() {
executorService.shutdown();
}
}

View File

@ -0,0 +1,189 @@
//package com.muyu.parsing.manager;
//
///**
// * @Author: 胡杨
// * @Name: TaskManager
// * @Description: 任务管理器
// * @CreatedDate: 2024/9/4 下午7:44
// * @FilePath: com.muyu.quest.manager
// */
//
//
//import lombok.extern.slf4j.Slf4j;
//
//import java.util.Collections;
//import java.util.LinkedList;
//import java.util.List;
//
//import static java.lang.Thread.sleep;
//
///**
// * 基础线程配置
// */
//@Slf4j
//public final class TaskManagers {
// // 线程池中默认线程的个数为5
// private static int workerNum = 8;
// // 工作线程
//// private final WorkThread[] workThrads;
// // 未处理的任务
// private static volatile int finishedTask = 0;
////
//// // 任务队列
//// private final List<Runnable> taskQueue = new LinkedList<Runnable>();
//// private static TaskManager taskManager;
////
//// private long startTime;
//// private long endTime;
////
//// // 创建具有默认线程个数的线程池
//// private TaskManager() {
//// this(8);
//// }
////
//// // 创建线程池,workerNum为线程池中工作线程的个数
//// private TaskManager(int workerNum) {
//// taskManager.workerNum = workerNum;
//// workThrads = new WorkThread[workerNum];
//// for (int i = 0; i < workerNum; i++) {
//// workThrads[i] = new WorkThread();
//// workThrads[i].start();// 开启线程池中的线程
//// }
//// startTime = System.currentTimeMillis();
//// }
////
//// // 单态模式,获得一个默认线程个数的线程池
//// public static TaskManager getTaskManager() {
//// return getTaskManager(TaskManager.workerNum);
//// }
////
//// // 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数
//// // workerNum<=0创建默认的工作线程个数
//// public static TaskManager getTaskManager(int workerNum1) {
//// if (workerNum1 <= 0) {
//// workerNum1 = TaskManager.workerNum;
//// }
//// if (taskManager == null) {
//// taskManager = new TaskManager(workerNum1);
//// }
//// return taskManager;
//// }
////
//// // 把任务加入任务队列
//// public void execute(List<Runnable> task) {
//// execute(task.toArray(new Runnable[0]));
//// }
////
//// // 把任务加入任务队列
//// public void execute(Runnable... task) {
//// synchronized (taskQueue) {
//// Collections.addAll(taskQueue, task);
//// taskQueue.notify();
//// }
//// }
////
////
//// // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
//// public void destroy() {
//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧
//// try {
//// sleep(500);
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// }
//// // 工作线程停止工作且置为null
//// for (int i = 0; i < workerNum; i++) {
//// workThrads[i].stopWorker();
//// workThrads[i] = null;
//// }
//// taskManager = null;
//// taskQueue.clear();// 清空任务队列
//// }
////
//// // 任务完成后,统计线程池的运行情况
//// public void closed() {
//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧
//// try {
//// sleep(500);
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// }
//// log.info("任务完成,线程状态: {}", this.toString());
//// taskManager = null;
//// }
////
//// // 返回工作线程的个数
//// public int getWorkThreadNumber() {
//// return workerNum;
//// }
////
//// // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
//// public int getFinishedTasknumber() {
//// return finishedTask;
//// }
////
//// // 返回任务队列的长度,即还没处理的任务个数
//// public int getWaitTasknumber() {
//// return taskQueue.size();
//// }
////
//// /**
//// * 返回线程池当前状态
//// *
//// * @return 如果还在工作返回false否则返回true
//// */
//// public Boolean getIsRunning() {
//// return taskQueue.isEmpty() || (taskManager.getWaitTasknumber() == 0 && taskManager.getWorkThreadNumber() == 0);
//// }
////
//// // 覆盖toString方法返回线程池信息工作线程个数和已完成任务个数
//// @Override
//// public String toString() {
//// endTime = System.currentTimeMillis();
//// return "工作任务数:" + workerNum + ",已完成任务数:"
//// + finishedTask + ",等待任务数:" + getWaitTasknumber()
//// + "线程池作时长:" + (endTime - startTime) + "ms";
//// }
////
//// /**
//// * 内部类,工作线程
//// */
//// private class WorkThread extends Thread {
//// // 该工作线程是否有效,用于结束该工作线程
//// private boolean isRunning = true;
////
//// /*
//// * 关键所在,如果任务队列不空,则取出任务执行,若任务队列空,则等待
//// */
//// @Override
//// public void run() {
//// Runnable r = null;
//// while (isRunning) {// 若线程无效则自然结束run方法该线程就没用了
//// synchronized (taskQueue) {
//// while (isRunning && taskQueue.isEmpty()) {// 队列为空
//// try {
//// taskQueue.wait(50);
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// }
//// if (!taskQueue.isEmpty()) {
//// r = taskQueue.remove(0);// 取出任务
//// }
//// }
//// if (r != null) {
//// r.run();// 执行任务
//// }
//// finishedTask++;
//// r = null;
//// }
//// }
////
//// // 停止工作让该线程自然执行完run方法自然结束
//// public void stopWorker() {
//// isRunning = false;
//// }
//// }
//}

View File

@ -0,0 +1,71 @@
//package com.muyu.parsing.test;
//
//import com.alibaba.fastjson.JSONObject;
//import com.muyu.common.kafka.constants.KafkaConstants;
//import com.muyu.parsing.domain.KafKaData;
//import com.muyu.parsing.domain.SysCarMessage;
//import com.muyu.parsing.manager.TaskManager;
//import com.muyu.parsing.service.impl.SysCarMessageServiceImpl;
//import org.apache.kafka.clients.producer.KafkaProducer;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.eclipse.paho.client.mqttv3.*;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//
//@Component
//public class MqttTest {
// private static final Logger log = LoggerFactory.getLogger(MqttTest.class);
//
// private static final Integer ID = 1;
// private static final String TOPIC = "vehicle";
// private static final String BROKER = "tcp://106.15.136.7:1883";
//
// @Resource
// private KafkaProducer<String, String> kafkaProducer;
// @Resource
// private SysCarMessageServiceImpl sysCarMessageService;
// @Resource
// private TaskManager taskManager; // 注入 TaskManager
//
// @PostConstruct
// public void init() {
// String clientId = "JavaSample";
// String userName = "";
// String password = "";
//
// try {
// MqttClient sampleClient = new MqttClient(BROKER, clientId);
// MqttConnectOptions connOpts = new MqttConnectOptions();
// connOpts.setCleanSession(true);
// System.out.println("Connecting to broker: " + BROKER);
// sampleClient.connect(connOpts);
// sampleClient.subscribe(TOPIC, 0);
// sampleClient.setCallback(new MqttCallback() {
// @Override
// public void connectionLost(Throwable throwable) {
// // 处理连接丢失
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// taskManager.execute(() -> processMessage(mqttMessage));
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// // 处理消息发送完成
// }
// });
// } catch (MqttException me) {
// me.printStackTrace();
// }
// }
//
//
//}

View File

@ -1,4 +1,4 @@
//package com.muyu.system.rabbit;
package com.muyu.system.rabbit;//package com.muyu.system.rabbit;
//
//import com.alibaba.fastjson2.JSONObject;
//import com.muyu.system.domain.SysConfig;