Python高效错误处理:从基础到最佳实践
Orion K Lv6

Python高效错误处理:从基础到最佳实践

错误处理是编写健壮Python代码的关键部分。良好的错误处理不仅可以防止程序崩溃,还能提供有用的调试信息,改善用户体验。在这篇文章中,我将从基础概念到高级技巧,全面介绍Python中的错误处理最佳实践。

异常处理基础

Python异常层次结构

Python有一个丰富的内置异常层次结构,所有异常都继承自BaseException类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
BaseException
├── SystemExit
├── KeyboardInterrupt
├── GeneratorExit
└── Exception
├── StopIteration
├── ArithmeticError
│ ├── FloatingPointError
│ ├── OverflowError
│ └── ZeroDivisionError
├── AssertionError
├── AttributeError
├── BufferError
├── EOFError
├── ImportError
│ └── ModuleNotFoundError
├── LookupError
│ ├── IndexError
│ └── KeyError
├── MemoryError
├── NameError
│ └── UnboundLocalError
├── OSError
│ ├── BlockingIOError
│ ├── ChildProcessError
│ ├── ConnectionError
│ │ ├── BrokenPipeError
│ │ ├── ConnectionAbortedError
│ │ ├── ConnectionRefusedError
│ │ └── ConnectionResetError
│ ├── FileExistsError
│ ├── FileNotFoundError
│ ├── InterruptedError
│ ├── IsADirectoryError
│ ├── NotADirectoryError
│ ├── PermissionError
│ ├── ProcessLookupError
│ └── TimeoutError
├── ReferenceError
├── RuntimeError
│ ├── NotImplementedError
│ └── RecursionError
├── SyntaxError
│ └── IndentationError
│ └── TabError
├── SystemError
├── TypeError
├── ValueError
│ └── UnicodeError
│ ├── UnicodeDecodeError
│ ├── UnicodeEncodeError
│ └── UnicodeTranslateError
└── Warning
├── DeprecationWarning
├── PendingDeprecationWarning
├── RuntimeWarning
├── SyntaxWarning
├── UserWarning
├── FutureWarning
├── ImportWarning
├── UnicodeWarning
├── BytesWarning
└── ResourceWarning

了解这个层次结构有助于我们选择合适的异常类型进行捕获和抛出。

基本的try-except语句

Python使用try-except语句来捕获和处理异常:

1
2
3
4
5
6
try:
# 可能引发异常的代码
result = 10 / 0
except ZeroDivisionError:
# 处理特定类型的异常
print("除数不能为零!")

捕获多种异常

可以在一个except子句中捕获多种异常,或使用多个except子句:

1
2
3
4
5
6
7
8
9
10
11
12
13
try:
# 可能引发异常的代码
value = int(input("请输入一个数字: "))
result = 10 / value
except ValueError:
# 处理ValueError异常
print("输入必须是一个数字!")
except ZeroDivisionError:
# 处理ZeroDivisionError异常
print("除数不能为零!")
except (TypeError, OverflowError):
# 同时处理多种异常
print("发生了类型错误或溢出错误!")

else和finally子句

try-except语句可以包含elsefinally子句:

1
2
3
4
5
6
7
8
9
10
11
12
13
try:
value = int(input("请输入一个数字: "))
result = 10 / value
except ValueError:
print("输入必须是一个数字!")
except ZeroDivisionError:
print("除数不能为零!")
else:
# 当try块中的代码没有引发异常时执行
print(f"结果是: {result}")
finally:
# 无论是否发生异常都会执行
print("计算完成,清理资源...")

高级异常处理技巧

获取异常信息

可以使用as关键字获取异常对象,以便访问其属性和方法:

1
2
3
4
5
6
7
try:
with open("不存在的文件.txt", "r") as file:
content = file.read()
except FileNotFoundError as e:
print(f"错误类型: {type(e).__name__}")
print(f"错误信息: {e}")
print(f"错误参数: {e.args}")

异常链和上下文

Python 3支持异常链,可以在抛出新异常时保留原始异常的上下文:

1
2
3
4
5
6
7
8
9
try:
try:
1 / 0
except ZeroDivisionError as e:
# 抛出新异常,同时保留原始异常
raise ValueError("计算失败") from e
except ValueError as e:
print(f"当前异常: {e}")
print(f"原始异常: {e.__cause__}")

也可以使用raise不带from子句来抑制异常链:

1
2
3
4
5
6
7
8
9
try:
try:
1 / 0
except ZeroDivisionError:
# 抛出新异常,不保留原始异常
raise ValueError("计算失败")
except ValueError as e:
print(f"当前异常: {e}")
print(f"原始异常: {e.__cause__}") # 输出None

自定义异常类

创建自定义异常类可以使错误处理更加明确和有意义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CustomError(Exception):
"""自定义异常的基类"""
pass

class ValueTooSmallError(CustomError):
"""当值小于最小允许值时抛出"""
def __init__(self, value, min_value):
self.value = value
self.min_value = min_value
self.message = f"提供的值 {value} 小于最小允许值 {min_value}"
super().__init__(self.message)

class ValueTooLargeError(CustomError):
"""当值大于最大允许值时抛出"""
def __init__(self, value, max_value):
self.value = value
self.max_value = max_value
self.message = f"提供的值 {value} 大于最大允许值 {max_value}"
super().__init__(self.message)

def validate_value(value, min_value=0, max_value=100):
if value < min_value:
raise ValueTooSmallError(value, min_value)
if value > max_value:
raise ValueTooLargeError(value, max_value)
return value

# 使用自定义异常
try:
validate_value(-5)
except ValueTooSmallError as e:
print(e)
except ValueTooLargeError as e:
print(e)

错误处理最佳实践

1. 只捕获预期的异常

不要使用空的except:子句捕获所有异常,这可能会掩盖真正的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 不好的做法
try:
# 一些代码
pass
except:
# 捕获所有异常
pass

# 好的做法
try:
# 一些代码
pass
except (ValueError, TypeError) as e:
# 只捕获预期的异常
print(f"处理特定错误: {e}")

如果确实需要捕获所有异常,至少应该使用Exception而不是空的except:,并记录异常信息:

1
2
3
4
5
6
7
8
9
try:
# 一些代码
pass
except Exception as e:
# 捕获所有Exception子类
print(f"发生错误: {e}")
# 记录异常信息
import traceback
traceback.print_exc()

2. 尽早抛出,延迟捕获

在代码的最低层抛出异常,在更高层次捕获和处理它们:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_user_data(user_id):
if not isinstance(user_id, int):
raise TypeError("user_id必须是整数")
if user_id <= 0:
raise ValueError("user_id必须是正整数")
# 获取用户数据...
return {"id": user_id, "name": "用户" + str(user_id)}

def process_user(user_id):
try:
user_data = get_user_data(user_id)
# 处理用户数据...
return user_data
except (TypeError, ValueError) as e:
print(f"处理用户 {user_id} 时出错: {e}")
return None

3. 使用上下文管理器处理资源

使用with语句和上下文管理器自动处理资源的获取和释放:

1
2
3
4
5
6
7
8
9
10
# 不好的做法
file = open("data.txt", "r")
try:
content = file.read()
finally:
file.close()

# 好的做法
with open("data.txt", "r") as file:
content = file.read()

自定义上下文管理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DatabaseConnection:
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None

def __enter__(self):
print("连接数据库...")
self.connection = self._connect()
return self.connection

def __exit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接...")
if self.connection:
self.connection.close()
# 如果返回True,则异常被抑制
return False

def _connect(self):
# 实际连接数据库的代码
return {"connected": True}

# 使用自定义上下文管理器
with DatabaseConnection("mysql://localhost/mydb") as conn:
# 使用数据库连接
print("执行数据库操作...")
# 如果这里发生异常,__exit__方法仍然会被调用

4. 使用异常而不是返回错误码

Python鼓励使用异常而不是返回错误码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 不好的做法
def divide(a, b):
if b == 0:
return None, "除数不能为零"
return a / b, None

result, error = divide(10, 0)
if error:
print(f"错误: {error}")
else:
print(f"结果: {result}")

# 好的做法
def divide(a, b):
if b == 0:
raise ZeroDivisionError("除数不能为零")
return a / b

try:
result = divide(10, 0)
print(f"结果: {result}")
except ZeroDivisionError as e:
print(f"错误: {e}")

5. 使用断言进行内部检查

断言用于验证代码的内部假设,而不是处理用户输入或外部条件:

1
2
3
4
5
6
def calculate_average(numbers):
# 断言用于验证内部假设
assert len(numbers) > 0, "列表不能为空"
return sum(numbers) / len(numbers)

# 注意:断言可以通过-O选项禁用,所以不要用它来检查用户输入

6. 结合使用日志和异常

使用日志记录异常信息,而不仅仅是打印它们:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='app.log'
)

def process_data(data):
try:
# 处理数据
result = data['key'] / 0
return result
except KeyError:
# 记录警告并继续
logging.warning("数据中缺少'key'字段")
return None
except ZeroDivisionError:
# 记录错误并重新抛出
logging.error("除零错误", exc_info=True)
raise
except Exception as e:
# 记录未预期的异常
logging.exception("处理数据时发生未预期的错误")
# 或者
logging.error("处理数据时发生错误: %s", str(e), exc_info=True)
raise

实际应用案例

案例1:API错误处理

在Web API中处理错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from flask import Flask, request, jsonify

app = Flask(__name__)

class APIError(Exception):
"""API错误的基类"""
def __init__(self, message, status_code=400, payload=None):
super().__init__(self)
self.message = message
self.status_code = status_code
self.payload = payload

def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
rv['status'] = 'error'
return rv

class ResourceNotFoundError(APIError):
"""资源未找到错误"""
def __init__(self, message="请求的资源不存在", payload=None):
super().__init__(message, status_code=404, payload=payload)

class ValidationError(APIError):
"""输入验证错误"""
def __init__(self, message="输入数据验证失败", payload=None):
super().__init__(message, status_code=400, payload=payload)

@app.errorhandler(APIError)
def handle_api_error(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
# 模拟用户数据库
users = {1: {"id": 1, "name": "张三"}}

if user_id not in users:
raise ResourceNotFoundError(f"用户ID {user_id} 不存在")

return jsonify({"status": "success", "data": users[user_id]})

@app.route('/users', methods=['POST'])
def create_user():
data = request.json

if not data:
raise ValidationError("没有提供JSON数据")

if 'name' not in data:
raise ValidationError("缺少必要字段: name", payload={"fields": ["name"]})

# 创建用户...
return jsonify({"status": "success", "message": "用户创建成功"})

if __name__ == '__main__':
app.run(debug=True)

案例2:数据处理错误处理

在数据处理管道中处理错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pandas as pd
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)

class DataProcessingError(Exception):
"""数据处理错误的基类"""
pass

class DataLoadError(DataProcessingError):
"""数据加载错误"""
pass

class DataCleaningError(DataProcessingError):
"""数据清洗错误"""
pass

class DataTransformError(DataProcessingError):
"""数据转换错误"""
pass

def load_data(file_path):
"""加载数据"""
try:
logging.info(f"加载数据: {file_path}")
if file_path.endswith('.csv'):
return pd.read_csv(file_path)
elif file_path.endswith('.xlsx'):
return pd.read_excel(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_path}")
except FileNotFoundError:
raise DataLoadError(f"文件不存在: {file_path}")
except pd.errors.EmptyDataError:
raise DataLoadError(f"文件为空: {file_path}")
except Exception as e:
raise DataLoadError(f"加载数据失败: {str(e)}")

def clean_data(df):
"""清洗数据"""
try:
logging.info("清洗数据")
# 检查必要的列
required_columns = ['id', 'name', 'value']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"缺少必要的列: {missing_columns}")

# 删除空行
df_cleaned = df.dropna(subset=required_columns)

# 检查是否有足够的数据
if len(df_cleaned) == 0:
raise ValueError("清洗后没有剩余数据")

return df_cleaned
except Exception as e:
raise DataCleaningError(f"清洗数据失败: {str(e)}")

def transform_data(df):
"""转换数据"""
try:
logging.info("转换数据")
# 转换数据类型
df['id'] = df['id'].astype(int)
df['value'] = df['value'].astype(float)

# 计算新列
df['squared_value'] = df['value'] ** 2

return df
except Exception as e:
raise DataTransformError(f"转换数据失败: {str(e)}")

def process_data_file(file_path):
"""处理数据文件的主函数"""
try:
# 加载数据
df = load_data(file_path)
logging.info(f"成功加载数据: {len(df)} 行")

# 清洗数据
df_cleaned = clean_data(df)
logging.info(f"清洗后数据: {len(df_cleaned)} 行")

# 转换数据
df_transformed = transform_data(df_cleaned)
logging.info("数据转换完成")

# 保存结果
output_path = file_path.replace('.', '_processed.')
df_transformed.to_csv(output_path, index=False)
logging.info(f"结果已保存到: {output_path}")

return True, output_path
except DataLoadError as e:
logging.error(f"数据加载错误: {e}")
return False, str(e)
except DataCleaningError as e:
logging.error(f"数据清洗错误: {e}")
return False, str(e)
except DataTransformError as e:
logging.error(f"数据转换错误: {e}")
return False, str(e)
except Exception as e:
logging.exception("处理数据时发生未预期的错误")
return False, f"未预期的错误: {str(e)}"

# 使用示例
success, result = process_data_file("data.csv")
if success:
print(f"处理成功,结果保存在: {result}")
else:
print(f"处理失败: {result}")

案例3:重试机制

实现带有重试机制的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import time
import random
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)

class RetryError(Exception):
"""重试失败后抛出的异常"""
def __init__(self, message, last_exception=None):
super().__init__(message)
self.last_exception = last_exception

def retry(func, max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""
执行函数,如果失败则重试

参数:
func: 要执行的函数
max_attempts: 最大尝试次数
delay: 初始延迟时间(秒)
backoff: 延迟时间的增长因子
exceptions: 要捕获的异常类型

返回:
函数的返回值

抛出:
RetryError: 如果所有尝试都失败
"""
attempt = 1
last_exception = None

while attempt <= max_attempts:
try:
logging.info(f"尝试 {attempt}/{max_attempts}")
return func()
except exceptions as e:
last_exception = e
if attempt == max_attempts:
break

wait_time = delay * (backoff ** (attempt - 1))
logging.warning(f"尝试 {attempt} 失败: {e}")
logging.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
attempt += 1

raise RetryError(
f"在 {max_attempts} 次尝试后失败",
last_exception=last_exception
)

# 使用示例
def unstable_network_request():
"""模拟不稳定的网络请求"""
# 80%的概率失败
if random.random() < 0.8:
raise ConnectionError("网络连接失败")
return "请求成功"

try:
result = retry(
unstable_network_request,
max_attempts=5,
delay=1,
backoff=2,
exceptions=(ConnectionError,)
)
print(f"结果: {result}")
except RetryError as e:
print(f"所有重试都失败了: {e}")
if e.last_exception:
print(f"最后一次异常: {e.last_exception}")

高级错误处理模式

1. 上下文管理器模式

使用上下文管理器进行错误处理和资源管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import contextlib

@contextlib.contextmanager
def error_handling_context(error_msg="操作失败"):
"""创建一个错误处理上下文"""
try:
yield
except Exception as e:
print(f"{error_msg}: {e}")
# 可以选择重新抛出或抑制异常
# raise
finally:
print("清理资源...")

# 使用上下文管理器
with error_handling_context("文件处理失败"):
with open("不存在的文件.txt", "r") as f:
content = f.read()

2. 装饰器模式

使用装饰器添加错误处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import functools
import logging

def error_handler(func=None, *, reraise=False, log_level=logging.ERROR):
"""
处理函数中的异常的装饰器

参数:
func: 被装饰的函数
reraise: 是否重新抛出异常
log_level: 日志级别
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.log(log_level, f"函数 {func.__name__} 执行失败: {e}", exc_info=True)
if reraise:
raise
return None
return wrapper

if func is None:
return decorator
return decorator(func)

# 使用装饰器
@error_handler(reraise=False, log_level=logging.WARNING)
def risky_function(x):
return 10 / x

result = risky_function(0) # 不会抛出异常,返回None
print(f"结果: {result}")

3. 异常过滤器模式

创建一个异常过滤器,根据条件决定是否处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ExceptionFilter:
"""异常过滤器,根据条件决定是否处理异常"""

def __init__(self):
self.handlers = []

def register(self, exception_type, condition=None, handler=None):
"""
注册异常处理器

参数:
exception_type: 异常类型
condition: 一个函数,接受异常对象,返回布尔值
handler: 一个函数,接受异常对象,处理异常
"""
self.handlers.append((exception_type, condition, handler))

def handle(self, exception):
"""
处理异常

参数:
exception: 异常对象

返回:
布尔值,表示异常是否被处理
"""
for exc_type, condition, handler in self.handlers:
if isinstance(exception, exc_type):
if condition is None or condition(exception):
if handler:
handler(exception)
return True
return False

# 使用异常过滤器
filter = ExceptionFilter()

# 注册处理器
filter.register(
ValueError,
condition=lambda e: "invalid" in str(e).lower(),
handler=lambda e: print(f"处理无效值错误: {e}")
)

filter.register(
ZeroDivisionError,
handler=lambda e: print(f"处理除零错误: {e}")
)

# 使用过滤器
try:
value = int("invalid")
except Exception as e:
if not filter.handle(e):
print(f"未处理的异常: {e}")
raise

4. 错误边界模式

创建错误边界,限制错误的传播范围:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class ErrorBoundary:
"""
错误边界,限制错误的传播范围
"""

def __init__(self, fallback=None, on_error=None):
"""
初始化错误边界

参数:
fallback: 发生错误时的回退值
on_error: 错误处理函数,接受异常对象
"""
self.fallback = fallback
self.on_error = on_error

def execute(self, func, *args, **kwargs):
"""
执行函数,捕获异常

参数:
func: 要执行的函数
args, kwargs: 函数参数

返回:
函数返回值或回退值
"""
try:
return func(*args, **kwargs)
except Exception as e:
if self.on_error:
self.on_error(e)
return self.fallback

# 使用错误边界
def process_item(item):
if item == 0:
raise ValueError("项目不能为零")
return 10 / item

# 创建错误边界
boundary = ErrorBoundary(
fallback=None,
on_error=lambda e: print(f"处理项目时出错: {e}")
)

# 处理项目列表
items = [5, 0, 2, "invalid", 8]
results = [boundary.execute(process_item, item) for item in items]
print(f"结果: {results}") # [2.0, None, 5.0, None, 1.25]

结论

良好的错误处理是编写健壮Python代码的关键。通过遵循本文介绍的最佳实践和模式,你可以创建更可靠、更易于维护的应用程序。记住以下几点:

  1. 明确异常类型:捕获特定的异常,而不是笼统地捕获所有异常
  2. 创建有意义的自定义异常:使错误信息更加明确
  3. 使用上下文管理器:自动处理资源的获取和释放
  4. 结合使用日志和异常:记录详细的错误信息
  5. 实现适当的重试机制:处理临时故障
  6. 建立错误边界:限制错误的传播范围

通过这些技术,你可以构建出既能优雅地处理错误,又能提供良好用户体验的Python应用程序。

你有什么关于Python错误处理的问题或经验分享吗?欢迎在评论中讨论!

本站由 提供部署服务