bug修改,对结果提供缓存能力
parent
33906e36f4
commit
f68795d7dd
|
@ -0,0 +1,18 @@
|
||||||
|
package us.codecraft.webmagic.pipeline;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaoqiang
|
||||||
|
*
|
||||||
|
* 为pipeline提供缓存能力
|
||||||
|
* 在某个时机执行批处理任务
|
||||||
|
*/
|
||||||
|
public interface CachePipeline<T> extends Pipeline{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param collection 缓存批处理
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
void process(Collection<T> collection);
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
package us.codecraft.webmagic.pipeline;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import us.codecraft.webmagic.ResultItems;
|
||||||
|
import us.codecraft.webmagic.Task;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaoqiang
|
||||||
|
* 提供关闭时刷新能力
|
||||||
|
* <p>
|
||||||
|
* <p>
|
||||||
|
* 不负责创建 {@link ExecutorService},如果需要异步执行,那么需要从外界传入,由外界自己管理 {@link ExecutorService}生命周期
|
||||||
|
* @see ExecutorService
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public abstract class CloseableCachePipeline implements CachePipeline<ResultItems>, Closeable {
|
||||||
|
|
||||||
|
|
||||||
|
private final BlockingQueue<ResultItems> cache;
|
||||||
|
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
public CloseableCachePipeline(int max, ExecutorService executorService) {
|
||||||
|
this.cache = new ArrayBlockingQueue<>(max, false);
|
||||||
|
this.executorService = executorService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CloseableCachePipeline(int max) {
|
||||||
|
this(max, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param resultItems 接收到的信息
|
||||||
|
* @param task 执行的任务
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public final void process(ResultItems resultItems, Task task) {
|
||||||
|
try {
|
||||||
|
cache.put(resultItems);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
log.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
if (cache.remainingCapacity() == 0) {
|
||||||
|
// set 中的resultItem 使用权依然传递了出去,cache的使用全保留,考虑到后面也用不上 resultItem,所以发布出去问题也不大
|
||||||
|
// temp的修改权限被发布,理由同上,想添加,删除都可以,反正以后也用不上了;
|
||||||
|
Set<ResultItems> temp = new HashSet<>(cache);
|
||||||
|
if (executorService != null && !executorService.isShutdown()) {
|
||||||
|
executorService.execute(() -> process(temp, task));
|
||||||
|
} else {
|
||||||
|
process(temp, task);
|
||||||
|
}
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void process(Collection<ResultItems> resultItems, Task task);
|
||||||
|
|
||||||
|
private synchronized void flush(Collection<ResultItems> resultItems) {
|
||||||
|
process(resultItems, null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 结合源码,实现关闭时处理剩余的缓存,直接交由主线程处理
|
||||||
|
*
|
||||||
|
* @throws IOException 关闭可能出现异常,由上层处理
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public final void close() throws IOException {
|
||||||
|
if (!cache.isEmpty()) {
|
||||||
|
flush(cache);
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,12 +3,9 @@ package us.codecraft.webmagic.proxy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import us.codecraft.webmagic.Task;
|
import us.codecraft.webmagic.Task;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.math.RoundingMode;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author yaoqiang
|
* @author yaoqiang
|
||||||
|
@ -17,26 +14,18 @@ import java.util.concurrent.atomic.LongAdder;
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider {
|
public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider {
|
||||||
|
|
||||||
private final LongAdder totalGet = new LongAdder();
|
|
||||||
|
|
||||||
private final LongAdder canUse = new LongAdder();
|
|
||||||
|
|
||||||
private final AtomicReference<FutureTask<Proxy>> usedProxyCache = new AtomicReference<>();
|
private final AtomicReference<FutureTask<Proxy>> usedProxyCache = new AtomicReference<>();
|
||||||
|
|
||||||
private final PriorityBlockingQueue<ExpirableProxy> ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime));
|
private final PriorityBlockingQueue<ExpirableProxy> ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime));
|
||||||
|
|
||||||
private final int maxHostNum;
|
|
||||||
|
|
||||||
public AbstractRefreshableProxyProvider(int maxHostNum) {
|
|
||||||
this.maxHostNum = maxHostNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void doPut(ExpirableProxy expirableProxy) {
|
protected void doPut(ExpirableProxy expirableProxy) {
|
||||||
synchronized (ipQueue) {
|
ipQueue.put(expirableProxy);
|
||||||
if (ipQueue.size() <= maxHostNum) {
|
}
|
||||||
ipQueue.put(expirableProxy);
|
|
||||||
}
|
protected int hostSize() {
|
||||||
}
|
return ipQueue.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -127,7 +116,6 @@ public abstract class AbstractRefreshableProxyProvider implements RefreshablePro
|
||||||
do {
|
do {
|
||||||
proxy = ipQueue.take();
|
proxy = ipQueue.take();
|
||||||
} while (proxy.isExpire());
|
} while (proxy.isExpire());
|
||||||
log.info("切换到proxy:ip:{},port:{},ip可用率:{}", proxy.getHost(), proxy.getPort(), BigDecimal.valueOf(canUse.sum()).divide(BigDecimal.valueOf(totalGet.sum()), 2, RoundingMode.HALF_DOWN).doubleValue());
|
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package us.codecraft.webmagic.proxy;
|
package us.codecraft.webmagic.proxy;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import org.apache.http.annotation.Contract;
|
import org.apache.http.annotation.Contract;
|
||||||
import org.apache.http.annotation.ThreadingBehavior;
|
import org.apache.http.annotation.ThreadingBehavior;
|
||||||
|
|
||||||
|
@ -13,6 +14,7 @@ import java.time.temporal.ChronoUnit;
|
||||||
*/
|
*/
|
||||||
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
|
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
|
||||||
public class ExpirableProxy extends Proxy {
|
public class ExpirableProxy extends Proxy {
|
||||||
|
@Getter
|
||||||
private final int ttl;
|
private final int ttl;
|
||||||
private final LocalDateTime expireTime;
|
private final LocalDateTime expireTime;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue