refactor multi thread code in Spider

master
yihua.huang 2013-10-31 21:52:43 +08:00
parent dbfb6b5803
commit a3f9ad198f
5 changed files with 360 additions and 344 deletions

View File

@ -63,6 +63,11 @@
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>
<version>4.2.4</version> <version>4.2.4</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency> <dependency>
<groupId>us.codecraft</groupId> <groupId>us.codecraft</groupId>
<artifactId>xsoup</artifactId> <artifactId>xsoup</artifactId>

View File

@ -20,6 +20,12 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>

View File

@ -52,381 +52,380 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class Spider implements Runnable, Task { public class Spider implements Runnable, Task {
protected Downloader downloader; protected Downloader downloader;
protected List<Pipeline> pipelines = new ArrayList<Pipeline>(); protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
protected PageProcessor pageProcessor; protected PageProcessor pageProcessor;
protected List<String> startUrls; protected List<String> startUrls;
protected Site site; protected Site site;
protected String uuid; protected String uuid;
protected Scheduler scheduler = new QueueScheduler(); protected Scheduler scheduler = new QueueScheduler();
protected Logger logger = Logger.getLogger(getClass()); protected Logger logger = Logger.getLogger(getClass());
protected ExecutorService executorService; protected ExecutorService executorService;
protected int threadNum = 1; protected int threadNum = 1;
protected AtomicInteger stat = new AtomicInteger(STAT_INIT); protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
protected final static int STAT_INIT = 0; protected final static int STAT_INIT = 0;
protected final static int STAT_RUNNING = 1; protected final static int STAT_RUNNING = 1;
protected final static int STAT_STOPPED = 2; protected final static int STAT_STOPPED = 2;
/** /**
* create a spider with pageProcessor. * create a spider with pageProcessor.
* *
* @param pageProcessor * @param pageProcessor
* @return new spider * @return new spider
* @see PageProcessor * @see PageProcessor
*/ */
public static Spider create(PageProcessor pageProcessor) { public static Spider create(PageProcessor pageProcessor) {
return new Spider(pageProcessor); return new Spider(pageProcessor);
} }
/** /**
* create a spider with pageProcessor. * create a spider with pageProcessor.
* *
* @param pageProcessor * @param pageProcessor
*/ */
public Spider(PageProcessor pageProcessor) { public Spider(PageProcessor pageProcessor) {
this.pageProcessor = pageProcessor; this.pageProcessor = pageProcessor;
this.site = pageProcessor.getSite(); this.site = pageProcessor.getSite();
this.startUrls = pageProcessor.getSite().getStartUrls(); this.startUrls = pageProcessor.getSite().getStartUrls();
} }
/** /**
* Set startUrls of Spider.<br> * Set startUrls of Spider.<br>
* Prior to startUrls of Site. * Prior to startUrls of Site.
* *
* @param startUrls * @param startUrls
* @return this * @return this
*/ */
public Spider startUrls(List<String> startUrls) { public Spider startUrls(List<String> startUrls) {
checkIfRunning(); checkIfRunning();
this.startUrls = startUrls; this.startUrls = startUrls;
return this; return this;
} }
/** /**
* Set an uuid for spider.<br> * Set an uuid for spider.<br>
* Default uuid is domain of site.<br> * Default uuid is domain of site.<br>
* *
* @param uuid * @param uuid
* @return this * @return this
*/ */
public Spider setUUID(String uuid) { public Spider setUUID(String uuid) {
this.uuid = uuid; this.uuid = uuid;
return this; return this;
} }
/** /**
* set scheduler for Spider * set scheduler for Spider
* *
* @param scheduler * @param scheduler
* @return this * @return this
* @Deprecated * @Deprecated
* @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler)
*/ */
public Spider scheduler(Scheduler scheduler) { public Spider scheduler(Scheduler scheduler) {
return setScheduler(scheduler); return setScheduler(scheduler);
} }
/** /**
* set scheduler for Spider * set scheduler for Spider
* *
* @param scheduler * @param scheduler
* @return this * @return this
* @see Scheduler * @see Scheduler
* @since 0.2.1 * @since 0.2.1
*/ */
public Spider setScheduler(Scheduler scheduler) { public Spider setScheduler(Scheduler scheduler) {
checkIfRunning(); checkIfRunning();
this.scheduler = scheduler; this.scheduler = scheduler;
return this; return this;
} }
/** /**
* add a pipeline for Spider * add a pipeline for Spider
* *
* @param pipeline * @param pipeline
* @return this * @return this
* @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline)
* @deprecated * @deprecated
*/ */
public Spider pipeline(Pipeline pipeline) { public Spider pipeline(Pipeline pipeline) {
return addPipeline(pipeline); return addPipeline(pipeline);
} }
/** /**
* add a pipeline for Spider * add a pipeline for Spider
* *
* @param pipeline * @param pipeline
* @return this * @return this
* @see Pipeline * @see Pipeline
* @since 0.2.1 * @since 0.2.1
*/ */
public Spider addPipeline(Pipeline pipeline) { public Spider addPipeline(Pipeline pipeline) {
checkIfRunning(); checkIfRunning();
this.pipelines.add(pipeline); this.pipelines.add(pipeline);
return this; return this;
} }
/** /**
* clear the pipelines set * clear the pipelines set
* *
* @return this * @return this
*/ */
public Spider clearPipeline() { public Spider clearPipeline() {
pipelines = new ArrayList<Pipeline>(); pipelines = new ArrayList<Pipeline>();
return this; return this;
} }
/** /**
* set the downloader of spider * set the downloader of spider
* *
* @param downloader * @param downloader
* @return this * @return this
* @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader)
* @deprecated * @deprecated
*/ */
public Spider downloader(Downloader downloader) { public Spider downloader(Downloader downloader) {
return setDownloader(downloader); return setDownloader(downloader);
} }
/** /**
* set the downloader of spider * set the downloader of spider
* *
* @param downloader * @param downloader
* @return this * @return this
* @see Downloader * @see Downloader
*/ */
public Spider setDownloader(Downloader downloader) { public Spider setDownloader(Downloader downloader) {
checkIfRunning(); checkIfRunning();
this.downloader = downloader; this.downloader = downloader;
return this; return this;
} }
protected void checkComponent() { protected void initComponent() {
if (downloader == null) { if (downloader == null) {
this.downloader = new HttpClientDownloader(); this.downloader = new HttpClientDownloader();
} }
if (pipelines.isEmpty()) { if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline()); pipelines.add(new ConsolePipeline());
} }
downloader.setThread(threadNum); downloader.setThread(threadNum);
} executorService = ThreadUtils.newFixedThreadPool(threadNum);
if (startUrls != null) {
for (String startUrl : startUrls) {
scheduler.push(new Request(startUrl), this);
}
startUrls.clear();
}
}
@Override @Override
public void run() { public void run() {
if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) && !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) { checkRunningStat();
throw new IllegalStateException("Spider is already running!"); initComponent();
}
checkComponent();
if (startUrls != null) {
for (String startUrl : startUrls) {
scheduler.push(new Request(startUrl), this);
}
startUrls.clear();
}
Request request = scheduler.poll(this);
logger.info("Spider " + getUUID() + " started!"); logger.info("Spider " + getUUID() + " started!");
// single thread final AtomicInteger threadAlive = new AtomicInteger(0);
if (threadNum <= 1) { while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { Request request = scheduler.poll(this);
processRequest(request); if (request == null) {
request = scheduler.poll(this); if (threadAlive.get() == 0) {
} break;
} else { }
synchronized (this) { // when no request found but some thread is alive, sleep a
this.executorService = ThreadUtils.newFixedThreadPool(threadNum); // while.
} try {
// multi thread Thread.sleep(100);
final AtomicInteger threadAlive = new AtomicInteger(0); } catch (InterruptedException e) {
while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { }
if (request == null) { } else {
// when no request found but some thread is alive, sleep a final Request requestFinal = request;
// while. threadAlive.incrementAndGet();
try { executorService.execute(new Runnable() {
Thread.sleep(100); @Override
} catch (InterruptedException e) { public void run() {
} try {
} else { processRequest(requestFinal);
final Request requestFinal = request; } catch (Exception e) {
threadAlive.incrementAndGet(); logger.error("download "+requestFinal+" error",e);
executorService.execute(new Runnable() { } finally {
@Override threadAlive.decrementAndGet();
public void run() { }
processRequest(requestFinal); }
threadAlive.decrementAndGet(); });
} }
}); }
} executorService.shutdown();
request = scheduler.poll(this); stat.set(STAT_STOPPED);
if (threadAlive.get() == 0) { // release some resources
request = scheduler.poll(this); destroy();
if (request == null) { }
break;
}
}
}
executorService.shutdown();
}
stat.compareAndSet(STAT_RUNNING, STAT_STOPPED);
// release some resources
destroy();
}
protected void destroy() { private void checkRunningStat() {
destroyEach(downloader); while (true) {
destroyEach(pageProcessor); int statNow = stat.get();
for (Pipeline pipeline : pipelines) { if (statNow == STAT_RUNNING) {
destroyEach(pipeline); throw new IllegalStateException("Spider is already running!");
} }
} if (stat.compareAndSet(statNow, STAT_RUNNING)) {
break;
}
}
}
private void destroyEach(Object object) { protected void destroy() {
if (object instanceof Closeable) { destroyEach(downloader);
try { destroyEach(pageProcessor);
((Closeable) object).close(); for (Pipeline pipeline : pipelines) {
} catch (IOException e) { destroyEach(pipeline);
e.printStackTrace(); }
} }
}
}
/** private void destroyEach(Object object) {
* Process specific urls without url discovering. if (object instanceof Closeable) {
* try {
* @param urls ((Closeable) object).close();
* urls to process } catch (IOException e) {
*/ e.printStackTrace();
public void test(String... urls) { }
checkComponent(); }
if (urls.length > 0) { }
for (String url : urls) {
processRequest(new Request(url));
}
}
}
protected void processRequest(Request request) { /**
Page page = downloader.download(request, this); * Process specific urls without url discovering.
if (page == null) { *
sleep(site.getSleepTime()); * @param urls urls to process
return; */
} public void test(String... urls) {
// for cycle retry initComponent();
if (page.getHtml() == null) { if (urls.length > 0) {
addRequest(page); for (String url : urls) {
sleep(site.getSleepTime()); processRequest(new Request(url));
return; }
} }
pageProcessor.process(page); }
addRequest(page);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
pipeline.process(page.getResultItems(), this);
}
}
sleep(site.getSleepTime());
}
protected void sleep(int time) { protected void processRequest(Request request) {
try { Page page = downloader.download(request, this);
Thread.sleep(time); if (page == null) {
} catch (InterruptedException e) { sleep(site.getSleepTime());
e.printStackTrace(); return;
} }
} // for cycle retry
if (page.getHtml() == null) {
addRequest(page);
sleep(site.getSleepTime());
return;
}
pageProcessor.process(page);
addRequest(page);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
pipeline.process(page.getResultItems(), this);
}
}
sleep(site.getSleepTime());
}
protected void addRequest(Page page) { protected void sleep(int time) {
if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { try {
for (Request request : page.getTargetRequests()) { Thread.sleep(time);
scheduler.push(request, this); } catch (InterruptedException e) {
} e.printStackTrace();
} }
} }
protected void checkIfRunning() { protected void addRequest(Page page) {
if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) { if (CollectionUtils.isNotEmpty(page.getTargetRequests())) {
throw new IllegalStateException("Spider is already running!"); for (Request request : page.getTargetRequests()) {
} scheduler.push(request, this);
} }
}
}
public void runAsync() { protected void checkIfRunning() {
Thread thread = new Thread(this); if (stat.get() == STAT_RUNNING) {
thread.setDaemon(false); throw new IllegalStateException("Spider is already running!");
thread.start(); }
} }
public void start() { public void runAsync() {
runAsync(); Thread thread = new Thread(this);
} thread.setDaemon(false);
thread.start();
}
public void stop() { public void start() {
if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) { runAsync();
if (executorService != null) { }
executorService.shutdown();
}
logger.info("Spider " + getUUID() + " stop success!");
} else {
logger.info("Spider " + getUUID() + " stop fail!");
}
}
public void stopAndDestroy() { public void stop() {
stop(); if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
destroy(); if (executorService != null) {
} executorService.shutdown();
}
logger.info("Spider " + getUUID() + " stop success!");
} else {
logger.info("Spider " + getUUID() + " stop fail!");
}
}
/** public void stopAndDestroy() {
* start with more than one threads stop();
* destroy();
* @param threadNum }
* @return this
*/
public Spider thread(int threadNum) {
checkIfRunning();
this.threadNum = threadNum;
if (threadNum <= 0) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
if (threadNum == 1) {
return this;
}
return this;
}
/** /**
* switch off xsoup * start with more than one threads
* *
* @return * @param threadNum
*/ * @return this
public static void xsoupOff() { */
EnvironmentUtil.setUseXsoup(false); public Spider thread(int threadNum) {
} checkIfRunning();
this.threadNum = threadNum;
if (threadNum <= 0) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
if (threadNum == 1) {
return this;
}
return this;
}
@Override /**
public String getUUID() { * switch off xsoup
if (uuid != null) { *
return uuid; * @return
} */
if (site != null) { public static void xsoupOff() {
return site.getDomain(); EnvironmentUtil.setUseXsoup(false);
} }
return null;
}
@Override @Override
public Site getSite() { public String getUUID() {
return site; if (uuid != null) {
} return uuid;
}
if (site != null) {
return site.getDomain();
}
return null;
}
@Override
public Site getSite() {
return site;
}
} }

View File

@ -1,5 +1,7 @@
package us.codecraft.webmagic.utils; package us.codecraft.webmagic.utils;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -11,11 +13,15 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ThreadUtils { public class ThreadUtils {
public static ExecutorService newFixedThreadPool(int threadSize) { public static ExecutorService newFixedThreadPool(int threadSize) {
if (threadSize <= 1) { if (threadSize <= 0) {
throw new IllegalArgumentException("ThreadSize must be greater than 1!"); throw new IllegalArgumentException("ThreadSize must be greater than 0!");
} }
return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, if (threadSize == 1) {
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); return MoreExecutors.sameThreadExecutor();
}
}
return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
}
} }

View File

@ -18,7 +18,7 @@ public class SpiderTest {
public void process(ResultItems resultItems, Task task) { public void process(ResultItems resultItems, Task task) {
System.out.println(1); System.out.println(1);
} }
}).thread(2); }).thread(1);
spider.start(); spider.start();
Thread.sleep(10000); Thread.sleep(10000);
spider.stop(); spider.stop();