From c6661899fd4b06f1590613c5c42503c46e9b909c Mon Sep 17 00:00:00 2001 From: "yihua.huang" Date: Fri, 25 Apr 2014 17:33:48 +0800 Subject: [PATCH] new thread pool #110 --- .../java/us/codecraft/webmagic/Spider.java | 27 +++---- .../example/GithubRepoPageProcessor.java | 3 +- .../webmagic/selector/thread/ThreadPool.java | 73 +++++++++++++++++++ .../codecraft/webmagic/utils/ThreadUtils.java | 1 - .../webmagic/monitor/SpiderMonitor.java | 2 +- 5 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 6560a1b..0d7d5be 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -1,11 +1,9 @@ package us.codecraft.webmagic; import com.google.common.collect.Lists; - import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import us.codecraft.webmagic.downloader.Downloader; import us.codecraft.webmagic.downloader.HttpClientDownloader; import us.codecraft.webmagic.pipeline.CollectorPipeline; @@ -15,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline; import us.codecraft.webmagic.processor.PageProcessor; import us.codecraft.webmagic.scheduler.QueueScheduler; import us.codecraft.webmagic.scheduler.Scheduler; -import us.codecraft.webmagic.utils.ThreadUtils; +import us.codecraft.webmagic.selector.thread.ThreadPool; import us.codecraft.webmagic.utils.UrlUtils; import java.io.Closeable; @@ -79,7 +77,7 @@ public class Spider implements Runnable, Task { protected Logger logger = LoggerFactory.getLogger(getClass()); - protected ExecutorService executorService; + protected ThreadPool threadPool; protected int threadNum = 1; @@ -101,8 +99,6 @@ public class Spider implements Runnable, Task { private Condition newUrlCondition = newUrlLock.newCondition(); - private final AtomicInteger threadAlive = new AtomicInteger(0); - private List spiderListeners; private final AtomicLong pageCount = new AtomicLong(0); @@ -283,8 +279,8 @@ public class Spider implements Runnable, Task { pipelines.add(new ConsolePipeline()); } downloader.setThread(threadNum); - if (executorService == null || executorService.isShutdown()) { - executorService = ThreadUtils.newFixedThreadPool(threadNum); + if (threadPool == null || threadPool.isShutdown()) { + threadPool = new ThreadPool(threadNum); } if (startRequests != null) { for (Request request : startRequests) { @@ -292,7 +288,6 @@ public class Spider implements Runnable, Task { } startRequests.clear(); } - threadAlive.set(0); } @Override @@ -303,15 +298,14 @@ public class Spider implements Runnable, Task { while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { Request request = scheduler.poll(this); if (request == null) { - if (threadAlive.get() == 0 && exitWhenComplete) { + if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); } else { final Request requestFinal = request; - threadAlive.incrementAndGet(); - executorService.execute(new Runnable() { + threadPool.execute(new Runnable() { @Override public void run() { try { @@ -321,7 +315,6 @@ public class Spider implements Runnable, Task { onError(requestFinal); logger.error("process request " + requestFinal + " error", e); } finally { - threadAlive.decrementAndGet(); pageCount.incrementAndGet(); signalNewUrl(); } @@ -370,7 +363,7 @@ public class Spider implements Runnable, Task { for (Pipeline pipeline : pipelines) { destroyEach(pipeline); } - executorService.shutdown(); + threadPool.shutdown(); } private void destroyEach(Object object) { @@ -522,7 +515,7 @@ public class Spider implements Runnable, Task { newUrlLock.lock(); try { //double check - if (threadAlive.get() == 0 && exitWhenComplete) { + if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { return; } newUrlCondition.await(); @@ -644,7 +637,7 @@ public class Spider implements Runnable, Task { * @since 0.4.1 */ public int getThreadAlive() { - return threadAlive.get(); + return threadPool.getThreadAlive(); } /** @@ -674,7 +667,7 @@ public class Spider implements Runnable, Task { } public Spider setExecutorService(ExecutorService executorService) { - this.executorService = executorService; + this.threadPool.setExecutorService(executorService); return this; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/processor/example/GithubRepoPageProcessor.java b/webmagic-core/src/main/java/us/codecraft/webmagic/processor/example/GithubRepoPageProcessor.java index c512265..f4ae058 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/processor/example/GithubRepoPageProcessor.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/processor/example/GithubRepoPageProcessor.java @@ -11,11 +11,12 @@ import us.codecraft.webmagic.processor.PageProcessor; */ public class GithubRepoPageProcessor implements PageProcessor { - private Site site = Site.me().setRetryTimes(3).setSleepTime(1000); + private Site site = Site.me().setRetryTimes(3).setSleepTime(0); @Override public void process(Page page) { page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+/\\w+)").all()); + page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+)").all()); page.putField("author", page.getUrl().regex("https://github\\.com/(\\w+)/.*").toString()); page.putField("name", page.getHtml().xpath("//h1[@class='entry-title public']/strong/a/text()").toString()); if (page.getResultItems().get("name")==null){ diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java b/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java new file mode 100644 index 0000000..0548919 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/selector/thread/ThreadPool.java @@ -0,0 +1,73 @@ +package us.codecraft.webmagic.selector.thread; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public class ThreadPool { + + private int threadNum; + + private int threadAlive; + + private ReentrantLock reentrantLock = new ReentrantLock(); + + private Condition condition = reentrantLock.newCondition(); + + public ThreadPool(int threadNum) { + this.threadNum = threadNum; + this.executorService = Executors.newFixedThreadPool(threadNum); + } + + public ThreadPool(int threadNum, ExecutorService executorService) { + this.threadNum = threadNum; + this.executorService = executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public int getThreadAlive() { + return threadAlive; + } + + public int getThreadNum() { + return threadNum; + } + + private ExecutorService executorService; + + public void execute(Runnable runnable) { + try { + reentrantLock.lock(); + while (threadAlive >= threadNum) { + try { + condition.await(); + } catch (InterruptedException e) { + } + } + threadAlive++; + executorService.execute(runnable); + } finally { + condition.notify(); + threadAlive--; + reentrantLock.unlock(); + } + } + + public boolean isShutdown() { + return executorService.isShutdown(); + } + + public void shutdown() { + executorService.shutdown(); + } + + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java index cdfe6d0..d8d2122 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java @@ -19,7 +19,6 @@ public class ThreadUtils { } if (threadSize == 1) { return MoreExecutors.sameThreadExecutor(); - } return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java index 193ff94..0783b7e 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java @@ -240,7 +240,7 @@ public class SpiderMonitor { //Others will be registered spiderMonitor.server().jmxStart(); oschinaSpider.start(); - githubSpider.start(); + githubSpider.thread(10).start(); }