import re
import logging
from scrapy.spiders import Spider
from scrapy.http import Request, XmlResponse
from scrapy.utils.sitemap import Sitemap, sitemap_urls_from_robots
from scrapy.utils.gz import gunzip, gzip_magic_number
logger = logging.getLogger(__name__)
[文档]class SitemapSpider(Spider):
sitemap_urls = ()
sitemap_rules = [('', 'parse')]
sitemap_follow = ['']
sitemap_alternate_links = False
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._cbs = []
for r, c in self.sitemap_rules:
if isinstance(c, str):
c = getattr(self, c)
self._cbs.append((regex(r), c))
self._follow = [regex(x) for x in self.sitemap_follow]
def start_requests(self):
for url in self.sitemap_urls:
yield Request(url, self._parse_sitemap)
[文档] def sitemap_filter(self, entries):
"""This method can be used to filter sitemap entries by their
attributes, for example, you can filter locs with lastmod greater
than a given date (see docs).
"""
for entry in entries:
yield entry
def _parse_sitemap(self, response):
if response.url.endswith('/robots.txt'):
for url in sitemap_urls_from_robots(response.text, base_url=response.url):
yield Request(url, callback=self._parse_sitemap)
else:
body = self._get_sitemap_body(response)
if body is None:
logger.warning("Ignoring invalid sitemap: %(response)s",
{'response': response}, extra={'spider': self})
return
s = Sitemap(body)
it = self.sitemap_filter(s)
if s.type == 'sitemapindex':
for loc in iterloc(it, self.sitemap_alternate_links):
if any(x.search(loc) for x in self._follow):
yield Request(loc, callback=self._parse_sitemap)
elif s.type == 'urlset':
for loc in iterloc(it, self.sitemap_alternate_links):
for r, c in self._cbs:
if r.search(loc):
yield Request(loc, callback=c)
break
def _get_sitemap_body(self, response):
"""Return the sitemap body contained in the given response,
or None if the response is not a sitemap.
"""
if isinstance(response, XmlResponse):
return response.body
elif gzip_magic_number(response):
return gunzip(response.body)
# actual gzipped sitemap files are decompressed above ;
# if we are here (response body is not gzipped)
# and have a response for .xml.gz,
# it usually means that it was already gunzipped
# by HttpCompression middleware,
# the HTTP response being sent with "Content-Encoding: gzip"
# without actually being a .xml.gz file in the first place,
# merely XML gzip-compressed on the fly,
# in other word, here, we have plain XML
elif response.url.endswith('.xml') or response.url.endswith('.xml.gz'):
return response.body
def regex(x):
if isinstance(x, str):
return re.compile(x)
return x
def iterloc(it, alt=False):
for d in it:
yield d['loc']
# Also consider alternate URLs (xhtml:link rel="alternate")
if alt and 'alternate' in d:
yield from d['alternate']