FastAPI错误处理与日志系统:构建健壮的生产级应用
Orion K Lv6

FastAPI错误处理与日志系统:构建健壮的生产级应用

在生产环境中,完善的错误处理和日志系统是确保应用稳定运行的关键。作为一名在多个大型FastAPI项目中负责错误处理和监控的开发者,我想分享一些实战经验,帮助你构建更加健壮和可维护的应用。

异常处理基础

1. 自定义异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from fastapi import HTTPException, status
from typing import Any, Dict, Optional
import uuid
from datetime import datetime

class BaseAPIException(Exception):
"""API异常基类"""
def __init__(
self,
message: str,
error_code: str = None,
status_code: int = 500,
details: Dict[str, Any] = None
):
self.message = message
self.error_code = error_code or self.__class__.__name__
self.status_code = status_code
self.details = details or {}
self.error_id = str(uuid.uuid4())
self.timestamp = datetime.utcnow()
super().__init__(self.message)

class ValidationException(BaseAPIException):
"""数据验证异常"""
def __init__(self, message: str, field: str = None, value: Any = None):
details = {}
if field:
details["field"] = field
if value is not None:
details["value"] = str(value)

super().__init__(
message=message,
error_code="VALIDATION_ERROR",
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
details=details
)

class BusinessLogicException(BaseAPIException):
"""业务逻辑异常"""
def __init__(self, message: str, business_code: str = None):
super().__init__(
message=message,
error_code=business_code or "BUSINESS_LOGIC_ERROR",
status_code=status.HTTP_400_BAD_REQUEST
)

class ResourceNotFoundException(BaseAPIException):
"""资源未找到异常"""
def __init__(self, resource_type: str, resource_id: Any):
super().__init__(
message=f"{resource_type} with id {resource_id} not found",
error_code="RESOURCE_NOT_FOUND",
status_code=status.HTTP_404_NOT_FOUND,
details={"resource_type": resource_type, "resource_id": str(resource_id)}
)

class AuthenticationException(BaseAPIException):
"""认证异常"""
def __init__(self, message: str = "Authentication failed"):
super().__init__(
message=message,
error_code="AUTHENTICATION_ERROR",
status_code=status.HTTP_401_UNAUTHORIZED
)

class AuthorizationException(BaseAPIException):
"""授权异常"""
def __init__(self, message: str = "Access denied", required_permission: str = None):
details = {}
if required_permission:
details["required_permission"] = required_permission

super().__init__(
message=message,
error_code="AUTHORIZATION_ERROR",
status_code=status.HTTP_403_FORBIDDEN,
details=details
)

class ExternalServiceException(BaseAPIException):
"""外部服务异常"""
def __init__(self, service_name: str, message: str, upstream_error: str = None):
details = {"service_name": service_name}
if upstream_error:
details["upstream_error"] = upstream_error

super().__init__(
message=f"External service error: {message}",
error_code="EXTERNAL_SERVICE_ERROR",
status_code=status.HTTP_502_BAD_GATEWAY,
details=details
)

class RateLimitException(BaseAPIException):
"""限流异常"""
def __init__(self, limit: int, window: int, retry_after: int = None):
details = {"limit": limit, "window": window}
if retry_after:
details["retry_after"] = retry_after

super().__init__(
message=f"Rate limit exceeded: {limit} requests per {window} seconds",
error_code="RATE_LIMIT_EXCEEDED",
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
details=details
)

2. 全局异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
import logging
import traceback
import sys

app = FastAPI()
logger = logging.getLogger(__name__)

@app.exception_handler(BaseAPIException)
async def api_exception_handler(request: Request, exc: BaseAPIException):
"""处理自定义API异常"""

# 记录异常日志
logger.error(
f"API Exception: {exc.error_code}",
extra={
"error_id": exc.error_id,
"error_code": exc.error_code,
"message": exc.message,
"status_code": exc.status_code,
"details": exc.details,
"request_method": request.method,
"request_url": str(request.url),
"request_headers": dict(request.headers),
"timestamp": exc.timestamp.isoformat()
}
)

# 构造响应
response_data = {
"error": {
"code": exc.error_code,
"message": exc.message,
"error_id": exc.error_id,
"timestamp": exc.timestamp.isoformat()
}
}

# 在开发环境中包含详细信息
if app.debug:
response_data["error"]["details"] = exc.details

return JSONResponse(
status_code=exc.status_code,
content=response_data,
headers={"X-Error-ID": exc.error_id}
)

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理请求验证异常"""

error_id = str(uuid.uuid4())

# 格式化验证错误
formatted_errors = []
for error in exc.errors():
formatted_errors.append({
"field": ".".join(str(x) for x in error["loc"]),
"message": error["msg"],
"type": error["type"],
"input": error.get("input")
})

logger.warning(
"Request validation failed",
extra={
"error_id": error_id,
"validation_errors": formatted_errors,
"request_method": request.method,
"request_url": str(request.url),
"request_body": await get_request_body(request)
}
)

return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"error_id": error_id,
"validation_errors": formatted_errors
}
},
headers={"X-Error-ID": error_id}
)

@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
"""处理HTTP异常"""

error_id = str(uuid.uuid4())

logger.warning(
f"HTTP Exception: {exc.status_code}",
extra={
"error_id": error_id,
"status_code": exc.status_code,
"detail": exc.detail,
"request_method": request.method,
"request_url": str(request.url)
}
)

return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": f"HTTP_{exc.status_code}",
"message": exc.detail,
"error_id": error_id
}
},
headers={"X-Error-ID": error_id}
)

@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""处理未捕获的异常"""

error_id = str(uuid.uuid4())

# 记录详细的错误信息
logger.error(
"Unhandled exception occurred",
extra={
"error_id": error_id,
"exception_type": type(exc).__name__,
"exception_message": str(exc),
"traceback": traceback.format_exc(),
"request_method": request.method,
"request_url": str(request.url),
"request_headers": dict(request.headers)
}
)

# 发送告警通知
await send_error_alert(error_id, exc, request)

# 返回通用错误响应
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An internal server error occurred",
"error_id": error_id
}
},
headers={"X-Error-ID": error_id}
)

async def get_request_body(request: Request) -> str:
"""安全地获取请求体"""
try:
body = await request.body()
return body.decode() if body else ""
except:
return "Unable to read request body"

async def send_error_alert(error_id: str, exc: Exception, request: Request):
"""发送错误告警"""
try:
# 这里可以集成各种告警渠道
alert_data = {
"error_id": error_id,
"exception_type": type(exc).__name__,
"exception_message": str(exc),
"request_url": str(request.url),
"request_method": request.method,
"timestamp": datetime.utcnow().isoformat()
}

# 发送到监控系统、Slack、邮件等
logger.critical(f"Critical error alert: {alert_data}")

except Exception as alert_exc:
logger.error(f"Failed to send error alert: {alert_exc}")

结构化日志系统

1. 日志配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import logging.config
import json
import sys
from datetime import datetime
from typing import Dict, Any
import os

class JSONFormatter(logging.Formatter):
"""JSON格式化器"""

def format(self, record: logging.LogRecord) -> str:
# 基础日志信息
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}

# 添加异常信息
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)

# 添加额外字段
if hasattr(record, 'extra_fields'):
log_data.update(record.extra_fields)

# 从record中提取自定义字段
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage', 'exc_info',
'exc_text', 'stack_info']:
log_data[key] = value

return json.dumps(log_data, ensure_ascii=False, default=str)

class ContextFilter(logging.Filter):
"""上下文过滤器,添加请求上下文信息"""

def filter(self, record: logging.LogRecord) -> bool:
# 尝试从上下文中获取请求信息
try:
from contextvars import copy_context
context = copy_context()

# 添加请求ID(如果存在)
request_id = context.get('request_id', None)
if request_id:
record.request_id = request_id

# 添加用户ID(如果存在)
user_id = context.get('user_id', None)
if user_id:
record.user_id = user_id

except:
pass

return True

# 日志配置
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": JSONFormatter,
},
"standard": {
"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
}
},
"filters": {
"context_filter": {
"()": ContextFilter,
}
},
"handlers": {
"console": {
"level": "INFO",
"class": "logging.StreamHandler",
"formatter": "json" if os.getenv("LOG_FORMAT") == "json" else "standard",
"filters": ["context_filter"],
"stream": sys.stdout
},
"file": {
"level": "DEBUG",
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filters": ["context_filter"],
"filename": "logs/app.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5
},
"error_file": {
"level": "ERROR",
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filters": ["context_filter"],
"filename": "logs/error.log",
"maxBytes": 10485760,
"backupCount": 10
}
},
"loggers": {
"": { # root logger
"handlers": ["console", "file", "error_file"],
"level": "INFO",
"propagate": False
},
"uvicorn": {
"handlers": ["console"],
"level": "INFO",
"propagate": False
},
"sqlalchemy.engine": {
"handlers": ["file"],
"level": "WARNING",
"propagate": False
}
}
}

# 应用日志配置
logging.config.dictConfig(LOGGING_CONFIG)

# 创建日志目录
os.makedirs("logs", exist_ok=True)

2. 请求日志中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import time
import uuid
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

# 上下文变量
request_id_var: ContextVar[str] = ContextVar('request_id')
user_id_var: ContextVar[str] = ContextVar('user_id')

class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""请求日志中间件"""

def __init__(self, app, logger_name: str = "request"):
super().__init__(app)
self.logger = logging.getLogger(logger_name)

async def dispatch(self, request: Request, call_next):
# 生成请求ID
request_id = str(uuid.uuid4())
request_id_var.set(request_id)

# 记录请求开始
start_time = time.time()

# 提取用户信息(如果有)
user_id = await self.extract_user_id(request)
if user_id:
user_id_var.set(user_id)

# 记录请求信息
self.logger.info(
"Request started",
extra={
"event": "request_started",
"request_id": request_id,
"method": request.method,
"url": str(request.url),
"path": request.url.path,
"query_params": dict(request.query_params),
"headers": self.filter_sensitive_headers(dict(request.headers)),
"client_ip": self.get_client_ip(request),
"user_agent": request.headers.get("user-agent", ""),
"user_id": user_id
}
)

try:
# 处理请求
response = await call_next(request)

# 计算处理时间
process_time = time.time() - start_time

# 记录响应信息
self.logger.info(
"Request completed",
extra={
"event": "request_completed",
"request_id": request_id,
"status_code": response.status_code,
"process_time": process_time,
"response_headers": dict(response.headers)
}
)

# 添加请求ID到响应头
response.headers["X-Request-ID"] = request_id

return response

except Exception as exc:
# 记录异常
process_time = time.time() - start_time

self.logger.error(
"Request failed",
extra={
"event": "request_failed",
"request_id": request_id,
"process_time": process_time,
"exception_type": type(exc).__name__,
"exception_message": str(exc)
},
exc_info=True
)

raise

async def extract_user_id(self, request: Request) -> str:
"""提取用户ID"""
try:
# 从JWT token中提取用户ID
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
# 这里应该解析JWT token获取用户ID
# user_id = decode_jwt_token(token).get("user_id")
# return user_id
pass
except:
pass
return None

def filter_sensitive_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
"""过滤敏感头信息"""
sensitive_headers = {"authorization", "cookie", "x-api-key"}
filtered = {}

for key, value in headers.items():
if key.lower() in sensitive_headers:
filtered[key] = "***REDACTED***"
else:
filtered[key] = value

return filtered

def get_client_ip(self, request: Request) -> str:
"""获取客户端IP"""
# 检查代理头
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
return forwarded_for.split(",")[0].strip()

real_ip = request.headers.get("x-real-ip")
if real_ip:
return real_ip

return request.client.host

app.add_middleware(RequestLoggingMiddleware)

3. 业务日志记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class BusinessLogger:
"""业务日志记录器"""

def __init__(self, logger_name: str = "business"):
self.logger = logging.getLogger(logger_name)

def log_user_action(
self,
action: str,
user_id: str,
resource_type: str = None,
resource_id: str = None,
details: Dict[str, Any] = None
):
"""记录用户操作"""
self.logger.info(
f"User action: {action}",
extra={
"event": "user_action",
"action": action,
"user_id": user_id,
"resource_type": resource_type,
"resource_id": resource_id,
"details": details or {}
}
)

def log_business_event(
self,
event_type: str,
event_data: Dict[str, Any],
severity: str = "info"
):
"""记录业务事件"""
log_method = getattr(self.logger, severity.lower(), self.logger.info)

log_method(
f"Business event: {event_type}",
extra={
"event": "business_event",
"event_type": event_type,
"event_data": event_data
}
)

def log_performance_metric(
self,
metric_name: str,
value: float,
unit: str = None,
tags: Dict[str, str] = None
):
"""记录性能指标"""
self.logger.info(
f"Performance metric: {metric_name}",
extra={
"event": "performance_metric",
"metric_name": metric_name,
"value": value,
"unit": unit,
"tags": tags or {}
}
)

def log_security_event(
self,
event_type: str,
user_id: str = None,
ip_address: str = None,
details: Dict[str, Any] = None
):
"""记录安全事件"""
self.logger.warning(
f"Security event: {event_type}",
extra={
"event": "security_event",
"event_type": event_type,
"user_id": user_id,
"ip_address": ip_address,
"details": details or {}
}
)

# 全局业务日志记录器
business_logger = BusinessLogger()

# 使用示例
@app.post("/users/{user_id}/profile")
async def update_user_profile(
user_id: int,
profile_data: dict,
current_user: dict = Depends(get_current_user)
):
try:
# 业务逻辑
updated_profile = update_profile(user_id, profile_data)

# 记录用户操作
business_logger.log_user_action(
action="update_profile",
user_id=str(current_user["id"]),
resource_type="user_profile",
resource_id=str(user_id),
details={"updated_fields": list(profile_data.keys())}
)

return updated_profile

except Exception as e:
# 记录业务异常
business_logger.log_business_event(
event_type="profile_update_failed",
event_data={
"user_id": user_id,
"error": str(e)
},
severity="error"
)
raise

监控和告警

1. 健康检查端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import psutil
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List

class HealthChecker:
"""健康检查器"""

def __init__(self):
self.checks = {}
self.last_check_time = {}
self.check_results = {}

def register_check(self, name: str, check_func, timeout: int = 30):
"""注册健康检查"""
self.checks[name] = {
"func": check_func,
"timeout": timeout
}

async def run_check(self, name: str) -> Dict[str, Any]:
"""运行单个健康检查"""
if name not in self.checks:
return {"status": "unknown", "error": "Check not found"}

check_config = self.checks[name]
start_time = time.time()

try:
# 使用超时运行检查
result = await asyncio.wait_for(
check_config["func"](),
timeout=check_config["timeout"]
)

duration = time.time() - start_time

return {
"status": "healthy",
"duration": duration,
"result": result,
"timestamp": datetime.utcnow().isoformat()
}

except asyncio.TimeoutError:
return {
"status": "timeout",
"duration": check_config["timeout"],
"error": "Check timed out",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
duration = time.time() - start_time
return {
"status": "unhealthy",
"duration": duration,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}

async def run_all_checks(self) -> Dict[str, Any]:
"""运行所有健康检查"""
results = {}
overall_status = "healthy"

for name in self.checks:
result = await self.run_check(name)
results[name] = result

if result["status"] != "healthy":
overall_status = "unhealthy"

return {
"status": overall_status,
"checks": results,
"timestamp": datetime.utcnow().isoformat()
}

# 全局健康检查器
health_checker = HealthChecker()

# 数据库健康检查
async def check_database():
"""检查数据库连接"""
try:
# 这里应该是实际的数据库连接检查
# await database.execute("SELECT 1")
return {"connection": "ok"}
except Exception as e:
raise Exception(f"Database connection failed: {e}")

# Redis健康检查
async def check_redis():
"""检查Redis连接"""
try:
# 这里应该是实际的Redis连接检查
# await redis_client.ping()
return {"connection": "ok"}
except Exception as e:
raise Exception(f"Redis connection failed: {e}")

# 系统资源检查
async def check_system_resources():
"""检查系统资源"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')

# 检查资源使用率
if cpu_percent > 90:
raise Exception(f"High CPU usage: {cpu_percent}%")

if memory.percent > 90:
raise Exception(f"High memory usage: {memory.percent}%")

if (disk.used / disk.total) * 100 > 90:
raise Exception(f"High disk usage: {(disk.used / disk.total) * 100:.1f}%")

return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": (disk.used / disk.total) * 100
}

# 注册健康检查
health_checker.register_check("database", check_database)
health_checker.register_check("redis", check_redis)
health_checker.register_check("system", check_system_resources)

@app.get("/health")
async def health_check():
"""健康检查端点"""
return await health_checker.run_all_checks()

@app.get("/health/{check_name}")
async def specific_health_check(check_name: str):
"""特定健康检查"""
result = await health_checker.run_check(check_name)

if result["status"] == "unknown":
raise HTTPException(status_code=404, detail="Health check not found")

return result

2. 错误统计和告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from collections import defaultdict, deque
from datetime import datetime, timedelta
import asyncio

class ErrorTracker:
"""错误跟踪器"""

def __init__(self, window_minutes: int = 60):
self.window_minutes = window_minutes
self.error_counts = defaultdict(deque) # error_type -> timestamps
self.error_details = defaultdict(list) # error_type -> error details
self.alert_thresholds = {}
self.alert_cooldowns = {} # error_type -> last_alert_time

def set_alert_threshold(self, error_type: str, threshold: int, cooldown_minutes: int = 30):
"""设置告警阈值"""
self.alert_thresholds[error_type] = {
"threshold": threshold,
"cooldown_minutes": cooldown_minutes
}

def record_error(self, error_type: str, error_details: Dict[str, Any]):
"""记录错误"""
now = datetime.utcnow()

# 记录错误时间
self.error_counts[error_type].append(now)

# 记录错误详情
self.error_details[error_type].append({
"timestamp": now,
"details": error_details
})

# 清理过期数据
self._cleanup_old_errors(error_type, now)

# 检查是否需要告警
asyncio.create_task(self._check_alert(error_type, now))

def _cleanup_old_errors(self, error_type: str, current_time: datetime):
"""清理过期错误记录"""
cutoff_time = current_time - timedelta(minutes=self.window_minutes)

# 清理错误计数
while (self.error_counts[error_type] and
self.error_counts[error_type][0] < cutoff_time):
self.error_counts[error_type].popleft()

# 清理错误详情
self.error_details[error_type] = [
error for error in self.error_details[error_type]
if error["timestamp"] > cutoff_time
]

async def _check_alert(self, error_type: str, current_time: datetime):
"""检查是否需要发送告警"""
if error_type not in self.alert_thresholds:
return

config = self.alert_thresholds[error_type]
error_count = len(self.error_counts[error_type])

# 检查是否超过阈值
if error_count >= config["threshold"]:
# 检查冷却时间
last_alert = self.alert_cooldowns.get(error_type)
if (last_alert is None or
current_time - last_alert > timedelta(minutes=config["cooldown_minutes"])):

# 发送告警
await self._send_alert(error_type, error_count, current_time)
self.alert_cooldowns[error_type] = current_time

async def _send_alert(self, error_type: str, error_count: int, timestamp: datetime):
"""发送告警"""
try:
# 获取最近的错误详情
recent_errors = self.error_details[error_type][-5:] # 最近5个错误

alert_data = {
"error_type": error_type,
"error_count": error_count,
"window_minutes": self.window_minutes,
"timestamp": timestamp.isoformat(),
"recent_errors": [
{
"timestamp": error["timestamp"].isoformat(),
"details": error["details"]
}
for error in recent_errors
]
}

# 记录告警日志
logger.critical(
f"Error alert triggered: {error_type}",
extra={
"event": "error_alert",
"alert_data": alert_data
}
)

# 发送到外部告警系统
# await send_to_slack(alert_data)
# await send_email_alert(alert_data)

except Exception as e:
logger.error(f"Failed to send alert for {error_type}: {e}")

def get_error_stats(self) -> Dict[str, Any]:
"""获取错误统计"""
stats = {}

for error_type, timestamps in self.error_counts.items():
stats[error_type] = {
"count": len(timestamps),
"window_minutes": self.window_minutes,
"first_occurrence": timestamps[0].isoformat() if timestamps else None,
"last_occurrence": timestamps[-1].isoformat() if timestamps else None
}

return stats

# 全局错误跟踪器
error_tracker = ErrorTracker(window_minutes=60)

# 设置告警阈值
error_tracker.set_alert_threshold("AUTHENTICATION_ERROR", threshold=10, cooldown_minutes=30)
error_tracker.set_alert_threshold("VALIDATION_ERROR", threshold=50, cooldown_minutes=15)
error_tracker.set_alert_threshold("INTERNAL_SERVER_ERROR", threshold=5, cooldown_minutes=10)

# 在异常处理器中记录错误
@app.exception_handler(BaseAPIException)
async def api_exception_handler_with_tracking(request: Request, exc: BaseAPIException):
# 记录错误到跟踪器
error_tracker.record_error(exc.error_code, {
"message": exc.message,
"details": exc.details,
"request_url": str(request.url),
"request_method": request.method
})

# 调用原有的异常处理逻辑
return await api_exception_handler(request, exc)

@app.get("/admin/error-stats")
async def get_error_statistics():
"""获取错误统计"""
return error_tracker.get_error_stats()

调试和开发工具

1. 调试中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import json
import time
from typing import Any, Dict

class DebugMiddleware(BaseHTTPMiddleware):
"""调试中间件"""

def __init__(self, app, enabled: bool = False):
super().__init__(app)
self.enabled = enabled
self.logger = logging.getLogger("debug")

async def dispatch(self, request: Request, call_next):
if not self.enabled:
return await call_next(request)

# 记录请求详情
request_data = await self._capture_request_data(request)

self.logger.debug(
"Debug: Request details",
extra={
"event": "debug_request",
"request_data": request_data
}
)

start_time = time.time()

try:
response = await call_next(request)

# 记录响应详情
duration = time.time() - start_time
response_data = await self._capture_response_data(response)

self.logger.debug(
"Debug: Response details",
extra={
"event": "debug_response",
"duration": duration,
"response_data": response_data
}
)

return response

except Exception as exc:
duration = time.time() - start_time

self.logger.debug(
"Debug: Exception details",
extra={
"event": "debug_exception",
"duration": duration,
"exception_type": type(exc).__name__,
"exception_message": str(exc),
"exception_traceback": traceback.format_exc()
}
)

raise

async def _capture_request_data(self, request: Request) -> Dict[str, Any]:
"""捕获请求数据"""
data = {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"query_params": dict(request.query_params),
"path_params": getattr(request, 'path_params', {}),
"client": {
"host": request.client.host,
"port": request.client.port
}
}

# 捕获请求体(小心处理)
if request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.body()
if body:
try:
data["body"] = json.loads(body.decode())
except:
data["body"] = body.decode()[:1000] # 截断长内容
except:
data["body"] = "Unable to read body"

return data

async def _capture_response_data(self, response) -> Dict[str, Any]:
"""捕获响应数据"""
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"media_type": getattr(response, 'media_type', None)
}

# 在开发环境启用调试中间件
if app.debug:
app.add_middleware(DebugMiddleware, enabled=True)

2. 性能分析工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import cProfile
import pstats
import io
from functools import wraps

class ProfilerMiddleware(BaseHTTPMiddleware):
"""性能分析中间件"""

def __init__(self, app, enabled: bool = False, profile_threshold: float = 1.0):
super().__init__(app)
self.enabled = enabled
self.profile_threshold = profile_threshold
self.logger = logging.getLogger("profiler")

async def dispatch(self, request: Request, call_next):
if not self.enabled:
return await call_next(request)

# 创建性能分析器
profiler = cProfile.Profile()

start_time = time.time()
profiler.enable()

try:
response = await call_next(request)
return response
finally:
profiler.disable()
duration = time.time() - start_time

# 如果请求时间超过阈值,记录性能分析结果
if duration > self.profile_threshold:
self._log_profile_results(profiler, request, duration)

def _log_profile_results(self, profiler: cProfile.Profile, request: Request, duration: float):
"""记录性能分析结果"""
# 创建统计对象
stats_stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stats_stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # 显示前20个最耗时的函数

profile_output = stats_stream.getvalue()

self.logger.warning(
f"Slow request profiled: {request.method} {request.url.path}",
extra={
"event": "performance_profile",
"request_method": request.method,
"request_path": request.url.path,
"duration": duration,
"profile_output": profile_output
}
)

# 函数级性能分析装饰器
def profile_function(threshold: float = 0.1):
"""函数性能分析装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
profiler = cProfile.Profile()
start_time = time.time()

profiler.enable()
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
duration = time.time() - start_time

if duration > threshold:
_log_function_profile(profiler, func.__name__, duration)

@wraps(func)
def sync_wrapper(*args, **kwargs):
profiler = cProfile.Profile()
start_time = time.time()

profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
duration = time.time() - start_time

if duration > threshold:
_log_function_profile(profiler, func.__name__, duration)

return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

return decorator

def _log_function_profile(profiler: cProfile.Profile, func_name: str, duration: float):
"""记录函数性能分析结果"""
stats_stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stats_stream)
stats.sort_stats('cumulative')
stats.print_stats(10)

profile_output = stats_stream.getvalue()

logger = logging.getLogger("profiler")
logger.warning(
f"Slow function profiled: {func_name}",
extra={
"event": "function_profile",
"function_name": func_name,
"duration": duration,
"profile_output": profile_output
}
)

# 使用示例
@profile_function(threshold=0.5)
async def slow_database_query(query: str):
"""可能很慢的数据库查询"""
# 数据库查询逻辑
await asyncio.sleep(1) # 模拟慢查询
return "query result"

日志分析和可视化

1. 日志聚合和分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import List, Dict, Any

class LogAnalyzer:
"""日志分析器"""

def __init__(self, log_file_path: str):
self.log_file_path = log_file_path

def analyze_errors(self, hours: int = 24) -> Dict[str, Any]:
"""分析错误日志"""
cutoff_time = datetime.utcnow() - timedelta(hours=hours)

error_counts = Counter()
error_details = defaultdict(list)

with open(self.log_file_path, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)

# 检查时间范围
log_time = datetime.fromisoformat(log_entry.get('timestamp', ''))
if log_time < cutoff_time:
continue

# 分析错误日志
if log_entry.get('level') in ['ERROR', 'CRITICAL']:
error_type = log_entry.get('error_code', 'UNKNOWN')
error_counts[error_type] += 1
error_details[error_type].append({
'timestamp': log_entry['timestamp'],
'message': log_entry.get('message', ''),
'request_url': log_entry.get('request_url', '')
})

except (json.JSONDecodeError, ValueError):
continue

return {
'total_errors': sum(error_counts.values()),
'error_types': dict(error_counts),
'error_details': dict(error_details),
'analysis_period_hours': hours
}

def analyze_performance(self, hours: int = 24) -> Dict[str, Any]:
"""分析性能日志"""
cutoff_time = datetime.utcnow() - timedelta(hours=hours)

response_times = []
slow_requests = []
endpoint_stats = defaultdict(list)

with open(self.log_file_path, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)

# 检查时间范围
log_time = datetime.fromisoformat(log_entry.get('timestamp', ''))
if log_time < cutoff_time:
continue

# 分析请求完成日志
if log_entry.get('event') == 'request_completed':
process_time = log_entry.get('process_time', 0)
response_times.append(process_time)

endpoint = log_entry.get('path', 'unknown')
endpoint_stats[endpoint].append(process_time)

# 记录慢请求
if process_time > 1.0: # 超过1秒
slow_requests.append({
'timestamp': log_entry['timestamp'],
'method': log_entry.get('method', ''),
'path': endpoint,
'process_time': process_time,
'status_code': log_entry.get('status_code', 0)
})

except (json.JSONDecodeError, ValueError):
continue

# 计算统计信息
if response_times:
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)

# 计算百分位数
sorted_times = sorted(response_times)
p95_index = int(len(sorted_times) * 0.95)
p99_index = int(len(sorted_times) * 0.99)

p95_response_time = sorted_times[p95_index] if p95_index < len(sorted_times) else max_response_time
p99_response_time = sorted_times[p99_index] if p99_index < len(sorted_times) else max_response_time
else:
avg_response_time = max_response_time = min_response_time = 0
p95_response_time = p99_response_time = 0

# 分析端点性能
endpoint_performance = {}
for endpoint, times in endpoint_stats.items():
endpoint_performance[endpoint] = {
'request_count': len(times),
'avg_response_time': sum(times) / len(times),
'max_response_time': max(times),
'min_response_time': min(times)
}

return {
'total_requests': len(response_times),
'avg_response_time': avg_response_time,
'max_response_time': max_response_time,
'min_response_time': min_response_time,
'p95_response_time': p95_response_time,
'p99_response_time': p99_response_time,
'slow_requests_count': len(slow_requests),
'slow_requests': slow_requests[:10], # 最慢的10个请求
'endpoint_performance': endpoint_performance,
'analysis_period_hours': hours
}

# 日志分析API端点
@app.get("/admin/logs/analysis/errors")
async def analyze_error_logs(hours: int = 24):
"""分析错误日志"""
analyzer = LogAnalyzer("logs/app.log")
return analyzer.analyze_errors(hours)

@app.get("/admin/logs/analysis/performance")
async def analyze_performance_logs(hours: int = 24):
"""分析性能日志"""
analyzer = LogAnalyzer("logs/app.log")
return analyzer.analyze_performance(hours)

总结和最佳实践

通过本文的深入探讨,我们学习了FastAPI错误处理与日志系统的完整解决方案:

核心要点

  1. 异常处理体系:建立分层的异常处理机制,包括自定义异常类和全局异常处理器
  2. 结构化日志:使用JSON格式记录结构化日志,便于分析和监控
  3. 请求追踪:通过请求ID和上下文变量实现请求全链路追踪
  4. 监控告警:建立健康检查和错误统计机制,及时发现问题
  5. 调试工具:提供调试中间件和性能分析工具,帮助开发和优化

最佳实践

  1. 异常设计

    • 创建有意义的异常类层次结构
    • 包含足够的上下文信息
    • 区分业务异常和系统异常
  2. 日志记录

    • 使用结构化日志格式
    • 记录关键业务事件和用户操作
    • 避免记录敏感信息
  3. 错误处理

    • 提供一致的错误响应格式
    • 记录详细的错误信息用于调试
    • 向用户返回友好的错误消息
  4. 监控告警

    • 设置合理的告警阈值
    • 实现告警冷却机制避免告警风暴
    • 建立多层次的健康检查
  5. 性能优化

    • 使用性能分析工具识别瓶颈
    • 监控关键性能指标
    • 建立性能基线和目标

生产环境建议

  1. 日志管理:使用ELK Stack或类似工具进行日志收集和分析
  2. 监控系统:集成Prometheus、Grafana等监控工具
  3. 告警通道:配置多种告警通道(邮件、Slack、短信等)
  4. 错误追踪:使用Sentry等工具进行错误追踪和分析
  5. 性能监控:使用APM工具进行应用性能监控

完善的错误处理和日志系统是构建生产级应用的基础,它不仅能帮助我们快速定位和解决问题,还能为系统优化提供数据支持。

你在错误处理和日志系统方面有什么经验或遇到过什么挑战吗?欢迎在评论中分享讨论!

本站由 提供部署服务