Python异步编程实战指南:从入门到精通
Orion K Lv6

Python异步编程实战指南:从入门到精通

异步编程是现代Python开发中的一项关键技能,特别是在处理I/O密集型应用时。Python的asyncio库提供了一套强大的工具来编写并发代码,而无需使用线程或多进程。在这篇文章中,我将带你从入门到精通Python的异步编程,分享实用技巧和最佳实践。

异步编程基础

什么是异步编程?

异步编程是一种编程范式,允许程序在等待某些操作完成(如I/O操作)时继续执行其他任务,而不是被阻塞。在Python中,这主要通过asyncio库和async/await语法实现。

同步vs异步:一个简单的例子

让我们通过一个简单的例子来理解同步和异步的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 同步版本
import time

def fetch_data(url):
print(f"开始下载 {url}")
# 模拟网络请求
time.sleep(2)
print(f"完成下载 {url}")
return f"数据来自 {url}"

def main():
start = time.time()
data1 = fetch_data("url1")
data2 = fetch_data("url2")
data3 = fetch_data("url3")
print(f"总耗时: {time.time() - start:.2f}秒")

main() # 大约需要6秒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 异步版本
import asyncio

async def fetch_data(url):
print(f"开始下载 {url}")
# 模拟网络请求
await asyncio.sleep(2)
print(f"完成下载 {url}")
return f"数据来自 {url}"

async def main():
start = time.time()
# 并发执行三个任务
results = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
)
print(f"总耗时: {time.time() - start:.2f}秒")
print(results)

asyncio.run(main()) # 大约需要2秒

在同步版本中,每个下载操作都会阻塞程序,直到完成。而在异步版本中,当一个任务在等待I/O操作时,程序可以切换到其他任务,从而实现并发执行。

核心概念

协程(Coroutines)

协程是Python异步编程的基础。使用async def定义的函数不是普通函数,而是返回一个协程对象:

1
2
3
4
5
6
7
8
9
10
11
async def hello():
return "Hello, World!"

# 这不会执行函数,而是创建一个协程对象
coro = hello()
print(coro) # <coroutine object hello at 0x...>

# 要运行协程,可以使用asyncio.run()
import asyncio
result = asyncio.run(hello())
print(result) # Hello, World!

await表达式

await关键字用于暂停协程的执行,直到等待的对象完成:

1
2
3
4
5
6
async def main():
print("开始")
await asyncio.sleep(1) # 暂停协程1秒
print("1秒后")

asyncio.run(main())

await只能在async def定义的函数内部使用。

Tasks和Futures

Task是对协程的封装,表示一个正在执行的操作。Future是一个低级对象,表示异步操作的最终结果:

1
2
3
4
5
6
7
8
9
10
11
12
async def main():
# 创建任务
task = asyncio.create_task(asyncio.sleep(1))

# 等待任务完成
await task

# 或者使用gather同时等待多个任务
await asyncio.gather(
asyncio.sleep(1),
asyncio.sleep(2)
)

实用技巧

技巧1:使用aiohttp进行异步HTTP请求

aiohttp是一个异步HTTP客户端/服务器库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp
import time

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
urls = [
"https://api.github.com/events",
"https://api.github.com/emojis",
"https://api.github.com/users/python"
]

start = time.time()

async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch(session, url) for url in urls]
)

print(f"完成所有请求,耗时: {time.time() - start:.2f}秒")
print(f"获取了 {len(results)} 个响应")

asyncio.run(main())

技巧2:使用asyncio.as_completed处理先完成的任务

有时我们希望按照任务完成的顺序处理结果,而不是等待所有任务完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import random

async def random_sleep(id):
sleep_time = random.uniform(0.5, 3)
await asyncio.sleep(sleep_time)
return f"任务 {id} 完成,耗时 {sleep_time:.2f}秒"

async def main():
tasks = [random_sleep(i) for i in range(10)]

for future in asyncio.as_completed(tasks):
result = await future
print(result)

asyncio.run(main())

技巧3:使用超时控制

防止任务执行时间过长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def long_operation():
await asyncio.sleep(10)
return "操作完成"

async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(long_operation(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("操作超时")

asyncio.run(main())

技巧4:使用asyncio.shield保护任务不被取消

有时我们希望某些任务即使在外部被取消也能继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio

async def protected_operation():
try:
await asyncio.sleep(5)
return "受保护的操作完成"
except asyncio.CancelledError:
# 即使被取消也会继续执行清理工作
print("尝试取消操作,但我们会完成清理工作")
await asyncio.sleep(2)
print("清理工作完成")
raise # 重新抛出异常

async def main():
# 创建受保护的任务
shielded = asyncio.shield(protected_operation())

# 等待1秒后取消任务
task = asyncio.create_task(shielded)
await asyncio.sleep(1)
task.cancel()

try:
await task
except asyncio.CancelledError:
print("主任务被取消")

# asyncio.run(main())

技巧5:使用asyncio.Queue实现生产者-消费者模式

asyncio.Queue是一个异步队列,适合实现生产者-消费者模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import random

async def producer(queue, id):
for i in range(5):
item = f"Producer {id} - Item {i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))

# 发送结束信号
await queue.put(None)

async def consumer(queue, id):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break

print(f"Consumer {id} got: {item}")
await asyncio.sleep(random.uniform(0.2, 0.6))
queue.task_done()

async def main():
queue = asyncio.Queue()

# 创建生产者和消费者
producers = [producer(queue, i) for i in range(2)]
consumers = [consumer(queue, i) for i in range(3)]

# 启动所有任务
await asyncio.gather(*producers)

# 等待所有生产者完成并发送结束信号
for _ in range(len(consumers)):
await queue.put(None)

# 等待所有消费者完成
await asyncio.gather(*consumers)

# 等待队列处理完成
await queue.join()

asyncio.run(main())

高级技巧

技巧6:使用asyncio.Semaphore限制并发

当我们需要限制并发任务的数量时,可以使用asyncio.Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import aiohttp

async def fetch_with_semaphore(semaphore, session, url):
async with semaphore:
print(f"开始下载: {url}")
async with session.get(url) as response:
await asyncio.sleep(1) # 模拟下载时间
print(f"完成下载: {url}")
return await response.text()

async def main():
# 限制最多3个并发请求
semaphore = asyncio.Semaphore(3)

urls = [f"https://example.com/{i}" for i in range(10)]

async with aiohttp.ClientSession() as session:
tasks = [fetch_with_semaphore(semaphore, session, url) for url in urls]
await asyncio.gather(*tasks)

asyncio.run(main())

技巧7:使用asyncio.Lock实现互斥访问

当多个协程需要访问共享资源时,可以使用asyncio.Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio

class SharedResource:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()

async def update(self, worker_id):
# 获取锁
async with self.lock:
print(f"Worker {worker_id} 获取锁")
current = self.value
await asyncio.sleep(0.1) # 模拟一些处理时间
self.value = current + 1
print(f"Worker {worker_id} 释放锁,值更新为 {self.value}")

async def worker(resource, id):
for _ in range(3):
await resource.update(id)
await asyncio.sleep(0.2)

async def main():
resource = SharedResource()
workers = [worker(resource, i) for i in range(5)]
await asyncio.gather(*workers)
print(f"最终值: {resource.value}")

asyncio.run(main())

技巧8:使用asyncio.Event进行任务协调

asyncio.Event可以用于协调多个协程的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import random

async def waiter(event, id):
print(f"Waiter {id} 等待事件")
await event.wait()
print(f"Waiter {id} 收到事件通知")
await asyncio.sleep(random.uniform(0.5, 2))
print(f"Waiter {id} 完成处理")

async def trigger(event):
print("Trigger 等待5秒后设置事件")
await asyncio.sleep(5)
print("Trigger 设置事件")
event.set()

async def main():
event = asyncio.Event()

# 创建等待者和触发者
waiters = [waiter(event, i) for i in range(5)]
trigger_task = trigger(event)

# 启动所有任务
await asyncio.gather(trigger_task, *waiters)

asyncio.run(main())

技巧9:使用asyncio.gather和asyncio.wait的区别

asyncio.gatherasyncio.wait都可以等待多个协程完成,但它们有不同的用途:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio

async def task(id, seconds):
print(f"Task {id} 开始")
await asyncio.sleep(seconds)
print(f"Task {id} 完成")
return f"Result {id}"

async def demo_gather():
print("使用 asyncio.gather:")
# gather 返回结果列表,按照任务提交顺序
results = await asyncio.gather(
task(1, 3),
task(2, 1),
task(3, 2)
)
print(f"结果: {results}")

async def demo_wait():
print("使用 asyncio.wait:")
# wait 返回已完成和未完成的任务集合
tasks = [
asyncio.create_task(task(1, 3)),
asyncio.create_task(task(2, 1)),
asyncio.create_task(task(3, 2))
]

# 等待所有任务完成或第一个任务完成
done, pending = await asyncio.wait(
tasks,
# return_when=asyncio.ALL_COMPLETED # 默认
return_when=asyncio.FIRST_COMPLETED
)

print(f"完成的任务: {len(done)}")
print(f"未完成的任务: {len(pending)}")

# 取消未完成的任务
for task in pending:
task.cancel()

# 获取已完成任务的结果
for task in done:
print(f"结果: {task.result()}")

async def main():
await demo_gather()
print("\n" + "-" * 50 + "\n")
await demo_wait()

asyncio.run(main())

技巧10:使用上下文管理器进行资源管理

异步上下文管理器可以确保资源正确释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

class AsyncResource:
def __init__(self, name):
self.name = name

async def __aenter__(self):
print(f"获取资源: {self.name}")
await asyncio.sleep(1) # 模拟资源获取
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"释放资源: {self.name}")
await asyncio.sleep(0.5) # 模拟资源释放

async def use(self):
print(f"使用资源: {self.name}")
await asyncio.sleep(2)

async def main():
# 使用异步上下文管理器
async with AsyncResource("database") as resource:
await resource.use()

print("资源已释放")

asyncio.run(main())

实际应用案例

案例1:异步Web爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_html(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None

async def parse_links(html, base_url):
if not html:
return []

soup = BeautifulSoup(html, 'html.parser')
links = []

for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
if href.startswith('http'):
links.append(href)
elif href.startswith('/'):
links.append(f"{base_url}{href}")

return links

async def crawl(url, max_depth=2, current_depth=0, visited=None):
if visited is None:
visited = set()

if current_depth > max_depth or url in visited:
return

visited.add(url)
print(f"爬取: {url} (深度: {current_depth})")

async with aiohttp.ClientSession() as session:
html = await fetch_html(session, url)
links = await parse_links(html, url)

tasks = []
for link in links[:5]: # 限制每页最多爬取5个链接
if link not in visited:
task = asyncio.create_task(
crawl(link, max_depth, current_depth + 1, visited)
)
tasks.append(task)

if tasks:
await asyncio.gather(*tasks)

async def main():
start_time = time.time()
await crawl("https://python.org", max_depth=1)
print(f"爬虫完成,耗时: {time.time() - start_time:.2f}秒")

# asyncio.run(main())

案例2:异步API服务器

使用aiohttp.web创建异步API服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from aiohttp import web
import asyncio
import json

# 模拟数据库
class AsyncDatabase:
def __init__(self):
self.users = {}
self.next_id = 1

async def get_user(self, user_id):
await asyncio.sleep(0.1) # 模拟数据库查询延迟
return self.users.get(user_id)

async def get_all_users(self):
await asyncio.sleep(0.2) # 模拟数据库查询延迟
return list(self.users.values())

async def create_user(self, name, email):
await asyncio.sleep(0.3) # 模拟数据库写入延迟
user_id = str(self.next_id)
self.next_id += 1
user = {"id": user_id, "name": name, "email": email}
self.users[user_id] = user
return user

async def update_user(self, user_id, name=None, email=None):
await asyncio.sleep(0.2) # 模拟数据库更新延迟
user = self.users.get(user_id)
if not user:
return None

if name:
user["name"] = name
if email:
user["email"] = email

return user

async def delete_user(self, user_id):
await asyncio.sleep(0.1) # 模拟数据库删除延迟
if user_id in self.users:
del self.users[user_id]
return True
return False

# 创建数据库实例
db = AsyncDatabase()

# 路由处理函数
async def get_users(request):
users = await db.get_all_users()
return web.json_response(users)

async def get_user(request):
user_id = request.match_info['id']
user = await db.get_user(user_id)
if user:
return web.json_response(user)
return web.json_response({"error": "User not found"}, status=404)

async def create_user(request):
data = await request.json()
name = data.get('name')
email = data.get('email')

if not name or not email:
return web.json_response(
{"error": "Name and email are required"},
status=400
)

user = await db.create_user(name, email)
return web.json_response(user, status=201)

async def update_user(request):
user_id = request.match_info['id']
data = await request.json()

user = await db.update_user(
user_id,
name=data.get('name'),
email=data.get('email')
)

if user:
return web.json_response(user)
return web.json_response({"error": "User not found"}, status=404)

async def delete_user(request):
user_id = request.match_info['id']
success = await db.delete_user(user_id)

if success:
return web.json_response({"status": "deleted"})
return web.json_response({"error": "User not found"}, status=404)

# 创建应用和路由
app = web.Application()
app.add_routes([
web.get('/users', get_users),
web.get('/users/{id}', get_user),
web.post('/users', create_user),
web.put('/users/{id}', update_user),
web.delete('/users/{id}', delete_user),
])

# 启动服务器
# web.run_app(app, port=8080)

常见陷阱和解决方案

陷阱1:阻塞事件循环

在异步代码中执行CPU密集型操作或阻塞I/O会阻塞整个事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time

async def good_practice():
# 使用asyncio.sleep进行非阻塞等待
await asyncio.sleep(1)
return "Good"

async def bad_practice():
# 阻塞事件循环
time.sleep(1)
return "Bad"

async def main():
# 这将顺序执行,总共需要2秒
start = time.time()
await bad_practice()
await bad_practice()
print(f"Bad practice: {time.time() - start:.2f}秒")

# 这将并发执行,总共需要1秒
start = time.time()
await asyncio.gather(good_practice(), good_practice())
print(f"Good practice: {time.time() - start:.2f}秒")

asyncio.run(main())

解决方案:对于CPU密集型任务,使用concurrent.futures.ProcessPoolExecutorasyncio.to_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import time
import concurrent.futures

def cpu_bound_task(n):
# 模拟CPU密集型任务
result = 0
for i in range(n):
result += i * i
return result

async def main():
# 使用进程池执行CPU密集型任务
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound_task, 10000000
)
print(f"计算结果: {result}")

# Python 3.9+: 使用asyncio.to_thread执行阻塞I/O
def blocking_io():
time.sleep(1)
return "完成I/O操作"

result = await asyncio.to_thread(blocking_io)
print(result)

asyncio.run(main())

陷阱2:忘记await协程

忘记await协程是一个常见错误,会导致协程不执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def say_hello():
print("Hello, World!")

async def main():
# 错误: 协程未被等待
say_hello() # 这只会创建协程对象,不会执行

# 正确: 等待协程
await say_hello()

# 或者创建任务
task = asyncio.create_task(say_hello())
await task

asyncio.run(main())

解决方案:始终使用awaitasyncio.create_task()来执行协程。

陷阱3:嵌套事件循环

在已有事件循环内创建新的事件循环会导致问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def inner():
# 错误: 在协程中创建新的事件循环
# loop = asyncio.new_event_loop()
# loop.run_until_complete(asyncio.sleep(1))

# 正确: 使用现有事件循环
await asyncio.sleep(1)

async def main():
await inner()

asyncio.run(main())

解决方案:在异步代码中,始终使用await而不是创建新的事件循环。

结论

Python的异步编程模型提供了一种强大的方式来处理并发操作,特别是I/O密集型任务。通过掌握asyncio库和本文介绍的技巧,你可以编写高效、可维护的异步代码。

记住,异步编程不是万能的——它主要适用于I/O密集型任务,而不是CPU密集型任务。对于后者,多进程仍然是更好的选择。

最后,随着Python异步生态系统的不断发展,越来越多的库开始支持异步操作,使得构建完全异步的应用变得更加容易。无论是Web服务器、数据库访问还是网络爬虫,异步编程都能帮助你构建更高效的应用。

你有什么关于Python异步编程的问题或经验吗?欢迎在评论中分享!

本站由 提供部署服务