Python网络爬虫技术:从入门到精通
Orion K Lv6

Python网络爬虫技术:从入门到精通

网络爬虫是Python最流行的应用领域之一,它允许我们自动化地从网站获取数据。无论是数据分析、机器学习还是自动化任务,网络爬虫都是一项非常有用的技能。在这篇文章中,我将带你从基础到高级,全面掌握Python网络爬虫技术。

网络爬虫基础

什么是网络爬虫?

网络爬虫(Web Scraping)是一种通过程序自动获取网页内容的技术。它可以模拟人类浏览网页的行为,访问网站,提取数据,并将其保存为结构化格式。

爬虫的合法性和道德考量

在开始爬取网站之前,需要考虑以下几点:

  1. 查看robots.txt文件:这个文件定义了网站允许爬虫访问的部分
  2. 遵守网站的使用条款:某些网站明确禁止爬虫
  3. 控制请求频率:过于频繁的请求可能会对网站造成负担
  4. 尊重版权:获取的数据可能受版权保护
  5. 考虑使用官方API:如果网站提供API,优先使用API而不是爬虫

网络爬虫的基本流程

  1. 发送HTTP请求获取网页内容
  2. 解析HTML或XML提取所需数据
  3. 处理和存储数据
  4. 根据需要继续爬取其他页面

基本爬虫工具

Requests库

requests是Python最流行的HTTP客户端库,它使发送HTTP请求变得简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import requests

# 发送GET请求
response = requests.get('https://www.example.com')

# 检查状态码
print(f"状态码: {response.status_code}")

# 查看响应内容
print(response.text[:100]) # 打印前100个字符

# 发送带参数的GET请求
params = {'q': 'python', 'page': 1}
response = requests.get('https://www.example.com/search', params=params)
print(response.url) # 打印完整URL

# 发送POST请求
data = {'username': 'user', 'password': 'pass'}
response = requests.post('https://www.example.com/login', data=data)

# 处理JSON响应
response = requests.get('https://api.github.com/users/python')
user_data = response.json()
print(f"GitHub用户名: {user_data['login']}")
print(f"仓库数量: {user_data['public_repos']}")

BeautifulSoup库

BeautifulSoup是一个强大的HTML和XML解析库,它可以帮助我们从网页中提取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from bs4 import BeautifulSoup
import requests

# 获取网页内容
response = requests.get('https://www.example.com')
html_content = response.text

# 创建BeautifulSoup对象
soup = BeautifulSoup(html_content, 'html.parser')

# 查找元素
title = soup.title
print(f"页面标题: {title.string}")

# 查找所有链接
links = soup.find_all('a')
for link in links[:5]: # 打印前5个链接
print(f"链接文本: {link.text}, URL: {link.get('href')}")

# 使用CSS选择器
main_content = soup.select('div.main-content')
headings = soup.select('h1, h2, h3')

# 提取特定元素
article = soup.find('article')
if article:
article_title = article.find('h1').text
article_paragraphs = article.find_all('p')
article_text = '\n'.join([p.text for p in article_paragraphs])
print(f"文章标题: {article_title}")
print(f"文章内容: {article_text[:200]}...") # 打印前200个字符

lxml库

lxml是一个高性能的HTML和XML解析库,它比BeautifulSoup更快,但API不太友好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from lxml import etree
import requests

# 获取网页内容
response = requests.get('https://www.example.com')
html_content = response.text

# 解析HTML
html = etree.HTML(html_content)

# 使用XPath提取数据
title = html.xpath('//title/text()')
print(f"页面标题: {title[0] if title else 'No title'}")

# 提取所有链接
links = html.xpath('//a/@href')
for link in links[:5]: # 打印前5个链接
print(f"链接: {link}")

# 提取特定元素
articles = html.xpath('//article')
for article in articles:
article_title = article.xpath('.//h1/text()')
article_content = article.xpath('.//p/text()')
print(f"文章标题: {article_title[0] if article_title else 'No title'}")
print(f"文章内容: {''.join(article_content)[:200]}...") # 打印前200个字符

中级爬虫技术

处理表单和登录

许多网站需要登录才能访问内容。以下是如何使用requests处理登录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests

# 创建会话对象
session = requests.Session()

# 获取登录页面(可能包含CSRF令牌)
login_url = 'https://www.example.com/login'
response = session.get(login_url)

# 假设我们需要从页面提取CSRF令牌
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
csrf_token = soup.find('input', {'name': 'csrf_token'})['value']

# 准备登录数据
login_data = {
'username': 'your_username',
'password': 'your_password',
'csrf_token': csrf_token
}

# 发送登录请求
response = session.post(login_url, data=login_data)

# 检查是否登录成功
if 'Welcome' in response.text or response.url != login_url:
print("登录成功!")

# 访问需要登录的页面
protected_url = 'https://www.example.com/dashboard'
response = session.get(protected_url)
print(f"Dashboard页面标题: {BeautifulSoup(response.text, 'html.parser').title.string}")
else:
print("登录失败!")

处理Cookie

Cookie对于维护会话状态很重要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests

# 手动设置Cookie
cookies = {'session_id': '12345', 'user_id': '67890'}
response = requests.get('https://www.example.com', cookies=cookies)

# 从响应中获取Cookie
response = requests.get('https://www.example.com')
print(response.cookies['session_id'])

# 使用会话自动处理Cookie
session = requests.Session()
session.get('https://www.example.com') # 这将设置Cookie
response = session.get('https://www.example.com/profile') # 使用之前设置的Cookie

处理JavaScript渲染的页面

许多现代网站使用JavaScript动态加载内容,这对传统爬虫构成了挑战。

使用Selenium

Selenium可以自动化浏览器,执行JavaScript:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import time

# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument("--headless") # 无头模式,不显示浏览器窗口

# 初始化WebDriver
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

# 访问网页
driver.get('https://www.example.com')

# 等待JavaScript执行
time.sleep(2) # 简单等待
# 或者使用显式等待
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "dynamic-content"))
)

# 获取渲染后的页面内容
html_content = driver.page_source

# 提取数据
elements = driver.find_elements(By.CSS_SELECTOR, '.item')
for element in elements:
print(element.text)

# 与页面交互
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python')
search_box.submit()

# 关闭浏览器
driver.quit()

使用Requests-HTML

requests-htmlrequests的扩展,支持JavaScript渲染:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from requests_html import HTMLSession

# 创建会话
session = HTMLSession()

# 获取页面
response = session.get('https://www.example.com')

# 渲染JavaScript
response.html.render()

# 提取数据
elements = response.html.find('.item')
for element in elements:
print(element.text)

处理分页

许多网站将内容分成多个页面,我们需要遍历这些页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import requests
from bs4 import BeautifulSoup

base_url = 'https://www.example.com/products?page='
all_products = []

# 遍历前5页
for page_num in range(1, 6):
url = base_url + str(page_num)
print(f"爬取页面: {url}")

response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

# 提取产品信息
products = soup.select('.product-item')
for product in products:
product_name = product.select_one('.product-name').text.strip()
product_price = product.select_one('.product-price').text.strip()
all_products.append({
'name': product_name,
'price': product_price
})

# 可选:检查是否有下一页
next_button = soup.select_one('.pagination .next')
if not next_button or 'disabled' in next_button.get('class', []):
print("已到达最后一页")
break

print(f"共爬取 {len(all_products)} 个产品")

高级爬虫技术

使用Scrapy框架

Scrapy是一个强大的爬虫框架,适合大规模爬虫项目:

1
2
3
4
5
6
7
8
9
# 安装Scrapy
pip install scrapy

# 创建新项目
scrapy startproject bookstore
cd bookstore

# 创建爬虫
scrapy genspider books example.com

编辑爬虫文件 bookstore/spiders/books.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import scrapy

class BooksSpider(scrapy.Spider):
name = 'books'
allowed_domains = ['books.toscrape.com']
start_urls = ['http://books.toscrape.com/']

def parse(self, response):
# 提取所有书籍
books = response.css('article.product_pod')
for book in books:
yield {
'title': book.css('h3 a::attr(title)').get(),
'price': book.css('p.price_color::text').get(),
'rating': book.css('p.star-rating::attr(class)').get().split()[-1]
}

# 处理分页
next_page = response.css('li.next a::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)

运行爬虫并保存结果:

1
scrapy crawl books -o books.json

处理反爬虫机制

网站通常会实施反爬虫措施,以下是一些应对策略:

1. 设置请求头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import requests
import random

# 常见User-Agent列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
]

# 随机选择User-Agent
headers = {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1', # Do Not Track
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}

response = requests.get('https://www.example.com', headers=headers)

2. 控制请求频率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests
import time
import random

urls = ['https://www.example.com/page1', 'https://www.example.com/page2', '...']

for url in urls:
response = requests.get(url)
print(f"爬取 {url}, 状态码: {response.status_code}")

# 随机延迟1-5秒
delay = random.uniform(1, 5)
print(f"等待 {delay:.2f} 秒...")
time.sleep(delay)

3. 使用代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests

proxies = {
'http': 'http://10.10.10.10:8000',
'https': 'http://10.10.10.10:8000',
}

response = requests.get('https://www.example.com', proxies=proxies)

# 使用代理池
proxy_pool = [
{'http': 'http://proxy1.example.com:8000'},
{'http': 'http://proxy2.example.com:8000'},
{'http': 'http://proxy3.example.com:8000'}
]

import random
response = requests.get('https://www.example.com', proxies=random.choice(proxy_pool))

4. 处理验证码

对于验证码,可以使用OCR库或验证码识别服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from PIL import Image
import pytesseract
import requests
from io import BytesIO

# 获取验证码图片
response = requests.get('https://www.example.com/captcha.php')
img = Image.open(BytesIO(response.content))

# 使用pytesseract识别验证码
captcha_text = pytesseract.image_to_string(img)
print(f"识别的验证码: {captcha_text}")

# 提交表单时包含验证码
form_data = {
'username': 'user',
'password': 'pass',
'captcha': captcha_text
}
response = requests.post('https://www.example.com/login', data=form_data)

数据存储

爬取的数据需要妥善存储:

1. 保存为CSV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import csv

# 准备数据
data = [
{'name': 'Product 1', 'price': '$19.99', 'rating': '4.5'},
{'name': 'Product 2', 'price': '$29.99', 'rating': '3.8'},
{'name': 'Product 3', 'price': '$15.49', 'rating': '4.2'}
]

# 保存为CSV
with open('products.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['name', 'price', 'rating']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

writer.writeheader()
for item in data:
writer.writerow(item)

2. 保存为JSON

1
2
3
4
5
import json

# 保存为JSON
with open('products.json', 'w', encoding='utf-8') as jsonfile:
json.dump(data, jsonfile, ensure_ascii=False, indent=4)

3. 保存到数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sqlite3

# 连接到SQLite数据库
conn = sqlite3.connect('products.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price TEXT NOT NULL,
rating TEXT NOT NULL
)
''')

# 插入数据
for item in data:
cursor.execute(
'INSERT INTO products (name, price, rating) VALUES (?, ?, ?)',
(item['name'], item['price'], item['rating'])
)

# 提交更改并关闭连接
conn.commit()
conn.close()

实际爬虫案例

案例1:爬取新闻网站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import requests
from bs4 import BeautifulSoup
import csv
from datetime import datetime

def scrape_news():
# 目标URL
url = 'https://news.example.com'

# 发送请求
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

# 提取新闻文章
articles = soup.select('article.news-item')
news_data = []

for article in articles:
# 提取标题
title = article.select_one('h2.title').text.strip()

# 提取链接
link = article.select_one('a')['href']
if not link.startswith('http'):
link = url + link

# 提取摘要
summary = article.select_one('p.summary')
summary = summary.text.strip() if summary else "无摘要"

# 提取发布日期
date = article.select_one('span.date')
date = date.text.strip() if date else "未知日期"

# 提取分类
category = article.select_one('span.category')
category = category.text.strip() if category else "未分类"

# 添加到数据列表
news_data.append({
'title': title,
'link': link,
'summary': summary,
'date': date,
'category': category,
'scraped_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})

# 保存数据
with open('news.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['title', 'link', 'summary', 'date', 'category', 'scraped_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in news_data:
writer.writerow(item)

print(f"已爬取 {len(news_data)} 条新闻并保存到 news.csv")

if __name__ == "__main__":
scrape_news()

案例2:爬取电商网站产品信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests
from bs4 import BeautifulSoup
import json
import time
import random

class EcommerceSpider:
def __init__(self):
self.base_url = 'https://www.example-shop.com/products'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.products = []

def get_page(self, url):
"""获取页面内容"""
time.sleep(random.uniform(1, 3)) # 随机延迟
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return BeautifulSoup(response.text, 'html.parser')
else:
print(f"请求失败: {response.status_code}")
return None

def parse_product_list(self, soup):
"""解析产品列表页"""
product_cards = soup.select('.product-card')
product_links = []

for card in product_cards:
link = card.select_one('a.product-link')['href']
if not link.startswith('http'):
link = 'https://www.example-shop.com' + link
product_links.append(link)

return product_links

def parse_product_detail(self, url):
"""解析产品详情页"""
soup = self.get_page(url)
if not soup:
return None

try:
product = {
'url': url,
'name': soup.select_one('h1.product-name').text.strip(),
'price': soup.select_one('span.price').text.strip(),
'description': soup.select_one('div.description').text.strip(),
'rating': soup.select_one('div.rating').text.strip() if soup.select_one('div.rating') else 'No rating',
'reviews_count': soup.select_one('span.reviews-count').text.strip() if soup.select_one('span.reviews-count') else '0',
'availability': soup.select_one('div.availability').text.strip() if soup.select_one('div.availability') else 'Unknown',
'images': [img['src'] for img in soup.select('div.product-images img')],
'specifications': {}
}

# 提取规格
specs_table = soup.select_one('table.specifications')
if specs_table:
rows = specs_table.select('tr')
for row in rows:
cols = row.select('td')
if len(cols) >= 2:
key = cols[0].text.strip()
value = cols[1].text.strip()
product['specifications'][key] = value

return product
except Exception as e:
print(f"解析产品详情出错: {e}")
return None

def scrape(self, pages=3):
"""爬取指定页数的产品"""
for page in range(1, pages + 1):
page_url = f"{self.base_url}?page={page}"
print(f"爬取页面: {page_url}")

soup = self.get_page(page_url)
if not soup:
continue

product_links = self.parse_product_list(soup)
print(f"找到 {len(product_links)} 个产品链接")

for link in product_links:
print(f"爬取产品: {link}")
product = self.parse_product_detail(link)
if product:
self.products.append(product)

# 保存结果
with open('products.json', 'w', encoding='utf-8') as f:
json.dump(self.products, f, ensure_ascii=False, indent=4)

print(f"已爬取 {len(self.products)} 个产品并保存到 products.json")

if __name__ == "__main__":
spider = EcommerceSpider()
spider.scrape(pages=3)

案例3:使用Scrapy爬取GitHub仓库信息

首先创建Scrapy项目:

1
2
3
scrapy startproject github_scraper
cd github_scraper
scrapy genspider repos github.com

编辑爬虫文件 github_scraper/spiders/repos.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import scrapy
from scrapy import FormRequest

class ReposSpider(scrapy.Spider):
name = 'repos'
allowed_domains = ['github.com']
start_urls = ['https://github.com/login']

def parse(self, response):
# 提取CSRF令牌
token = response.css('input[name="authenticity_token"]::attr(value)').get()

# 提交登录表单
return FormRequest.from_response(
response,
formdata={
'login': 'your_username',
'password': 'your_password',
'authenticity_token': token
},
callback=self.after_login
)

def after_login(self, response):
# 检查登录是否成功
if 'Sign out' in response.text:
self.log("登录成功!")
# 访问Python组织的仓库页面
return scrapy.Request('https://github.com/python', callback=self.parse_org)
else:
self.log("登录失败!")

def parse_org(self, response):
# 访问仓库标签页
repos_url = response.css('a[data-tab-item="repositories"]::attr(href)').get()
return response.follow(repos_url, callback=self.parse_repos)

def parse_repos(self, response):
# 提取仓库信息
for repo in response.css('li.Box-row'):
yield {
'name': repo.css('a[itemprop="name codeRepository"]::text').get().strip(),
'description': repo.css('p[itemprop="description"]::text').get('').strip(),
'language': repo.css('span[itemprop="programmingLanguage"]::text').get('').strip(),
'stars': repo.css('a.Link--muted[href$="/stargazers"]::text').get('').strip(),
'forks': repo.css('a.Link--muted[href$="/forks"]::text').get('').strip(),
'updated': repo.css('relative-time::attr(datetime)').get('')
}

# 处理分页
next_page = response.css('a.next_page::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse_repos)

创建项目设置文件 github_scraper/settings.py

1
2
3
4
5
6
7
# 添加以下设置
ROBOTSTXT_OBEY = False
DOWNLOAD_DELAY = 2
COOKIES_ENABLED = True

# 添加User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'

运行爬虫:

1
scrapy crawl repos -o github_repos.json

爬虫进阶技巧

1. 使用异步爬虫提高效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def parse(html):
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "No title"
return title

async def scrape(url):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
title = await parse(html)
print(f"{url} - {title}")

async def main():
urls = [
'https://www.example.com',
'https://www.example.org',
'https://www.example.net',
'https://www.example.edu',
'https://www.example.io'
]

tasks = [scrape(url) for url in urls]
await asyncio.gather(*tasks)

if __name__ == "__main__":
asyncio.run(main())

2. 使用IP代理池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import requests
from bs4 import BeautifulSoup
import random
import time

class ProxyManager:
def __init__(self):
self.proxies = []
self.current_proxy = None
self.max_failures = 3
self.failure_count = 0

def get_proxy_list(self):
"""从代理网站获取代理列表"""
try:
response = requests.get('https://www.free-proxy-list.net/')
soup = BeautifulSoup(response.text, 'html.parser')
table = soup.find('table', {'id': 'proxylisttable'})

for row in table.tbody.find_all('tr'):
cols = row.find_all('td')
if cols[6].text.strip() == 'yes': # HTTPS代理
proxy = {
'ip': cols[0].text.strip(),
'port': cols[1].text.strip(),
'https': 'https://' + cols[0].text.strip() + ':' + cols[1].text.strip()
}
self.proxies.append(proxy)

print(f"获取到 {len(self.proxies)} 个代理")
except Exception as e:
print(f"获取代理列表失败: {e}")

def get_random_proxy(self):
"""获取随机代理"""
if not self.proxies:
self.get_proxy_list()

if self.proxies:
self.current_proxy = random.choice(self.proxies)
return {
'https': self.current_proxy['https']
}
return None

def handle_request_error(self):
"""处理请求错误"""
self.failure_count += 1
if self.failure_count >= self.max_failures:
if self.current_proxy in self.proxies:
self.proxies.remove(self.current_proxy)
self.current_proxy = None
self.failure_count = 0

# 使用代理管理器
proxy_manager = ProxyManager()

def scrape_with_proxy(url):
"""使用代理爬取网页"""
max_retries = 5
retries = 0

while retries < max_retries:
proxy = proxy_manager.get_random_proxy()
if not proxy:
print("没有可用代理")
break

try:
print(f"使用代理: {proxy['https']}")
response = requests.get(url, proxies=proxy, timeout=10)
if response.status_code == 200:
return response.text
except Exception as e:
print(f"请求失败: {e}")
proxy_manager.handle_request_error()

retries += 1
time.sleep(2)

print("所有重试都失败了")
return None

3. 使用User-Agent池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
import random

class UserAgentManager:
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59'
]

def get_random_user_agent(self):
"""获取随机User-Agent"""
return random.choice(self.user_agents)

# 使用User-Agent管理器
ua_manager = UserAgentManager()

def scrape_with_random_ua(url):
"""使用随机User-Agent爬取网页"""
headers = {
'User-Agent': ua_manager.get_random_user_agent()
}

try:
response = requests.get(url, headers=headers)
return response.text
except Exception as e:
print(f"请求失败: {e}")
return None

4. 使用爬虫调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import time
import threading
import queue
import requests
from bs4 import BeautifulSoup

class Scheduler:
def __init__(self, num_threads=5, delay=2):
self.queue = queue.Queue()
self.results = []
self.num_threads = num_threads
self.delay = delay
self.lock = threading.Lock()

def add_task(self, url):
"""添加爬取任务"""
self.queue.put(url)

def worker(self):
"""工作线程"""
while True:
url = self.queue.get()
if url is None:
break

try:
print(f"爬取: {url}")
response = requests.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else "No title"

with self.lock:
self.results.append({
'url': url,
'title': title,
'status': response.status_code
})
except Exception as e:
print(f"爬取 {url} 失败: {e}")

self.queue.task_done()
time.sleep(self.delay)

def run(self):
"""运行调度器"""
threads = []

# 创建工作线程
for _ in range(self.num_threads):
thread = threading.Thread(target=self.worker)
thread.start()
threads.append(thread)

# 等待队列处理完成
self.queue.join()

# 停止工作线程
for _ in range(self.num_threads):
self.queue.put(None)

for thread in threads:
thread.join()

return self.results

# 使用调度器
scheduler = Scheduler(num_threads=3, delay=2)

# 添加任务
urls = [
'https://www.example.com',
'https://www.example.org',
'https://www.example.net',
'https://www.example.edu',
'https://www.example.io',
'https://www.example.dev',
'https://www.example.app'
]

for url in urls:
scheduler.add_task(url)

# 运行调度器
results = scheduler.run()
print(f"爬取了 {len(results)} 个网页")
for result in results:
print(f"{result['url']} - {result['title']} (状态码: {result['status']})")

爬虫的法律和道德问题

法律考量

  1. 遵守网站的服务条款:许多网站在服务条款中明确禁止爬虫
  2. 尊重robots.txt:这是网站告诉爬虫哪些页面可以爬取的标准
  3. 版权法:爬取的内容可能受版权保护
  4. 数据保护法规:如果爬取个人数据,需要遵守GDPR等数据保护法规
  5. 计算机滥用法:过度爬取可能被视为对服务器的攻击

道德考量

  1. 不要对网站造成负担:控制请求频率
  2. 识别你的爬虫:在User-Agent中标明你的爬虫身份
  3. 缓存数据:避免重复请求相同的内容
  4. 尊重隐私:不要爬取和存储个人敏感信息
  5. 考虑使用API:如果网站提供API,优先使用API而不是爬虫

最佳实践

  1. 阅读网站的服务条款和robots.txt
  2. 控制爬取速度:使用延迟和限速
  3. 缓存结果:避免重复请求
  4. 处理错误:优雅地处理异常和错误
  5. 监控爬虫:确保它按预期工作
  6. 定期更新爬虫:网站结构可能会改变

结论

网络爬虫是一个强大的工具,可以帮助我们自动化地从网络上获取数据。Python提供了丰富的库和框架,使爬虫开发变得相对简单。从基本的requests和BeautifulSoup,到高级的Scrapy框架,再到处理JavaScript渲染页面的Selenium,Python生态系统为各种爬虫需求提供了解决方案。

然而,使用爬虫时,我们必须记住法律和道德责任。尊重网站的服务条款,遵守robots.txt,控制爬取速度,这些都是负责任的爬虫行为。

希望这篇文章能帮助你理解网络爬虫的基础知识和高级技术,并在实际项目中负责任地应用这些技术。

你有什么关于Python网络爬虫的问题或经验分享吗?欢迎在评论中讨论!

本站由 提供部署服务