scrapy笔记通用配置

python

scrapy">1,调试scrapy

在工作中经常会用到调试功能, 下面是一种scrapy提供的方法, 代码如下:

from scrapy.crawler import CrawlerProcess

from scrapy.utils.project import get_project_settings

...

if __main__ == "__main__":

process = CrawlerProcess(get_project_settings())

process.crawl("demo") # 你需要将此处的spider_name替换为你自己的爬虫名称

process.start()

2,读取数据库,获取起始链接(或构造起始链接)

有时候, scrapy start_urls 需要从数据库获取

def start_requests(self):

db = pymysql.connect(host=settings.MYSQL_HOST,

user=settings.MYSQL_USER,

password=settings.MYSQL_PASSWORD,

database=settings.MYSQL_DATABASE,

port=settings.MYSQL_PORT

)

with db.cursor() as cursor:

cursor.execute("select url from url_table")

result = cursor.fetchall()

db.close()

for url in result:

# yield scrapy.Request(f"http://www.baidu.com/s?wd={url[0]}") # 自己通过数据库返回的字段进行构造链接

yield scrapy.Request(url[0])

3,动态添加待爬url

同2, 将mysql部分去掉, 修改自己的就可以

4,常用设置, 自定义设置

有时候需要覆盖settings文件中的设置, 就在spider文件中写custom_settings, 然后在里面配置一些信息就可以了

class MySpider(scrapy.Spider):

...

custom_settings = {

"DEPTH_LIMIT": 5, # 爬取层速

"JOBDIR": "路径" # 在指定路径保存当前的进度,当中途退出爬虫后,再次运行时可以从这个文件中读取之前的进度,继续爬取.

"HTTPERROR_ALLOWED_CODES" : [302] # 302为暂时性的链接重置,当出现302时,应该继续深入爬取内容,所以我们不忽略302的报错,一般会设置为[301,302],301为永久性的链接重置.

"AUTOTHROTTLE_ENABLED" : True # 自动限速,可以参考网址: https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/autothrottle.html

}

更多配置请参考主页另一篇文章

5,常用pipeline

  • mysql pipeline

    • 存在更新, 不存在插入

      class MysqlUpdatePipeline:

      def __init__(self):

      self.database = settings.MYSQL_DATABASE

      self.port = settings.MYSQL_PORT

      self.password = settings.MYSQL_PASSWORD

      self.user = settings.MYSQL_USER

      self.host = settings.MYSQL_HOST

      self.cursor = None

      self.db = None

      def open_spider(self, spider):

      self.db = pymysql.connect(host=self.host,

      user=self.user,

      password=self.password,

      database=self.database,

      port=self.port

      )

      self.cursor = self.db.cursor()

      def process_item(self, item, spider):

      data = dict(item)

      keys = ", ".join(data.keys())

      values = ", ".join(["% s"] * len(data))

      update = ", ".join(

      [f"{tuple(item.keys())[i]} = % s" for i in range(len(data))])

      sql = "insert into % s (% s) values (% s) on duplicate key update % s" % (

      item.table, keys, values, update)

      try:

      # 检查连接是否断开,如果断开就进行重连

      self.db.ping(reconnect=True)

      print("mysql超时重连")

      # 使用 execute() 执行sql

      self.cursor.execute(sql, tuple(data.values()) * 2)

      # 提交事务

      self.db.commit()

      except Exception as e:

      print("操作出现错误:{}".format(e))

      # 回滚所有更改

      self.db.rollback()

      return item

      def close_spider(self, spider):

      self.db.close()

    • 直接插入, 操作跟更新差不多, 就将process_item中代码覆盖或者修改即可

      class MysqlPipeline:

      ...

      def process_item(self, item, spider):

      data = dict(item)

      keys = ", ".join(data.keys())

      values = ", ".join(["% s"] * len(data))

      sql = "insert into % s (% s) values (% s) on duplicate key update % s" % (

      item.table, keys, values)

      try:

      # 检查连接是否断开,如果断开就进行重连

      self.db.ping(reconnect=True)

      self.cursor.execute(sql, data.values())

      self.db.commit()

      except Exception as e:

      print("操作出现错误:{}".format(e))

      self.db.rollback()

      return item

  • mongo pipeline

    • 插入

      class MongoPipeline:

      def __init__(self):

      self.client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT)

      self.client.admin.authenticate(settings.MONGO_USER, settings.MONGO_PWD)

      self.db = self.client[settings.MONGO_DB]

      def process_item(self, item, spider):

      self.db[item.table].insert(dict(item))

      # self.db[item.table].update_one({"id": item["id"]}, {"$set": item}, upsert=True)

      return item

      def close_spider(self, spider):

      self.client.close()

    • 存在则更新, 不存在则插入, 其他代码同上

      class MongoPipeline:

      def process_item(self, item, spider):

      # 根据item中唯一id进行判断, 实际运用中, 需要自己更改这个判断字段

      self.db[item.table].update_one({"id": item["id"]}, {"$set": item}, upsert=True)

      return item

  • scrapy redis 主从分布式-master_pipeline

    • class MasterRedisPipeline:

      def __init__(self):

      self.redis_table = settings.REDIS_KEY # 选择表

      self.redis_db = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,

      db=settings.REDIS_DB, password=settings.REDIS_PASSWORD) # redis数据库连接信息

      def process_item(self, item, spider):

      # 存储字段自己可以根据自己需要单独设置, 这里设置字段名为url

      res = self.redis_db.lpush(self.redis_table, item["url"])

      return item

      def close_spider(self, spider):

      self.redis_db.close()

  • 图片下载中间件

    • 需要继承ImagesPipeline

    • 需要在settings中指明存储路径, IMAGES_STORE = "./images"

      class ImagePipeline(ImagesPipeline):

      def file_path(self, request, response=None, info=None):

      url = request.url

      file_name = url.split("/")[-1]

      return file_name

      def item_completed(self, results, item, info):

      image_paths = [x["path"] for ok, x in results if ok]

      if not image_paths:

      raise DropItem("Image Downloaded Failed")

      return item

      def get_media_requests(self, item, info):

      # 根据自己需求设置url字段

      yield Request(item["url"])

以上是 scrapy笔记通用配置 的全部内容, 来源链接: utcz.com/z/531132.html

回到顶部