diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 6d436bd..c53afcb 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import us.codecraft.webmagic.downloader.Downloader; import us.codecraft.webmagic.downloader.HttpClientDownloader; +import us.codecraft.webmagic.monitor.SpiderListener; import us.codecraft.webmagic.pipeline.CollectorPipeline; import us.codecraft.webmagic.pipeline.ConsolePipeline; import us.codecraft.webmagic.pipeline.Pipeline; @@ -101,6 +102,8 @@ public class Spider implements Runnable, Task { private final AtomicInteger threadAlive = new AtomicInteger(0); + private List spiderListeners; + private final AtomicLong pageCount = new AtomicLong(0); /** @@ -312,7 +315,9 @@ public class Spider implements Runnable, Task { public void run() { try { processRequest(requestFinal); + onSuccess(requestFinal); } catch (Exception e) { + onError(requestFinal); logger.error("download " + requestFinal + " error", e); } finally { threadAlive.decrementAndGet(); @@ -330,6 +335,22 @@ public class Spider implements Runnable, Task { } } + protected void onError(Request request) { + if (CollectionUtils.isNotEmpty(spiderListeners)){ + for (SpiderListener spiderListener : spiderListeners) { + spiderListener.onError(request); + } + } + } + + protected void onSuccess(Request request) { + if (CollectionUtils.isNotEmpty(spiderListeners)){ + for (SpiderListener spiderListener : spiderListeners) { + spiderListener.onSuccess(request); + } + } + } + private void checkRunningStat() { while (true) { int statNow = stat.get(); @@ -378,6 +399,7 @@ public class Spider implements Runnable, Task { protected void processRequest(Request request) { Page page = downloader.download(request, this); if (page == null) { + onError(request); sleep(site.getSleepTime()); return; } @@ -659,4 +681,17 @@ public class Spider implements Runnable, Task { public Site getSite() { return site; } + + public List getSpiderListeners() { + return spiderListeners; + } + + public Spider setSpiderListeners(List spiderListeners) { + this.spiderListeners = spiderListeners; + return this; + } + + public Scheduler getScheduler() { + return scheduler; + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/AbstractDownloader.java b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/AbstractDownloader.java index 2336856..5940c2f 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/AbstractDownloader.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/AbstractDownloader.java @@ -34,6 +34,12 @@ public abstract class AbstractDownloader implements Downloader { return (Html) page.getHtml(); } + protected void onSuccess(Request request) { + } + + protected void onError(Request request) { + } + protected Page addToCycleRetry(Request request, Site site) { Page page = new Page(); Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES); diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java index f0f53c6..13e220f 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java @@ -87,7 +87,9 @@ public class HttpClientDownloader extends AbstractDownloader { String value = httpResponse.getEntity().getContentType().getValue(); charset = UrlUtils.getCharset(value); } - return handleResponse(request, charset, httpResponse, task); + Page page = handleResponse(request, charset, httpResponse, task); + onSuccess(request); + return page; } else { logger.warn("code error " + statusCode + "\t" + request.getUrl()); return null; @@ -97,6 +99,7 @@ public class HttpClientDownloader extends AbstractDownloader { if (site.getCycleRetryTimes() > 0) { return addToCycleRetry(request, site); } + onError(request); return null; } finally { try { diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/MonitorableScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/MonitorableScheduler.java new file mode 100644 index 0000000..11889ac --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/MonitorableScheduler.java @@ -0,0 +1,18 @@ +package us.codecraft.webmagic.monitor; + +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.Scheduler; + +/** + * The scheduler whose requests can be counted for monitor. + * + * @author code4crafter@gmail.com + * @since 0.5.0 + */ +public interface MonitorableScheduler extends Scheduler { + + public int getLeftRequestsCount(Task task); + + public int getTotalRequestsCount(Task task); + +} \ No newline at end of file diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderListener.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderListener.java new file mode 100644 index 0000000..7a6c687 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderListener.java @@ -0,0 +1,14 @@ +package us.codecraft.webmagic.monitor; + +import us.codecraft.webmagic.Request; + +/** + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public interface SpiderListener { + + public void onSuccess(Request request); + + public void onError(Request request); +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java new file mode 100644 index 0000000..ccf498d --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitor.java @@ -0,0 +1,122 @@ +package us.codecraft.webmagic.monitor; + +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Spider; +import us.codecraft.webmagic.processor.example.GithubRepoPageProcessor; +import us.codecraft.webmagic.processor.example.OschinaBlogPageProcessor; + +import javax.management.*; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import java.io.IOException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public class SpiderMonitor implements SpiderMonitorMBean { + + private List spiderStatuses = new ArrayList(); + + @Override + public List getSpiders() { + return spiderStatuses; + } + + @Override + public SpiderStatus getSpider() { + return spiderStatuses.get(0); + } + + public void register(Spider spider) { + MonitorSpiderListener monitorSpiderListener = new MonitorSpiderListener(); + if (spider.getSpiderListeners() == null) { + List spiderListeners = new ArrayList(); + spiderListeners.add(monitorSpiderListener); + spider.setSpiderListeners(spiderListeners); + } else { + spider.getSpiderListeners().add(monitorSpiderListener); + } + spiderStatuses.add(new SpiderStatus(spider, monitorSpiderListener)); + + } + + public class MonitorSpiderListener implements SpiderListener { + + private final AtomicInteger successCount = new AtomicInteger(0); + + private final AtomicInteger errorCount = new AtomicInteger(0); + + private List errorUrls = Collections.synchronizedList(new ArrayList()); + + @Override + public void onSuccess(Request request) { + successCount.incrementAndGet(); + } + + @Override + public void onError(Request request) { + errorUrls.add(request.getUrl()); + errorCount.incrementAndGet(); + } + + public AtomicInteger getSuccessCount() { + return successCount; + } + + public AtomicInteger getErrorCount() { + return errorCount; + } + + public List getErrorUrls() { + return errorUrls; + } + } + + + public static void main(String[] args) throws MalformedObjectNameException, + NullPointerException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, IOException { + + int rmiPort = 1099; + SpiderMonitor spiderMonitor = new SpiderMonitor(); + String jmxServerName = "TestJMXServer"; + + Spider oschinaSpider = Spider.create(new OschinaBlogPageProcessor()).addUrl("http://my.oschina.net/flashsword/blog").thread(2); + + spiderMonitor.register(oschinaSpider); + + Spider githubSpider = Spider.create(new GithubRepoPageProcessor()).addUrl("https://github.com/code4craft"); + + spiderMonitor.register(githubSpider); + + // jdkfolder/bin/rmiregistry.exe 9999 + Registry registry = LocateRegistry.createRegistry(rmiPort); + + MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName); + //MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objName = new ObjectName(jmxServerName + ":name=" + "HelloWorld"); + mbs.registerMBean(spiderMonitor, objName); + + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + rmiPort + "/" + jmxServerName); + System.out.println("JMXServiceURL: " + url.toString()); + JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs); + jmxConnServer.start(); + + for (SpiderStatus spiderStatuse : spiderMonitor.spiderStatuses) { + objName = new ObjectName(jmxServerName + ":name=" + spiderStatuse.getName()); + mbs.registerMBean(spiderStatuse, objName); + } + + + } + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitorMBean.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitorMBean.java new file mode 100644 index 0000000..8b77b33 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderMonitorMBean.java @@ -0,0 +1,14 @@ +package us.codecraft.webmagic.monitor; + +import java.util.List; + +/** + * @author code4crafer@gmail.com + */ +public interface SpiderMonitorMBean { + + public List getSpiders(); + + public SpiderStatus getSpider(); + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatus.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatus.java new file mode 100644 index 0000000..84d8603 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatus.java @@ -0,0 +1,52 @@ +package us.codecraft.webmagic.monitor; + +import us.codecraft.webmagic.Spider; + +import java.util.List; + +/** + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public class SpiderStatus implements SpiderStatusMBean{ + + private final Spider spider; + + private final SpiderMonitor.MonitorSpiderListener monitorSpiderListener; + + public SpiderStatus(Spider spider, SpiderMonitor.MonitorSpiderListener monitorSpiderListener) { + this.spider = spider; + this.monitorSpiderListener = monitorSpiderListener; + } + + public String getName() { + return spider.getUUID(); + } + + public int getLeftPages() { + if (spider.getScheduler() instanceof MonitorableScheduler) { + return ((MonitorableScheduler) spider.getScheduler()).getLeftRequestsCount(spider); + } + return -1; + } + + public int getTotalPages() { + if (spider.getScheduler() instanceof MonitorableScheduler) { + return ((MonitorableScheduler) spider.getScheduler()).getTotalRequestsCount(spider); + } + return -1; + } + + public List getErrorPages() { + return monitorSpiderListener.getErrorUrls(); + } + + public void start() { + spider.start(); + } + + public void stop() { + spider.stop(); + } + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatusMBean.java b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatusMBean.java new file mode 100644 index 0000000..cd884a5 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/monitor/SpiderStatusMBean.java @@ -0,0 +1,22 @@ +package us.codecraft.webmagic.monitor; + +import java.util.List; + +/** + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public interface SpiderStatusMBean { + + public String getName(); + + public int getLeftPages(); + + public int getTotalPages(); + public List getErrorPages(); + + public void start(); + + public void stop(); + +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java index 449c3f6..2807e0f 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.monitor.MonitorableScheduler; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -15,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; * @author code4crafter@gmail.com * @since 0.5.0 */ -public abstract class LocalDuplicatedRemovedScheduler implements Scheduler { +public abstract class LocalDuplicatedRemovedScheduler implements MonitorableScheduler { protected Logger logger = LoggerFactory.getLogger(getClass()); @@ -34,5 +35,10 @@ public abstract class LocalDuplicatedRemovedScheduler implements Scheduler { return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; } + @Override + public int getTotalRequestsCount(Task task) { + return urls.size(); + } + protected abstract void pushWhenNoDuplicate(Request request, Task task); } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java index 04917ad..a57a6fb 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java @@ -60,4 +60,9 @@ public class PriorityScheduler extends LocalDuplicatedRemovedScheduler { } return priorityQueueMinus.poll(); } + + @Override + public int getLeftRequestsCount(Task task) { + return noPriorityQueue.size(); + } } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java index ab288df..e2a6e75 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java @@ -29,4 +29,9 @@ public class QueueScheduler extends LocalDuplicatedRemovedScheduler { public synchronized Request poll(Task task) { return queue.poll(); } + + @Override + public int getLeftRequestsCount(Task task) { + return queue.size(); + } } diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java index 79f3b8b..9d7668d 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java @@ -161,4 +161,9 @@ public class FileCacheQueueScheduler extends LocalDuplicatedRemovedScheduler { fileCursorWriter.println(cursor.incrementAndGet()); return queue.poll(); } + + @Override + public int getLeftRequestsCount(Task task) { + return queue.size(); + } } diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java index cd90625..16f9147 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java @@ -7,6 +7,7 @@ import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.monitor.MonitorableScheduler; /** * Use Redis as url scheduler for distributed crawlers.
@@ -14,7 +15,7 @@ import us.codecraft.webmagic.Task; * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class RedisScheduler implements Scheduler { +public class RedisScheduler implements MonitorableScheduler { private JedisPool pool; @@ -39,10 +40,10 @@ public class RedisScheduler implements Scheduler { // if cycleRetriedTimes is set, allow duplicated. Object cycleRetriedTimes = request.getExtra(Request.CYCLE_TRIED_TIMES); // use set to remove duplicate url - if (cycleRetriedTimes != null || !jedis.sismember(SET_PREFIX + task.getUUID(), request.getUrl())) { + if (cycleRetriedTimes != null || !jedis.sismember(getSetKey(task), request.getUrl())) { // use list to store queue - jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl()); - jedis.sadd(SET_PREFIX + task.getUUID(), request.getUrl()); + jedis.rpush(getQueueKey(task), request.getUrl()); + jedis.sadd(getSetKey(task), request.getUrl()); if (request.getExtras() != null) { String field = DigestUtils.shaHex(request.getUrl()); String value = JSON.toJSONString(request); @@ -58,7 +59,7 @@ public class RedisScheduler implements Scheduler { public synchronized Request poll(Task task) { Jedis jedis = pool.getResource(); try { - String url = jedis.lpop(QUEUE_PREFIX + task.getUUID()); + String url = jedis.lpop(getQueueKey(task)); if (url == null) { return null; } @@ -75,4 +76,34 @@ public class RedisScheduler implements Scheduler { pool.returnResource(jedis); } } + + protected String getSetKey(Task task) { + return SET_PREFIX + task.getUUID(); + } + + protected String getQueueKey(Task task) { + return QUEUE_PREFIX + task.getUUID(); + } + + @Override + public int getLeftRequestsCount(Task task) { + Jedis jedis = pool.getResource(); + try { + Long size = jedis.llen(getQueueKey(task)); + return size.intValue(); + } finally { + pool.returnResource(jedis); + } + } + + @Override + public int getTotalRequestsCount(Task task) { + Jedis jedis = pool.getResource(); + try { + Long size = jedis.scard(getQueueKey(task)); + return size.intValue(); + } finally { + pool.returnResource(jedis); + } + } }