FastAPI后台任务处理:异步任务与队列实战
Orion K Lv6

FastAPI后台任务处理:异步任务与队列实战

在Web应用开发中,我们经常需要处理一些耗时的操作,如发送邮件、生成报告、处理文件上传等。如果在请求处理过程中同步执行这些操作,会严重影响用户体验。FastAPI提供了多种后台任务处理方案,让我们能够优雅地处理这些异步操作。

FastAPI内置后台任务

1. 基础后台任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
import asyncio
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

app = FastAPI()
logger = logging.getLogger(__name__)

def send_email_notification(email: str, subject: str, content: str):
"""发送邮件通知(同步函数)"""
try:
# 模拟邮件发送
logger.info(f"Sending email to {email}: {subject}")
# 实际的SMTP发送逻辑
time.sleep(2) # 模拟耗时操作
logger.info(f"Email sent successfully to {email}")
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")

async def async_send_email(email: str, subject: str, content: str):
"""异步发送邮件"""
try:
logger.info(f"Async sending email to {email}: {subject}")
# 使用异步SMTP客户端
await asyncio.sleep(2) # 模拟异步操作
logger.info(f"Async email sent successfully to {email}")
except Exception as e:
logger.error(f"Failed to send async email to {email}: {e}")

@app.post("/register")
async def register_user(
user_data: dict,
background_tasks: BackgroundTasks
):
"""用户注册接口"""
# 主要业务逻辑
user_id = create_user(user_data)

# 添加后台任务
background_tasks.add_task(
send_email_notification,
user_data["email"],
"欢迎注册",
f"欢迎 {user_data['name']} 注册我们的服务!"
)

# 可以添加多个后台任务
background_tasks.add_task(
async_send_email,
"admin@example.com",
"新用户注册",
f"新用户 {user_data['name']} 已注册"
)

# 立即返回响应
return {"message": "注册成功", "user_id": user_id}

def create_user(user_data: dict) -> int:
"""创建用户(模拟)"""
logger.info(f"Creating user: {user_data['name']}")
return 12345

2. 带参数的后台任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Optional, Dict, Any
import json

def process_user_data(
user_id: int,
data: Dict[str, Any],
options: Optional[Dict] = None
):
"""处理用户数据"""
try:
logger.info(f"Processing data for user {user_id}")

# 数据验证
if not data.get("email"):
raise ValueError("Email is required")

# 数据处理逻辑
processed_data = {
"user_id": user_id,
"email": data["email"].lower(),
"name": data.get("name", "").title(),
"processed_at": time.time(),
"options": options or {}
}

# 保存到数据库或文件
save_processed_data(processed_data)

logger.info(f"Data processed successfully for user {user_id}")

except Exception as e:
logger.error(f"Failed to process data for user {user_id}: {e}")
# 可以在这里添加错误处理逻辑,如重试或通知

def save_processed_data(data: Dict[str, Any]):
"""保存处理后的数据"""
# 模拟保存操作
logger.info(f"Saving data: {json.dumps(data, indent=2)}")

@app.post("/process-data/{user_id}")
async def process_data_endpoint(
user_id: int,
data: Dict[str, Any],
background_tasks: BackgroundTasks,
priority: str = "normal",
notify_completion: bool = True
):
"""数据处理接口"""

# 准备处理选项
options = {
"priority": priority,
"notify_completion": notify_completion,
"requested_at": time.time()
}

# 添加后台任务
background_tasks.add_task(
process_user_data,
user_id,
data,
options
)

# 如果需要完成通知,添加通知任务
if notify_completion:
background_tasks.add_task(
send_completion_notification,
user_id,
data.get("email")
)

return {
"message": "数据处理已开始",
"user_id": user_id,
"status": "processing"
}

def send_completion_notification(user_id: int, email: str):
"""发送完成通知"""
logger.info(f"Sending completion notification to user {user_id}")
# 实际的通知逻辑

使用Celery进行分布式任务处理

1. Celery配置和设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
import os

# Celery配置
celery_app = Celery(
"fastapi_tasks",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"),
include=["tasks"] # 任务模块
)

# Celery配置
celery_app.conf.update(
# 任务序列化
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,

# 任务路由
task_routes={
"tasks.send_email": {"queue": "email"},
"tasks.process_image": {"queue": "image_processing"},
"tasks.generate_report": {"queue": "reports"},
},

# 队列定义
task_queues=(
Queue("default", routing_key="default"),
Queue("email", routing_key="email"),
Queue("image_processing", routing_key="image_processing"),
Queue("reports", routing_key="reports"),
),

# 任务执行配置
task_acks_late=True,
worker_prefetch_multiplier=1,
task_reject_on_worker_lost=True,

# 结果过期时间
result_expires=3600, # 1小时

# 任务重试配置
task_default_retry_delay=60, # 60秒后重试
task_max_retries=3,
)

# 定义队列
celery_app.conf.task_queues = (
Queue("default"),
Queue("email"),
Queue("image_processing"),
Queue("reports"),
)

2. 定义Celery任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# tasks.py
from celery import current_task
from celery.exceptions import Retry
import requests
import time
from typing import Dict, List, Optional
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, recipient: str, subject: str, content: str, template: str = None):
"""发送邮件任务"""
try:
logger.info(f"Task {self.request.id}: Sending email to {recipient}")

# 模拟邮件发送
if template:
content = render_email_template(template, {"content": content})

# 实际的邮件发送逻辑
success = send_smtp_email(recipient, subject, content)

if not success:
raise Exception("SMTP send failed")

logger.info(f"Task {self.request.id}: Email sent successfully")
return {"status": "sent", "recipient": recipient}

except Exception as exc:
logger.error(f"Task {self.request.id}: Email send failed: {exc}")

# 重试逻辑
if self.request.retries < self.max_retries:
logger.info(f"Task {self.request.id}: Retrying in 60 seconds...")
raise self.retry(countdown=60, exc=exc)
else:
logger.error(f"Task {self.request.id}: Max retries exceeded")
return {"status": "failed", "error": str(exc)}

@celery_app.task(bind=True)
def process_image_task(self, image_url: str, operations: List[Dict]):
"""图片处理任务"""
try:
task_id = self.request.id
logger.info(f"Task {task_id}: Processing image {image_url}")

# 更新任务状态
self.update_state(
state="PROGRESS",
meta={"current": 0, "total": len(operations), "status": "下载图片中..."}
)

# 下载图片
image_data = download_image(image_url)

processed_image = image_data
for i, operation in enumerate(operations):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(operations),
"status": f"执行操作: {operation['type']}"
}
)

processed_image = apply_image_operation(processed_image, operation)
time.sleep(1) # 模拟处理时间

# 保存处理后的图片
result_url = save_processed_image(processed_image)

return {
"status": "completed",
"original_url": image_url,
"result_url": result_url,
"operations_applied": len(operations)
}

except Exception as exc:
logger.error(f"Task {task_id}: Image processing failed: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise

@celery_app.task(bind=True)
def generate_report_task(self, report_type: str, filters: Dict, user_id: int):
"""生成报告任务"""
try:
task_id = self.request.id
logger.info(f"Task {task_id}: Generating {report_type} report for user {user_id}")

# 报告生成步骤
steps = [
"查询数据",
"数据处理",
"生成图表",
"创建PDF",
"上传文件"
]

for i, step in enumerate(steps):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(steps),
"status": step
}
)

# 执行相应的步骤
if step == "查询数据":
data = query_report_data(report_type, filters)
elif step == "数据处理":
processed_data = process_report_data(data)
elif step == "生成图表":
charts = generate_charts(processed_data)
elif step == "创建PDF":
pdf_path = create_pdf_report(processed_data, charts)
elif step == "上传文件":
file_url = upload_report_file(pdf_path)

time.sleep(2) # 模拟处理时间

# 发送完成通知
send_report_notification.delay(user_id, file_url, report_type)

return {
"status": "completed",
"report_type": report_type,
"file_url": file_url,
"generated_at": time.time()
}

except Exception as exc:
logger.error(f"Task {task_id}: Report generation failed: {exc}")
raise

@celery_app.task
def send_report_notification(user_id: int, file_url: str, report_type: str):
"""发送报告完成通知"""
# 获取用户邮箱
user_email = get_user_email(user_id)

# 发送通知邮件
send_email_task.delay(
user_email,
f"{report_type}报告已生成",
f"您的报告已生成完成,请点击下载:{file_url}"
)

# 辅助函数
def render_email_template(template: str, context: Dict) -> str:
"""渲染邮件模板"""
# 简单的模板渲染逻辑
return template.format(**context)

def send_smtp_email(recipient: str, subject: str, content: str) -> bool:
"""发送SMTP邮件"""
# 实际的SMTP发送逻辑
logger.info(f"Sending SMTP email to {recipient}")
return True # 模拟成功

def download_image(url: str) -> bytes:
"""下载图片"""
response = requests.get(url)
return response.content

def apply_image_operation(image_data: bytes, operation: Dict) -> bytes:
"""应用图片操作"""
# 图片处理逻辑
return image_data

def save_processed_image(image_data: bytes) -> str:
"""保存处理后的图片"""
# 保存逻辑,返回URL
return "https://example.com/processed_image.jpg"

def query_report_data(report_type: str, filters: Dict) -> List[Dict]:
"""查询报告数据"""
# 数据查询逻辑
return [{"id": 1, "value": 100}]

def process_report_data(data: List[Dict]) -> Dict:
"""处理报告数据"""
return {"processed": True, "count": len(data)}

def generate_charts(data: Dict) -> List[str]:
"""生成图表"""
return ["chart1.png", "chart2.png"]

def create_pdf_report(data: Dict, charts: List[str]) -> str:
"""创建PDF报告"""
return "/tmp/report.pdf"

def upload_report_file(file_path: str) -> str:
"""上传报告文件"""
return "https://example.com/reports/report.pdf"

def get_user_email(user_id: int) -> str:
"""获取用户邮箱"""
return f"user{user_id}@example.com"

3. FastAPI与Celery集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Optional
import uuid

app = FastAPI()

# 请求模型
class EmailRequest(BaseModel):
recipient: str
subject: str
content: str
template: Optional[str] = None

class ImageProcessRequest(BaseModel):
image_url: str
operations: List[Dict]

class ReportRequest(BaseModel):
report_type: str
filters: Dict
user_id: int

# 任务状态响应模型
class TaskResponse(BaseModel):
task_id: str
status: str
message: str

@app.post("/tasks/send-email", response_model=TaskResponse)
async def send_email_endpoint(request: EmailRequest):
"""发送邮件接口"""
try:
# 提交Celery任务
task = send_email_task.delay(
request.recipient,
request.subject,
request.content,
request.template
)

return TaskResponse(
task_id=task.id,
status="submitted",
message="邮件发送任务已提交"
)

except Exception as e:
raise HTTPException(status_code=500, detail=f"任务提交失败: {str(e)}")

@app.post("/tasks/process-image", response_model=TaskResponse)
async def process_image_endpoint(request: ImageProcessRequest):
"""图片处理接口"""
try:
task = process_image_task.delay(
request.image_url,
request.operations
)

return TaskResponse(
task_id=task.id,
status="submitted",
message="图片处理任务已提交"
)

except Exception as e:
raise HTTPException(status_code=500, detail=f"任务提交失败: {str(e)}")

@app.post("/tasks/generate-report", response_model=TaskResponse)
async def generate_report_endpoint(request: ReportRequest):
"""生成报告接口"""
try:
task = generate_report_task.delay(
request.report_type,
request.filters,
request.user_id
)

return TaskResponse(
task_id=task.id,
status="submitted",
message="报告生成任务已提交"
)

except Exception as e:
raise HTTPException(status_code=500, detail=f"任务提交失败: {str(e)}")

@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
"""获取任务状态"""
try:
result = AsyncResult(task_id, app=celery_app)

if result.state == "PENDING":
response = {
"task_id": task_id,
"state": result.state,
"status": "任务等待中..."
}
elif result.state == "PROGRESS":
response = {
"task_id": task_id,
"state": result.state,
"current": result.info.get("current", 0),
"total": result.info.get("total", 1),
"status": result.info.get("status", "")
}
elif result.state == "SUCCESS":
response = {
"task_id": task_id,
"state": result.state,
"result": result.result
}
else: # FAILURE
response = {
"task_id": task_id,
"state": result.state,
"error": str(result.info)
}

return response

except Exception as e:
raise HTTPException(status_code=404, detail=f"任务不存在: {str(e)}")

@app.delete("/tasks/{task_id}")
async def cancel_task(task_id: str):
"""取消任务"""
try:
celery_app.control.revoke(task_id, terminate=True)
return {"message": f"任务 {task_id} 已取消"}

except Exception as e:
raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")

@app.get("/tasks/stats")
async def get_task_stats():
"""获取任务统计"""
try:
inspect = celery_app.control.inspect()

# 获取活跃任务
active_tasks = inspect.active()

# 获取预定任务
scheduled_tasks = inspect.scheduled()

# 获取保留任务
reserved_tasks = inspect.reserved()

return {
"active_tasks": active_tasks,
"scheduled_tasks": scheduled_tasks,
"reserved_tasks": reserved_tasks
}

except Exception as e:
raise HTTPException(status_code=500, detail=f"获取统计失败: {str(e)}")

定时任务处理

1. 使用Celery Beat进行定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from celery.schedules import crontab
from datetime import timedelta

# 定时任务配置
celery_app.conf.beat_schedule = {
# 每天凌晨2点执行数据备份
"daily-backup": {
"task": "tasks.backup_database",
"schedule": crontab(hour=2, minute=0),
},

# 每小时清理临时文件
"hourly-cleanup": {
"task": "tasks.cleanup_temp_files",
"schedule": crontab(minute=0),
},

# 每5分钟检查系统健康状态
"health-check": {
"task": "tasks.system_health_check",
"schedule": timedelta(minutes=5),
},

# 每周一上午9点发送周报
"weekly-report": {
"task": "tasks.send_weekly_report",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
},

# 每月1号凌晨1点执行月度统计
"monthly-stats": {
"task": "tasks.calculate_monthly_stats",
"schedule": crontab(hour=1, minute=0, day_of_month=1),
},
}

# 定时任务实现
@celery_app.task
def backup_database():
"""数据库备份任务"""
try:
logger.info("Starting database backup...")

# 执行备份逻辑
backup_file = create_database_backup()

# 上传到云存储
cloud_url = upload_backup_to_cloud(backup_file)

# 发送备份完成通知
send_backup_notification(cloud_url)

logger.info("Database backup completed successfully")
return {"status": "success", "backup_url": cloud_url}

except Exception as e:
logger.error(f"Database backup failed: {e}")
send_backup_failure_notification(str(e))
raise

@celery_app.task
def cleanup_temp_files():
"""清理临时文件任务"""
try:
logger.info("Starting temp files cleanup...")

import os
import time

temp_dir = "/tmp"
current_time = time.time()
cleaned_count = 0

for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
if os.path.isfile(file_path):
# 删除超过24小时的文件
if current_time - os.path.getctime(file_path) > 24 * 3600:
os.remove(file_path)
cleaned_count += 1

logger.info(f"Cleanup completed. Removed {cleaned_count} files")
return {"status": "success", "files_removed": cleaned_count}

except Exception as e:
logger.error(f"Temp files cleanup failed: {e}")
raise

@celery_app.task
def system_health_check():
"""系统健康检查任务"""
try:
import psutil

# 检查CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)

# 检查内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent

# 检查磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100

# 健康状态判断
health_status = "healthy"
alerts = []

if cpu_percent > 80:
health_status = "warning"
alerts.append(f"High CPU usage: {cpu_percent}%")

if memory_percent > 85:
health_status = "critical"
alerts.append(f"High memory usage: {memory_percent}%")

if disk_percent > 90:
health_status = "critical"
alerts.append(f"High disk usage: {disk_percent}%")

# 记录健康状态
health_data = {
"timestamp": time.time(),
"status": health_status,
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"disk_percent": disk_percent,
"alerts": alerts
}

# 保存健康数据
save_health_data(health_data)

# 如果有警报,发送通知
if alerts:
send_health_alert(health_data)

return health_data

except Exception as e:
logger.error(f"Health check failed: {e}")
raise

def create_database_backup():
"""创建数据库备份"""
# 数据库备份逻辑
return "/tmp/backup_20231201.sql"

def upload_backup_to_cloud(backup_file):
"""上传备份到云存储"""
# 云存储上传逻辑
return "https://cloud.example.com/backups/backup_20231201.sql"

def send_backup_notification(backup_url):
"""发送备份完成通知"""
send_email_task.delay(
"admin@example.com",
"数据库备份完成",
f"数据库备份已完成,备份文件:{backup_url}"
)

def send_backup_failure_notification(error):
"""发送备份失败通知"""
send_email_task.delay(
"admin@example.com",
"数据库备份失败",
f"数据库备份失败,错误信息:{error}"
)

def save_health_data(health_data):
"""保存健康数据"""
# 保存到数据库或监控系统
logger.info(f"Health data saved: {health_data}")

def send_health_alert(health_data):
"""发送健康警报"""
alert_message = f"系统健康警报:\n"
for alert in health_data["alerts"]:
alert_message += f"- {alert}\n"

send_email_task.delay(
"admin@example.com",
"系统健康警报",
alert_message
)

2. 动态定时任务管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from celery.beat import ScheduleEntry
from celery import current_app
from datetime import datetime, timedelta

class DynamicTaskScheduler:
"""动态任务调度器"""

def __init__(self):
self.scheduled_tasks = {}

def add_scheduled_task(
self,
task_name: str,
task_func: str,
schedule_type: str,
schedule_value: dict,
args: list = None,
kwargs: dict = None
):
"""添加定时任务"""
try:
# 根据调度类型创建调度对象
if schedule_type == "crontab":
from celery.schedules import crontab
schedule = crontab(**schedule_value)
elif schedule_type == "interval":
schedule = timedelta(**schedule_value)
else:
raise ValueError(f"Unsupported schedule type: {schedule_type}")

# 创建任务条目
task_entry = {
"task": task_func,
"schedule": schedule,
"args": args or [],
"kwargs": kwargs or {},
}

# 添加到Celery Beat调度
current_app.conf.beat_schedule[task_name] = task_entry
self.scheduled_tasks[task_name] = task_entry

logger.info(f"Scheduled task added: {task_name}")
return True

except Exception as e:
logger.error(f"Failed to add scheduled task {task_name}: {e}")
return False

def remove_scheduled_task(self, task_name: str):
"""移除定时任务"""
try:
if task_name in current_app.conf.beat_schedule:
del current_app.conf.beat_schedule[task_name]

if task_name in self.scheduled_tasks:
del self.scheduled_tasks[task_name]

logger.info(f"Scheduled task removed: {task_name}")
return True

except Exception as e:
logger.error(f"Failed to remove scheduled task {task_name}: {e}")
return False

def list_scheduled_tasks(self):
"""列出所有定时任务"""
return list(self.scheduled_tasks.keys())

def get_task_info(self, task_name: str):
"""获取任务信息"""
return self.scheduled_tasks.get(task_name)

# 全局调度器实例
scheduler = DynamicTaskScheduler()

# 动态任务管理接口
@app.post("/admin/scheduled-tasks")
async def create_scheduled_task(
task_name: str,
task_func: str,
schedule_type: str,
schedule_value: dict,
args: list = None,
kwargs: dict = None
):
"""创建定时任务"""
success = scheduler.add_scheduled_task(
task_name, task_func, schedule_type, schedule_value, args, kwargs
)

if success:
return {"message": f"定时任务 {task_name} 创建成功"}
else:
raise HTTPException(status_code=400, detail="创建定时任务失败")

@app.delete("/admin/scheduled-tasks/{task_name}")
async def delete_scheduled_task(task_name: str):
"""删除定时任务"""
success = scheduler.remove_scheduled_task(task_name)

if success:
return {"message": f"定时任务 {task_name} 删除成功"}
else:
raise HTTPException(status_code=404, detail="定时任务不存在")

@app.get("/admin/scheduled-tasks")
async def list_scheduled_tasks():
"""列出所有定时任务"""
tasks = scheduler.list_scheduled_tasks()
task_info = []

for task_name in tasks:
info = scheduler.get_task_info(task_name)
task_info.append({
"name": task_name,
"task": info["task"],
"schedule": str(info["schedule"]),
"args": info["args"],
"kwargs": info["kwargs"]
})

return {"tasks": task_info}

任务监控和管理

1. 任务监控面板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import json

templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")

@app.get("/admin/tasks", response_class=HTMLResponse)
async def task_monitor_dashboard(request: Request):
"""任务监控面板"""
return templates.TemplateResponse("task_monitor.html", {"request": request})

@app.get("/admin/tasks/metrics")
async def get_task_metrics():
"""获取任务指标"""
try:
inspect = celery_app.control.inspect()

# 获取工作节点状态
stats = inspect.stats()
active = inspect.active()
scheduled = inspect.scheduled()
reserved = inspect.reserved()

# 计算指标
total_workers = len(stats) if stats else 0
total_active_tasks = sum(len(tasks) for tasks in active.values()) if active else 0
total_scheduled_tasks = sum(len(tasks) for tasks in scheduled.values()) if scheduled else 0
total_reserved_tasks = sum(len(tasks) for tasks in reserved.values()) if reserved else 0

return {
"workers": {
"total": total_workers,
"stats": stats
},
"tasks": {
"active": total_active_tasks,
"scheduled": total_scheduled_tasks,
"reserved": total_reserved_tasks,
"active_details": active,
"scheduled_details": scheduled
},
"timestamp": time.time()
}

except Exception as e:
logger.error(f"Failed to get task metrics: {e}")
return {"error": str(e)}

@app.get("/admin/tasks/history")
async def get_task_history(limit: int = 100, task_name: str = None):
"""获取任务历史"""
try:
# 这里需要实现任务历史记录的存储和查询
# 可以使用数据库或Redis来存储任务执行历史

# 示例返回数据
history = [
{
"task_id": "12345",
"task_name": "send_email_task",
"status": "SUCCESS",
"started_at": "2023-12-01T10:00:00Z",
"completed_at": "2023-12-01T10:00:05Z",
"duration": 5.2,
"result": {"status": "sent", "recipient": "user@example.com"}
}
]

return {"history": history}

except Exception as e:
return {"error": str(e)}

2. 任务重试和错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from celery.exceptions import Retry, WorkerLostError
from celery.signals import task_failure, task_retry, task_success

@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def robust_task(self, data: dict):
"""健壮的任务处理"""
try:
task_id = self.request.id
logger.info(f"Task {task_id}: Processing data")

# 验证输入数据
if not validate_task_data(data):
raise ValueError("Invalid task data")

# 执行主要逻辑
result = process_task_data(data)

# 验证结果
if not validate_task_result(result):
raise ValueError("Invalid task result")

logger.info(f"Task {task_id}: Completed successfully")
return result

except ValueError as e:
# 数据验证错误,不重试
logger.error(f"Task {task_id}: Validation error: {e}")
raise

except ConnectionError as e:
# 连接错误,可以重试
logger.warning(f"Task {task_id}: Connection error: {e}")
raise self.retry(countdown=60, exc=e)

except Exception as e:
# 其他错误,根据重试次数决定
logger.error(f"Task {task_id}: Unexpected error: {e}")

if self.request.retries >= self.max_retries:
# 达到最大重试次数,记录失败
log_task_failure(task_id, str(e))
send_failure_notification(task_id, str(e))

raise

# 任务信号处理
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None):
"""任务失败处理"""
logger.error(f"Task {task_id} failed: {exception}")

# 记录失败信息
failure_info = {
"task_id": task_id,
"task_name": sender.name if sender else "unknown",
"exception": str(exception),
"traceback": traceback,
"timestamp": time.time()
}

# 保存失败记录
save_task_failure(failure_info)

# 发送告警
if should_send_failure_alert(sender.name if sender else "unknown"):
send_task_failure_alert(failure_info)

@task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None):
"""任务重试处理"""
logger.warning(f"Task {task_id} retrying: {reason}")

# 记录重试信息
retry_info = {
"task_id": task_id,
"task_name": sender.name if sender else "unknown",
"reason": str(reason),
"retry_count": sender.request.retries if sender else 0,
"timestamp": time.time()
}

save_task_retry(retry_info)

@task_success.connect
def task_success_handler(sender=None, task_id=None, result=None):
"""任务成功处理"""
logger.info(f"Task {task_id} completed successfully")

# 记录成功信息
success_info = {
"task_id": task_id,
"task_name": sender.name if sender else "unknown",
"result": result,
"timestamp": time.time()
}

save_task_success(success_info)

def validate_task_data(data: dict) -> bool:
"""验证任务数据"""
required_fields = ["id", "type", "payload"]
return all(field in data for field in required_fields)

def validate_task_result(result: any) -> bool:
"""验证任务结果"""
return result is not None

def process_task_data(data: dict) -> dict:
"""处理任务数据"""
# 实际的数据处理逻辑
return {"processed": True, "data": data}

def log_task_failure(task_id: str, error: str):
"""记录任务失败"""
logger.error(f"Task {task_id} failed permanently: {error}")

def send_failure_notification(task_id: str, error: str):
"""发送失败通知"""
send_email_task.delay(
"admin@example.com",
f"任务失败通知 - {task_id}",
f"任务 {task_id} 执行失败:{error}"
)

def save_task_failure(failure_info: dict):
"""保存任务失败记录"""
# 保存到数据库或日志系统
logger.error(f"Task failure recorded: {json.dumps(failure_info)}")

def save_task_retry(retry_info: dict):
"""保存任务重试记录"""
logger.warning(f"Task retry recorded: {json.dumps(retry_info)}")

def save_task_success(success_info: dict):
"""保存任务成功记录"""
logger.info(f"Task success recorded: {json.dumps(success_info)}")

def should_send_failure_alert(task_name: str) -> bool:
"""判断是否应该发送失败告警"""
# 可以根据任务类型、失败频率等条件判断
critical_tasks = ["backup_database", "send_payment_notification"]
return task_name in critical_tasks

def send_task_failure_alert(failure_info: dict):
"""发送任务失败告警"""
alert_message = f"""
任务执行失败告警

任务ID: {failure_info['task_id']}
任务名称: {failure_info['task_name']}
失败原因: {failure_info['exception']}
失败时间: {datetime.fromtimestamp(failure_info['timestamp'])}

请及时处理!
"""

send_email_task.delay(
"admin@example.com",
"任务失败告警",
alert_message
)

性能优化和最佳实践

1. 任务队列优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 任务优先级配置
from kombu import Queue

# 定义不同优先级的队列
celery_app.conf.task_routes = {
'tasks.critical_task': {'queue': 'critical', 'priority': 9},
'tasks.high_priority_task': {'queue': 'high', 'priority': 7},
'tasks.normal_task': {'queue': 'normal', 'priority': 5},
'tasks.low_priority_task': {'queue': 'low', 'priority': 3},
}

celery_app.conf.task_queues = (
Queue('critical', routing_key='critical'),
Queue('high', routing_key='high'),
Queue('normal', routing_key='normal'),
Queue('low', routing_key='low'),
)

# 批量任务处理
@celery_app.task
def batch_process_users(user_ids: List[int], batch_size: int = 100):
"""批量处理用户数据"""
total_users = len(user_ids)
processed = 0

for i in range(0, total_users, batch_size):
batch = user_ids[i:i + batch_size]

# 并行处理批次
job = group(process_single_user.s(user_id) for user_id in batch)
result = job.apply_async()

# 等待批次完成
result.get()

processed += len(batch)
logger.info(f"Processed {processed}/{total_users} users")

return {"total_processed": processed}

@celery_app.task
def process_single_user(user_id: int):
"""处理单个用户"""
# 用户处理逻辑
return {"user_id": user_id, "processed": True}

# 任务链和工作流
from celery import chain, group, chord

@app.post("/workflows/user-onboarding")
async def start_user_onboarding(user_id: int):
"""启动用户入职工作流"""

# 创建任务链
workflow = chain(
# 第一步:创建用户账户
create_user_account.s(user_id),

# 第二步:发送欢迎邮件
send_welcome_email.s(),

# 第三步:设置默认权限
setup_default_permissions.s(),

# 第四步:发送完成通知
send_onboarding_complete_notification.s()
)

result = workflow.apply_async()

return {
"workflow_id": result.id,
"status": "started",
"message": "用户入职工作流已启动"
}

@celery_app.task
def create_user_account(user_id: int):
"""创建用户账户"""
logger.info(f"Creating account for user {user_id}")
# 账户创建逻辑
return {"user_id": user_id, "account_created": True}

@celery_app.task
def send_welcome_email(previous_result: dict):
"""发送欢迎邮件"""
user_id = previous_result["user_id"]
logger.info(f"Sending welcome email to user {user_id}")
# 邮件发送逻辑
return {**previous_result, "welcome_email_sent": True}

@celery_app.task
def setup_default_permissions(previous_result: dict):
"""设置默认权限"""
user_id = previous_result["user_id"]
logger.info(f"Setting up permissions for user {user_id}")
# 权限设置逻辑
return {**previous_result, "permissions_set": True}

@celery_app.task
def send_onboarding_complete_notification(previous_result: dict):
"""发送入职完成通知"""
user_id = previous_result["user_id"]
logger.info(f"Onboarding completed for user {user_id}")
# 完成通知逻辑
return {**previous_result, "onboarding_complete": True}

2. 监控和告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 任务性能监控
import time
from functools import wraps

def monitor_task_performance(func):
"""任务性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
task_name = func.__name__

try:
result = func(*args, **kwargs)
duration = time.time() - start_time

# 记录性能指标
record_task_metrics(task_name, duration, "success")

# 检查是否为慢任务
if duration > 30: # 超过30秒
logger.warning(f"Slow task detected: {task_name} took {duration:.2f}s")
send_slow_task_alert(task_name, duration)

return result

except Exception as e:
duration = time.time() - start_time
record_task_metrics(task_name, duration, "failure")
raise

return wrapper

def record_task_metrics(task_name: str, duration: float, status: str):
"""记录任务指标"""
metrics = {
"task_name": task_name,
"duration": duration,
"status": status,
"timestamp": time.time()
}

# 发送到监控系统(如Prometheus、InfluxDB等)
logger.info(f"Task metrics: {json.dumps(metrics)}")

def send_slow_task_alert(task_name: str, duration: float):
"""发送慢任务告警"""
send_email_task.delay(
"admin@example.com",
"慢任务告警",
f"任务 {task_name} 执行时间过长:{duration:.2f}秒"
)

# 使用监控装饰器
@celery_app.task
@monitor_task_performance
def monitored_task(data: dict):
"""被监控的任务"""
# 任务逻辑
time.sleep(5) # 模拟耗时操作
return {"processed": True}

总结

通过本文的深入探讨,我们学习了FastAPI后台任务处理的完整解决方案:

核心技术点

  1. FastAPI内置后台任务:适合轻量级异步操作
  2. Celery分布式任务队列:适合复杂的任务处理场景
  3. 定时任务调度:使用Celery Beat实现定时任务
  4. 任务监控管理:实现任务状态跟踪和性能监控
  5. 错误处理重试:建立健壮的错误处理机制

最佳实践

  1. 任务设计:保持任务幂等性,避免副作用
  2. 队列管理:合理设计队列优先级和路由
  3. 监控告警:建立完善的监控和告警机制
  4. 性能优化:使用批处理和任务链优化性能
  5. 错误处理:实现智能重试和失败通知

选择建议

  • 简单异步操作:使用FastAPI内置BackgroundTasks
  • 复杂任务处理:使用Celery + Redis/RabbitMQ
  • 定时任务:使用Celery Beat
  • 高并发场景:考虑任务队列分片和负载均衡

后台任务处理是现代Web应用的重要组成部分,选择合适的方案并正确实施,能够显著提升应用的性能和用户体验。

你在后台任务处理方面有什么经验或遇到过什么挑战吗?欢迎在评论中分享讨论!

本站由 提供部署服务