perfect Spider.run to avoid some rare concurrent issue, change the Spider.emptySleepTime to long type
parent
c5a037a807
commit
ab5d81a6b6
|
@ -106,7 +106,7 @@ public class Spider implements Runnable, Task {
|
|||
|
||||
private Date startTime;
|
||||
|
||||
private int emptySleepTime = 30000;
|
||||
private long emptySleepTime = 30000;
|
||||
|
||||
/**
|
||||
* create a spider with pageProcessor.
|
||||
|
@ -305,32 +305,52 @@ public class Spider implements Runnable, Task {
|
|||
public void run() {
|
||||
checkRunningStat();
|
||||
initComponent();
|
||||
logger.info("Spider {} started!",getUUID());
|
||||
logger.info("Spider {} started!", getUUID());
|
||||
// interrupt won't be necessarily detected
|
||||
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
|
||||
final Request request = scheduler.poll(this);
|
||||
if (request == null) {
|
||||
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
|
||||
break;
|
||||
}
|
||||
// wait until new url added
|
||||
waitNewUrl();
|
||||
} else {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
processRequest(request);
|
||||
onSuccess(request);
|
||||
} catch (Exception e) {
|
||||
onError(request, e);
|
||||
logger.error("process request " + request + " error", e);
|
||||
} finally {
|
||||
pageCount.incrementAndGet();
|
||||
signalNewUrl();
|
||||
Request poll = scheduler.poll(this);
|
||||
if (poll == null) {
|
||||
if (threadPool.getThreadAlive() == 0) {
|
||||
//no alive thread anymore , try again
|
||||
poll = scheduler.poll(this);
|
||||
if(poll==null) {
|
||||
if (exitWhenComplete) {
|
||||
break;
|
||||
}else{
|
||||
// wait
|
||||
try {
|
||||
Thread.sleep(emptySleepTime);
|
||||
continue;
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}else {
|
||||
// wait until new url added,
|
||||
if(waitNewUrl())
|
||||
//if interrupted
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
final Request request = poll;
|
||||
//this may swallow the interruption
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
processRequest(request);
|
||||
onSuccess(request);
|
||||
} catch (Exception e) {
|
||||
onError(request,e);
|
||||
logger.error("process request " + request + " error", e);
|
||||
} finally {
|
||||
pageCount.incrementAndGet();
|
||||
signalNewUrl();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
stat.set(STAT_STOPPED);
|
||||
// release some resources
|
||||
|
@ -565,16 +585,24 @@ public class Spider implements Runnable, Task {
|
|||
return this;
|
||||
}
|
||||
|
||||
private void waitNewUrl() {
|
||||
/**
|
||||
*
|
||||
* @return isInterrupted
|
||||
*/
|
||||
private boolean waitNewUrl() {
|
||||
// now there may not be any thread live
|
||||
newUrlLock.lock();
|
||||
try {
|
||||
//double check
|
||||
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
|
||||
return;
|
||||
//double check,unnecessary, unless very fast concurrent
|
||||
if (threadPool.getThreadAlive() == 0) {
|
||||
return false;
|
||||
}
|
||||
//wait for amount of time
|
||||
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("waitNewUrl - interrupted, error {}", e);
|
||||
// logger.warn("waitNewUrl - interrupted, error {}", e);
|
||||
return true;
|
||||
} finally {
|
||||
newUrlLock.unlock();
|
||||
}
|
||||
|
@ -772,7 +800,10 @@ public class Spider implements Runnable, Task {
|
|||
*
|
||||
* @param emptySleepTime In MILLISECONDS.
|
||||
*/
|
||||
public void setEmptySleepTime(int emptySleepTime) {
|
||||
public void setEmptySleepTime(long emptySleepTime) {
|
||||
if(emptySleepTime<=0){
|
||||
throw new IllegalArgumentException("emptySleepTime should be more than zero!");
|
||||
}
|
||||
this.emptySleepTime = emptySleepTime;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue