diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java deleted file mode 100644 index e0acd90..0000000 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CachePipeline.java +++ /dev/null @@ -1,18 +0,0 @@ -package us.codecraft.webmagic.pipeline; - -import java.util.Collection; - -/** - * @author yaoqiang - * - * 为pipeline提供缓存能力 - * 在某个时机执行批处理任务 - */ -public interface CachePipeline extends Pipeline{ - - /** - * @param collection 缓存批处理 - * - */ - void process(Collection collection); -} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java b/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java deleted file mode 100644 index 4ba433e..0000000 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/pipeline/CloseableCachePipeline.java +++ /dev/null @@ -1,87 +0,0 @@ -package us.codecraft.webmagic.pipeline; - -import lombok.extern.slf4j.Slf4j; -import us.codecraft.webmagic.ResultItems; -import us.codecraft.webmagic.Task; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; - -/** - * @author yaoqiang - * 提供关闭时刷新能力 - *

- *

- * 不负责创建 {@link ExecutorService},如果需要异步执行,那么需要从外界传入,由外界自己管理 {@link ExecutorService}生命周期 - * @see ExecutorService - */ -@Slf4j -public abstract class CloseableCachePipeline implements CachePipeline, Closeable { - - - private final BlockingQueue cache; - - private final ExecutorService executorService; - - public CloseableCachePipeline(int max, ExecutorService executorService) { - this.cache = new ArrayBlockingQueue<>(max, false); - this.executorService = executorService; - } - - public CloseableCachePipeline(int max) { - this(max, null); - } - - /** - * @param resultItems 接收到的信息 - * @param task 执行的任务 - */ - @Override - public final void process(ResultItems resultItems, Task task) { - try { - cache.put(resultItems); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - log.error(e.getMessage(), e); - } - if (cache.remainingCapacity() == 0) { - // set 中的resultItem 使用权依然传递了出去,cache的使用全保留,考虑到后面也用不上 resultItem,所以发布出去问题也不大 - // temp的修改权限被发布,理由同上,想添加,删除都可以,反正以后也用不上了; - Set temp = new HashSet<>(cache); - if (executorService != null && !executorService.isShutdown()) { - executorService.execute(() -> process(temp, task)); - } else { - process(temp, task); - } - cache.clear(); - } - - } - - protected abstract void process(Collection resultItems, Task task); - - private synchronized void flush(Collection resultItems) { - process(resultItems, null); - - } - - /** - * 结合源码,实现关闭时处理剩余的缓存,直接交由主线程处理 - * - * @throws IOException 关闭可能出现异常,由上层处理 - */ - @Override - public final void close() throws IOException { - if (!cache.isEmpty()) { - flush(cache); - cache.clear(); - } - } -} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java index 781553c..8e7cb08 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/AbstractRefreshableProxyProvider.java @@ -3,9 +3,12 @@ package us.codecraft.webmagic.proxy; import lombok.extern.slf4j.Slf4j; import us.codecraft.webmagic.Task; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.Comparator; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; /** * @author yaoqiang @@ -14,18 +17,26 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider { + private final LongAdder totalGet = new LongAdder(); + + private final LongAdder canUse = new LongAdder(); private final AtomicReference> usedProxyCache = new AtomicReference<>(); private final PriorityBlockingQueue ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime)); + private final int maxHostNum; - protected void doPut(ExpirableProxy expirableProxy) { - ipQueue.put(expirableProxy); + public AbstractRefreshableProxyProvider(int maxHostNum) { + this.maxHostNum = maxHostNum; } - protected int hostSize() { - return ipQueue.size(); + protected void doPut(ExpirableProxy expirableProxy) { + synchronized (ipQueue) { + if (ipQueue.size() <= maxHostNum) { + ipQueue.put(expirableProxy); + } + } } @Override @@ -116,6 +127,7 @@ public abstract class AbstractRefreshableProxyProvider implements RefreshablePro do { proxy = ipQueue.take(); } while (proxy.isExpire()); + log.info("切换到proxy:ip:{},port:{},ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue()); return proxy; } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java index d22b7dc..f23caaf 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ExpirableProxy.java @@ -1,6 +1,5 @@ package us.codecraft.webmagic.proxy; -import lombok.Getter; import org.apache.http.annotation.Contract; import org.apache.http.annotation.ThreadingBehavior; @@ -14,7 +13,6 @@ import java.time.temporal.ChronoUnit; */ @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) public class ExpirableProxy extends Proxy { - @Getter private final int ttl; private final LocalDateTime expireTime;