From c146e2c7b45bfdfffe020c9024b805150cccb62b Mon Sep 17 00:00:00 2001 From: zwf Date: Mon, 19 May 2014 15:33:11 +0800 Subject: [PATCH] add proxy pool --- .../java/us/codecraft/webmagic/Request.java | 2 + .../main/java/us/codecraft/webmagic/Site.java | 32 ++ .../java/us/codecraft/webmagic/Spider.java | 5 + .../downloader/HttpClientDownloader.java | 14 +- .../us/codecraft/webmagic/proxy/Proxy.java | 172 ++++++++++ .../codecraft/webmagic/proxy/ProxyPool.java | 307 ++++++++++++++++++ .../codecraft/webmagic/proxy/ProxyUtil.java | 101 ++++++ 7 files changed, 629 insertions(+), 4 deletions(-) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPool.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyUtil.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Request.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Request.java index 1f8a194..9a0321e 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Request.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Request.java @@ -18,6 +18,8 @@ public class Request implements Serializable { private static final long serialVersionUID = 2062192774891352043L; public static final String CYCLE_TRIED_TIMES = "_cycle_tried_times"; + public static final String STATUS_CODE = "statusCode"; + public static final String PROXY = "proxy"; private String url; diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Site.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Site.java index a7c7bf8..01a4c75 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Site.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Site.java @@ -3,6 +3,8 @@ package us.codecraft.webmagic; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import org.apache.http.HttpHost; + +import us.codecraft.webmagic.proxy.ProxyPool; import us.codecraft.webmagic.utils.UrlUtils; import java.util.*; @@ -47,6 +49,8 @@ public class Site { private HttpHost httpProxy; + private ProxyPool httpProxyPool=new ProxyPool(); + private boolean useGzip = true; /** @@ -438,4 +442,32 @@ public class Site { ", headers=" + headers + '}'; } + + /** + * Set httpProxyPool, String[0]:ip, String[1]:port
+ * + * @return this + */ + public Site setHttpProxyPool(List httpProxyList) { + this.httpProxyPool=new ProxyPool(httpProxyList); + return this; + } + + public ProxyPool getHttpProxyPool() { + return httpProxyPool; + } + + public HttpHost getHttpProxyFromPool() { + return httpProxyPool.getProxy(); + } + + public void returnHttpProxyToPool(HttpHost proxy,int statusCode) { + httpProxyPool.returnProxy(proxy,statusCode); + } + + public Site setProxyReuseInterval(int reuseInterval) { + this.httpProxyPool.setReuseInterval(reuseInterval); + return this; + } + } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index 81cf179..b6ec472 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -2,6 +2,7 @@ package us.codecraft.webmagic; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; +import org.apache.http.HttpHost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import us.codecraft.webmagic.downloader.Downloader; @@ -324,6 +325,10 @@ public class Spider implements Runnable, Task { onError(requestFinal); logger.error("process request " + requestFinal + " error", e); } finally { + if (site.getHttpProxyPool().isEnable()) { + site.returnHttpProxyToPool((HttpHost) requestFinal.getExtra(Request.PROXY), (Integer) requestFinal + .getExtra(Request.STATUS_CODE)); + } pageCount.incrementAndGet(); signalNewUrl(); } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java index 5d2af73..6941087 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/downloader/HttpClientDownloader.java @@ -3,6 +3,7 @@ package us.codecraft.webmagic.downloader; import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.annotation.ThreadSafe; @@ -84,10 +85,12 @@ public class HttpClientDownloader extends AbstractDownloader { } logger.info("downloading page {}", request.getUrl()); CloseableHttpResponse httpResponse = null; + int statusCode=0; try { HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers); httpResponse = getHttpClient(site).execute(httpUriRequest); - int statusCode = httpResponse.getStatusLine().getStatusCode(); + statusCode = httpResponse.getStatusLine().getStatusCode(); + request.putExtra(Request.STATUS_CODE, statusCode); if (statusAccept(acceptStatCode, statusCode)) { //charset if (charset == null) { @@ -109,6 +112,7 @@ public class HttpClientDownloader extends AbstractDownloader { onError(request); return null; } finally { + request.putExtra(Request.STATUS_CODE, statusCode); try { if (httpResponse != null) { //ensure the connection is released back to pool @@ -173,9 +177,11 @@ public class HttpClientDownloader extends AbstractDownloader { .setSocketTimeout(site.getTimeOut()) .setConnectTimeout(site.getTimeOut()) .setCookieSpec(CookieSpecs.BEST_MATCH); - if (site != null && site.getHttpProxy() != null) { - requestConfigBuilder.setProxy(site.getHttpProxy()); - } + if (site.getHttpProxyPool().isEnable()) { + HttpHost host = site.getHttpProxyFromPool(); + requestConfigBuilder.setProxy(host); + request.putExtra(Request.PROXY, host); + } requestBuilder.setConfig(requestConfigBuilder.build()); return requestBuilder.build(); } diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java new file mode 100644 index 0000000..27e6b52 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/Proxy.java @@ -0,0 +1,172 @@ +package us.codecraft.webmagic.proxy; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpHost; + +/** + * >>>>Proxy Status + +----------+ +-----+ + | last use | | new | + +-----+----+ +---+-+ + | +------+ | + +->| init |<--+ + +--+---+ + | + v + +--------+ + +--->| borrow | + | +---+----+ + | |+------------------+ + | v + | +--------+ + | | in use | Respone Time + | +---+----+ + | |+------------------+ + | v + | +--------+ + | | return | + | +---+----+ + | |+-------------------+ + | v + | +-------+ reuse interval + | | delay | (delay time) + | +---+---+ + | |+-------------------+ + | v + | +------+ + | | idle | idle time + | +---+--+ + | |+-------------------+ + +--------+ + */ +public class Proxy implements Delayed, Serializable { + + private static final long serialVersionUID = 228939737383625551L; + public static final int ERROR_403 = 403; + public static final int ERROR_404 = 404; + public static final int ERROR_BANNED = 10000; + public static final int ERROR_Proxy = 10001; + public static final int SUCCESS = 200; + + private final HttpHost httpHost; + + private int reuseTimeInterval = 1500;// ms + private Long canReuseTime = 0L; + private Long lastBorrowTime = System.currentTimeMillis(); + private Long responseTime = 0L; + private Long idleTime = 0L; + + private int failedNum = 0; + private int successNum = 0; + private int borrowNum = 0; + + private List failedErrorType = new ArrayList(); + + Proxy(HttpHost httpHost) { + this.httpHost = httpHost; + this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseTimeInterval, TimeUnit.MILLISECONDS); + } + + Proxy(HttpHost httpHost, int reuseInterval) { + this.httpHost = httpHost; + this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseInterval, TimeUnit.MILLISECONDS); + } + + public int getSuccessNum() { + return successNum; + } + + public void successNumIncrement(int increment) { + this.successNum += increment; + } + + public Long getLastUseTime() { + return lastBorrowTime; + } + + public void setLastBorrowTime(Long lastBorrowTime) { + this.lastBorrowTime = lastBorrowTime; + } + + public void recordResponse() { + this.responseTime = (System.currentTimeMillis() - lastBorrowTime + responseTime) / 2; + this.lastBorrowTime = System.currentTimeMillis(); + } + + public List getFailedErrorType() { + return failedErrorType; + } + + public void setFailedErrorType(List failedErrorType) { + this.failedErrorType = failedErrorType; + } + + public void fail(int failedErrorType) { + this.failedNum++; + this.failedErrorType.add(failedErrorType); + } + + public void setFailedNum(int failedNum) { + this.failedNum = failedNum; + } + + public int getFailedNum() { + return failedNum; + } + + public String getFailedType() { + String re = ""; + for (Integer i : this.failedErrorType) { + re += i + " . "; + } + return re; + } + + public HttpHost getHttpHost() { + return httpHost; + } + + public int getReuseTimeInterval() { + return reuseTimeInterval; + } + + public void setReuseTimeInterval(int reuseTimeInterval) { + this.reuseTimeInterval = reuseTimeInterval; + this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseTimeInterval, TimeUnit.MILLISECONDS); + + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(canReuseTime - System.nanoTime(), unit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + Proxy that = (Proxy) o; + return canReuseTime > that.canReuseTime ? 1 : (canReuseTime < that.canReuseTime ? -1 : 0); + + } + + @Override + public String toString() { + + String re = String.format("host: %15s >> %5dms >> success: %-3.2f%% >> borrow: %d", httpHost.getAddress().getHostAddress(), responseTime, + successNum * 100.0 / borrowNum, borrowNum); + return re; + + } + + public void borrowNumIncrement(int increment) { + this.borrowNum += increment; + } + + public int getBorrowNum() { + return borrowNum; + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPool.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPool.java new file mode 100644 index 0000000..aa43b25 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPool.java @@ -0,0 +1,307 @@ +package us.codecraft.webmagic.proxy; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ClassName:ProxyPool + * + * @see + * @Function: TODO ADD FUNCTION + * @author ch + * @version Ver 1.0 + * @Date 2014-2-14 下午01:10:04 + */ +public class ProxyPool { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private BlockingQueue proxyQueue = new DelayQueue(); + private Map allProxy = new ConcurrentHashMap(); + + private int reuseInterval = 1500;// ms + private int reviveTime = 2 * 60 * 60 * 1000;// ms + + private boolean isEnable = false; + private boolean validateWhenInit = false; + private String proxyFile = "data/lastUse.proxy"; + + private Timer timer = new Timer(true); + private TimerTask saveProxyTask = new TimerTask() { + + @Override + public void run() { + saveProxyList(); + logger.info(allProxyStutus()); + } + }; + + public ProxyPool() { + + } + + public ProxyPool(List httpProxyList) { + readProxyList(); + addProxy(httpProxyList.toArray(new String[httpProxyList.size()][])); + timer.schedule(saveProxyTask, 10 * 60 * 1000L, 10 * 60 * 1000); + } + + private void saveProxyList() { + if (allProxy.size() == 0) { + return; + } + try { + ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(proxyFile)); + os.writeObject(prepareForSaving()); + os.close(); + logger.info("save proxy"); + } catch (FileNotFoundException e) { + logger.error("proxy file not found", e); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private Map prepareForSaving() { + Map tmp = new HashMap(); + for (Entry e : allProxy.entrySet()) { + Proxy p = e.getValue(); + p.setFailedNum(0); + tmp.put(e.getKey(), p); + } + return tmp; + } + + private void readProxyList() { + try { + ObjectInputStream is = new ObjectInputStream(new FileInputStream(proxyFile)); + addProxy((Map) is.readObject()); + is.close(); + } catch (FileNotFoundException e) { + logger.error("proxy file not found", e); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + private void addProxy(Map httpProxyMap) { + isEnable = true; + for (Entry entry : httpProxyMap.entrySet()) { + try { + if (allProxy.containsKey(entry.getKey())) { + continue; + } + if (!validateWhenInit || ProxyUtil.validateProxy(entry.getValue().getHttpHost())) { + entry.getValue().setFailedNum(0); + entry.getValue().setReuseTimeInterval(reuseInterval); + proxyQueue.add(entry.getValue()); + allProxy.put(entry.getKey(), entry.getValue()); + } + } catch (NumberFormatException e) { + logger.error("HttpHost init error:", e); + } + } + logger.info("proxy pool size>>>>" + allProxy.size()); + } + + public void addProxy(String[]... httpProxyList) { + isEnable = true; + for (String[] s : httpProxyList) { + try { + if (allProxy.containsKey(s[0])) { + continue; + } + HttpHost item = new HttpHost(InetAddress.getByName(s[0]), Integer.valueOf(s[1])); + if (!validateWhenInit || ProxyUtil.validateProxy(item)) { + Proxy p = new Proxy(item, reuseInterval); + proxyQueue.add(p); + allProxy.put(s[0], p); + } + } catch (NumberFormatException e) { + logger.error("HttpHost init error:", e); + } catch (UnknownHostException e) { + logger.error("HttpHost init error:", e); + } + } + logger.info("proxy pool size>>>>" + allProxy.size()); + } + + public HttpHost getProxy() { + Proxy proxy = null; + try { + Long time = System.currentTimeMillis(); + proxy = proxyQueue.take(); + double costTime = (System.currentTimeMillis() - time) / 1000.0; + if (costTime > reuseInterval) { + logger.info("get proxy time >>>> " + costTime); + } + Proxy p = allProxy.get(proxy.getHttpHost().getAddress().getHostAddress()); + p.setLastBorrowTime(System.currentTimeMillis()); + p.borrowNumIncrement(1); + } catch (InterruptedException e) { + logger.error("get proxy error", e); + } + if (proxy == null) { + throw new NoSuchElementException(); + } + return proxy.getHttpHost(); + } + + public void returnProxy(HttpHost host, int statusCode) { + Proxy p = allProxy.get(host.getAddress().getHostAddress()); + if (p == null) { + return; + } + switch (statusCode) { + case Proxy.SUCCESS: + p.setReuseTimeInterval(reuseInterval); + p.setFailedNum(0); + p.setFailedErrorType(new ArrayList()); + p.recordResponse(); + p.successNumIncrement(1); + break; + case Proxy.ERROR_403: + // banned,try larger interval + p.fail(Proxy.ERROR_403); + p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); + logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); + break; + case Proxy.ERROR_BANNED: + p.fail(Proxy.ERROR_BANNED); + p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum()); + logger.warn("this proxy is banned >>>> " + p.getHttpHost()); + logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); + break; + case Proxy.ERROR_404: + //p.fail(Proxy.ERROR_404); + // p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); + break; + default: + p.fail(statusCode); + break; + } + if (p.getFailedNum() > 20) { + // allProxy.remove(host.getAddress().getHostAddress()); + p.setReuseTimeInterval(reviveTime); + logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); + return; + } + if (p.getFailedNum()%5==0) { + if (!ProxyUtil.validateProxy(host)) { + // allProxy.remove(host.getAddress().getHostAddress()); + p.setReuseTimeInterval(reviveTime); + logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); + return; + } + } + try { + proxyQueue.put(p); + } catch (InterruptedException e) { + logger.warn("proxyQueue return proxy error", e); + } + } + + public String allProxyStutus() { + String re = "all proxy info >>>> \n"; + for (Entry entry : allProxy.entrySet()) { + re += entry.getValue().toString() + "\n"; + } + return re; + + } + + public int getIdleNum() { + return proxyQueue.size(); + } + + public int getReuseInterval() { + return reuseInterval; + } + + public void setReuseInterval(int reuseInterval) { + this.reuseInterval = reuseInterval; + } + + public static List getProxyList() { + List proxyList = new ArrayList(); + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(new File("proxy.txt"))); + + String line = ""; + while ((line = br.readLine()) != null) { + proxyList.add(new String[] { line.split(":")[0], line.split(":")[1] }); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return proxyList; + } + + public static void main(String[] args) throws IOException { + ProxyPool proxyPool = new ProxyPool(getProxyList()); + proxyPool.setReuseInterval(10000); + // proxyPool.saveProxyList(); + + while (true) { + List httphostList = new ArrayList(); + System.in.read(); + int i = 0; + while (proxyPool.getIdleNum() > 2) { + HttpHost httphost = proxyPool.getProxy(); + httphostList.add(httphost); + // proxyPool.proxyPool.use(httphost); + proxyPool.logger.info("borrow object>>>>" + i + ">>>>" + httphostList.get(i).toString()); + i++; + } + System.out.println(proxyPool.allProxyStutus()); + System.in.read(); + for (i = 0; i < httphostList.size(); i++) { + proxyPool.returnProxy(httphostList.get(i), 200); + proxyPool.logger.info("return object>>>>" + i + ">>>>" + httphostList.get(i).toString()); + } + System.out.println(proxyPool.allProxyStutus()); + System.in.read(); + } + + } + + public void enable(boolean isEnable) { + this.isEnable = isEnable; + } + + public boolean isEnable() { + return isEnable; + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyUtil.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyUtil.java new file mode 100644 index 0000000..f045e0d --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyUtil.java @@ -0,0 +1,101 @@ +package us.codecraft.webmagic.proxy; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.Socket; +import java.net.SocketException; +import java.util.Enumeration; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ClassName:ProxyUtil + * + * @see + * @author ch + * @version Ver 1.0 + * @Date 2014-2-16 下午04:20:07 + */ +public class ProxyUtil { + // TODO 改为单例 + private static InetAddress localAddr; + private static final Logger logger = LoggerFactory.getLogger(ProxyUtil.class); + static { + init(); + } + + private static void init() { + Enumeration localAddrs; + try { + NetworkInterface ni = NetworkInterface.getByName("eth7"); + if (ni == null) { + logger.error("choose NetworkInterface\n" + getNetworkInterface()); + } + localAddrs = ni.getInetAddresses(); + while (localAddrs.hasMoreElements()) { + InetAddress tmp = localAddrs.nextElement(); + if (!tmp.isLoopbackAddress() && !tmp.isLinkLocalAddress() && !(tmp instanceof Inet6Address)) { + localAddr = tmp; + logger.info("local IP:" + localAddr.getHostAddress()); + break; + } + } + } catch (Exception e) { + logger.error("Failure when init ProxyUtil", e); + logger.error("choose NetworkInterface\n" + getNetworkInterface()); + } + + } + + public static boolean validateProxy(HttpHost p) { + if (localAddr == null) { + logger.error("cannot get local ip"); + return false; + } + boolean isReachable = false; + Socket socket = null; + try { + socket = new Socket(); + socket.bind(new InetSocketAddress(localAddr, 0)); + InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort()); + socket.connect(endpointSocketAddr, 3000); + logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p); + isReachable = true; + } catch (IOException e) { + logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p); + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + logger.warn("Error occurred while closing socket of validating proxy", e); + } + } + } + return isReachable; + } + + private static String getNetworkInterface() { + String networkInterfaceName = ""; + Enumeration enumeration = null; + try { + enumeration = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (enumeration.hasMoreElements()) { + NetworkInterface networkInterface = enumeration.nextElement(); + networkInterfaceName += networkInterface.toString() + '\n'; + Enumeration addr = networkInterface.getInetAddresses(); + while (addr.hasMoreElements()) { + networkInterfaceName += "\tip:" + addr.nextElement().getHostAddress() + "\n"; + } + } + return networkInterfaceName; + } +}