Python异步编程实战指南:从入门到精通
异步编程是现代Python开发中的一项关键技能,特别是在处理I/O密集型应用时。Python的asyncio库提供了一套强大的工具来编写并发代码,而无需使用线程或多进程。在这篇文章中,我将带你从入门到精通Python的异步编程,分享实用技巧和最佳实践。
异步编程基础
什么是异步编程?
异步编程是一种编程范式,允许程序在等待某些操作完成(如I/O操作)时继续执行其他任务,而不是被阻塞。在Python中,这主要通过asyncio库和async/await语法实现。
同步vs异步:一个简单的例子
让我们通过一个简单的例子来理解同步和异步的区别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import time
def fetch_data(url): print(f"开始下载 {url}") time.sleep(2) print(f"完成下载 {url}") return f"数据来自 {url}"
def main(): start = time.time() data1 = fetch_data("url1") data2 = fetch_data("url2") data3 = fetch_data("url3") print(f"总耗时: {time.time() - start:.2f}秒")
main()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import asyncio
async def fetch_data(url): print(f"开始下载 {url}") await asyncio.sleep(2) print(f"完成下载 {url}") return f"数据来自 {url}"
async def main(): start = time.time() results = await asyncio.gather( fetch_data("url1"), fetch_data("url2"), fetch_data("url3") ) print(f"总耗时: {time.time() - start:.2f}秒") print(results)
asyncio.run(main())
|
在同步版本中,每个下载操作都会阻塞程序,直到完成。而在异步版本中,当一个任务在等待I/O操作时,程序可以切换到其他任务,从而实现并发执行。
核心概念
协程(Coroutines)
协程是Python异步编程的基础。使用async def定义的函数不是普通函数,而是返回一个协程对象:
1 2 3 4 5 6 7 8 9 10 11
| async def hello(): return "Hello, World!"
coro = hello() print(coro)
import asyncio result = asyncio.run(hello()) print(result)
|
await表达式
await关键字用于暂停协程的执行,直到等待的对象完成:
1 2 3 4 5 6
| async def main(): print("开始") await asyncio.sleep(1) print("1秒后") asyncio.run(main())
|
await只能在async def定义的函数内部使用。
Tasks和Futures
Task是对协程的封装,表示一个正在执行的操作。Future是一个低级对象,表示异步操作的最终结果:
1 2 3 4 5 6 7 8 9 10 11 12
| async def main(): task = asyncio.create_task(asyncio.sleep(1)) await task await asyncio.gather( asyncio.sleep(1), asyncio.sleep(2) )
|
实用技巧
技巧1:使用aiohttp进行异步HTTP请求
aiohttp是一个异步HTTP客户端/服务器库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import asyncio import aiohttp import time
async def fetch(session, url): async with session.get(url) as response: return await response.text()
async def main(): urls = [ "https://api.github.com/events", "https://api.github.com/emojis", "https://api.github.com/users/python" ] start = time.time() async with aiohttp.ClientSession() as session: results = await asyncio.gather( *[fetch(session, url) for url in urls] ) print(f"完成所有请求,耗时: {time.time() - start:.2f}秒") print(f"获取了 {len(results)} 个响应")
asyncio.run(main())
|
技巧2:使用asyncio.as_completed处理先完成的任务
有时我们希望按照任务完成的顺序处理结果,而不是等待所有任务完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import asyncio import random
async def random_sleep(id): sleep_time = random.uniform(0.5, 3) await asyncio.sleep(sleep_time) return f"任务 {id} 完成,耗时 {sleep_time:.2f}秒"
async def main(): tasks = [random_sleep(i) for i in range(10)] for future in asyncio.as_completed(tasks): result = await future print(result)
asyncio.run(main())
|
技巧3:使用超时控制
防止任务执行时间过长:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import asyncio
async def long_operation(): await asyncio.sleep(10) return "操作完成"
async def main(): try: result = await asyncio.wait_for(long_operation(), timeout=3) print(result) except asyncio.TimeoutError: print("操作超时")
asyncio.run(main())
|
技巧4:使用asyncio.shield保护任务不被取消
有时我们希望某些任务即使在外部被取消也能继续执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio
async def protected_operation(): try: await asyncio.sleep(5) return "受保护的操作完成" except asyncio.CancelledError: print("尝试取消操作,但我们会完成清理工作") await asyncio.sleep(2) print("清理工作完成") raise
async def main(): shielded = asyncio.shield(protected_operation()) task = asyncio.create_task(shielded) await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("主任务被取消")
|
技巧5:使用asyncio.Queue实现生产者-消费者模式
asyncio.Queue是一个异步队列,适合实现生产者-消费者模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import asyncio import random
async def producer(queue, id): for i in range(5): item = f"Producer {id} - Item {i}" await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(random.uniform(0.1, 0.5)) await queue.put(None)
async def consumer(queue, id): while True: item = await queue.get() if item is None: queue.task_done() break print(f"Consumer {id} got: {item}") await asyncio.sleep(random.uniform(0.2, 0.6)) queue.task_done()
async def main(): queue = asyncio.Queue() producers = [producer(queue, i) for i in range(2)] consumers = [consumer(queue, i) for i in range(3)] await asyncio.gather(*producers) for _ in range(len(consumers)): await queue.put(None) await asyncio.gather(*consumers) await queue.join()
asyncio.run(main())
|
高级技巧
技巧6:使用asyncio.Semaphore限制并发
当我们需要限制并发任务的数量时,可以使用asyncio.Semaphore:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import asyncio import aiohttp
async def fetch_with_semaphore(semaphore, session, url): async with semaphore: print(f"开始下载: {url}") async with session.get(url) as response: await asyncio.sleep(1) print(f"完成下载: {url}") return await response.text()
async def main(): semaphore = asyncio.Semaphore(3) urls = [f"https://example.com/{i}" for i in range(10)] async with aiohttp.ClientSession() as session: tasks = [fetch_with_semaphore(semaphore, session, url) for url in urls] await asyncio.gather(*tasks)
asyncio.run(main())
|
技巧7:使用asyncio.Lock实现互斥访问
当多个协程需要访问共享资源时,可以使用asyncio.Lock:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio
class SharedResource: def __init__(self): self.value = 0 self.lock = asyncio.Lock() async def update(self, worker_id): async with self.lock: print(f"Worker {worker_id} 获取锁") current = self.value await asyncio.sleep(0.1) self.value = current + 1 print(f"Worker {worker_id} 释放锁,值更新为 {self.value}")
async def worker(resource, id): for _ in range(3): await resource.update(id) await asyncio.sleep(0.2)
async def main(): resource = SharedResource() workers = [worker(resource, i) for i in range(5)] await asyncio.gather(*workers) print(f"最终值: {resource.value}")
asyncio.run(main())
|
技巧8:使用asyncio.Event进行任务协调
asyncio.Event可以用于协调多个协程的执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import asyncio import random
async def waiter(event, id): print(f"Waiter {id} 等待事件") await event.wait() print(f"Waiter {id} 收到事件通知") await asyncio.sleep(random.uniform(0.5, 2)) print(f"Waiter {id} 完成处理")
async def trigger(event): print("Trigger 等待5秒后设置事件") await asyncio.sleep(5) print("Trigger 设置事件") event.set()
async def main(): event = asyncio.Event() waiters = [waiter(event, i) for i in range(5)] trigger_task = trigger(event) await asyncio.gather(trigger_task, *waiters)
asyncio.run(main())
|
技巧9:使用asyncio.gather和asyncio.wait的区别
asyncio.gather和asyncio.wait都可以等待多个协程完成,但它们有不同的用途:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import asyncio
async def task(id, seconds): print(f"Task {id} 开始") await asyncio.sleep(seconds) print(f"Task {id} 完成") return f"Result {id}"
async def demo_gather(): print("使用 asyncio.gather:") results = await asyncio.gather( task(1, 3), task(2, 1), task(3, 2) ) print(f"结果: {results}")
async def demo_wait(): print("使用 asyncio.wait:") tasks = [ asyncio.create_task(task(1, 3)), asyncio.create_task(task(2, 1)), asyncio.create_task(task(3, 2)) ] done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) print(f"完成的任务: {len(done)}") print(f"未完成的任务: {len(pending)}") for task in pending: task.cancel() for task in done: print(f"结果: {task.result()}")
async def main(): await demo_gather() print("\n" + "-" * 50 + "\n") await demo_wait()
asyncio.run(main())
|
技巧10:使用上下文管理器进行资源管理
异步上下文管理器可以确保资源正确释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import asyncio
class AsyncResource: def __init__(self, name): self.name = name async def __aenter__(self): print(f"获取资源: {self.name}") await asyncio.sleep(1) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print(f"释放资源: {self.name}") await asyncio.sleep(0.5) async def use(self): print(f"使用资源: {self.name}") await asyncio.sleep(2)
async def main(): async with AsyncResource("database") as resource: await resource.use() print("资源已释放")
asyncio.run(main())
|
实际应用案例
案例1:异步Web爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import asyncio import aiohttp from bs4 import BeautifulSoup import time
async def fetch_html(session, url): try: async with session.get(url, timeout=10) as response: return await response.text() except Exception as e: print(f"获取 {url} 失败: {e}") return None
async def parse_links(html, base_url): if not html: return [] soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if href.startswith('http'): links.append(href) elif href.startswith('/'): links.append(f"{base_url}{href}") return links
async def crawl(url, max_depth=2, current_depth=0, visited=None): if visited is None: visited = set() if current_depth > max_depth or url in visited: return visited.add(url) print(f"爬取: {url} (深度: {current_depth})") async with aiohttp.ClientSession() as session: html = await fetch_html(session, url) links = await parse_links(html, url) tasks = [] for link in links[:5]: if link not in visited: task = asyncio.create_task( crawl(link, max_depth, current_depth + 1, visited) ) tasks.append(task) if tasks: await asyncio.gather(*tasks)
async def main(): start_time = time.time() await crawl("https://python.org", max_depth=1) print(f"爬虫完成,耗时: {time.time() - start_time:.2f}秒")
|
案例2:异步API服务器
使用aiohttp.web创建异步API服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| from aiohttp import web import asyncio import json
class AsyncDatabase: def __init__(self): self.users = {} self.next_id = 1 async def get_user(self, user_id): await asyncio.sleep(0.1) return self.users.get(user_id) async def get_all_users(self): await asyncio.sleep(0.2) return list(self.users.values()) async def create_user(self, name, email): await asyncio.sleep(0.3) user_id = str(self.next_id) self.next_id += 1 user = {"id": user_id, "name": name, "email": email} self.users[user_id] = user return user async def update_user(self, user_id, name=None, email=None): await asyncio.sleep(0.2) user = self.users.get(user_id) if not user: return None if name: user["name"] = name if email: user["email"] = email return user async def delete_user(self, user_id): await asyncio.sleep(0.1) if user_id in self.users: del self.users[user_id] return True return False
db = AsyncDatabase()
async def get_users(request): users = await db.get_all_users() return web.json_response(users)
async def get_user(request): user_id = request.match_info['id'] user = await db.get_user(user_id) if user: return web.json_response(user) return web.json_response({"error": "User not found"}, status=404)
async def create_user(request): data = await request.json() name = data.get('name') email = data.get('email') if not name or not email: return web.json_response( {"error": "Name and email are required"}, status=400 ) user = await db.create_user(name, email) return web.json_response(user, status=201)
async def update_user(request): user_id = request.match_info['id'] data = await request.json() user = await db.update_user( user_id, name=data.get('name'), email=data.get('email') ) if user: return web.json_response(user) return web.json_response({"error": "User not found"}, status=404)
async def delete_user(request): user_id = request.match_info['id'] success = await db.delete_user(user_id) if success: return web.json_response({"status": "deleted"}) return web.json_response({"error": "User not found"}, status=404)
app = web.Application() app.add_routes([ web.get('/users', get_users), web.get('/users/{id}', get_user), web.post('/users', create_user), web.put('/users/{id}', update_user), web.delete('/users/{id}', delete_user), ])
|
常见陷阱和解决方案
陷阱1:阻塞事件循环
在异步代码中执行CPU密集型操作或阻塞I/O会阻塞整个事件循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import asyncio import time
async def good_practice(): await asyncio.sleep(1) return "Good"
async def bad_practice(): time.sleep(1) return "Bad"
async def main(): start = time.time() await bad_practice() await bad_practice() print(f"Bad practice: {time.time() - start:.2f}秒") start = time.time() await asyncio.gather(good_practice(), good_practice()) print(f"Good practice: {time.time() - start:.2f}秒")
asyncio.run(main())
|
解决方案:对于CPU密集型任务,使用concurrent.futures.ProcessPoolExecutor和asyncio.to_thread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import asyncio import time import concurrent.futures
def cpu_bound_task(n): result = 0 for i in range(n): result += i * i return result
async def main(): loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound_task, 10000000 ) print(f"计算结果: {result}") def blocking_io(): time.sleep(1) return "完成I/O操作" result = await asyncio.to_thread(blocking_io) print(result)
asyncio.run(main())
|
陷阱2:忘记await协程
忘记await协程是一个常见错误,会导致协程不执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import asyncio
async def say_hello(): print("Hello, World!")
async def main(): say_hello() await say_hello() task = asyncio.create_task(say_hello()) await task
asyncio.run(main())
|
解决方案:始终使用await或asyncio.create_task()来执行协程。
陷阱3:嵌套事件循环
在已有事件循环内创建新的事件循环会导致问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import asyncio
async def inner(): await asyncio.sleep(1)
async def main(): await inner()
asyncio.run(main())
|
解决方案:在异步代码中,始终使用await而不是创建新的事件循环。
结论
Python的异步编程模型提供了一种强大的方式来处理并发操作,特别是I/O密集型任务。通过掌握asyncio库和本文介绍的技巧,你可以编写高效、可维护的异步代码。
记住,异步编程不是万能的——它主要适用于I/O密集型任务,而不是CPU密集型任务。对于后者,多进程仍然是更好的选择。
最后,随着Python异步生态系统的不断发展,越来越多的库开始支持异步操作,使得构建完全异步的应用变得更加容易。无论是Web服务器、数据库访问还是网络爬虫,异步编程都能帮助你构建更高效的应用。
你有什么关于Python异步编程的问题或经验吗?欢迎在评论中分享!