commit
1d536cf705
|
@ -1,5 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>us.codecraft</groupId>
|
||||
<artifactId>webmagic-parent</artifactId>
|
||||
|
@ -24,6 +25,12 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.10</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>us.codecraft</groupId>
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package us.codecraft.webmagic;
|
||||
|
||||
import com.sun.org.apache.regexp.internal.RE;
|
||||
import us.codecraft.webmagic.utils.HttpConstant;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -35,8 +36,12 @@ public class Site {
|
|||
|
||||
private static final Set<Integer> DEFAULT_STATUS_CODE_SET = new HashSet<Integer>();
|
||||
|
||||
private static final Set<Integer> DEFAULT_REFRESH_CODE_SET = new HashSet<>();
|
||||
|
||||
private Set<Integer> refreshCode = DEFAULT_REFRESH_CODE_SET;
|
||||
private Set<Integer> acceptStatCode = DEFAULT_STATUS_CODE_SET;
|
||||
|
||||
|
||||
private Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
private boolean useGzip = true;
|
||||
|
@ -44,6 +49,7 @@ public class Site {
|
|||
private boolean disableCookieManagement = false;
|
||||
|
||||
static {
|
||||
DEFAULT_REFRESH_CODE_SET.add(HttpConstant.StatusCode.FORBIDDEN);
|
||||
DEFAULT_STATUS_CODE_SET.add(HttpConstant.StatusCode.CODE_200);
|
||||
}
|
||||
|
||||
|
@ -192,6 +198,15 @@ public class Site {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Site setRefreshCode(Set<Integer> refreshCode){
|
||||
this.refreshCode = refreshCode;
|
||||
return this;
|
||||
}
|
||||
public Set<Integer> getRefreshCode(){
|
||||
return refreshCode;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* get acceptStatCode
|
||||
*
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -102,7 +103,7 @@ public class Spider implements Runnable, Task {
|
|||
|
||||
private List<SpiderListener> spiderListeners;
|
||||
|
||||
private final AtomicLong pageCount = new AtomicLong(0);
|
||||
private final LongAdder pageCount = new LongAdder();
|
||||
|
||||
private Date startTime;
|
||||
|
||||
|
@ -323,7 +324,7 @@ public class Spider implements Runnable, Task {
|
|||
onError(request);
|
||||
logger.error("process request " + request + " error", e);
|
||||
} finally {
|
||||
pageCount.incrementAndGet();
|
||||
pageCount.increment();
|
||||
signalNewUrl();
|
||||
}
|
||||
}
|
||||
|
@ -335,7 +336,7 @@ public class Spider implements Runnable, Task {
|
|||
if (destroyWhenExit) {
|
||||
close();
|
||||
}
|
||||
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
|
||||
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.sumThenReset());
|
||||
}
|
||||
|
||||
protected void onError(Request request) {
|
||||
|
@ -423,7 +424,11 @@ public class Spider implements Runnable, Task {
|
|||
pipeline.process(page.getResultItems(), this);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if(site.getRefreshCode().contains(page.getStatusCode())) {
|
||||
logger.info("page status code error, page {} , code: {}, start refresh downloader", request.getUrl(), page.getStatusCode());
|
||||
downloader.refreshComponent(this);
|
||||
failHandler(request);
|
||||
}else {
|
||||
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
|
||||
}
|
||||
sleep(site.getSleepTime());
|
||||
|
@ -431,6 +436,10 @@ public class Spider implements Runnable, Task {
|
|||
}
|
||||
|
||||
private void onDownloaderFail(Request request) {
|
||||
failHandler(request);
|
||||
}
|
||||
|
||||
private void failHandler(Request request){
|
||||
if (site.getCycleRetryTimes() == 0) {
|
||||
sleep(site.getSleepTime());
|
||||
} else {
|
||||
|
@ -650,7 +659,7 @@ public class Spider implements Runnable, Task {
|
|||
* @since 0.4.1
|
||||
*/
|
||||
public long getPageCount() {
|
||||
return pageCount.get();
|
||||
return pageCount.sum();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -3,6 +3,7 @@ package us.codecraft.webmagic.downloader;
|
|||
import us.codecraft.webmagic.Page;
|
||||
import us.codecraft.webmagic.Request;
|
||||
import us.codecraft.webmagic.Site;
|
||||
import us.codecraft.webmagic.proxy.ProxyProvider;
|
||||
import us.codecraft.webmagic.selector.Html;
|
||||
|
||||
/**
|
||||
|
@ -38,7 +39,7 @@ public abstract class AbstractDownloader implements Downloader {
|
|||
protected void onSuccess(Request request) {
|
||||
}
|
||||
|
||||
protected void onError(Request request) {
|
||||
protected void onError(Request request, Throwable throwable, ProxyProvider proxyProvider) {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,11 +21,15 @@ public interface Downloader {
|
|||
* @param task task
|
||||
* @return page
|
||||
*/
|
||||
public Page download(Request request, Task task);
|
||||
Page download(Request request, Task task);
|
||||
|
||||
/**
|
||||
* Tell the downloader how many threads the spider used.
|
||||
*
|
||||
* @param threadNum number of threads
|
||||
*/
|
||||
public void setThread(int threadNum);
|
||||
void setThread(int threadNum);
|
||||
|
||||
|
||||
void refreshComponent(Task task);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@ import us.codecraft.webmagic.Site;
|
|||
import us.codecraft.webmagic.Task;
|
||||
import us.codecraft.webmagic.proxy.Proxy;
|
||||
import us.codecraft.webmagic.proxy.ProxyProvider;
|
||||
import us.codecraft.webmagic.proxy.RefreshableProxyProvider;
|
||||
import us.codecraft.webmagic.proxy.ReturnableProxyProvider;
|
||||
import us.codecraft.webmagic.selector.PlainText;
|
||||
import us.codecraft.webmagic.utils.CharsetUtils;
|
||||
import us.codecraft.webmagic.utils.HttpClientUtils;
|
||||
|
@ -21,6 +23,8 @@ import java.io.IOException;
|
|||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -31,17 +35,29 @@ import java.util.Map;
|
|||
*/
|
||||
public class HttpClientDownloader extends AbstractDownloader {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();
|
||||
|
||||
private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();
|
||||
private final Map<String, CloseableHttpClient> httpClients = new ConcurrentHashMap<>();
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private final HttpClientGenerator httpClientGenerator = new HttpClientGenerator();
|
||||
|
||||
private HttpUriRequestConverter httpUriRequestConverter = new HttpUriRequestConverter();
|
||||
|
||||
private ProxyProvider proxyProvider;
|
||||
|
||||
private boolean responseHeader = true;
|
||||
private final boolean responseHeader = true;
|
||||
|
||||
|
||||
private Predicate<Throwable> refreshProxyOnError = t -> false;
|
||||
|
||||
|
||||
private Predicate<Throwable> refreshClientOnError = t -> false;
|
||||
|
||||
|
||||
public void setRefreshClientOnError(Predicate<Throwable> clientOnError){
|
||||
this.refreshClientOnError = clientOnError;
|
||||
}
|
||||
public void setRefreshProxyOnError(Predicate<Throwable> proxyOnError) {
|
||||
this.refreshProxyOnError = proxyOnError;
|
||||
}
|
||||
|
||||
public void setHttpUriRequestConverter(HttpUriRequestConverter httpUriRequestConverter) {
|
||||
this.httpUriRequestConverter = httpUriRequestConverter;
|
||||
|
@ -56,17 +72,8 @@ public class HttpClientDownloader extends AbstractDownloader {
|
|||
return httpClientGenerator.getClient(null);
|
||||
}
|
||||
String domain = site.getDomain();
|
||||
CloseableHttpClient httpClient = httpClients.get(domain);
|
||||
if (httpClient == null) {
|
||||
synchronized (this) {
|
||||
httpClient = httpClients.get(domain);
|
||||
if (httpClient == null) {
|
||||
httpClient = httpClientGenerator.getClient(site);
|
||||
httpClients.put(domain, httpClient);
|
||||
}
|
||||
}
|
||||
}
|
||||
return httpClient;
|
||||
return httpClients.computeIfAbsent(domain,k->httpClientGenerator.getClient(site));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -87,19 +94,37 @@ public class HttpClientDownloader extends AbstractDownloader {
|
|||
return page;
|
||||
} catch (IOException e) {
|
||||
logger.warn("download page {} error", request.getUrl(), e);
|
||||
onError(request);
|
||||
onError(request, e, proxyProvider);
|
||||
if (proxyProvider != null && proxy != null && proxyProvider instanceof RefreshableProxyProvider && refreshProxyOnError.test(e)) {
|
||||
((RefreshableProxyProvider)proxyProvider).refreshProxy(task,proxy);
|
||||
}
|
||||
if(refreshClientOnError.test(e)) {
|
||||
httpClients.remove(task.getSite().getDomain());
|
||||
}
|
||||
return page;
|
||||
} finally {
|
||||
if (httpResponse != null) {
|
||||
//ensure the connection is released back to pool
|
||||
EntityUtils.consumeQuietly(httpResponse.getEntity());
|
||||
}
|
||||
if (proxyProvider != null && proxy != null) {
|
||||
proxyProvider.returnProxy(proxy, page, task);
|
||||
if (proxyProvider != null && proxy != null && proxyProvider instanceof ReturnableProxyProvider) {
|
||||
((ReturnableProxyProvider) proxyProvider).returnProxy(proxy, page, task);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
if (proxyProvider != null && proxyProvider instanceof RefreshableProxyProvider) {
|
||||
((RefreshableProxyProvider) proxyProvider).refreshProxy(task, ((RefreshableProxyProvider) proxyProvider).getCurrentProxy(task));
|
||||
}
|
||||
|
||||
httpClients.remove(task.getSite().getDomain());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setThread(int thread) {
|
||||
httpClientGenerator.setPoolSize(thread);
|
||||
|
@ -110,7 +135,7 @@ public class HttpClientDownloader extends AbstractDownloader {
|
|||
String contentType = httpResponse.getEntity().getContentType() == null ? "" : httpResponse.getEntity().getContentType().getValue();
|
||||
Page page = new Page();
|
||||
page.setBytes(bytes);
|
||||
if (!request.isBinaryContent()){
|
||||
if (!request.isBinaryContent()) {
|
||||
if (charset == null) {
|
||||
charset = getHtmlCharset(contentType, bytes);
|
||||
}
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
package us.codecraft.webmagic.downloader;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLContextSpi;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
|
||||
|
@ -24,6 +28,7 @@ import org.apache.http.conn.socket.ConnectionSocketFactory;
|
|||
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
|
||||
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
|
||||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
|
||||
import org.apache.http.impl.client.BasicCookieStore;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
|
||||
|
@ -32,6 +37,7 @@ import org.apache.http.impl.client.HttpClients;
|
|||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||
import org.apache.http.impl.cookie.BasicClientCookie;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.ssl.SSLContexts;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -69,7 +75,7 @@ public class HttpClientGenerator {
|
|||
return new SSLConnectionSocketFactory(sslContext, supportedProtocols,
|
||||
null,
|
||||
new DefaultHostnameVerifier()); // 优先绕过安全证书
|
||||
} catch (KeyManagementException e) {
|
||||
} catch (KeyManagementException | CertificateException | KeyStoreException | IOException e) {
|
||||
logger.error("ssl connection fail", e);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
logger.error("ssl connection fail", e);
|
||||
|
@ -77,8 +83,8 @@ public class HttpClientGenerator {
|
|||
return SSLConnectionSocketFactory.getSocketFactory();
|
||||
}
|
||||
|
||||
private SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException {
|
||||
// 实现一个X509TrustManager接口,用于绕过验证,不用修改里面的方法
|
||||
private SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException, CertificateException, KeyStoreException, IOException {
|
||||
// 实现一个X509TrustManager接口,用于绕过验证,不用修改里面的方法
|
||||
X509TrustManager trustManager = new X509TrustManager() {
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +102,7 @@ public class HttpClientGenerator {
|
|||
|
||||
};
|
||||
|
||||
SSLContext sc = SSLContext.getInstance("TLS");
|
||||
SSLContext sc = SSLContext.getInstance("SSLv3");
|
||||
sc.init(null, new TrustManager[] { trustManager }, null);
|
||||
return sc;
|
||||
}
|
||||
|
@ -137,6 +143,7 @@ public class HttpClientGenerator {
|
|||
SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
|
||||
socketConfigBuilder.setSoKeepAlive(true).setTcpNoDelay(true);
|
||||
socketConfigBuilder.setSoTimeout(site.getTimeOut());
|
||||
|
||||
SocketConfig socketConfig = socketConfigBuilder.build();
|
||||
httpClientBuilder.setDefaultSocketConfig(socketConfig);
|
||||
connectionManager.setDefaultSocketConfig(socketConfig);
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
package us.codecraft.webmagic.pipeline;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
*
|
||||
* 为pipeline提供缓存能力
|
||||
* 在某个时机执行批处理任务
|
||||
*/
|
||||
public interface CachePipeline<T> extends Pipeline{
|
||||
|
||||
/**
|
||||
* @param collection 缓存批处理
|
||||
*
|
||||
*/
|
||||
void process(Collection<T> collection);
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package us.codecraft.webmagic.pipeline;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import us.codecraft.webmagic.ResultItems;
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
* 提供关闭时刷新能力
|
||||
* <p>
|
||||
* <p>
|
||||
* 不负责创建 {@link ExecutorService},如果需要异步执行,那么需要从外界传入,由外界自己管理 {@link ExecutorService}生命周期
|
||||
* @see ExecutorService
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class CloseableCachePipeline implements CachePipeline<ResultItems>, Closeable {
|
||||
|
||||
|
||||
private final BlockingQueue<ResultItems> cache;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CloseableCachePipeline(int max, ExecutorService executorService) {
|
||||
this.cache = new ArrayBlockingQueue<>(max, false);
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
public CloseableCachePipeline(int max) {
|
||||
this(max, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param resultItems 接收到的信息
|
||||
* @param task 执行的任务
|
||||
*/
|
||||
@Override
|
||||
public final void process(ResultItems resultItems, Task task) {
|
||||
try {
|
||||
cache.put(resultItems);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
if (cache.remainingCapacity() == 0) {
|
||||
// set 中的resultItem 使用权依然传递了出去,cache的使用全保留,考虑到后面也用不上 resultItem,所以发布出去问题也不大
|
||||
// temp的修改权限被发布,理由同上,想添加,删除都可以,反正以后也用不上了;
|
||||
Set<ResultItems> temp = new HashSet<>(cache);
|
||||
if (executorService != null && !executorService.isShutdown()) {
|
||||
executorService.execute(() -> process(temp, task));
|
||||
} else {
|
||||
process(temp, task);
|
||||
}
|
||||
cache.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected abstract void process(Collection<ResultItems> resultItems, Task task);
|
||||
|
||||
private synchronized void flush(Collection<ResultItems> resultItems) {
|
||||
process(resultItems, null);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 结合源码,实现关闭时处理剩余的缓存,直接交由主线程处理
|
||||
*
|
||||
* @throws IOException 关闭可能出现异常,由上层处理
|
||||
*/
|
||||
@Override
|
||||
public final void close() throws IOException {
|
||||
if (!cache.isEmpty()) {
|
||||
flush(cache);
|
||||
cache.clear();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
* 可刷新的代理提供商抽象实现
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractRefreshableProxyProvider implements RefreshableProxyProvider {
|
||||
|
||||
|
||||
private final AtomicReference<FutureTask<Proxy>> usedProxyCache = new AtomicReference<>();
|
||||
|
||||
private final PriorityBlockingQueue<ExpirableProxy> ipQueue = new PriorityBlockingQueue<>(1000, Comparator.comparing(ExpirableProxy::getExpireTime));
|
||||
|
||||
|
||||
protected void doPut(ExpirableProxy expirableProxy) {
|
||||
ipQueue.put(expirableProxy);
|
||||
}
|
||||
|
||||
protected int hostSize() {
|
||||
return ipQueue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshProxy(Task task, Proxy proxy) {
|
||||
if (proxy != null) {
|
||||
FutureTask<Proxy> proxyFutureTask = usedProxyCache.get();
|
||||
Proxy currentProxy = getCurrentProxy(task);
|
||||
// 如果在出错到这里的过程中,usedProxyCache被更新过,proxy 就不可能相等,如果依然相等,说明没有更新过
|
||||
// 可能没有使用代理的情况
|
||||
if (proxy.equals(currentProxy)) {
|
||||
// 如果此时依然没有更新过,就设置为空
|
||||
usedProxyCache.compareAndSet(proxyFutureTask, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Proxy getCurrentProxy(Task task) {
|
||||
FutureTask<Proxy> cache = usedProxyCache.get();
|
||||
Proxy currentProxy = null;
|
||||
try {
|
||||
if (cache != null)
|
||||
currentProxy = cache.get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getCause().getMessage(), e);
|
||||
} catch (TimeoutException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
return currentProxy;
|
||||
}
|
||||
|
||||
|
||||
private FutureTask<Proxy> buildCacheTask() {
|
||||
return new FutureTask<>(this::doGet);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 特别注意,防止活锁,集cache中总是抛出异常,那么将无限循环,无限报错
|
||||
*
|
||||
* @param task 下载任务
|
||||
* @return 返回代理
|
||||
*/
|
||||
@Override
|
||||
public Proxy getProxy(Task task) {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
FutureTask<Proxy> cache = usedProxyCache.get();
|
||||
if (cache == null) {
|
||||
FutureTask<Proxy> futureTask = buildCacheTask();
|
||||
if (usedProxyCache.compareAndSet(null, futureTask)) {
|
||||
cache = futureTask;
|
||||
futureTask.run();
|
||||
} else {
|
||||
// 交换失败,需要更新到最新数据
|
||||
cache = usedProxyCache.get();
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (cache != null) {
|
||||
|
||||
ExpirableProxy proxy = (ExpirableProxy) cache.get(5, TimeUnit.SECONDS);
|
||||
if (!proxy.isExpire())
|
||||
return proxy;
|
||||
}
|
||||
usedProxyCache.compareAndSet(cache, null);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error(e.getMessage(), e);
|
||||
usedProxyCache.compareAndSet(cache, null);
|
||||
} catch (ExecutionException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
usedProxyCache.compareAndSet(cache, null);
|
||||
} catch (TimeoutException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Proxy doGet() throws InterruptedException {
|
||||
ExpirableProxy proxy;
|
||||
do {
|
||||
proxy = ipQueue.take();
|
||||
} while (proxy.isExpire());
|
||||
return proxy;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.http.annotation.Contract;
|
||||
import org.apache.http.annotation.ThreadingBehavior;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
*
|
||||
* 可以过期的代理
|
||||
*/
|
||||
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
|
||||
public class ExpirableProxy extends Proxy {
|
||||
@Getter
|
||||
private final int ttl;
|
||||
private final LocalDateTime expireTime;
|
||||
|
||||
|
||||
public ExpirableProxy(String host, int port, int ttl, ChronoUnit chronoUnit) {
|
||||
super(host, port);
|
||||
this.ttl = ttl;
|
||||
this.expireTime = LocalDateTime.now().plus(ttl, chronoUnit);
|
||||
|
||||
}
|
||||
|
||||
public boolean isExpire() {
|
||||
return LocalDateTime.now().isAfter(expireTime);
|
||||
}
|
||||
public LocalDateTime getExpireTime(){
|
||||
return expireTime;
|
||||
}
|
||||
|
||||
}
|
|
@ -6,33 +6,30 @@ import java.net.URISyntaxException;
|
|||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import jdk.nashorn.internal.ir.annotations.Immutable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.annotation.Contract;
|
||||
import org.apache.http.annotation.ThreadingBehavior;
|
||||
|
||||
@Contract(threading = ThreadingBehavior.IMMUTABLE)
|
||||
public class Proxy {
|
||||
|
||||
private String scheme;
|
||||
private final String scheme;
|
||||
|
||||
private String host;
|
||||
private final String host;
|
||||
|
||||
private int port;
|
||||
private final int port;
|
||||
|
||||
private String username;
|
||||
private final String username;
|
||||
|
||||
private String password;
|
||||
private final String password;
|
||||
|
||||
public static Proxy create(final URI uri) {
|
||||
Proxy proxy = new Proxy(uri.getHost(), uri.getPort(), uri.getScheme());
|
||||
String userInfo = uri.getUserInfo();
|
||||
if (userInfo != null) {
|
||||
String[] up = userInfo.split(":");
|
||||
if (up.length == 1) {
|
||||
proxy.username = up[0].isEmpty() ? null : up[0];
|
||||
} else {
|
||||
proxy.username = up[0].isEmpty() ? null : up[0];
|
||||
proxy.password = up[1].isEmpty() ? null : up[1];
|
||||
}
|
||||
}
|
||||
return proxy;
|
||||
public Proxy(String host, int port, String scheme, String username, String password) {
|
||||
this.scheme = scheme;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public Proxy(String host, int port) {
|
||||
|
@ -40,24 +37,27 @@ public class Proxy {
|
|||
}
|
||||
|
||||
public Proxy(String host, int port, String scheme) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.scheme = scheme;
|
||||
this(host, port, scheme, null, null);
|
||||
}
|
||||
|
||||
public Proxy(String host, int port, String username, String password) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
this(host, port, null, username, password);
|
||||
}
|
||||
|
||||
public String getScheme() {
|
||||
return scheme;
|
||||
public static Proxy create(final URI uri) {
|
||||
String userInfo = uri.getUserInfo();
|
||||
String username = null;
|
||||
String password = null;
|
||||
if (userInfo != null) {
|
||||
String[] up = userInfo.split(":");
|
||||
if (up.length == 1) {
|
||||
username = up[0].isEmpty() ? null : up[0];
|
||||
} else {
|
||||
username = up[0].isEmpty() ? null : up[0];
|
||||
password = up[1].isEmpty() ? null : up[1];
|
||||
}
|
||||
|
||||
public void setScheme(String scheme) {
|
||||
this.scheme = scheme;
|
||||
}
|
||||
return new Proxy(uri.getHost(), uri.getPort(), uri.getScheme(), username, password);
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
|
@ -68,6 +68,8 @@ public class Proxy {
|
|||
return port;
|
||||
}
|
||||
|
||||
public String getScheme(){return scheme;}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import us.codecraft.webmagic.Page;
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
/**
|
||||
|
@ -10,14 +9,6 @@ import us.codecraft.webmagic.Task;
|
|||
*/
|
||||
public interface ProxyProvider {
|
||||
|
||||
/**
|
||||
*
|
||||
* Return proxy to Provider when complete a download.
|
||||
* @param proxy the proxy config contains host,port and identify info
|
||||
* @param page the download result
|
||||
* @param task the download task
|
||||
*/
|
||||
void returnProxy(Proxy proxy, Page page, Task task);
|
||||
|
||||
/**
|
||||
* Get a proxy for task by some strategy.
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
*
|
||||
* 可以手动刷新的代理供应商
|
||||
*/
|
||||
public interface RefreshableProxyProvider extends ProxyProvider{
|
||||
|
||||
/**
|
||||
* 代理IP是珍贵资源,有可能代理提供者内部代理没有过期,就一直提供某个IP,但这个IP又不可以使用,所以提供一种方式通知提供者,这个代理该刷新了
|
||||
*
|
||||
* @param task 爬虫任务
|
||||
* @param proxy 需要对代理进行验证,如果确实持有的时错误代理,则刷新,否则,继续执行
|
||||
*/
|
||||
void refreshProxy(Task task,Proxy proxy);
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* 获取当前正在提供的代理
|
||||
*
|
||||
* @param task 工作中的爬虫任务
|
||||
* @return 获取当前正在使用的代理
|
||||
*/
|
||||
Proxy getCurrentProxy(Task task);
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import us.codecraft.webmagic.Page;
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
/**
|
||||
* @author yaoqiang
|
||||
*
|
||||
* 可归还的代理提供商,代理被取出后,实用完成,可以归还给代理提供商
|
||||
*/
|
||||
public interface ReturnableProxyProvider {
|
||||
|
||||
/**
|
||||
*
|
||||
* Return proxy to Provider when complete a download.
|
||||
* @param proxy the proxy config contains host,port and identify info
|
||||
* @param page the download result
|
||||
* @param task the download task
|
||||
*/
|
||||
void returnProxy(Proxy proxy, Page page, Task task);
|
||||
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
package us.codecraft.webmagic.proxy;
|
||||
|
||||
import us.codecraft.webmagic.Page;
|
||||
import us.codecraft.webmagic.Task;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -30,6 +29,7 @@ public class SimpleProxyProvider implements ProxyProvider {
|
|||
this.pointer = pointer;
|
||||
}
|
||||
|
||||
|
||||
public static SimpleProxyProvider from(Proxy... proxies) {
|
||||
List<Proxy> proxiesTemp = new ArrayList<Proxy>(proxies.length);
|
||||
for (Proxy proxy : proxies) {
|
||||
|
@ -38,11 +38,6 @@ public class SimpleProxyProvider implements ProxyProvider {
|
|||
return new SimpleProxyProvider(Collections.unmodifiableList(proxiesTemp));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnProxy(Proxy proxy, Page page, Task task) {
|
||||
//Donothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public Proxy getProxy(Task task) {
|
||||
return proxies.get(incrForLoop());
|
||||
|
|
|
@ -28,6 +28,7 @@ public abstract class HttpConstant {
|
|||
public static abstract class StatusCode {
|
||||
|
||||
public static final int CODE_200 = 200;
|
||||
public static final int FORBIDDEN = 403;
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,11 @@ public class SpiderTest {
|
|||
return Site.me().setSleepTime(0);
|
||||
}
|
||||
}).setDownloader(new Downloader() {
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page download(Request request, Task task) {
|
||||
return new Page().setRawText("");
|
||||
|
|
|
@ -28,6 +28,11 @@ public class MockGithubDownloader implements Downloader {
|
|||
return page;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setThread(int threadNum) {
|
||||
}
|
||||
|
|
|
@ -43,6 +43,11 @@ public class PhantomJSDownloader extends AbstractDownloader {
|
|||
PhantomJSDownloader.phantomJsCommand = phantomJsCommand;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 新增构造函数,支持crawl.js路径自定义,因为当其他项目依赖此jar包时,runtime.exec()执行phantomjs命令时无使用法jar包中的crawl.js
|
||||
* <pre>
|
||||
|
|
|
@ -9,6 +9,10 @@ import us.codecraft.webmagic.selector.PlainText;
|
|||
* @author code4crafter@gmail.com
|
||||
*/
|
||||
public class MockGithubDownloader implements Downloader{
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
|
||||
}
|
||||
|
||||
private String html = "\n" +
|
||||
"\n" +
|
||||
|
|
|
@ -59,6 +59,11 @@ public class SeleniumDownloader implements Downloader, Closeable {
|
|||
// "/Users/Bingo/Downloads/phantomjs-1.9.7-macosx/bin/phantomjs");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshComponent(Task task) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* set sleep time to wait until load success
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue