From 76f625c02e552ccd2834cd38cb0f46e2f3037db7 Mon Sep 17 00:00:00 2001 From: linweisen Date: Fri, 9 Apr 2021 17:00:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E5=8F=AF=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=E7=88=AC=E5=8F=96=E5=86=85=E5=AE=B9=E4=BE=8B=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- webmagic-samples/pom.xml | 20 +++++ .../recover/DuplicateStorageRemover.java | 82 +++++++++++++++++ .../webmagic/recover/MmapQueueScheduler.java | 89 +++++++++++++++++++ .../webmagic/recover/RecoverSample.java | 22 +++++ 4 files changed, 213 insertions(+) create mode 100644 webmagic-samples/src/main/java/us/codecraft/webmagic/recover/DuplicateStorageRemover.java create mode 100644 webmagic-samples/src/main/java/us/codecraft/webmagic/recover/MmapQueueScheduler.java create mode 100644 webmagic-samples/src/main/java/us/codecraft/webmagic/recover/RecoverSample.java diff --git a/webmagic-samples/pom.xml b/webmagic-samples/pom.xml index 3699fa6..6c0e59b 100644 --- a/webmagic-samples/pom.xml +++ b/webmagic-samples/pom.xml @@ -24,6 +24,26 @@ junit junit + + org.mapdb + mapdb + 3.0.7 + + + com.fasterxml.jackson.core + jackson-core + 2.9.5 + + + com.fasterxml.jackson.core + jackson-annotations + 2.9.5 + + + com.fasterxml.jackson.core + jackson-databind + 2.9.5 + diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/DuplicateStorageRemover.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/DuplicateStorageRemover.java new file mode 100644 index 0000000..5bf249e --- /dev/null +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/DuplicateStorageRemover.java @@ -0,0 +1,82 @@ +package us.codecraft.webmagic.recover; + +import com.google.common.base.Charsets; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.IndexTreeList; +import org.mapdb.Serializer; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.component.DuplicateRemover; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author :linweisen + * @date :Created in 2021/4/9 14:46 + * @description:${description} + * @modified By: + * @version: 1.0 + */ +public class DuplicateStorageRemover implements DuplicateRemover { + + private DB db; + + private static String DATABASE_NAME = "duplicate"; + + private IndexTreeList urlDuplicateQueue; + + private BloomFilter bloomFilter; + + private AtomicInteger counter; + + public DuplicateStorageRemover(String path) { + + String duplicatStoragePath = path; + + DB db = DBMaker.fileDB(duplicatStoragePath) + .fileMmapEnableIfSupported() + .fileMmapPreclearDisable() + .cleanerHackEnable() + .closeOnJvmShutdown() + .transactionEnable() + .concurrencyScale(128) + .make(); + this.db = db; + + this.urlDuplicateQueue = db.indexTreeList(DATABASE_NAME, Serializer.STRING).createOrOpen(); + + counter = new AtomicInteger(this.urlDuplicateQueue.size()); + this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 200000, 1E-7); + for (String url : this.urlDuplicateQueue){ + bloomFilter.put(url); + } + + } + + @Override + public boolean isDuplicate(Request request, Task task) { + String url = request.getUrl(); + boolean isDuplicate = bloomFilter.mightContain(url); + if (!isDuplicate) { + bloomFilter.put(url); + urlDuplicateQueue.add(url); + this.db.commit(); + counter.incrementAndGet(); + } + return isDuplicate; + } + + @Override + public void resetDuplicateCheck(Task task) { + this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 200000, 1E-7); + this.urlDuplicateQueue.clear(); + } + + @Override + public int getTotalRequestsCount(Task task) { + return counter.get(); + } +} diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/MmapQueueScheduler.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/MmapQueueScheduler.java new file mode 100644 index 0000000..07cfa22 --- /dev/null +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/MmapQueueScheduler.java @@ -0,0 +1,89 @@ +package us.codecraft.webmagic.recover; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.IndexTreeList; +import org.mapdb.Serializer; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler; +import us.codecraft.webmagic.scheduler.component.DuplicateRemover; + +import java.io.IOException; + +/** + * @author :linweisen + * @date :Created in 2021/4/9 14:38 + * @description:${description} + * @modified By: + * @version: 1.0 + */ +public class MmapQueueScheduler extends DuplicateRemovedScheduler { + + private DB db; + + private static String DATABASE_NAME = "queue"; + + private IndexTreeList queue; + + private static ObjectMapper mapper; + + public MmapQueueScheduler(DuplicateRemover duplicateRemover, String path) { + super.setDuplicateRemover(duplicateRemover); + + String queuePath = path; + + DB db = DBMaker.fileDB(queuePath) + .fileMmapEnableIfSupported() + .fileMmapPreclearDisable() + .cleanerHackEnable() + .closeOnJvmShutdown() + .transactionEnable() + .concurrencyScale(128) + .make(); + this.db = db; + this.mapper = new ObjectMapper(); + this.queue = db.indexTreeList(MmapQueueScheduler.DATABASE_NAME, Serializer.STRING).createOrOpen(); + } + + @Override + public Request poll(Task task) { + if (this.queue.size() > 0){ + String s = queue.remove(0); + return fromJson(s, Request.class); + }else{ + return null; + } + + } + + @Override + public void pushWhenNoDuplicate(Request request, Task task) { + queue.add(toJson(request)); + this.db.commit(); + } + + public String toJson(Object object) { + try { + return mapper.writeValueAsString(object); + } catch (IOException e) { + logger.warn("write to json string error:" + object, e); + return null; + } + } + + public T fromJson(String jsonString, Class clazz) { + if (StringUtils.isEmpty(jsonString)) { + return null; + } + try { + return mapper.readValue(jsonString, clazz); + } catch (IOException e) { + logger.warn("parse json string error:" + jsonString, e); + return null; + } + } + +} diff --git a/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/RecoverSample.java b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/RecoverSample.java new file mode 100644 index 0000000..4fb91a0 --- /dev/null +++ b/webmagic-samples/src/main/java/us/codecraft/webmagic/recover/RecoverSample.java @@ -0,0 +1,22 @@ +package us.codecraft.webmagic.recover; + + +import us.codecraft.webmagic.Spider; +import us.codecraft.webmagic.samples.SinaBlogProcessor; +import us.codecraft.webmagic.scheduler.component.DuplicateRemover; + +/** + * @author code4crafter@gmail.com
+ */ +public class RecoverSample { + + public static void main(String[] args) { + String storage = "queue"; + String duplicate = "duplicate"; + Spider spider = new Spider(new SinaBlogProcessor()); + DuplicateRemover remover = new DuplicateStorageRemover(duplicate); + spider.setScheduler(new MmapQueueScheduler(remover, storage)); + spider.addUrl("http://blog.sina.com.cn/s/articlelist_1487828712_0_1.html") + .run(); + } +}