diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 5016724..0c5b147 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -13,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline; import us.codecraft.webmagic.processor.PageProcessor; import us.codecraft.webmagic.scheduler.QueueScheduler; import us.codecraft.webmagic.scheduler.Scheduler; -import us.codecraft.webmagic.selector.thread.ThreadPool; +import us.codecraft.webmagic.selector.thread.CountableThreadPool; import us.codecraft.webmagic.utils.UrlUtils; import java.io.Closeable; @@ -74,7 +74,9 @@ public class Spider implements Runnable, Task { protected Logger logger = LoggerFactory.getLogger(getClass()); - protected ThreadPool threadPool; + protected CountableThreadPool threadPool; + + protected ExecutorService executorService; protected int threadNum = 1; @@ -279,7 +281,11 @@ public class Spider implements Runnable, Task { } downloader.setThread(threadNum); if (threadPool == null || threadPool.isShutdown()) { - threadPool = new ThreadPool(threadNum); + if (executorService != null && !executorService.isShutdown()) { + threadPool = new CountableThreadPool(threadNum, executorService); + } else { + threadPool = new CountableThreadPool(threadNum); + } } if (startRequests != null) { for (Request request : startRequests) { @@ -330,7 +336,7 @@ public class Spider implements Runnable, Task { } protected void onError(Request request) { - if (CollectionUtils.isNotEmpty(spiderListeners)){ + if (CollectionUtils.isNotEmpty(spiderListeners)) { for (SpiderListener spiderListener : spiderListeners) { spiderListener.onError(request); } @@ -338,7 +344,7 @@ public class Spider implements Runnable, Task { } protected void onSuccess(Request request) { - if (CollectionUtils.isNotEmpty(spiderListeners)){ + if (CollectionUtils.isNotEmpty(spiderListeners)) { for (SpiderListener spiderListener : spiderListeners) { spiderListener.onSuccess(request); } @@ -521,8 +527,7 @@ public class Spider implements Runnable, Task { newUrlCondition.await(); } catch (InterruptedException e) { logger.warn("waitNewUrl - interrupted, error {}", e); - } - finally { + } finally { newUrlLock.unlock(); } } @@ -563,6 +568,21 @@ public class Spider implements Runnable, Task { return this; } + /** + * start with more than one threads + * + * @param threadNum + * @return this + */ + public Spider thread(ExecutorService executorService, int threadNum) { + checkIfRunning(); + this.threadNum = threadNum; + if (threadNum <= 0) { + throw new IllegalArgumentException("threadNum should be more than one!"); + } + return this; + } + public boolean isExitWhenComplete() { return exitWhenComplete; } @@ -637,6 +657,9 @@ public class Spider implements Runnable, Task { * @since 0.4.1 */ public int getThreadAlive() { + if (threadPool == null) { + return 0; + } return threadPool.getThreadAlive(); } @@ -667,7 +690,8 @@ public class Spider implements Runnable, Task { } public Spider setExecutorService(ExecutorService executorService) { - this.threadPool.setExecutorService(executorService); + checkIfRunning(); + this.executorService = executorService; return this; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java b/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/CountableThreadPool.java similarity index 84% rename from webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java rename to webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/CountableThreadPool.java index 00df89a..0121cf2 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/CountableThreadPool.java @@ -7,10 +7,16 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** + * Thread pool for workers.

+ * Use {@link java.util.concurrent.ExecutorService} as inner implement.

+ * New feature:

+ * 1. Block when thread pool is full to avoid poll many urls but not process.

+ * 2. Count of thread alive for monitor. + * * @author code4crafer@gmail.com * @since 0.5.0 */ -public class ThreadPool { +public class CountableThreadPool { private int threadNum; @@ -20,12 +26,12 @@ public class ThreadPool { private Condition condition = reentrantLock.newCondition(); - public ThreadPool(int threadNum) { + public CountableThreadPool(int threadNum) { this.threadNum = threadNum; this.executorService = Executors.newFixedThreadPool(threadNum); } - public ThreadPool(int threadNum, ExecutorService executorService) { + public CountableThreadPool(int threadNum, ExecutorService executorService) { this.threadNum = threadNum; this.executorService = executorService; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java index d8d2122..5c4d346 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java @@ -13,6 +13,12 @@ import java.util.concurrent.TimeUnit; */ public class ThreadUtils { + /** + * @Deprecated + * @param threadSize + * @return + * @see us.codecraft.webmagic.selector.thread.CountableThreadPool + */ public static ExecutorService newFixedThreadPool(int threadSize) { if (threadSize <= 0) { throw new IllegalArgumentException("ThreadSize must be greater than 0!");