Python并发编程模型:线程、进程与异步IO的全面对比
Orion K Lv6

Python并发编程模型:线程、进程与异步IO的全面对比

在现代软件开发中,并发编程已经成为提高应用性能和响应能力的关键技术。Python提供了多种并发编程模型,包括多线程、多进程和异步IO。每种模型都有其独特的优势和适用场景。在这篇文章中,我将深入探讨这些并发模型,帮助你选择最适合自己项目的方案。

Python并发编程的挑战:GIL

在讨论Python并发模型之前,我们需要了解Python的全局解释器锁(Global Interpreter Lock,简称GIL)。GIL是CPython解释器的一个机制,它确保同一时刻只有一个线程执行Python字节码。这意味着在多核处理器上,Python的多线程无法实现真正的并行计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
import time

def cpu_bound_task(n):
# CPU密集型任务:计算斐波那契数列
if n <= 1:
return n
return cpu_bound_task(n-1) + cpu_bound_task(n-2)

def run_in_thread():
result = cpu_bound_task(35)
print(f"计算结果: {result}")

# 创建两个线程
t1 = threading.Thread(target=run_in_thread)
t2 = threading.Thread(target=run_in_thread)

# 记录开始时间
start_time = time.time()

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

# 计算总耗时
elapsed = time.time() - start_time
print(f"总耗时: {elapsed:.2f}秒")

在上面的例子中,尽管我们使用了两个线程,但由于GIL的存在,这两个线程实际上是轮流执行的,总耗时可能比单线程执行两次任务的时间还要长(因为线程切换有开销)。

然而,GIL并不意味着Python无法有效地进行并发编程。接下来,我们将探讨如何在Python中实现真正的并发。

多线程编程

尽管有GIL的限制,多线程在IO密集型任务中仍然非常有效,因为在等待IO操作时,线程会释放GIL,允许其他线程执行。

使用threading模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading
import requests
import time

def download_site(url):
print(f"开始下载 {url}")
response = requests.get(url)
print(f"完成下载 {url}, 大小: {len(response.content)} 字节")

def download_all_sites(sites):
threads = []
for url in sites:
thread = threading.Thread(target=download_site, args=(url,))
threads.append(thread)
thread.start()

# 等待所有线程完成
for thread in threads:
thread.join()

if __name__ == "__main__":
sites = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
] * 5 # 重复5次,共25个URL

start_time = time.time()
download_all_sites(sites)
elapsed = time.time() - start_time
print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒")

线程安全和同步

当多个线程访问共享资源时,需要使用同步机制来避免竞态条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time

# 共享资源
counter = 0
counter_lock = threading.Lock()

def increment_counter(n):
global counter
for _ in range(n):
# 获取锁
with counter_lock:
counter += 1

# 创建两个线程
t1 = threading.Thread(target=increment_counter, args=(100000,))
t2 = threading.Thread(target=increment_counter, args=(100000,))

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print(f"最终计数: {counter}") # 应该是200000

线程池

对于大量线程任务,使用线程池可以更有效地管理线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import concurrent.futures
import requests
import time

def download_site(url):
print(f"开始下载 {url}")
response = requests.get(url)
print(f"完成下载 {url}, 大小: {len(response.content)} 字节")
return len(response.content)

def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务并获取Future对象
future_to_url = {executor.submit(download_site, url): url for url in sites}

# 处理完成的任务
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data_size = future.result()
print(f"{url} 下载完成,大小: {data_size} 字节")
except Exception as e:
print(f"{url} 下载失败: {e}")

if __name__ == "__main__":
sites = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
] * 5

start_time = time.time()
download_all_sites(sites)
elapsed = time.time() - start_time
print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒")

多线程的优缺点

优点

  • 轻量级,创建和切换成本低
  • 共享内存,线程间通信简单
  • 适合IO密集型任务

缺点

  • 受GIL限制,无法实现真正的并行计算
  • 线程安全问题复杂
  • 调试困难
  • 可能出现死锁、竞态条件等并发问题

多进程编程

多进程可以绕过GIL的限制,实现真正的并行计算,特别适合CPU密集型任务。

使用multiprocessing模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing
import time

def cpu_bound_task(n):
# CPU密集型任务:计算斐波那契数列
if n <= 1:
return n
return cpu_bound_task(n-1) + cpu_bound_task(n-2)

def run_in_process():
result = cpu_bound_task(35)
print(f"计算结果: {result}")

if __name__ == "__main__":
# 创建两个进程
p1 = multiprocessing.Process(target=run_in_process)
p2 = multiprocessing.Process(target=run_in_process)

# 记录开始时间
start_time = time.time()

# 启动进程
p1.start()
p2.start()

# 等待进程完成
p1.join()
p2.join()

# 计算总耗时
elapsed = time.time() - start_time
print(f"总耗时: {elapsed:.2f}秒")

进程池

与线程池类似,进程池可以更有效地管理进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import concurrent.futures
import time

def cpu_bound_task(n):
# CPU密集型任务:计算斐波那契数列
if n <= 1:
return n
return cpu_bound_task(n-1) + cpu_bound_task(n-2)

def main():
numbers = [34, 35, 36, 37]

# 使用进程池
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(cpu_bound_task, numbers)

for number, result in zip(numbers, results):
print(f"fibonacci({number}) = {result}")

if __name__ == "__main__":
start_time = time.time()
main()
elapsed = time.time() - start_time
print(f"总耗时: {elapsed:.2f}秒")

进程间通信

由于进程不共享内存,进程间通信需要特殊机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import multiprocessing
import time

def producer(queue):
print("生产者进程开始")
for i in range(10):
item = f"项目-{i}"
queue.put(item)
print(f"生产: {item}")
time.sleep(0.5)
# 发送结束信号
queue.put(None)
print("生产者进程结束")

def consumer(queue):
print("消费者进程开始")
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(1)
print("消费者进程结束")

if __name__ == "__main__":
# 创建进程间通信的队列
queue = multiprocessing.Queue()

# 创建生产者和消费者进程
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))

# 启动进程
p1.start()
p2.start()

# 等待进程完成
p1.join()
p2.join()

print("所有进程已完成")

多进程的优缺点

优点

  • 可以绕过GIL,实现真正的并行计算
  • 进程间相互独立,一个进程崩溃不会影响其他进程
  • 适合CPU密集型任务

缺点

  • 创建和切换成本高
  • 内存占用大
  • 进程间通信复杂
  • 启动时间较长

异步IO编程

异步IO是一种单线程的并发模型,它通过事件循环和协程实现非阻塞操作,特别适合IO密集型任务。

使用asyncio模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
import aiohttp
import time

async def download_site(session, url):
print(f"开始下载 {url}")
async with session.get(url) as response:
content = await response.read()
print(f"完成下载 {url}, 大小: {len(content)} 字节")
return len(content)

async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.create_task(download_site(session, url))
tasks.append(task)

# 等待所有任务完成
results = await asyncio.gather(*tasks)
return results

if __name__ == "__main__":
sites = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
] * 5

start_time = time.time()

# 运行异步任务
results = asyncio.run(download_all_sites(sites))

elapsed = time.time() - start_time
print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒")
print(f"总下载大小: {sum(results)} 字节")

异步上下文管理器和异步迭代器

Python 3.7+提供了异步上下文管理器和异步迭代器的语法支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import aiofiles

async def read_large_file():
async with aiofiles.open('large_file.txt', 'r') as f:
# 异步读取文件
contents = await f.read()
return contents

async def process_lines():
async with aiofiles.open('large_file.txt', 'r') as f:
# 异步迭代文件行
async for line in f:
await process_line(line)

async def process_line(line):
# 处理单行数据
await asyncio.sleep(0.01) # 模拟异步处理
return line.upper()

# 运行异步任务
asyncio.run(read_large_file())
asyncio.run(process_lines())

结合asyncio和其他并发模型

有时候,我们需要结合使用异步IO和其他并发模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import concurrent.futures
import time

def cpu_bound_task(n):
# CPU密集型任务
time.sleep(n) # 模拟CPU密集操作
return n * n

async def main():
# 创建线程池
with concurrent.futures.ThreadPoolExecutor() as pool:
# 在线程池中运行CPU密集型任务
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(pool, cpu_bound_task, i)
for i in range(1, 6)
]

# 同时运行IO密集型任务
io_tasks = [io_bound_task(i) for i in range(1, 6)]

# 等待所有任务完成
all_results = await asyncio.gather(*tasks, *io_tasks)
print(f"所有结果: {all_results}")

async def io_bound_task(n):
# IO密集型任务
await asyncio.sleep(n) # 模拟IO操作
return n * 10

if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
elapsed = time.time() - start_time
print(f"总耗时: {elapsed:.2f}秒")

异步IO的优缺点

优点

  • 单线程模型,避免了多线程的复杂性
  • 高效处理大量并发IO操作
  • 代码结构清晰,使用async/await语法
  • 内存占用低

缺点

  • 不适合CPU密集型任务
  • 需要特殊的异步库支持
  • 调试可能复杂
  • 所有异步代码需要使用async/await语法

三种并发模型的对比

让我们通过一个实际例子来对比这三种并发模型的性能差异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import time
import threading
import multiprocessing
import asyncio
import aiohttp
import requests
import concurrent.futures

# 测试URL列表
URLS = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
] * 5 # 共25个URL

# 1. 串行下载
def download_serial():
start_time = time.time()

for url in URLS:
response = requests.get(url)
print(f"串行: 下载 {url}, 大小: {len(response.content)} 字节")

elapsed = time.time() - start_time
print(f"串行下载耗时: {elapsed:.2f}秒")

# 2. 多线程下载
def download_threaded():
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(requests.get, url): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
response = future.result()
print(f"多线程: 下载 {url}, 大小: {len(response.content)} 字节")
except Exception as e:
print(f"多线程: 下载 {url} 失败: {e}")

elapsed = time.time() - start_time
print(f"多线程下载耗时: {elapsed:.2f}秒")

# 3. 多进程下载
def download_url(url):
response = requests.get(url)
return url, len(response.content)

def download_multiprocess():
start_time = time.time()

with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
results = executor.map(download_url, URLS)
for url, size in results:
print(f"多进程: 下载 {url}, 大小: {size} 字节")

elapsed = time.time() - start_time
print(f"多进程下载耗时: {elapsed:.2f}秒")

# 4. 异步IO下载
async def fetch(session, url):
async with session.get(url) as response:
content = await response.read()
print(f"异步IO: 下载 {url}, 大小: {len(content)} 字节")
return len(content)

async def download_async():
start_time = time.time()

async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in URLS]
await asyncio.gather(*tasks)

elapsed = time.time() - start_time
print(f"异步IO下载耗时: {elapsed:.2f}秒")

# 运行所有测试
if __name__ == "__main__":
print("1. 开始串行下载测试...")
download_serial()

print("\n2. 开始多线程下载测试...")
download_threaded()

print("\n3. 开始多进程下载测试...")
download_multiprocess()

print("\n4. 开始异步IO下载测试...")
asyncio.run(download_async())

性能对比结果分析

在IO密集型任务(如网络请求)中:

  • 串行执行:最慢,因为每个请求都会阻塞后续请求
  • 多线程:比串行快很多,因为在等待IO时可以切换到其他线程
  • 多进程:可能比多线程慢,因为进程创建和通信开销大
  • 异步IO:通常是最快的,因为它避免了线程切换的开销

在CPU密集型任务中:

  • 串行执行:简单但慢
  • 多线程:受GIL限制,可能比串行还慢(因为线程切换开销)
  • 多进程:通常是最快的,因为可以利用多核处理器
  • 异步IO:不适合CPU密集型任务,性能可能很差

选择合适的并发模型

根据任务类型选择合适的并发模型:

  1. IO密集型任务(网络请求、文件操作等):

    • 首选:异步IO(asyncio)
    • 次选:多线程(threading)
  2. CPU密集型任务(计算、数据处理等):

    • 首选:多进程(multiprocessing)
    • 次选:考虑使用C扩展或其他语言实现计算密集部分
  3. 混合型任务

    • 考虑结合使用多种并发模型
    • 例如:使用asyncio处理IO,使用ProcessPoolExecutor处理CPU密集计算

实际应用案例

案例1:Web爬虫

Web爬虫是典型的IO密集型任务,适合使用异步IO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_html(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None

async def parse_and_extract_links(html, base_url):
if not html:
return []

soup = BeautifulSoup(html, 'html.parser')
links = []

for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
if href.startswith('http'):
links.append(href)
elif href.startswith('/'):
links.append(f"{base_url}{href}")

return links

async def crawl(start_url, max_depth=2, max_urls=100):
visited_urls = set()
urls_to_visit = [(start_url, 0)] # (url, depth)
base_url = '/'.join(start_url.split('/')[:3]) # 提取基础URL

async with aiohttp.ClientSession() as session:
while urls_to_visit and len(visited_urls) < max_urls:
url, depth = urls_to_visit.pop(0)

if url in visited_urls or depth > max_depth:
continue

print(f"爬取 ({depth}): {url}")
visited_urls.add(url)

html = await fetch_html(session, url)
if not html:
continue

if depth < max_depth:
links = await parse_and_extract_links(html, base_url)
for link in links:
if link not in visited_urls:
urls_to_visit.append((link, depth + 1))

return visited_urls

async def main():
start_time = time.time()
urls = await crawl("https://www.python.org", max_depth=2, max_urls=50)
elapsed = time.time() - start_time

print(f"\n爬取完成! 共爬取 {len(urls)} 个URL")
print(f"总耗时: {elapsed:.2f}秒")

if __name__ == "__main__":
asyncio.run(main())

案例2:图像处理

图像处理是典型的CPU密集型任务,适合使用多进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import time
from PIL import Image, ImageFilter
import concurrent.futures

def process_image(image_path, output_dir, blur_radius=2):
try:
# 打开图像
img = Image.open(image_path)

# 应用模糊滤镜
blurred_img = img.filter(ImageFilter.GaussianBlur(blur_radius))

# 创建输出路径
filename = os.path.basename(image_path)
output_path = os.path.join(output_dir, f"blurred_{filename}")

# 保存处理后的图像
blurred_img.save(output_path)

return output_path
except Exception as e:
print(f"处理图像 {image_path} 失败: {e}")
return None

def process_images_parallel(image_dir, output_dir, max_workers=None):
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 获取所有图像文件
image_files = [
os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))
]

start_time = time.time()
processed_files = []

# 使用进程池处理图像
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_path = {
executor.submit(process_image, path, output_dir): path
for path in image_files
}

for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
try:
output_path = future.result()
if output_path:
processed_files.append(output_path)
print(f"已处理: {path} -> {output_path}")
except Exception as e:
print(f"处理 {path} 时出错: {e}")

elapsed = time.time() - start_time
print(f"\n处理完成! 共处理 {len(processed_files)} 个图像")
print(f"总耗时: {elapsed:.2f}秒")

return processed_files

if __name__ == "__main__":
# 替换为你的图像目录和输出目录
process_images_parallel("./images", "./processed_images")

案例3:Web服务器

Web服务器需要处理大量并发请求,适合使用异步IO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from aiohttp import web
import asyncio
import aiofiles
import time

# 模拟数据库操作
async def fetch_from_db(user_id):
await asyncio.sleep(0.1) # 模拟数据库查询延迟
return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}

# 模拟外部API调用
async def call_external_api(user_id):
await asyncio.sleep(0.2) # 模拟API调用延迟
return {"status": "active", "last_login": "2023-01-01"}

# 异步日志记录
async def log_request(request_info):
async with aiofiles.open("server.log", "a") as f:
log_line = f"{time.time()},{request_info['ip']},{request_info['path']}\n"
await f.write(log_line)

# 处理用户请求的路由处理器
async def handle_user(request):
user_id = request.match_info.get('user_id', '1')

# 记录请求
await log_request({
"ip": request.remote,
"path": request.path
})

# 并行获取用户数据和状态
user_data, user_status = await asyncio.gather(
fetch_from_db(user_id),
call_external_api(user_id)
)

# 合并结果
result = {**user_data, **user_status}

return web.json_response(result)

# 主页路由处理器
async def handle_index(request):
return web.Response(text="Welcome to the Async Web Server!")

# 创建应用
async def create_app():
app = web.Application()
app.router.add_get('/', handle_index)
app.router.add_get('/users/{user_id}', handle_user)
return app

if __name__ == '__main__':
web.run_app(create_app())

高级并发模式

生产者-消费者模式

生产者-消费者是一种常见的并发模式,可以用多种方式实现:

使用队列和线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import threading
import queue
import time
import random

# 共享队列
task_queue = queue.Queue(maxsize=10)
result_queue = queue.Queue()

# 生产者函数
def producer(num_tasks):
for i in range(num_tasks):
task = f"Task-{i}"
task_queue.put(task)
print(f"生产者: 添加 {task}")
time.sleep(random.uniform(0.1, 0.3)) # 随机延迟

# 添加结束标记
for _ in range(3): # 假设有3个消费者
task_queue.put(None)

print("生产者: 完成所有任务")

# 消费者函数
def consumer(consumer_id):
while True:
task = task_queue.get()
if task is None:
print(f"消费者-{consumer_id}: 收到结束信号")
task_queue.task_done()
break

print(f"消费者-{consumer_id}: 处理 {task}")
# 模拟处理任务
time.sleep(random.uniform(0.5, 1.0))

# 将结果放入结果队列
result = f"Result of {task} by Consumer-{consumer_id}"
result_queue.put(result)

task_queue.task_done()

print(f"消费者-{consumer_id}: 退出")

# 结果处理函数
def result_processor():
results = []
while True:
try:
result = result_queue.get(timeout=5)
results.append(result)
result_queue.task_done()
print(f"结果处理器: 收到 {result}")
except queue.Empty:
print("结果处理器: 超时,退出")
break

print(f"结果处理器: 共处理 {len(results)} 个结果")
return results

# 主函数
def main():
num_tasks = 20

# 创建并启动线程
producer_thread = threading.Thread(target=producer, args=(num_tasks,))
consumer_threads = [
threading.Thread(target=consumer, args=(i,))
for i in range(3)
]
result_thread = threading.Thread(target=result_processor)

producer_thread.start()
for t in consumer_threads:
t.start()
result_thread.start()

# 等待所有线程完成
producer_thread.join()
for t in consumer_threads:
t.join()
result_thread.join()

print("所有线程已完成")

if __name__ == "__main__":
main()

使用asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import random

async def producer(queue, num_tasks):
for i in range(num_tasks):
task = f"Task-{i}"
await queue.put(task)
print(f"生产者: 添加 {task}")
await asyncio.sleep(random.uniform(0.1, 0.3)) # 随机延迟

# 添加结束标记
for _ in range(3): # 假设有3个消费者
await queue.put(None)

print("生产者: 完成所有任务")

async def consumer(consumer_id, task_queue, result_queue):
while True:
task = await task_queue.get()
if task is None:
print(f"消费者-{consumer_id}: 收到结束信号")
task_queue.task_done()
break

print(f"消费者-{consumer_id}: 处理 {task}")
# 模拟处理任务
await asyncio.sleep(random.uniform(0.5, 1.0))

# 将结果放入结果队列
result = f"Result of {task} by Consumer-{consumer_id}"
await result_queue.put(result)

task_queue.task_done()

print(f"消费者-{consumer_id}: 退出")

async def result_processor(queue):
results = []
while True:
try:
result = await asyncio.wait_for(queue.get(), timeout=5)
results.append(result)
queue.task_done()
print(f"结果处理器: 收到 {result}")
except asyncio.TimeoutError:
print("结果处理器: 超时,退出")
break

print(f"结果处理器: 共处理 {len(results)} 个结果")
return results

async def main():
num_tasks = 20
task_queue = asyncio.Queue()
result_queue = asyncio.Queue()

# 创建任务
producer_task = asyncio.create_task(producer(task_queue, num_tasks))
consumer_tasks = [
asyncio.create_task(consumer(i, task_queue, result_queue))
for i in range(3)
]
result_task = asyncio.create_task(result_processor(result_queue))

# 等待所有任务完成
await producer_task
await asyncio.gather(*consumer_tasks)
results = await result_task

print("所有任务已完成")

if __name__ == "__main__":
asyncio.run(main())

线程池与进程池的高级用法

使用上下文变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import concurrent.futures
import threading
import time

# 创建线程本地存储
thread_local = threading.local()

def initialize_worker():
# 初始化线程本地数据
thread_local.worker_id = threading.get_ident()
thread_local.start_time = time.time()
print(f"初始化工作线程 {thread_local.worker_id}")

def process_task(task):
# 访问线程本地数据
worker_id = thread_local.worker_id
elapsed = time.time() - thread_local.start_time

print(f"工作线程 {worker_id} 处理任务 {task},已运行 {elapsed:.2f} 秒")
time.sleep(0.5) # 模拟工作
return f"任务 {task} 的结果"

def main():
tasks = list(range(10))

with concurrent.futures.ThreadPoolExecutor(
max_workers=3,
initializer=initialize_worker
) as executor:
results = list(executor.map(process_task, tasks))

print("所有任务完成:", results)

if __name__ == "__main__":
main()

自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import threading
import queue
import time

class CustomThreadPool:
def __init__(self, num_workers):
self.task_queue = queue.Queue()
self.workers = []
self.results = {}
self.result_lock = threading.Lock()
self.task_counter = 0
self.shutdown_flag = False

# 创建工作线程
for _ in range(num_workers):
worker = threading.Thread(target=self._worker_loop)
worker.daemon = True
worker.start()
self.workers.append(worker)

def _worker_loop(self):
while not self.shutdown_flag:
try:
task_id, func, args, kwargs = self.task_queue.get(timeout=0.5)
try:
result = func(*args, **kwargs)
with self.result_lock:
self.results[task_id] = (True, result)
except Exception as e:
with self.result_lock:
self.results[task_id] = (False, e)
finally:
self.task_queue.task_done()
except queue.Empty:
continue

def submit(self, func, *args, **kwargs):
if self.shutdown_flag:
raise RuntimeError("线程池已关闭")

task_id = self.task_counter
self.task_counter += 1
self.task_queue.put((task_id, func, args, kwargs))
return task_id

def get_result(self, task_id, timeout=None):
end_time = None if timeout is None else time.time() + timeout

while True:
with self.result_lock:
if task_id in self.results:
success, result = self.results.pop(task_id)
if success:
return result
else:
raise result

if end_time is not None and time.time() > end_time:
raise TimeoutError(f"任务 {task_id} 等待超时")

time.sleep(0.01)

def shutdown(self, wait=True):
self.shutdown_flag = True
if wait:
for worker in self.workers:
worker.join()

# 使用自定义线程池
def example_task(n):
time.sleep(0.5)
return n * n

def main():
pool = CustomThreadPool(num_workers=3)

# 提交任务
task_ids = [pool.submit(example_task, i) for i in range(10)]

# 获取结果
results = [pool.get_result(task_id) for task_id in task_ids]
print("结果:", results)

# 关闭线程池
pool.shutdown()

if __name__ == "__main__":
main()

异步迭代器和异步生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
import aiohttp

class AsyncURLFetcher:
def __init__(self, urls):
self.urls = urls
self.index = 0

def __aiter__(self):
return self

async def __anext__(self):
if self.index >= len(self.urls):
raise StopAsyncIteration

url = self.urls[self.index]
self.index += 1

async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return url, await response.text()

async def fetch_urls():
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]

fetcher = AsyncURLFetcher(urls)
async for url, html in fetcher:
print(f"获取 {url}, 内容长度: {len(html)} 字节")

# 异步生成器
async def async_range(start, stop):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i

async def use_async_generator():
async for i in async_range(1, 5):
print(f"生成值: {i}")

async def main():
await fetch_urls()
await use_async_generator()

if __name__ == "__main__":
asyncio.run(main())

并发编程的常见陷阱和最佳实践

常见陷阱

  1. 死锁:两个或多个线程互相等待对方释放资源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 可能导致死锁的代码
    lock1 = threading.Lock()
    lock2 = threading.Lock()

    def thread1_function():
    with lock1:
    time.sleep(0.1) # 增加死锁可能性
    with lock2:
    print("线程1获取了两个锁")

    def thread2_function():
    with lock2:
    time.sleep(0.1) # 增加死锁可能性
    with lock1:
    print("线程2获取了两个锁")
  2. 竞态条件:多个线程同时访问共享资源导致不一致

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 竞态条件示例
    counter = 0

    def increment():
    global counter
    local_copy = counter
    local_copy += 1
    time.sleep(0.001) # 增加竞态条件可能性
    counter = local_copy
  3. GIL限制:在CPU密集型任务中使用多线程

    1
    2
    3
    4
    5
    6
    # 受GIL限制的多线程代码
    def cpu_intensive():
    for _ in range(10000000):
    _ = 1 + 1

    threads = [threading.Thread(target=cpu_intensive) for _ in range(4)]
  4. 资源泄漏:未正确关闭文件、连接等资源

    1
    2
    3
    4
    5
    6
    # 资源泄漏示例
    def process_file():
    f = open("large_file.txt", "r")
    data = f.read()
    # 忘记关闭文件
    return data
  5. 异步代码中的阻塞操作:在异步函数中使用阻塞调用

    1
    2
    3
    4
    5
    # 错误的异步代码
    async def bad_async_function():
    # 这会阻塞事件循环
    time.sleep(1)
    return "完成"

最佳实践

  1. 使用上下文管理器:自动管理资源的获取和释放

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # 使用上下文管理器
    with open("file.txt", "r") as f:
    data = f.read()

    # 自定义上下文管理器
    class ThreadPoolManager:
    def __init__(self, max_workers):
    self.max_workers = max_workers

    def __enter__(self):
    self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
    return self.pool

    def __exit__(self, exc_type, exc_val, exc_tb):
    self.pool.shutdown()

    # 使用自定义上下文管理器
    with ThreadPoolManager(max_workers=5) as pool:
    results = list(pool.map(process_item, items))
  2. 避免共享可变状态:尽量使用不可变数据或消息传递

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 使用队列而不是共享变量
    def producer(queue):
    for i in range(10):
    queue.put(i)

    def consumer(queue):
    while True:
    item = queue.get()
    if item is None:
    break
    process_item(item)
  3. 使用适当的同步原语:选择合适的锁、信号量等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 使用RLock避免自锁死锁
    lock = threading.RLock()

    def recursive_function():
    with lock:
    # 可以再次获取同一个锁
    another_function()

    def another_function():
    with lock:
    # 不会死锁
    print("安全的递归锁使用")
  4. 合理设置超时:避免无限等待

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 设置超时
    try:
    result = queue.get(timeout=5)
    except queue.Empty:
    print("获取超时")

    # 异步超时
    try:
    result = await asyncio.wait_for(async_function(), timeout=5)
    except asyncio.TimeoutError:
    print("异步操作超时")
  5. 使用线程/进程池:避免无限制创建线程/进程

    1
    2
    3
    # 使用线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(process_item, items))
  6. 正确处理异常:确保异常不会导致线程/进程崩溃

    1
    2
    3
    4
    5
    6
    def worker():
    try:
    # 可能抛出异常的代码
    process_data()
    except Exception as e:
    print(f"捕获到异常: {e}")
  7. 使用原子操作:避免需要锁的场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import threading
    import time
    from queue import Queue

    # 使用Queue而不是手动同步的列表
    task_queue = Queue()

    def producer():
    for i in range(10):
    task_queue.put(i)
    time.sleep(0.1)

    def consumer():
    while True:
    try:
    item = task_queue.get(timeout=1)
    print(f"处理项目: {item}")
    task_queue.task_done()
    except Queue.Empty:
    break

结论

Python提供了多种并发编程模型,每种模型都有其优势和适用场景:

  1. 多线程:适合IO密集型任务,但受GIL限制
  2. 多进程:适合CPU密集型任务,可以绕过GIL,但有更高的资源开销
  3. 异步IO:适合IO密集型任务,单线程模型避免了多线程的复杂性

选择合适的并发模型应该基于任务的性质、性能需求和代码复杂度等因素。在实际应用中,有时候混合使用多种并发模型可能是最佳选择。

无论选择哪种并发模型,都需要注意避免常见的并发编程陷阱,如死锁、竞态条件和资源泄漏等。遵循最佳实践,如使用上下文管理器、避免共享可变状态、设置合理的超时等,可以帮助你编写更健壮、更高效的并发代码。

随着Python的发展,特别是异步IO功能的完善,Python的并发编程能力正在不断增强。掌握这些并发编程模型和技术,将使你能够充分利用现代硬件的性能,构建高效、可扩展的Python应用。

你有什么关于Python并发编程的问题或经验分享吗?欢迎在评论中讨论!

本站由 提供部署服务