Python并发编程模型:线程、进程与异步IO的全面对比
在现代软件开发中,并发编程已经成为提高应用性能和响应能力的关键技术。Python提供了多种并发编程模型,包括多线程、多进程和异步IO。每种模型都有其独特的优势和适用场景。在这篇文章中,我将深入探讨这些并发模型,帮助你选择最适合自己项目的方案。
Python并发编程的挑战:GIL
在讨论Python并发模型之前,我们需要了解Python的全局解释器锁(Global Interpreter Lock,简称GIL)。GIL是CPython解释器的一个机制,它确保同一时刻只有一个线程执行Python字节码。这意味着在多核处理器上,Python的多线程无法实现真正的并行计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import threading import time
def cpu_bound_task(n): if n <= 1: return n return cpu_bound_task(n-1) + cpu_bound_task(n-2)
def run_in_thread(): result = cpu_bound_task(35) print(f"计算结果: {result}")
t1 = threading.Thread(target=run_in_thread) t2 = threading.Thread(target=run_in_thread)
start_time = time.time()
t1.start() t2.start()
t1.join() t2.join()
elapsed = time.time() - start_time print(f"总耗时: {elapsed:.2f}秒")
|
在上面的例子中,尽管我们使用了两个线程,但由于GIL的存在,这两个线程实际上是轮流执行的,总耗时可能比单线程执行两次任务的时间还要长(因为线程切换有开销)。
然而,GIL并不意味着Python无法有效地进行并发编程。接下来,我们将探讨如何在Python中实现真正的并发。
多线程编程
尽管有GIL的限制,多线程在IO密集型任务中仍然非常有效,因为在等待IO操作时,线程会释放GIL,允许其他线程执行。
使用threading模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import threading import requests import time
def download_site(url): print(f"开始下载 {url}") response = requests.get(url) print(f"完成下载 {url}, 大小: {len(response.content)} 字节")
def download_all_sites(sites): threads = [] for url in sites: thread = threading.Thread(target=download_site, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join()
if __name__ == "__main__": sites = [ "https://www.example.com", "https://www.python.org", "https://www.github.com", "https://www.stackoverflow.com", "https://www.wikipedia.org", ] * 5 start_time = time.time() download_all_sites(sites) elapsed = time.time() - start_time print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒")
|
线程安全和同步
当多个线程访问共享资源时,需要使用同步机制来避免竞态条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import threading import time
counter = 0 counter_lock = threading.Lock()
def increment_counter(n): global counter for _ in range(n): with counter_lock: counter += 1
t1 = threading.Thread(target=increment_counter, args=(100000,)) t2 = threading.Thread(target=increment_counter, args=(100000,))
t1.start() t2.start()
t1.join() t2.join()
print(f"最终计数: {counter}")
|
线程池
对于大量线程任务,使用线程池可以更有效地管理线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import concurrent.futures import requests import time
def download_site(url): print(f"开始下载 {url}") response = requests.get(url) print(f"完成下载 {url}, 大小: {len(response.content)} 字节") return len(response.content)
def download_all_sites(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(download_site, url): url for url in sites} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data_size = future.result() print(f"{url} 下载完成,大小: {data_size} 字节") except Exception as e: print(f"{url} 下载失败: {e}")
if __name__ == "__main__": sites = [ "https://www.example.com", "https://www.python.org", "https://www.github.com", "https://www.stackoverflow.com", "https://www.wikipedia.org", ] * 5 start_time = time.time() download_all_sites(sites) elapsed = time.time() - start_time print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒")
|
多线程的优缺点
优点:
- 轻量级,创建和切换成本低
- 共享内存,线程间通信简单
- 适合IO密集型任务
缺点:
- 受GIL限制,无法实现真正的并行计算
- 线程安全问题复杂
- 调试困难
- 可能出现死锁、竞态条件等并发问题
多进程编程
多进程可以绕过GIL的限制,实现真正的并行计算,特别适合CPU密集型任务。
使用multiprocessing模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import multiprocessing import time
def cpu_bound_task(n): if n <= 1: return n return cpu_bound_task(n-1) + cpu_bound_task(n-2)
def run_in_process(): result = cpu_bound_task(35) print(f"计算结果: {result}")
if __name__ == "__main__": p1 = multiprocessing.Process(target=run_in_process) p2 = multiprocessing.Process(target=run_in_process) start_time = time.time() p1.start() p2.start() p1.join() p2.join() elapsed = time.time() - start_time print(f"总耗时: {elapsed:.2f}秒")
|
进程池
与线程池类似,进程池可以更有效地管理进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import concurrent.futures import time
def cpu_bound_task(n): if n <= 1: return n return cpu_bound_task(n-1) + cpu_bound_task(n-2)
def main(): numbers = [34, 35, 36, 37] with concurrent.futures.ProcessPoolExecutor() as executor: results = executor.map(cpu_bound_task, numbers) for number, result in zip(numbers, results): print(f"fibonacci({number}) = {result}")
if __name__ == "__main__": start_time = time.time() main() elapsed = time.time() - start_time print(f"总耗时: {elapsed:.2f}秒")
|
进程间通信
由于进程不共享内存,进程间通信需要特殊机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import multiprocessing import time
def producer(queue): print("生产者进程开始") for i in range(10): item = f"项目-{i}" queue.put(item) print(f"生产: {item}") time.sleep(0.5) queue.put(None) print("生产者进程结束")
def consumer(queue): print("消费者进程开始") while True: item = queue.get() if item is None: break print(f"消费: {item}") time.sleep(1) print("消费者进程结束")
if __name__ == "__main__": queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join() print("所有进程已完成")
|
多进程的优缺点
优点:
- 可以绕过GIL,实现真正的并行计算
- 进程间相互独立,一个进程崩溃不会影响其他进程
- 适合CPU密集型任务
缺点:
- 创建和切换成本高
- 内存占用大
- 进程间通信复杂
- 启动时间较长
异步IO编程
异步IO是一种单线程的并发模型,它通过事件循环和协程实现非阻塞操作,特别适合IO密集型任务。
使用asyncio模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import asyncio import aiohttp import time
async def download_site(session, url): print(f"开始下载 {url}") async with session.get(url) as response: content = await response.read() print(f"完成下载 {url}, 大小: {len(content)} 字节") return len(content)
async def download_all_sites(sites): async with aiohttp.ClientSession() as session: tasks = [] for url in sites: task = asyncio.create_task(download_site(session, url)) tasks.append(task) results = await asyncio.gather(*tasks) return results
if __name__ == "__main__": sites = [ "https://www.example.com", "https://www.python.org", "https://www.github.com", "https://www.stackoverflow.com", "https://www.wikipedia.org", ] * 5 start_time = time.time() results = asyncio.run(download_all_sites(sites)) elapsed = time.time() - start_time print(f"下载 {len(sites)} 个网站耗时: {elapsed:.2f}秒") print(f"总下载大小: {sum(results)} 字节")
|
异步上下文管理器和异步迭代器
Python 3.7+提供了异步上下文管理器和异步迭代器的语法支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import asyncio import aiofiles
async def read_large_file(): async with aiofiles.open('large_file.txt', 'r') as f: contents = await f.read() return contents
async def process_lines(): async with aiofiles.open('large_file.txt', 'r') as f: async for line in f: await process_line(line)
async def process_line(line): await asyncio.sleep(0.01) return line.upper()
asyncio.run(read_large_file()) asyncio.run(process_lines())
|
结合asyncio和其他并发模型
有时候,我们需要结合使用异步IO和其他并发模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import asyncio import concurrent.futures import time
def cpu_bound_task(n): time.sleep(n) return n * n
async def main(): with concurrent.futures.ThreadPoolExecutor() as pool: loop = asyncio.get_running_loop() tasks = [ loop.run_in_executor(pool, cpu_bound_task, i) for i in range(1, 6) ] io_tasks = [io_bound_task(i) for i in range(1, 6)] all_results = await asyncio.gather(*tasks, *io_tasks) print(f"所有结果: {all_results}")
async def io_bound_task(n): await asyncio.sleep(n) return n * 10
if __name__ == "__main__": start_time = time.time() asyncio.run(main()) elapsed = time.time() - start_time print(f"总耗时: {elapsed:.2f}秒")
|
异步IO的优缺点
优点:
- 单线程模型,避免了多线程的复杂性
- 高效处理大量并发IO操作
- 代码结构清晰,使用
async/await语法
- 内存占用低
缺点:
- 不适合CPU密集型任务
- 需要特殊的异步库支持
- 调试可能复杂
- 所有异步代码需要使用
async/await语法
三种并发模型的对比
让我们通过一个实际例子来对比这三种并发模型的性能差异:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import time import threading import multiprocessing import asyncio import aiohttp import requests import concurrent.futures
URLS = [ "https://www.example.com", "https://www.python.org", "https://www.github.com", "https://www.stackoverflow.com", "https://www.wikipedia.org", ] * 5
def download_serial(): start_time = time.time() for url in URLS: response = requests.get(url) print(f"串行: 下载 {url}, 大小: {len(response.content)} 字节") elapsed = time.time() - start_time print(f"串行下载耗时: {elapsed:.2f}秒")
def download_threaded(): start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(requests.get, url): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: response = future.result() print(f"多线程: 下载 {url}, 大小: {len(response.content)} 字节") except Exception as e: print(f"多线程: 下载 {url} 失败: {e}") elapsed = time.time() - start_time print(f"多线程下载耗时: {elapsed:.2f}秒")
def download_url(url): response = requests.get(url) return url, len(response.content)
def download_multiprocess(): start_time = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: results = executor.map(download_url, URLS) for url, size in results: print(f"多进程: 下载 {url}, 大小: {size} 字节") elapsed = time.time() - start_time print(f"多进程下载耗时: {elapsed:.2f}秒")
async def fetch(session, url): async with session.get(url) as response: content = await response.read() print(f"异步IO: 下载 {url}, 大小: {len(content)} 字节") return len(content)
async def download_async(): start_time = time.time() async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in URLS] await asyncio.gather(*tasks) elapsed = time.time() - start_time print(f"异步IO下载耗时: {elapsed:.2f}秒")
if __name__ == "__main__": print("1. 开始串行下载测试...") download_serial() print("\n2. 开始多线程下载测试...") download_threaded() print("\n3. 开始多进程下载测试...") download_multiprocess() print("\n4. 开始异步IO下载测试...") asyncio.run(download_async())
|
性能对比结果分析
在IO密集型任务(如网络请求)中:
- 串行执行:最慢,因为每个请求都会阻塞后续请求
- 多线程:比串行快很多,因为在等待IO时可以切换到其他线程
- 多进程:可能比多线程慢,因为进程创建和通信开销大
- 异步IO:通常是最快的,因为它避免了线程切换的开销
在CPU密集型任务中:
- 串行执行:简单但慢
- 多线程:受GIL限制,可能比串行还慢(因为线程切换开销)
- 多进程:通常是最快的,因为可以利用多核处理器
- 异步IO:不适合CPU密集型任务,性能可能很差
选择合适的并发模型
根据任务类型选择合适的并发模型:
IO密集型任务(网络请求、文件操作等):
- 首选:异步IO(asyncio)
- 次选:多线程(threading)
CPU密集型任务(计算、数据处理等):
- 首选:多进程(multiprocessing)
- 次选:考虑使用C扩展或其他语言实现计算密集部分
混合型任务:
- 考虑结合使用多种并发模型
- 例如:使用asyncio处理IO,使用ProcessPoolExecutor处理CPU密集计算
实际应用案例
案例1:Web爬虫
Web爬虫是典型的IO密集型任务,适合使用异步IO:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import asyncio import aiohttp from bs4 import BeautifulSoup import time
async def fetch_html(session, url): try: async with session.get(url, timeout=10) as response: return await response.text() except Exception as e: print(f"获取 {url} 失败: {e}") return None
async def parse_and_extract_links(html, base_url): if not html: return [] soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if href.startswith('http'): links.append(href) elif href.startswith('/'): links.append(f"{base_url}{href}") return links
async def crawl(start_url, max_depth=2, max_urls=100): visited_urls = set() urls_to_visit = [(start_url, 0)] base_url = '/'.join(start_url.split('/')[:3]) async with aiohttp.ClientSession() as session: while urls_to_visit and len(visited_urls) < max_urls: url, depth = urls_to_visit.pop(0) if url in visited_urls or depth > max_depth: continue print(f"爬取 ({depth}): {url}") visited_urls.add(url) html = await fetch_html(session, url) if not html: continue if depth < max_depth: links = await parse_and_extract_links(html, base_url) for link in links: if link not in visited_urls: urls_to_visit.append((link, depth + 1)) return visited_urls
async def main(): start_time = time.time() urls = await crawl("https://www.python.org", max_depth=2, max_urls=50) elapsed = time.time() - start_time print(f"\n爬取完成! 共爬取 {len(urls)} 个URL") print(f"总耗时: {elapsed:.2f}秒")
if __name__ == "__main__": asyncio.run(main())
|
案例2:图像处理
图像处理是典型的CPU密集型任务,适合使用多进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import os import time from PIL import Image, ImageFilter import concurrent.futures
def process_image(image_path, output_dir, blur_radius=2): try: img = Image.open(image_path) blurred_img = img.filter(ImageFilter.GaussianBlur(blur_radius)) filename = os.path.basename(image_path) output_path = os.path.join(output_dir, f"blurred_{filename}") blurred_img.save(output_path) return output_path except Exception as e: print(f"处理图像 {image_path} 失败: {e}") return None
def process_images_parallel(image_dir, output_dir, max_workers=None): os.makedirs(output_dir, exist_ok=True) image_files = [ os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')) ] start_time = time.time() processed_files = [] with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: future_to_path = { executor.submit(process_image, path, output_dir): path for path in image_files } for future in concurrent.futures.as_completed(future_to_path): path = future_to_path[future] try: output_path = future.result() if output_path: processed_files.append(output_path) print(f"已处理: {path} -> {output_path}") except Exception as e: print(f"处理 {path} 时出错: {e}") elapsed = time.time() - start_time print(f"\n处理完成! 共处理 {len(processed_files)} 个图像") print(f"总耗时: {elapsed:.2f}秒") return processed_files
if __name__ == "__main__": process_images_parallel("./images", "./processed_images")
|
案例3:Web服务器
Web服务器需要处理大量并发请求,适合使用异步IO:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| from aiohttp import web import asyncio import aiofiles import time
async def fetch_from_db(user_id): await asyncio.sleep(0.1) return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
async def call_external_api(user_id): await asyncio.sleep(0.2) return {"status": "active", "last_login": "2023-01-01"}
async def log_request(request_info): async with aiofiles.open("server.log", "a") as f: log_line = f"{time.time()},{request_info['ip']},{request_info['path']}\n" await f.write(log_line)
async def handle_user(request): user_id = request.match_info.get('user_id', '1') await log_request({ "ip": request.remote, "path": request.path }) user_data, user_status = await asyncio.gather( fetch_from_db(user_id), call_external_api(user_id) ) result = {**user_data, **user_status} return web.json_response(result)
async def handle_index(request): return web.Response(text="Welcome to the Async Web Server!")
async def create_app(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/users/{user_id}', handle_user) return app
if __name__ == '__main__': web.run_app(create_app())
|
高级并发模式
生产者-消费者模式
生产者-消费者是一种常见的并发模式,可以用多种方式实现:
使用队列和线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import threading import queue import time import random
task_queue = queue.Queue(maxsize=10) result_queue = queue.Queue()
def producer(num_tasks): for i in range(num_tasks): task = f"Task-{i}" task_queue.put(task) print(f"生产者: 添加 {task}") time.sleep(random.uniform(0.1, 0.3)) for _ in range(3): task_queue.put(None) print("生产者: 完成所有任务")
def consumer(consumer_id): while True: task = task_queue.get() if task is None: print(f"消费者-{consumer_id}: 收到结束信号") task_queue.task_done() break print(f"消费者-{consumer_id}: 处理 {task}") time.sleep(random.uniform(0.5, 1.0)) result = f"Result of {task} by Consumer-{consumer_id}" result_queue.put(result) task_queue.task_done() print(f"消费者-{consumer_id}: 退出")
def result_processor(): results = [] while True: try: result = result_queue.get(timeout=5) results.append(result) result_queue.task_done() print(f"结果处理器: 收到 {result}") except queue.Empty: print("结果处理器: 超时,退出") break print(f"结果处理器: 共处理 {len(results)} 个结果") return results
def main(): num_tasks = 20 producer_thread = threading.Thread(target=producer, args=(num_tasks,)) consumer_threads = [ threading.Thread(target=consumer, args=(i,)) for i in range(3) ] result_thread = threading.Thread(target=result_processor) producer_thread.start() for t in consumer_threads: t.start() result_thread.start() producer_thread.join() for t in consumer_threads: t.join() result_thread.join() print("所有线程已完成")
if __name__ == "__main__": main()
|
使用asyncio
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| import asyncio import random
async def producer(queue, num_tasks): for i in range(num_tasks): task = f"Task-{i}" await queue.put(task) print(f"生产者: 添加 {task}") await asyncio.sleep(random.uniform(0.1, 0.3)) for _ in range(3): await queue.put(None) print("生产者: 完成所有任务")
async def consumer(consumer_id, task_queue, result_queue): while True: task = await task_queue.get() if task is None: print(f"消费者-{consumer_id}: 收到结束信号") task_queue.task_done() break print(f"消费者-{consumer_id}: 处理 {task}") await asyncio.sleep(random.uniform(0.5, 1.0)) result = f"Result of {task} by Consumer-{consumer_id}" await result_queue.put(result) task_queue.task_done() print(f"消费者-{consumer_id}: 退出")
async def result_processor(queue): results = [] while True: try: result = await asyncio.wait_for(queue.get(), timeout=5) results.append(result) queue.task_done() print(f"结果处理器: 收到 {result}") except asyncio.TimeoutError: print("结果处理器: 超时,退出") break print(f"结果处理器: 共处理 {len(results)} 个结果") return results
async def main(): num_tasks = 20 task_queue = asyncio.Queue() result_queue = asyncio.Queue() producer_task = asyncio.create_task(producer(task_queue, num_tasks)) consumer_tasks = [ asyncio.create_task(consumer(i, task_queue, result_queue)) for i in range(3) ] result_task = asyncio.create_task(result_processor(result_queue)) await producer_task await asyncio.gather(*consumer_tasks) results = await result_task print("所有任务已完成")
if __name__ == "__main__": asyncio.run(main())
|
线程池与进程池的高级用法
使用上下文变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import concurrent.futures import threading import time
thread_local = threading.local()
def initialize_worker(): thread_local.worker_id = threading.get_ident() thread_local.start_time = time.time() print(f"初始化工作线程 {thread_local.worker_id}")
def process_task(task): worker_id = thread_local.worker_id elapsed = time.time() - thread_local.start_time print(f"工作线程 {worker_id} 处理任务 {task},已运行 {elapsed:.2f} 秒") time.sleep(0.5) return f"任务 {task} 的结果"
def main(): tasks = list(range(10)) with concurrent.futures.ThreadPoolExecutor( max_workers=3, initializer=initialize_worker ) as executor: results = list(executor.map(process_task, tasks)) print("所有任务完成:", results)
if __name__ == "__main__": main()
|
自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import threading import queue import time
class CustomThreadPool: def __init__(self, num_workers): self.task_queue = queue.Queue() self.workers = [] self.results = {} self.result_lock = threading.Lock() self.task_counter = 0 self.shutdown_flag = False for _ in range(num_workers): worker = threading.Thread(target=self._worker_loop) worker.daemon = True worker.start() self.workers.append(worker) def _worker_loop(self): while not self.shutdown_flag: try: task_id, func, args, kwargs = self.task_queue.get(timeout=0.5) try: result = func(*args, **kwargs) with self.result_lock: self.results[task_id] = (True, result) except Exception as e: with self.result_lock: self.results[task_id] = (False, e) finally: self.task_queue.task_done() except queue.Empty: continue def submit(self, func, *args, **kwargs): if self.shutdown_flag: raise RuntimeError("线程池已关闭") task_id = self.task_counter self.task_counter += 1 self.task_queue.put((task_id, func, args, kwargs)) return task_id def get_result(self, task_id, timeout=None): end_time = None if timeout is None else time.time() + timeout while True: with self.result_lock: if task_id in self.results: success, result = self.results.pop(task_id) if success: return result else: raise result if end_time is not None and time.time() > end_time: raise TimeoutError(f"任务 {task_id} 等待超时") time.sleep(0.01) def shutdown(self, wait=True): self.shutdown_flag = True if wait: for worker in self.workers: worker.join()
def example_task(n): time.sleep(0.5) return n * n
def main(): pool = CustomThreadPool(num_workers=3) task_ids = [pool.submit(example_task, i) for i in range(10)] results = [pool.get_result(task_id) for task_id in task_ids] print("结果:", results) pool.shutdown()
if __name__ == "__main__": main()
|
异步迭代器和异步生成器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| import asyncio import aiohttp
class AsyncURLFetcher: def __init__(self, urls): self.urls = urls self.index = 0 def __aiter__(self): return self async def __anext__(self): if self.index >= len(self.urls): raise StopAsyncIteration url = self.urls[self.index] self.index += 1 async with aiohttp.ClientSession() as session: async with session.get(url) as response: return url, await response.text()
async def fetch_urls(): urls = [ "https://www.example.com", "https://www.python.org", "https://www.github.com" ] fetcher = AsyncURLFetcher(urls) async for url, html in fetcher: print(f"获取 {url}, 内容长度: {len(html)} 字节")
async def async_range(start, stop): for i in range(start, stop): await asyncio.sleep(0.1) yield i
async def use_async_generator(): async for i in async_range(1, 5): print(f"生成值: {i}")
async def main(): await fetch_urls() await use_async_generator()
if __name__ == "__main__": asyncio.run(main())
|
并发编程的常见陷阱和最佳实践
常见陷阱
死锁:两个或多个线程互相等待对方释放资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| lock1 = threading.Lock() lock2 = threading.Lock()
def thread1_function(): with lock1: time.sleep(0.1) with lock2: print("线程1获取了两个锁")
def thread2_function(): with lock2: time.sleep(0.1) with lock1: print("线程2获取了两个锁")
|
竞态条件:多个线程同时访问共享资源导致不一致
1 2 3 4 5 6 7 8 9
| counter = 0
def increment(): global counter local_copy = counter local_copy += 1 time.sleep(0.001) counter = local_copy
|
GIL限制:在CPU密集型任务中使用多线程
1 2 3 4 5 6
| def cpu_intensive(): for _ in range(10000000): _ = 1 + 1
threads = [threading.Thread(target=cpu_intensive) for _ in range(4)]
|
资源泄漏:未正确关闭文件、连接等资源
1 2 3 4 5 6
| def process_file(): f = open("large_file.txt", "r") data = f.read() return data
|
异步代码中的阻塞操作:在异步函数中使用阻塞调用
1 2 3 4 5
| async def bad_async_function(): time.sleep(1) return "完成"
|
最佳实践
使用上下文管理器:自动管理资源的获取和释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| with open("file.txt", "r") as f: data = f.read()
class ThreadPoolManager: def __init__(self, max_workers): self.max_workers = max_workers def __enter__(self): self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) return self.pool def __exit__(self, exc_type, exc_val, exc_tb): self.pool.shutdown()
with ThreadPoolManager(max_workers=5) as pool: results = list(pool.map(process_item, items))
|
避免共享可变状态:尽量使用不可变数据或消息传递
1 2 3 4 5 6 7 8 9 10 11
| def producer(queue): for i in range(10): queue.put(i)
def consumer(queue): while True: item = queue.get() if item is None: break process_item(item)
|
使用适当的同步原语:选择合适的锁、信号量等
1 2 3 4 5 6 7 8 9 10 11 12
| lock = threading.RLock()
def recursive_function(): with lock: another_function()
def another_function(): with lock: print("安全的递归锁使用")
|
合理设置超时:避免无限等待
1 2 3 4 5 6 7 8 9 10 11
| try: result = queue.get(timeout=5) except queue.Empty: print("获取超时")
try: result = await asyncio.wait_for(async_function(), timeout=5) except asyncio.TimeoutError: print("异步操作超时")
|
使用线程/进程池:避免无限制创建线程/进程
1 2 3
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(process_item, items))
|
正确处理异常:确保异常不会导致线程/进程崩溃
1 2 3 4 5 6
| def worker(): try: process_data() except Exception as e: print(f"捕获到异常: {e}")
|
使用原子操作:避免需要锁的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import threading import time from queue import Queue
task_queue = Queue()
def producer(): for i in range(10): task_queue.put(i) time.sleep(0.1)
def consumer(): while True: try: item = task_queue.get(timeout=1) print(f"处理项目: {item}") task_queue.task_done() except Queue.Empty: break
|
结论
Python提供了多种并发编程模型,每种模型都有其优势和适用场景:
- 多线程:适合IO密集型任务,但受GIL限制
- 多进程:适合CPU密集型任务,可以绕过GIL,但有更高的资源开销
- 异步IO:适合IO密集型任务,单线程模型避免了多线程的复杂性
选择合适的并发模型应该基于任务的性质、性能需求和代码复杂度等因素。在实际应用中,有时候混合使用多种并发模型可能是最佳选择。
无论选择哪种并发模型,都需要注意避免常见的并发编程陷阱,如死锁、竞态条件和资源泄漏等。遵循最佳实践,如使用上下文管理器、避免共享可变状态、设置合理的超时等,可以帮助你编写更健壮、更高效的并发代码。
随着Python的发展,特别是异步IO功能的完善,Python的并发编程能力正在不断增强。掌握这些并发编程模型和技术,将使你能够充分利用现代硬件的性能,构建高效、可扩展的Python应用。
你有什么关于Python并发编程的问题或经验分享吗?欢迎在评论中讨论!