From f68795d7dd1ad3202a59ab9d49030065992001b1 Mon Sep 17 00:00:00 2001 From: yao Date: Tue, 29 Dec 2020 16:54:38 +0800 Subject: [PATCH] =?UTF-8?q?=20bug=E4=BF=AE=E6=94=B9=EF=BC=8C=E5=AF=B9?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E6=8F=90=E4=BE=9B=E7=BC=93=E5=AD=98=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../webmagic/pipeline/CachePipeline.java | 18 ++++ .../pipeline/CloseableCachePipeline.java | 87 +++++++++++++++++++ .../AbstractRefreshableProxyProvider.java | 22 ++--- .../webmagic/proxy/ExpirableProxy.java | 2 + 4 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java new file mode 100644 index 0000000..e0acd90 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java @@ -0,0 +1,18 @@ +package us.codecraft.webmagic.pipeline; + +import java.util.Collection; + +/** + * @author yaoqiang + * + * 为pipeline提供缓存能力 + * 在某个时机执行批处理任务 + */ +public interface CachePipeline extends Pipeline{ + + /** + * @param collection 缓存批处理 + * + */ + void process(Collection collection); +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java new file mode 100644 index 0000000..4ba433e --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java @@ -0,0 +1,87 @@ +package us.codecraft.webmagic.pipeline; + +import lombok.extern.slf4j.Slf4j; +import us.codecraft.webmagic.ResultItems; +import us.codecraft.webmagic.Task; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +/** + * @author yaoqiang + * 提供关闭时刷新能力 + *

+ *

+ * 不负责创建 {@link ExecutorService},如果需要异步执行,那么需要从外界传入,由外界自己管理 {@link ExecutorService}生命周期 + * @see ExecutorService + */ +@Slf4j +public abstract class CloseableCachePipeline implements CachePipeline, Closeable { + + + private final BlockingQueue cache; + + private final ExecutorService executorService; + + public CloseableCachePipeline(int max, ExecutorService executorService) { + this.cache = new ArrayBlockingQueue<>(max, false); + this.executorService = executorService; + } + + public CloseableCachePipeline(int max) { + this(max, null); + } + + /** + * @param resultItems 接收到的信息 + * @param task 执行的任务 + */ + @Override + public final void process(ResultItems resultItems, Task task) { + try { + cache.put(resultItems); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + log.error(e.getMessage(), e); + } + if (cache.remainingCapacity() == 0) { + // set 中的resultItem 使用权依然传递了出去,cache的使用全保留,考虑到后面也用不上 resultItem,所以发布出去问题也不大 + // temp的修改权限被发布,理由同上,想添加,删除都可以,反正以后也用不上了; + Set temp = new HashSet<>(cache); + if (executorService != null && !executorService.isShutdown()) { + executorService.execute(() -> process(temp, task)); + } else { + process(temp, task); + } + cache.clear(); + } + + } + + protected abstract void process(Collection resultItems, Task task); + + private synchronized void flush(Collection resultItems) { + process(resultItems, null); + + } + + /** + * 结合源码,实现关闭时处理剩余的缓存,直接交由主线程处理 + * + * @throws IOException 关闭可能出现异常,由上层处理 + */ + @Override + public final void close() throws IOException { + if (!cache.isEmpty()) { + flush(cache); + cache.clear(); + } + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java index 8e7cb08..781553c 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java @@ -3,12 +3,9 @@ package us.codecraft.webmagic.proxy; import lombok.extern.slf4j.Slf4j; import us.codecraft.webmagic.Task; -import java.math.BigDecimal; -import java.math.RoundingMode; import java.util.Comparator; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.LongAdder; /** * @author yaoqiang @@ -17,26 +14,18 @@ import java.util.concurrent.atomic.LongAdder; @Slf4j public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider { - private final LongAdder totalGet = new LongAdder(); - - private final LongAdder canUse = new LongAdder(); private final AtomicReference> usedProxyCache = new AtomicReference<>(); private final PriorityBlockingQueue ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime)); - private final int maxHostNum; - - public AbstractRefreshableProxyProvider(int maxHostNum) { - this.maxHostNum = maxHostNum; - } protected void doPut(ExpirableProxy expirableProxy) { - synchronized (ipQueue) { - if (ipQueue.size() <= maxHostNum) { - ipQueue.put(expirableProxy); - } - } + ipQueue.put(expirableProxy); + } + + protected int hostSize() { + return ipQueue.size(); } @Override @@ -127,7 +116,6 @@ public abstract class AbstractRefreshableProxyProvider implements RefreshablePro do { proxy = ipQueue.take(); } while (proxy.isExpire()); - log.info("切换到proxy:ip:{},port:{},ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue()); return proxy; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java index f23caaf..d22b7dc 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java @@ -1,5 +1,6 @@ package us.codecraft.webmagic.proxy; +import lombok.Getter; import org.apache.http.annotation.Contract; import org.apache.http.annotation.ThreadingBehavior; @@ -13,6 +14,7 @@ import java.time.temporal.ChronoUnit; */ @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) public class ExpirableProxy extends Proxy { + @Getter private final int ttl; private final LocalDateTime expireTime;