Python网络爬虫技术:从入门到精通 网络爬虫是Python最流行的应用领域之一,它允许我们自动化地从网站获取数据。无论是数据分析、机器学习还是自动化任务,网络爬虫都是一项非常有用的技能。在这篇文章中,我将带你从基础到高级,全面掌握Python网络爬虫技术。
网络爬虫基础 什么是网络爬虫? 网络爬虫(Web Scraping)是一种通过程序自动获取网页内容的技术。它可以模拟人类浏览网页的行为,访问网站,提取数据,并将其保存为结构化格式。
爬虫的合法性和道德考量 在开始爬取网站之前,需要考虑以下几点:
查看robots.txt文件 :这个文件定义了网站允许爬虫访问的部分
遵守网站的使用条款 :某些网站明确禁止爬虫
控制请求频率 :过于频繁的请求可能会对网站造成负担
尊重版权 :获取的数据可能受版权保护
考虑使用官方API :如果网站提供API,优先使用API而不是爬虫
网络爬虫的基本流程
发送HTTP请求获取网页内容
解析HTML或XML提取所需数据
处理和存储数据
根据需要继续爬取其他页面
基本爬虫工具 Requests库 requests是Python最流行的HTTP客户端库,它使发送HTTP请求变得简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import requestsresponse = requests.get('https://www.example.com' ) print (f"状态码: {response.status_code} " )print (response.text[:100 ]) params = {'q' : 'python' , 'page' : 1 } response = requests.get('https://www.example.com/search' , params=params) print (response.url) data = {'username' : 'user' , 'password' : 'pass' } response = requests.post('https://www.example.com/login' , data=data) response = requests.get('https://api.github.com/users/python' ) user_data = response.json() print (f"GitHub用户名: {user_data['login' ]} " )print (f"仓库数量: {user_data['public_repos' ]} " )
BeautifulSoup库 BeautifulSoup是一个强大的HTML和XML解析库,它可以帮助我们从网页中提取数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from bs4 import BeautifulSoupimport requestsresponse = requests.get('https://www.example.com' ) html_content = response.text soup = BeautifulSoup(html_content, 'html.parser' ) title = soup.title print (f"页面标题: {title.string} " )links = soup.find_all('a' ) for link in links[:5 ]: print (f"链接文本: {link.text} , URL: {link.get('href' )} " ) main_content = soup.select('div.main-content' ) headings = soup.select('h1, h2, h3' ) article = soup.find('article' ) if article: article_title = article.find('h1' ).text article_paragraphs = article.find_all('p' ) article_text = '\n' .join([p.text for p in article_paragraphs]) print (f"文章标题: {article_title} " ) print (f"文章内容: {article_text[:200 ]} ..." )
lxml库 lxml是一个高性能的HTML和XML解析库,它比BeautifulSoup更快,但API不太友好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from lxml import etreeimport requestsresponse = requests.get('https://www.example.com' ) html_content = response.text html = etree.HTML(html_content) title = html.xpath('//title/text()' ) print (f"页面标题: {title[0 ] if title else 'No title' } " )links = html.xpath('//a/@href' ) for link in links[:5 ]: print (f"链接: {link} " ) articles = html.xpath('//article' ) for article in articles: article_title = article.xpath('.//h1/text()' ) article_content = article.xpath('.//p/text()' ) print (f"文章标题: {article_title[0 ] if article_title else 'No title' } " ) print (f"文章内容: {'' .join(article_content)[:200 ]} ..." )
中级爬虫技术 处理表单和登录 许多网站需要登录才能访问内容。以下是如何使用requests处理登录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import requestssession = requests.Session() login_url = 'https://www.example.com/login' response = session.get(login_url) from bs4 import BeautifulSoupsoup = BeautifulSoup(response.text, 'html.parser' ) csrf_token = soup.find('input' , {'name' : 'csrf_token' })['value' ] login_data = { 'username' : 'your_username' , 'password' : 'your_password' , 'csrf_token' : csrf_token } response = session.post(login_url, data=login_data) if 'Welcome' in response.text or response.url != login_url: print ("登录成功!" ) protected_url = 'https://www.example.com/dashboard' response = session.get(protected_url) print (f"Dashboard页面标题: {BeautifulSoup(response.text, 'html.parser' ).title.string} " ) else : print ("登录失败!" )
处理Cookie Cookie对于维护会话状态很重要:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requestscookies = {'session_id' : '12345' , 'user_id' : '67890' } response = requests.get('https://www.example.com' , cookies=cookies) response = requests.get('https://www.example.com' ) print (response.cookies['session_id' ])session = requests.Session() session.get('https://www.example.com' ) response = session.get('https://www.example.com/profile' )
处理JavaScript渲染的页面 许多现代网站使用JavaScript动态加载内容,这对传统爬虫构成了挑战。
使用Selenium Selenium可以自动化浏览器,执行JavaScript:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from selenium import webdriverfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.options import Optionsfrom webdriver_manager.chrome import ChromeDriverManagerimport timechrome_options = Options() chrome_options.add_argument("--headless" ) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get('https://www.example.com' ) time.sleep(2 ) from selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECWebDriverWait(driver, 10 ).until( EC.presence_of_element_located((By.ID, "dynamic-content" )) ) html_content = driver.page_source elements = driver.find_elements(By.CSS_SELECTOR, '.item' ) for element in elements: print (element.text) search_box = driver.find_element(By.NAME, 'q' ) search_box.send_keys('Python' ) search_box.submit() driver.quit()
使用Requests-HTML requests-html是requests的扩展,支持JavaScript渲染:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from requests_html import HTMLSessionsession = HTMLSession() response = session.get('https://www.example.com' ) response.html.render() elements = response.html.find('.item' ) for element in elements: print (element.text)
处理分页 许多网站将内容分成多个页面,我们需要遍历这些页面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import requestsfrom bs4 import BeautifulSoupbase_url = 'https://www.example.com/products?page=' all_products = [] for page_num in range (1 , 6 ): url = base_url + str (page_num) print (f"爬取页面: {url} " ) response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser' ) products = soup.select('.product-item' ) for product in products: product_name = product.select_one('.product-name' ).text.strip() product_price = product.select_one('.product-price' ).text.strip() all_products.append({ 'name' : product_name, 'price' : product_price }) next_button = soup.select_one('.pagination .next' ) if not next_button or 'disabled' in next_button.get('class' , []): print ("已到达最后一页" ) break print (f"共爬取 {len (all_products)} 个产品" )
高级爬虫技术 使用Scrapy框架 Scrapy是一个强大的爬虫框架,适合大规模爬虫项目:
1 2 3 4 5 6 7 8 9 pip install scrapy scrapy startproject bookstore cd bookstorescrapy genspider books example.com
编辑爬虫文件 bookstore/spiders/books.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import scrapyclass BooksSpider (scrapy.Spider): name = 'books' allowed_domains = ['books.toscrape.com' ] start_urls = ['http://books.toscrape.com/' ] def parse (self, response ): books = response.css('article.product_pod' ) for book in books: yield { 'title' : book.css('h3 a::attr(title)' ).get(), 'price' : book.css('p.price_color::text' ).get(), 'rating' : book.css('p.star-rating::attr(class)' ).get().split()[-1 ] } next_page = response.css('li.next a::attr(href)' ).get() if next_page: yield response.follow(next_page, self .parse)
运行爬虫并保存结果:
1 scrapy crawl books -o books.json
处理反爬虫机制 网站通常会实施反爬虫措施,以下是一些应对策略:
1. 设置请求头 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import requestsimport randomuser_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' , 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15' , 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0' ] headers = { 'User-Agent' : random.choice(user_agents), 'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' , 'Accept-Language' : 'en-US,en;q=0.5' , 'Referer' : 'https://www.google.com/' , 'DNT' : '1' , 'Connection' : 'keep-alive' , 'Upgrade-Insecure-Requests' : '1' } response = requests.get('https://www.example.com' , headers=headers)
2. 控制请求频率 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requestsimport timeimport randomurls = ['https://www.example.com/page1' , 'https://www.example.com/page2' , '...' ] for url in urls: response = requests.get(url) print (f"爬取 {url} , 状态码: {response.status_code} " ) delay = random.uniform(1 , 5 ) print (f"等待 {delay:.2 f} 秒..." ) time.sleep(delay)
3. 使用代理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import requestsproxies = { 'http' : 'http://10.10.10.10:8000' , 'https' : 'http://10.10.10.10:8000' , } response = requests.get('https://www.example.com' , proxies=proxies) proxy_pool = [ {'http' : 'http://proxy1.example.com:8000' }, {'http' : 'http://proxy2.example.com:8000' }, {'http' : 'http://proxy3.example.com:8000' } ] import randomresponse = requests.get('https://www.example.com' , proxies=random.choice(proxy_pool))
4. 处理验证码 对于验证码,可以使用OCR库或验证码识别服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from PIL import Imageimport pytesseractimport requestsfrom io import BytesIOresponse = requests.get('https://www.example.com/captcha.php' ) img = Image.open (BytesIO(response.content)) captcha_text = pytesseract.image_to_string(img) print (f"识别的验证码: {captcha_text} " )form_data = { 'username' : 'user' , 'password' : 'pass' , 'captcha' : captcha_text } response = requests.post('https://www.example.com/login' , data=form_data)
数据存储 爬取的数据需要妥善存储:
1. 保存为CSV 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import csvdata = [ {'name' : 'Product 1' , 'price' : '$19.99' , 'rating' : '4.5' }, {'name' : 'Product 2' , 'price' : '$29.99' , 'rating' : '3.8' }, {'name' : 'Product 3' , 'price' : '$15.49' , 'rating' : '4.2' } ] with open ('products.csv' , 'w' , newline='' , encoding='utf-8' ) as csvfile: fieldnames = ['name' , 'price' , 'rating' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in data: writer.writerow(item)
2. 保存为JSON 1 2 3 4 5 import jsonwith open ('products.json' , 'w' , encoding='utf-8' ) as jsonfile: json.dump(data, jsonfile, ensure_ascii=False , indent=4 )
3. 保存到数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import sqlite3conn = sqlite3.connect('products.db' ) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, price TEXT NOT NULL, rating TEXT NOT NULL ) ''' )for item in data: cursor.execute( 'INSERT INTO products (name, price, rating) VALUES (?, ?, ?)' , (item['name' ], item['price' ], item['rating' ]) ) conn.commit() conn.close()
实际爬虫案例 案例1:爬取新闻网站 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import requestsfrom bs4 import BeautifulSoupimport csvfrom datetime import datetimedef scrape_news (): url = 'https://news.example.com' response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser' ) articles = soup.select('article.news-item' ) news_data = [] for article in articles: title = article.select_one('h2.title' ).text.strip() link = article.select_one('a' )['href' ] if not link.startswith('http' ): link = url + link summary = article.select_one('p.summary' ) summary = summary.text.strip() if summary else "无摘要" date = article.select_one('span.date' ) date = date.text.strip() if date else "未知日期" category = article.select_one('span.category' ) category = category.text.strip() if category else "未分类" news_data.append({ 'title' : title, 'link' : link, 'summary' : summary, 'date' : date, 'category' : category, 'scraped_at' : datetime.now().strftime('%Y-%m-%d %H:%M:%S' ) }) with open ('news.csv' , 'w' , newline='' , encoding='utf-8' ) as csvfile: fieldnames = ['title' , 'link' , 'summary' , 'date' , 'category' , 'scraped_at' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in news_data: writer.writerow(item) print (f"已爬取 {len (news_data)} 条新闻并保存到 news.csv" ) if __name__ == "__main__" : scrape_news()
案例2:爬取电商网站产品信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 import requestsfrom bs4 import BeautifulSoupimport jsonimport timeimport randomclass EcommerceSpider : def __init__ (self ): self .base_url = 'https://www.example-shop.com/products' self .headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } self .products = [] def get_page (self, url ): """获取页面内容""" time.sleep(random.uniform(1 , 3 )) response = requests.get(url, headers=self .headers) if response.status_code == 200 : return BeautifulSoup(response.text, 'html.parser' ) else : print (f"请求失败: {response.status_code} " ) return None def parse_product_list (self, soup ): """解析产品列表页""" product_cards = soup.select('.product-card' ) product_links = [] for card in product_cards: link = card.select_one('a.product-link' )['href' ] if not link.startswith('http' ): link = 'https://www.example-shop.com' + link product_links.append(link) return product_links def parse_product_detail (self, url ): """解析产品详情页""" soup = self .get_page(url) if not soup: return None try : product = { 'url' : url, 'name' : soup.select_one('h1.product-name' ).text.strip(), 'price' : soup.select_one('span.price' ).text.strip(), 'description' : soup.select_one('div.description' ).text.strip(), 'rating' : soup.select_one('div.rating' ).text.strip() if soup.select_one('div.rating' ) else 'No rating' , 'reviews_count' : soup.select_one('span.reviews-count' ).text.strip() if soup.select_one('span.reviews-count' ) else '0' , 'availability' : soup.select_one('div.availability' ).text.strip() if soup.select_one('div.availability' ) else 'Unknown' , 'images' : [img['src' ] for img in soup.select('div.product-images img' )], 'specifications' : {} } specs_table = soup.select_one('table.specifications' ) if specs_table: rows = specs_table.select('tr' ) for row in rows: cols = row.select('td' ) if len (cols) >= 2 : key = cols[0 ].text.strip() value = cols[1 ].text.strip() product['specifications' ][key] = value return product except Exception as e: print (f"解析产品详情出错: {e} " ) return None def scrape (self, pages=3 ): """爬取指定页数的产品""" for page in range (1 , pages + 1 ): page_url = f"{self.base_url} ?page={page} " print (f"爬取页面: {page_url} " ) soup = self .get_page(page_url) if not soup: continue product_links = self .parse_product_list(soup) print (f"找到 {len (product_links)} 个产品链接" ) for link in product_links: print (f"爬取产品: {link} " ) product = self .parse_product_detail(link) if product: self .products.append(product) with open ('products.json' , 'w' , encoding='utf-8' ) as f: json.dump(self .products, f, ensure_ascii=False , indent=4 ) print (f"已爬取 {len (self.products)} 个产品并保存到 products.json" ) if __name__ == "__main__" : spider = EcommerceSpider() spider.scrape(pages=3 )
案例3:使用Scrapy爬取GitHub仓库信息 首先创建Scrapy项目:
1 2 3 scrapy startproject github_scraper cd github_scraperscrapy genspider repos github.com
编辑爬虫文件 github_scraper/spiders/repos.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import scrapyfrom scrapy import FormRequestclass ReposSpider (scrapy.Spider): name = 'repos' allowed_domains = ['github.com' ] start_urls = ['https://github.com/login' ] def parse (self, response ): token = response.css('input[name="authenticity_token"]::attr(value)' ).get() return FormRequest.from_response( response, formdata={ 'login' : 'your_username' , 'password' : 'your_password' , 'authenticity_token' : token }, callback=self .after_login ) def after_login (self, response ): if 'Sign out' in response.text: self .log("登录成功!" ) return scrapy.Request('https://github.com/python' , callback=self .parse_org) else : self .log("登录失败!" ) def parse_org (self, response ): repos_url = response.css('a[data-tab-item="repositories"]::attr(href)' ).get() return response.follow(repos_url, callback=self .parse_repos) def parse_repos (self, response ): for repo in response.css('li.Box-row' ): yield { 'name' : repo.css('a[itemprop="name codeRepository"]::text' ).get().strip(), 'description' : repo.css('p[itemprop="description"]::text' ).get('' ).strip(), 'language' : repo.css('span[itemprop="programmingLanguage"]::text' ).get('' ).strip(), 'stars' : repo.css('a.Link--muted[href$="/stargazers"]::text' ).get('' ).strip(), 'forks' : repo.css('a.Link--muted[href$="/forks"]::text' ).get('' ).strip(), 'updated' : repo.css('relative-time::attr(datetime)' ).get('' ) } next_page = response.css('a.next_page::attr(href)' ).get() if next_page: yield response.follow(next_page, callback=self .parse_repos)
创建项目设置文件 github_scraper/settings.py:
1 2 3 4 5 6 7 ROBOTSTXT_OBEY = False DOWNLOAD_DELAY = 2 COOKIES_ENABLED = True USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
运行爬虫:
1 scrapy crawl repos -o github_repos.json
爬虫进阶技巧 1. 使用异步爬虫提高效率 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import aiohttpimport asynciofrom bs4 import BeautifulSoupasync def fetch (session, url ): async with session.get(url) as response: return await response.text() async def parse (html ): soup = BeautifulSoup(html, 'html.parser' ) title = soup.title.string if soup.title else "No title" return title async def scrape (url ): async with aiohttp.ClientSession() as session: html = await fetch(session, url) title = await parse(html) print (f"{url} - {title} " ) async def main (): urls = [ 'https://www.example.com' , 'https://www.example.org' , 'https://www.example.net' , 'https://www.example.edu' , 'https://www.example.io' ] tasks = [scrape(url) for url in urls] await asyncio.gather(*tasks) if __name__ == "__main__" : asyncio.run(main())
2. 使用IP代理池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import requestsfrom bs4 import BeautifulSoupimport randomimport timeclass ProxyManager : def __init__ (self ): self .proxies = [] self .current_proxy = None self .max_failures = 3 self .failure_count = 0 def get_proxy_list (self ): """从代理网站获取代理列表""" try : response = requests.get('https://www.free-proxy-list.net/' ) soup = BeautifulSoup(response.text, 'html.parser' ) table = soup.find('table' , {'id' : 'proxylisttable' }) for row in table.tbody.find_all('tr' ): cols = row.find_all('td' ) if cols[6 ].text.strip() == 'yes' : proxy = { 'ip' : cols[0 ].text.strip(), 'port' : cols[1 ].text.strip(), 'https' : 'https://' + cols[0 ].text.strip() + ':' + cols[1 ].text.strip() } self .proxies.append(proxy) print (f"获取到 {len (self.proxies)} 个代理" ) except Exception as e: print (f"获取代理列表失败: {e} " ) def get_random_proxy (self ): """获取随机代理""" if not self .proxies: self .get_proxy_list() if self .proxies: self .current_proxy = random.choice(self .proxies) return { 'https' : self .current_proxy['https' ] } return None def handle_request_error (self ): """处理请求错误""" self .failure_count += 1 if self .failure_count >= self .max_failures: if self .current_proxy in self .proxies: self .proxies.remove(self .current_proxy) self .current_proxy = None self .failure_count = 0 proxy_manager = ProxyManager() def scrape_with_proxy (url ): """使用代理爬取网页""" max_retries = 5 retries = 0 while retries < max_retries: proxy = proxy_manager.get_random_proxy() if not proxy: print ("没有可用代理" ) break try : print (f"使用代理: {proxy['https' ]} " ) response = requests.get(url, proxies=proxy, timeout=10 ) if response.status_code == 200 : return response.text except Exception as e: print (f"请求失败: {e} " ) proxy_manager.handle_request_error() retries += 1 time.sleep(2 ) print ("所有重试都失败了" ) return None
3. 使用User-Agent池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import requestsimport randomclass UserAgentManager : def __init__ (self ): self .user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' , 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36' , 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15' , 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0' , 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' , 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36' , 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59' ] def get_random_user_agent (self ): """获取随机User-Agent""" return random.choice(self .user_agents) ua_manager = UserAgentManager() def scrape_with_random_ua (url ): """使用随机User-Agent爬取网页""" headers = { 'User-Agent' : ua_manager.get_random_user_agent() } try : response = requests.get(url, headers=headers) return response.text except Exception as e: print (f"请求失败: {e} " ) return None
4. 使用爬虫调度器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import timeimport threadingimport queueimport requestsfrom bs4 import BeautifulSoupclass Scheduler : def __init__ (self, num_threads=5 , delay=2 ): self .queue = queue.Queue() self .results = [] self .num_threads = num_threads self .delay = delay self .lock = threading.Lock() def add_task (self, url ): """添加爬取任务""" self .queue.put(url) def worker (self ): """工作线程""" while True : url = self .queue.get() if url is None : break try : print (f"爬取: {url} " ) response = requests.get(url) if response.status_code == 200 : soup = BeautifulSoup(response.text, 'html.parser' ) title = soup.title.string if soup.title else "No title" with self .lock: self .results.append({ 'url' : url, 'title' : title, 'status' : response.status_code }) except Exception as e: print (f"爬取 {url} 失败: {e} " ) self .queue.task_done() time.sleep(self .delay) def run (self ): """运行调度器""" threads = [] for _ in range (self .num_threads): thread = threading.Thread(target=self .worker) thread.start() threads.append(thread) self .queue.join() for _ in range (self .num_threads): self .queue.put(None ) for thread in threads: thread.join() return self .results scheduler = Scheduler(num_threads=3 , delay=2 ) urls = [ 'https://www.example.com' , 'https://www.example.org' , 'https://www.example.net' , 'https://www.example.edu' , 'https://www.example.io' , 'https://www.example.dev' , 'https://www.example.app' ] for url in urls: scheduler.add_task(url) results = scheduler.run() print (f"爬取了 {len (results)} 个网页" )for result in results: print (f"{result['url' ]} - {result['title' ]} (状态码: {result['status' ]} )" )
爬虫的法律和道德问题 法律考量
遵守网站的服务条款 :许多网站在服务条款中明确禁止爬虫
尊重robots.txt :这是网站告诉爬虫哪些页面可以爬取的标准
版权法 :爬取的内容可能受版权保护
数据保护法规 :如果爬取个人数据,需要遵守GDPR等数据保护法规
计算机滥用法 :过度爬取可能被视为对服务器的攻击
道德考量
不要对网站造成负担 :控制请求频率
识别你的爬虫 :在User-Agent中标明你的爬虫身份
缓存数据 :避免重复请求相同的内容
尊重隐私 :不要爬取和存储个人敏感信息
考虑使用API :如果网站提供API,优先使用API而不是爬虫
最佳实践
阅读网站的服务条款和robots.txt
控制爬取速度 :使用延迟和限速
缓存结果 :避免重复请求
处理错误 :优雅地处理异常和错误
监控爬虫 :确保它按预期工作
定期更新爬虫 :网站结构可能会改变
结论 网络爬虫是一个强大的工具,可以帮助我们自动化地从网络上获取数据。Python提供了丰富的库和框架,使爬虫开发变得相对简单。从基本的requests和BeautifulSoup,到高级的Scrapy框架,再到处理JavaScript渲染页面的Selenium,Python生态系统为各种爬虫需求提供了解决方案。
然而,使用爬虫时,我们必须记住法律和道德责任。尊重网站的服务条款,遵守robots.txt,控制爬取速度,这些都是负责任的爬虫行为。
希望这篇文章能帮助你理解网络爬虫的基础知识和高级技术,并在实际项目中负责任地应用这些技术。
你有什么关于Python网络爬虫的问题或经验分享吗?欢迎在评论中讨论!