From fba330872b7ebc211318c66003392e39deb46f80 Mon Sep 17 00:00:00 2001 From: "yihua.huang" Date: Sun, 22 Sep 2013 23:57:15 +0800 Subject: [PATCH] fix a thread pool exception --- .../java/us/codecraft/webmagic/Spider.java | 697 +++++++++--------- .../codecraft/webmagic/utils/ThreadUtils.java | 27 +- .../us/codecraft/webmagic/SpiderTest.java | 5 +- 3 files changed, 366 insertions(+), 363 deletions(-) diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 45766f3..829546b 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -21,23 +21,28 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Entrance of a crawler.
- * A spider contains four modules: Downloader, Scheduler, PageProcessor and Pipeline.
- * Every module is a field of Spider.
- * The modules are defined in interface.
- * You can customize a spider with various implementations of them.
- * Examples:
+ * A spider contains four modules: Downloader, Scheduler, PageProcessor and + * Pipeline.
+ * Every module is a field of Spider.
+ * The modules are defined in interface.
+ * You can customize a spider with various implementations of them.
+ * Examples:
*
- * A simple crawler:
- * Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*")).run();
+ * A simple crawler:
+ * Spider.create(new SimplePageProcessor("http://my.oschina.net/", + * "http://my.oschina.net/*blog/*")).run();
*
- * Store results to files by FilePipeline:
- * Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*"))
- * .pipeline(new FilePipeline("/data/temp/webmagic/")).run();
+ * Store results to files by FilePipeline:
+ * Spider.create(new SimplePageProcessor("http://my.oschina.net/", + * "http://my.oschina.net/*blog/*"))
+ * .pipeline(new FilePipeline("/data/temp/webmagic/")).run();
*
- * Use FileCacheQueueScheduler to store urls and cursor in files, so that a Spider can resume the status when shutdown.
- * Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*"))
- * .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run();
- * + * Use FileCacheQueueScheduler to store urls and cursor in files, so that a + * Spider can resume the status when shutdown.
+ * Spider.create(new SimplePageProcessor("http://my.oschina.net/", + * "http://my.oschina.net/*blog/*"))
+ * .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run();
+ * * @author code4crafter@gmail.com
* @see Downloader * @see Scheduler @@ -47,373 +52,381 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class Spider implements Runnable, Task { - protected Downloader downloader; + protected Downloader downloader; - protected List pipelines = new ArrayList(); + protected List pipelines = new ArrayList(); - protected PageProcessor pageProcessor; + protected PageProcessor pageProcessor; - protected List startUrls; + protected List startUrls; - protected Site site; + protected Site site; - protected String uuid; + protected String uuid; - protected Scheduler scheduler = new QueueScheduler(); + protected Scheduler scheduler = new QueueScheduler(); - protected Logger logger = Logger.getLogger(getClass()); + protected Logger logger = Logger.getLogger(getClass()); - protected ExecutorService executorService; + protected ExecutorService executorService; - protected int threadNum = 1; + protected int threadNum = 1; - protected AtomicInteger stat = new AtomicInteger(STAT_INIT); + protected AtomicInteger stat = new AtomicInteger(STAT_INIT); - protected final static int STAT_INIT = 0; + protected final static int STAT_INIT = 0; - protected final static int STAT_RUNNING = 1; + protected final static int STAT_RUNNING = 1; - protected final static int STAT_STOPPED = 2; + protected final static int STAT_STOPPED = 2; - /** - * create a spider with pageProcessor. - * - * @param pageProcessor - * @return new spider - * @see PageProcessor - */ - public static Spider create(PageProcessor pageProcessor) { - return new Spider(pageProcessor); - } + /** + * create a spider with pageProcessor. + * + * @param pageProcessor + * @return new spider + * @see PageProcessor + */ + public static Spider create(PageProcessor pageProcessor) { + return new Spider(pageProcessor); + } - /** - * create a spider with pageProcessor. - * - * @param pageProcessor - */ - public Spider(PageProcessor pageProcessor) { - this.pageProcessor = pageProcessor; - this.site = pageProcessor.getSite(); - this.startUrls = pageProcessor.getSite().getStartUrls(); - } + /** + * create a spider with pageProcessor. + * + * @param pageProcessor + */ + public Spider(PageProcessor pageProcessor) { + this.pageProcessor = pageProcessor; + this.site = pageProcessor.getSite(); + this.startUrls = pageProcessor.getSite().getStartUrls(); + } - /** - * Set startUrls of Spider.
- * Prior to startUrls of Site. - * - * @param startUrls - * @return this - */ - public Spider startUrls(List startUrls) { - checkIfRunning(); - this.startUrls = startUrls; - return this; - } + /** + * Set startUrls of Spider.
+ * Prior to startUrls of Site. + * + * @param startUrls + * @return this + */ + public Spider startUrls(List startUrls) { + checkIfRunning(); + this.startUrls = startUrls; + return this; + } - /** - * Set an uuid for spider.
- * Default uuid is domain of site.
- * - * @param uuid - * @return this - */ - public Spider setUUID(String uuid) { - this.uuid = uuid; - return this; - } + /** + * Set an uuid for spider.
+ * Default uuid is domain of site.
+ * + * @param uuid + * @return this + */ + public Spider setUUID(String uuid) { + this.uuid = uuid; + return this; + } - /** - * set scheduler for Spider - * - * @param scheduler - * @return this - * @Deprecated - * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) - */ - public Spider scheduler(Scheduler scheduler) { - return setScheduler(scheduler); - } + /** + * set scheduler for Spider + * + * @param scheduler + * @return this + * @Deprecated + * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) + */ + public Spider scheduler(Scheduler scheduler) { + return setScheduler(scheduler); + } - /** - * set scheduler for Spider - * - * @param scheduler - * @return this - * @see Scheduler - * @since 0.2.1 - */ - public Spider setScheduler(Scheduler scheduler) { - checkIfRunning(); - this.scheduler = scheduler; - return this; - } + /** + * set scheduler for Spider + * + * @param scheduler + * @return this + * @see Scheduler + * @since 0.2.1 + */ + public Spider setScheduler(Scheduler scheduler) { + checkIfRunning(); + this.scheduler = scheduler; + return this; + } - /** - * add a pipeline for Spider - * - * @param pipeline - * @return this - * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) - * @deprecated - */ - public Spider pipeline(Pipeline pipeline) { - return addPipeline(pipeline); - } + /** + * add a pipeline for Spider + * + * @param pipeline + * @return this + * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) + * @deprecated + */ + public Spider pipeline(Pipeline pipeline) { + return addPipeline(pipeline); + } - /** - * add a pipeline for Spider - * - * @param pipeline - * @return this - * @see Pipeline - * @since 0.2.1 - */ - public Spider addPipeline(Pipeline pipeline) { - checkIfRunning(); - this.pipelines.add(pipeline); - return this; - } + /** + * add a pipeline for Spider + * + * @param pipeline + * @return this + * @see Pipeline + * @since 0.2.1 + */ + public Spider addPipeline(Pipeline pipeline) { + checkIfRunning(); + this.pipelines.add(pipeline); + return this; + } - /** - * clear the pipelines set - * - * @return this - */ - public Spider clearPipeline() { - pipelines = new ArrayList(); - return this; - } + /** + * clear the pipelines set + * + * @return this + */ + public Spider clearPipeline() { + pipelines = new ArrayList(); + return this; + } - /** - * set the downloader of spider - * - * @param downloader - * @return this - * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) - * @deprecated - */ - public Spider downloader(Downloader downloader) { - return setDownloader(downloader); - } + /** + * set the downloader of spider + * + * @param downloader + * @return this + * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) + * @deprecated + */ + public Spider downloader(Downloader downloader) { + return setDownloader(downloader); + } - /** - * set the downloader of spider - * - * @param downloader - * @return this - * @see Downloader - */ - public Spider setDownloader(Downloader downloader) { - checkIfRunning(); - this.downloader = downloader; - return this; - } + /** + * set the downloader of spider + * + * @param downloader + * @return this + * @see Downloader + */ + public Spider setDownloader(Downloader downloader) { + checkIfRunning(); + this.downloader = downloader; + return this; + } - protected void checkComponent() { - if (downloader == null) { - this.downloader = new HttpClientDownloader(); - } - if (pipelines.isEmpty()) { - pipelines.add(new ConsolePipeline()); - } - downloader.setThread(threadNum); - } + protected void checkComponent() { + if (downloader == null) { + this.downloader = new HttpClientDownloader(); + } + if (pipelines.isEmpty()) { + pipelines.add(new ConsolePipeline()); + } + downloader.setThread(threadNum); + } - @Override - public void run() { - if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) - && !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) { - throw new IllegalStateException("Spider is already running!"); - } - checkComponent(); - if (startUrls != null) { - for (String startUrl : startUrls) { - scheduler.push(new Request(startUrl), this); - } - startUrls.clear(); - } - Request request = scheduler.poll(this); - //single thread - if (threadNum <= 1) { - while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { - processRequest(request); - request = scheduler.poll(this); - } - } else { - synchronized (this) { - this.executorService = ThreadUtils.newFixedThreadPool(threadNum); - } - //multi thread - final AtomicInteger threadAlive = new AtomicInteger(0); - while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { - if (request == null) { - //when no request found but some thread is alive, sleep a while. - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - } else { - final Request requestFinal = request; - threadAlive.incrementAndGet(); - executorService.execute(new Runnable() { - @Override - public void run() { - processRequest(requestFinal); - threadAlive.decrementAndGet(); - } - }); - } - request = scheduler.poll(this); - if (threadAlive.get() == 0) { - request = scheduler.poll(this); - if (request == null) { - break; - } - } - } - executorService.shutdown(); - } - stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); - //release some resources - destroy(); - } + @Override + public void run() { + if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) && !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) { + throw new IllegalStateException("Spider is already running!"); + } + checkComponent(); + if (startUrls != null) { + for (String startUrl : startUrls) { + scheduler.push(new Request(startUrl), this); + } + startUrls.clear(); + } + Request request = scheduler.poll(this); + logger.info("Spider " + getUUID() + " started!"); + // single thread + if (threadNum <= 1) { + while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { + processRequest(request); + request = scheduler.poll(this); + } + } else { + synchronized (this) { + this.executorService = ThreadUtils.newFixedThreadPool(threadNum); + } + // multi thread + final AtomicInteger threadAlive = new AtomicInteger(0); + while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { + if (request == null) { + // when no request found but some thread is alive, sleep a + // while. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } else { + final Request requestFinal = request; + threadAlive.incrementAndGet(); + executorService.execute(new Runnable() { + @Override + public void run() { + processRequest(requestFinal); + threadAlive.decrementAndGet(); + } + }); + } + request = scheduler.poll(this); + if (threadAlive.get() == 0) { + request = scheduler.poll(this); + if (request == null) { + break; + } + } + } + executorService.shutdown(); + } + stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); + // release some resources + destroy(); + } - protected void destroy() { - destroyEach(downloader); - destroyEach(pageProcessor); - for (Pipeline pipeline : pipelines) { - destroyEach(pipeline); - } - } + protected void destroy() { + destroyEach(downloader); + destroyEach(pageProcessor); + for (Pipeline pipeline : pipelines) { + destroyEach(pipeline); + } + } - private void destroyEach(Object object) { - if (object instanceof Closeable) { - try { - ((Closeable) object).close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + private void destroyEach(Object object) { + if (object instanceof Closeable) { + try { + ((Closeable) object).close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } - /** - * Process specific urls without url discovering. - * - * @param urls urls to process - */ - public void test(String... urls) { - checkComponent(); - if (urls.length > 0) { - for (String url : urls) { - processRequest(new Request(url)); - } - } - } + /** + * Process specific urls without url discovering. + * + * @param urls + * urls to process + */ + public void test(String... urls) { + checkComponent(); + if (urls.length > 0) { + for (String url : urls) { + processRequest(new Request(url)); + } + } + } - protected void processRequest(Request request) { - Page page = downloader.download(request, this); - if (page == null) { - sleep(site.getSleepTime()); - return; - } - //for cycle retry - if (page.getHtml() == null) { - addRequest(page); - sleep(site.getSleepTime()); - return; - } - pageProcessor.process(page); - addRequest(page); - if (!page.getResultItems().isSkip()) { - for (Pipeline pipeline : pipelines) { - pipeline.process(page.getResultItems(), this); - } - } - sleep(site.getSleepTime()); - } + protected void processRequest(Request request) { + Page page = downloader.download(request, this); + if (page == null) { + sleep(site.getSleepTime()); + return; + } + // for cycle retry + if (page.getHtml() == null) { + addRequest(page); + sleep(site.getSleepTime()); + return; + } + pageProcessor.process(page); + addRequest(page); + if (!page.getResultItems().isSkip()) { + for (Pipeline pipeline : pipelines) { + pipeline.process(page.getResultItems(), this); + } + } + sleep(site.getSleepTime()); + } - protected void sleep(int time) { - try { - Thread.sleep(time); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + protected void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - protected void addRequest(Page page) { - if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { - for (Request request : page.getTargetRequests()) { - scheduler.push(request, this); - } - } - } + protected void addRequest(Page page) { + if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { + for (Request request : page.getTargetRequests()) { + scheduler.push(request, this); + } + } + } - protected void checkIfRunning() { - if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) { - throw new IllegalStateException("Spider is already running!"); - } - } + protected void checkIfRunning() { + if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) { + throw new IllegalStateException("Spider is already running!"); + } + } - public void runAsync() { - Thread thread = new Thread(this); - thread.setDaemon(false); - thread.start(); - } + public void runAsync() { + Thread thread = new Thread(this); + thread.setDaemon(false); + thread.start(); + } - public void start() { - runAsync(); - } + public void start() { + runAsync(); + } - public void stop() { - stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); - executorService.shutdown(); - } + public void stop() { + if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) { + if (executorService != null) { + executorService.shutdown(); + } + logger.info("Spider " + getUUID() + " stop success!"); + } else { + logger.info("Spider " + getUUID() + " stop fail!"); + } + } - public void stopAndDestroy() { - stop(); - destroy(); - } + public void stopAndDestroy() { + stop(); + destroy(); + } - /** - * start with more than one threads - * - * @param threadNum - * @return this - */ - public Spider thread(int threadNum) { - checkIfRunning(); - this.threadNum = threadNum; - if (threadNum <= 0) { - throw new IllegalArgumentException("threadNum should be more than one!"); - } - if (threadNum == 1) { - return this; - } - return this; - } + /** + * start with more than one threads + * + * @param threadNum + * @return this + */ + public Spider thread(int threadNum) { + checkIfRunning(); + this.threadNum = threadNum; + if (threadNum <= 0) { + throw new IllegalArgumentException("threadNum should be more than one!"); + } + if (threadNum == 1) { + return this; + } + return this; + } - /** - * switch off xsoup - * - * @return - */ - public static void xsoupOff() { - EnvironmentUtil.setUseXsoup(false); - } + /** + * switch off xsoup + * + * @return + */ + public static void xsoupOff() { + EnvironmentUtil.setUseXsoup(false); + } - @Override - public String getUUID() { - if (uuid != null) { - return uuid; - } - if (site != null) { - return site.getDomain(); - } - return null; - } + @Override + public String getUUID() { + if (uuid != null) { + return uuid; + } + if (site != null) { + return site.getDomain(); + } + return null; + } - @Override - public Site getSite() { - return site; - } + @Override + public Site getSite() { + return site; + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java index 0d5666c..ba9774d 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/utils/ThreadUtils.java @@ -1,7 +1,7 @@ package us.codecraft.webmagic.utils; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -11,22 +11,11 @@ import java.util.concurrent.TimeUnit; */ public class ThreadUtils { - public static ExecutorService newFixedThreadPool(int threadSize) { - return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(1) { - - private static final long serialVersionUID = -9028058603126367678L; - - @Override - public boolean offer(Runnable e) { - try { - put(e); - return true; - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - return false; - } - }); - } + public static ExecutorService newFixedThreadPool(int threadSize) { + if (threadSize <= 1) { + throw new IllegalArgumentException("ThreadSize must be greater than 1!"); + } + return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); + } } diff --git a/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java b/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java index b3249ce..75c1ba1 100644 --- a/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java +++ b/webmagic-core/src/test/java/us/codecraft/webmagic/SpiderTest.java @@ -18,11 +18,12 @@ public class SpiderTest { public void process(ResultItems resultItems, Task task) { System.out.println(1); } - }); + }).thread(2); spider.start(); Thread.sleep(10000); spider.stop(); -// spider.run(); + Thread.sleep(10000); + spider.start(); Thread.sleep(10000); } }