老狗啃爬虫-增量爬取之Scheduler
摘要:
计算机程序在运行的时候,并不是完全如期望的一样顺风顺水,很多因素都可能会导致程序中断,爬虫程序亦是如此。在爬取数据的过程中,如果意外中断,并且我们还没有做相应的预案,那可能就是一场灾难,由于我们不知道爬虫程序的具体的爬取工作进度,不知道哪儿爬过了哪儿没爬过,吭哧吭哧费劲巴哈爬取的巨量数据,也只能前功尽弃,反复徒劳
我们知道,计算机程序在运行的时候,并不是完全如期望的一样顺风顺水,很多因素都可能会导致程序中断,爬虫程序亦是如此。在爬取数据的过程中,如果意外中断,并且我们还没有做相应的预案,那可能就是一场灾难,由于我们不知道爬虫程序的具体的爬取工作进度,不知道哪儿爬过了哪儿没爬过,吭哧吭哧费劲巴哈爬取的巨量数据,也只能前功尽弃,反复徒劳。
在爬虫程序的正式使用场景中,必须要做爬取进度数据持久化,也就是要考虑将抓取工作过程中的URL及其抓取状态,保存下来。只有做好了这些工作,我们才可以不惧程序中断,因为重启程序的时候,我们可以将保存的已抓取URL、待抓取URL加载进来,接着之前的工作进度继续即可。
WebMagic的实现
非常庆幸,WebMagic框架在这方面考虑的非常周全,已经帮我们做了方案。
WebMagic中一个名为FileCacheQueueScheduler类,就是在程序关闭时,将抓取过程中的已抓取URL,已文件的形式保存起来,然后在程序下次启动的时候,就可以接着之前抓取到的URL继续工作。
这个类在webmagic的拓展包里,即webmagic-extension-0.7.4.jar中,我们看看它在哪里:

这里强烈建议看看FileCacheQueueScheduler类的源码:
package us.codecraft.webmagic.scheduler;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
/**
* 将抓取到的url和光标存储在文件中
* 这样Spider可以在关闭时保存进度,重启可继续抓取进程
*/
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
private String filePath = System.getProperty("java.io.tmpdir");
private String fileUrlAllName = ".urls.txt";
private Task task;
private String fileCursor = ".cursor.txt";
private PrintWriter fileUrlWriter;
private PrintWriter fileCursorWriter;
private AtomicInteger cursor = new AtomicInteger();
private AtomicBoolean inited = new AtomicBoolean(false);
private BlockingQueue queue;
private Set urls;
private ScheduledExecutorService flushThreadPool;
public FileCacheQueueScheduler(String filePath) {
if (!filePath.endsWith("/") && !filePath.endsWith("\\")) {
filePath += "/";
}
this.filePath = filePath;
initDuplicateRemover();
}
private void flush() {
fileUrlWriter.flush();
fileCursorWriter.flush();
}
private void init(Task task) {
this.task = task;
File file = new File(filePath);
if (!file.exists()) {
file.mkdirs();
}
readFile();
initWriter();
initFlushThread();
inited.set(true);
logger.info("init cache scheduler success");
}
private void initDuplicateRemover() {
setDuplicateRemover(
new DuplicateRemover() {
@Override
public boolean isDuplicate(Request request, Task task) {
if (!inited.get()) {
init(task);
}
return !urls.add(request.getUrl());
}
@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
});
}
private void initFlushThread() {
flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
flush();
}
}, 10, 10, TimeUnit.SECONDS);
}
private void initWriter() {
try {
fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true));
fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false));
} catch (IOException e) {
throw new RuntimeException("init cache scheduler error", e);
}
}
private void readFile() {
try {
queue = new LinkedBlockingQueue();
urls = new LinkedHashSet();
readCursorFile();
readUrlFile();
// initDuplicateRemover();
} catch (FileNotFoundException e) {
//init
logger.info("init cache file " + getFileName(fileUrlAllName));
} catch (IOException e) {
logger.error("init file error", e);
}
}
private void readUrlFile() throws IOException {
String line;
BufferedReader fileUrlReader = null;
try {
fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
int lineReaded = 0;
while ((line = fileUrlReader.readLine()) != null) {
urls.add(line.trim());
lineReaded++;
if (lineReaded > cursor.get()) {
queue.add(deserializeRequest(line));
}
}
} finally {
if (fileUrlReader != null) {
IOUtils.closeQuietly(fileUrlReader);
}
}
}
private void readCursorFile() throws IOException {
BufferedReader fileCursorReader = null;
try {
fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
String line;
//read the last number
while ((line = fileCursorReader.readLine()) != null) {
cursor = new AtomicInteger(NumberUtils.toInt(line));
}
} finally {
if (fileCursorReader != null) {
IOUtils.closeQuietly(fileCursorReader);
}
}
}
public void close() throws IOException {
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}
private String getFileName(String filename) {
return filePath + task.getUUID() + filename;
}
@Override
protected void pushWhenNoDuplicate(Request request, Task task) {
if (!inited.get()) {
init(task);
}
queue.add(request);
fileUrlWriter.println(serializeRequest(request));
}
@Override
public synchronized Request poll(Task task) {
if (!inited.get()) {
init(task);
}
fileCursorWriter.println(cursor.incrementAndGet());
return queue.poll();
}
@Override
public int getLeftRequestsCount(Task task) {
return queue.size();
}
@Override
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
protected String serializeRequest(Request request) {
return request.getUrl();
}
protected Request deserializeRequest(String line) {
return new Request(line);
}
}
我们可以看到,FileCacheQueueScheduler同样继承了DuplicateRemovedScheduler类,也就是说已经对URL去重做了处理;只要我们在使用的时候,指定一个文件路径,它就在程序关闭时,创建.urls.txt和.cursor.txt两个文件,分别存储已经抓取到的URL和抓取进度游标,这样即使程序中断,爬取工作进度也会被完整保存,程序再次启动时,加载这些数据,就可以继续之前的工作了。
一般来说,URL抓取进度的这些数据,用文件保存的形式,已经可以满足绝大部分场景使用了,但有时候我们在做多点爬虫分布式方案的时候,还需要结合数据库啊、Redis啊等等来实现的,毕竟文件形式的数据还是不太便于统筹管理和数据分析,接下来我们就尝试通过数据库,来保存爬取进度数据。
基于数据库的增量爬取
第一步:建表备用
首先,根据URL在数据库中的一些必要特点,设计对应的数据库表:
-- ----------------------------
-- Table structure for spider_grab_urls
-- ----------------------------
DROP TABLE IF EXISTS `spider_grab_urls`;
CREATE TABLE `spider_grab_urls` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`url` varchar(512) DEFAULT NULL,
`grab_status` varchar(16) DEFAULT NULL,
`utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
表spider_grab_urls即是我们程序用来存储读取URL的数据库表,其中url是我们要记录的url字符串;grab_status是我们要记录的的URL记录抓取状态,目前我们在测试阶段,只考虑记录初始和抓取成功两种情况;再就是记一个记录更新时间,用于程序再次启动时的一些操作判断标识。
第二步:Java 对象准备
接着,创建与之对应的Java 对象(POJO):
package cn.veiking.interior.model;
import java.time.LocalDateTime;
import cn.veiking.base.common.enums.GrabStatus;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author :Veiking
* @version :2020年12月16日
* 说明 :抓取链接信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SpiderGrabUrls {
private String id;
private String url;
private GrabStatus grabStatus;
private LocalDateTime utime;
public SpiderGrabUrls(String url) {
this.url = url;
this.grabStatus = GrabStatus.INITIAL;
}
}
这里,我们为了和具体爬虫页面数据区分,重新设立了一个interior包。咱们的Java 对象就在这个包下,同样,属性就是与库表对应的几个字段,我们使用lombok帮我们处理琐碎,此外再添加一个附带参数的构造函数,用以协助插入记录。
第三步:DAO接口实现
然后我们创建与之对应的DAO文件:
package cn.veiking.interior.dao;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import cn.veiking.base.common.enums.GrabStatus;
import cn.veiking.interior.model.SpiderGrabUrls;
/**
* @author :Veiking
* @version :2020年12月16日
* 说明 :抓取链接信息数据持久化接口
*/
@Mapper
public interface SpiderGrabUrlsDao {
/************** BASE **************/
// 批插入
@Insert(value=" ")
public boolean adds(@Param("modelList") List modelList);
// 单插入
@Insert(value=" INSERT INTO spider_grab_urls (`url`, `grab_status`) VALUES (#{model.url}, #{model.grabStatus}) ")
public boolean add(@Param("model") SpiderGrabUrls model);
// 据IDS批删
@Delete(value=" ")
public boolean deletes(@Param("ids") List ids);
// 据ID单删
@Delete(value=" DELETE FROM spider_grab_urls WHERE id = #{id} ")
public boolean delete(@Param("id") Integer id);
// 据IDS状态批改
@Update(value=" ")
public boolean updateStatus(@Param("grabStatus") GrabStatus grabStatus, @Param("ids") List ids);
// 据URL状态单改
@Update(value=" UPDATE spider_grab_urls SET `grab_status`=#{grabStatus} WHERE url=#{url} ")
public boolean updateStatusByUrl(@Param("grabStatus") GrabStatus grabStatus, @Param("url") String url);
// 据ID单查
@Select(value=" SELECT * FROM spider_grab_urls sgu WHERE sgu.id = #{id} ")
public SpiderGrabUrls queryById(@Param("id") Integer id);
// 按状态批查
@Select(value=" SELECT * from spider_grab_urls sgu WHERE sgu.grab_status = #{grabStatus} ORDER BY utime DESC ")
public List queryByStatus(@Param("grabStatus") GrabStatus grabStatus);
}
这里我们实现了一些增删改查方法,等候业务的调用,有一些接口暂时没用,不用关注。
第四步:VduplicateRemover完缮
VduplicateRemover我们添加针对属性urls的set、get方法,提供修改urls的入口,以供我们加载数据库的数据:
package cn.veiking.scheduler.component;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
/**
* @author :Veiking
* @version :2020年12月12日
* 说明 :处理URL重复问题
*/
public class VDuplicateRemover implements DuplicateRemover {
private Set urls = Collections.newSetFromMap(new ConcurrentHashMap());;
@Override
public boolean isDuplicate(Request request, Task task) {
String url = request.getUrl();
// 设置规则,我们假设这个编码的人物信息页面已经获取,故在抓取的时候,当作重复
// 1000058-刘昶
String celebrityCode = "1000058";
if(url.contains(celebrityCode)) {
return true;
}
// 如原Set没有则非重复
if(urls.add(url)) {
return false;
}
return true;
}
// get
public Set getUrls() {
return urls;
}
// set
public void setUrls(Set urls) {
this.urls = urls;
}
@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
}
第五步:创建DBQueueScheduler
结合我们之前实现过的Scheduler,参考FileCacheQueueScheduler的实现逻辑,我们创建DBQueueScheduler:
package cn.veiking.scheduler;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import cn.veiking.base.common.enums.GrabStatus;
import cn.veiking.base.common.logs.SimLogger;
import cn.veiking.interior.dao.SpiderGrabUrlsDao;
import cn.veiking.interior.model.SpiderGrabUrls;
import cn.veiking.scheduler.component.VDuplicateRemover;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
/**
* @author :Veiking
* @version :2020年12月18日
* 说明 :通过数据库处理待抓取URL,管理URL队列
*/
@Service
public class DBQueueScheduler extends VQueueScheduler{
SimLogger logger = new SimLogger(this.getClass());
@Autowired
SpiderGrabUrlsDao spiderGrabUrlsDao;
private Boolean inited = false;
private Set grabbedUrlsSet = new LinkedHashSet(); // 已抓取URL集合
private BlockingQueue queue = new LinkedBlockingQueue(); // 待抓取URL队列
private VDuplicateRemover duplicatedRemover = new VDuplicateRemover(); // 去重插件
@Override
public void push(Request request, Task task) {
if (!inited) {
initUrlsFromDB();
}
String url = serializeRequest(request);
logger.info("VQueueScheduler get [url:{}] ", url);
boolean isDuplicate = duplicatedRemover.isDuplicate(request, task); // 重复 //
boolean shouldReserved = this.shouldReserved(request); // 应保留的
boolean noNeedToRemoveDuplicate = this.noNeedToRemoveDuplicate(request); // 不需要删除的
if (shouldReserved || noNeedToRemoveDuplicate || !isDuplicate) {
logger.info("VQueueScheduler push to queue [url:{}]", url);
SpiderGrabUrls grabUrl = new SpiderGrabUrls(url);
spiderGrabUrlsDao.add(grabUrl); // 进入队列前,先将url入库
queue.add(request);
}
}
@Override
public synchronized Request poll(Task task) {
if (!inited) {
initUrlsFromDB();
}
return queue.poll();
}
/**
* 程序加载时,要初始化待抓取URL队列和已抓取URL集合
* 数据库URL信息载入程序
*/
private void initUrlsFromDB() {
// 已爬取URL集合
List grabbedUrls = spiderGrabUrlsDao.queryByStatus(GrabStatus.SUCCESS);
// 未爬取URL集合
List unGrabbedUrls = spiderGrabUrlsDao.queryByStatus(GrabStatus.INITIAL);
// 如没有未爬取URL,则取一已爬取URL为种子URL,继续
if(unGrabbedUrls.isEmpty() && !grabbedUrls.isEmpty()) {
SpiderGrabUrls url = grabbedUrls.remove(0); // 更新日期倒序,取首位即最新
unGrabbedUrls.add(url);
}
// 已抓取URL读入程序
for(SpiderGrabUrls grabbedUrl : grabbedUrls) {
grabbedUrlsSet.add(grabbedUrl.getUrl());
}
// 未抓取URL读入程序
for(SpiderGrabUrls unGrabbedUrl : unGrabbedUrls) {
queue.add(deserializeRequest(unGrabbedUrl.getUrl()));
grabbedUrlsSet.add(unGrabbedUrl.getUrl()); // 正常逻辑:进入待爬取队列的URL,已经添加至用于判断是否已爬取的集合
}
duplicatedRemover.setUrls(grabbedUrlsSet); // 加载已爬取链接
inited = true; //初始状态更新
logger.info("VQueueScheduler initUrlsFromDB to Scheduler success");
}
// 序列化,即webMagic的Request类转URL串
private String serializeRequest(Request request) {
return request.getUrl();
}
// 反序列化,即URL串转webMagic的Request类
private Request deserializeRequest(String line) {
return new Request(line);
}
}
这里,我们的DBQueueScheduler会在程序启动的时候,初始方法initUrlsFromDB()会从数据库读取URL信息,分别加载已抓取URL至grabbedUrlsSet,提供给去重插件VduplicateRemover判断使用;加载待抓取URL队列给queue,继续程序中断、停止之前的抓取工作。
这里考虑到一个特殊情况,就是待抓取数据没有成功保存,即从最新的一条已抓取记录,作为继续的种子URL,开始后续的抓取工作。
最后,为了每条URL数据处理完成后,实时更新URL的状态,我们需要稍微的修改下入库操作的Pipeline类:
package cn.veiking.spider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import cn.veiking.base.common.enums.GrabStatus;
import cn.veiking.base.common.logs.SimLogger;
import cn.veiking.biz.dao.AigufengCelebrityDao;
import cn.veiking.biz.model.AigufengCelebrity;
import cn.veiking.interior.dao.SpiderGrabUrlsDao;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;
/**
* @author :Veiking
* @version :2020年12月6日6
* 说明 :爱古风-人物信息数据持久化管道
*/
@Service
public class AigufengCelebrityPipeline implements Pipeline{
SimLogger logger = new SimLogger(this.getClass());
@Autowired
private AigufengCelebrityDao aigufengCelebrityDao;
@Autowired
SpiderGrabUrlsDao spiderGrabUrlsDao;
@Override
public void process(ResultItems resultItems, Task task) {
AigufengCelebrity model = resultItems.get("aigufengCelebrity");
if(model != null) {
this.aigufengCelebrityDao.add(model);
logger.info("AigufengCelebrityPipeline process insert into DB ... ");
}
//更新数据,即数据处理完毕需更新待抓取URL记录状态
Request request = resultItems.getRequest();
if(null != request) {
spiderGrabUrlsDao.updateStatusByUrl(GrabStatus.SUCCESS, request.getUrl()); // 去除队列前,更新url数据状态
}
}
}
这里,当页面的数据成功入库后,即更新对应URL的抓取状态。
第六步:程序测试
完成了以上操作,我们就可以进行测试,这里还需要在测试启动入口添加一行代码,添加对interior的扫描:
package cn.veiking;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
/**
* @author :Veiking
* @version :2020年11月30日
* 说明 :测试启动入口
*/
@SpringBootApplication
//开启通用注解扫描
@MapperScan(value = {"cn.veiking.biz.dao"})
@MapperScan(value = {"cn.veiking.interior.dao"}) // interior
@ComponentScan(value = {"cn.veiking.processor"})
@ComponentScan(value = {"cn.veiking.scheduler"})
@ComponentScan(value = {"cn.veiking.spider.*"})
public class StartTest {
public static void main(String[] args) {
SpringApplication.run(StartTest.class, args);
}
}
然后还要将测试入口的Scheduler替换为我们的DBQueueScheduler:
package cn.veiking.aigufeng;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.veiking.StartTest;
import cn.veiking.base.common.logs.SimLogger;
import cn.veiking.scheduler.DBQueueScheduler;
import cn.veiking.spider.AigufengCelebrityImgPipeline;
import cn.veiking.spider.AigufengCelebrityPipeline;
import cn.veiking.spider.AigufengCelebrityProcessor;
import us.codecraft.webmagic.Spider;
/**
* @author :Veiking
* @version :2020年12月8日
* 说明 :测试入口
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartTest.class)
public class AigufengCelebrityTest {
SimLogger logger = new SimLogger(this.getClass());
@Autowired
private AigufengCelebrityPipeline aigufengCelebrityPipeline;
@Autowired
private AigufengCelebrityImgPipeline aigufengCelebrityImgPipeline;
@Autowired
private DBQueueScheduler dbQueueScheduler;
@Autowired
private AigufengCelebrityProcessor aigufengCelebrityProcessor;
private static final String StartUrl = "http://www.aigufeng.com/celebrity/article-1000056/page.html";
@Test
public void testSpider() {
long startTime, endTime;
logger.info("AigufengCelebrityTest testSpider [start={}] ", "开始爬取数据");
startTime = System.currentTimeMillis();
Spider.create(aigufengCelebrityProcessor)
.addUrl(StartUrl)
.addPipeline(aigufengCelebrityPipeline)
.addPipeline(aigufengCelebrityImgPipeline) // 添加图片下载管道
.setScheduler(dbQueueScheduler) // 设置数据库存储URL实现增量爬取的Scheduler
.thread(1)
.run();
endTime = System.currentTimeMillis();
logger.info("AigufengCelebrityTest testSpider [end={}] ", "爬取结束,耗时约" + ((endTime - startTime) / 1000) + "秒");
}
}
好了,一切就绪,开始测试,这次我们要运行两次程序:
根据之前的逻辑,第一次会插入11条数据,在执行完第十条记录的时候,程序会被中断;运行完毕,我们可以看到库里的数据:

我们看到,第一次运行完毕,ID编号为11的记录确实未执行抓取。程序没问题的话,我们进行第二次的运行,应该会接着第一次的记录继续。
我们一边运行程序,一边刷数据库,看到插入的记录:

果不其然,如料所愿,我们的程序是接着第11条记录继续抓取的。
至此,我们通过借助数据库进行数据管理、实现增量爬取的功能已经完美实现。
总结
通过前面这几篇关于爬虫框架WebMagic的开发,我们完成了对WebMagic核心组件PageProcessor、Pipeline、Scheduler的研究学习,实现了对目标网页的数据爬取,并成功实现对应数据的入库、图片下载,接着又实现了规则自己定义的去重,并实现了基于数据库存储的增量爬取。现在我们已经可以设计开发我们自己的爬虫程序了,当然,具体的需求场景,还是有很多细节问题需要去考虑的,任重道远路且长。
关于爬虫,还有一些技术之外道德层面的东西,我们在做爬取操作的时候,不能违背目标网站的意愿,如果对方不欢迎甚至拒绝我们爬取,我们尽量不要去冒犯;同时也要充分考虑目标网站的承受能力,不要因我们的爬取操作导致对方服务器承受不住而宕机。
后面我们在使用WebMagic的开发过程中,如遇到一些比较典型的学习场景,我们就拿出来讲一讲。