1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
| from celery import current_task from celery.exceptions import Retry import requests import time from typing import Dict, List, Optional import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3) def send_email_task(self, recipient: str, subject: str, content: str, template: str = None): """发送邮件任务""" try: logger.info(f"Task {self.request.id}: Sending email to {recipient}") if template: content = render_email_template(template, {"content": content}) success = send_smtp_email(recipient, subject, content) if not success: raise Exception("SMTP send failed") logger.info(f"Task {self.request.id}: Email sent successfully") return {"status": "sent", "recipient": recipient} except Exception as exc: logger.error(f"Task {self.request.id}: Email send failed: {exc}") if self.request.retries < self.max_retries: logger.info(f"Task {self.request.id}: Retrying in 60 seconds...") raise self.retry(countdown=60, exc=exc) else: logger.error(f"Task {self.request.id}: Max retries exceeded") return {"status": "failed", "error": str(exc)}
@celery_app.task(bind=True) def process_image_task(self, image_url: str, operations: List[Dict]): """图片处理任务""" try: task_id = self.request.id logger.info(f"Task {task_id}: Processing image {image_url}") self.update_state( state="PROGRESS", meta={"current": 0, "total": len(operations), "status": "下载图片中..."} ) image_data = download_image(image_url) processed_image = image_data for i, operation in enumerate(operations): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(operations), "status": f"执行操作: {operation['type']}" } ) processed_image = apply_image_operation(processed_image, operation) time.sleep(1) result_url = save_processed_image(processed_image) return { "status": "completed", "original_url": image_url, "result_url": result_url, "operations_applied": len(operations) } except Exception as exc: logger.error(f"Task {task_id}: Image processing failed: {exc}") self.update_state( state="FAILURE", meta={"error": str(exc)} ) raise
@celery_app.task(bind=True) def generate_report_task(self, report_type: str, filters: Dict, user_id: int): """生成报告任务""" try: task_id = self.request.id logger.info(f"Task {task_id}: Generating {report_type} report for user {user_id}") steps = [ "查询数据", "数据处理", "生成图表", "创建PDF", "上传文件" ] for i, step in enumerate(steps): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(steps), "status": step } ) if step == "查询数据": data = query_report_data(report_type, filters) elif step == "数据处理": processed_data = process_report_data(data) elif step == "生成图表": charts = generate_charts(processed_data) elif step == "创建PDF": pdf_path = create_pdf_report(processed_data, charts) elif step == "上传文件": file_url = upload_report_file(pdf_path) time.sleep(2) send_report_notification.delay(user_id, file_url, report_type) return { "status": "completed", "report_type": report_type, "file_url": file_url, "generated_at": time.time() } except Exception as exc: logger.error(f"Task {task_id}: Report generation failed: {exc}") raise
@celery_app.task def send_report_notification(user_id: int, file_url: str, report_type: str): """发送报告完成通知""" user_email = get_user_email(user_id) send_email_task.delay( user_email, f"{report_type}报告已生成", f"您的报告已生成完成,请点击下载:{file_url}" )
def render_email_template(template: str, context: Dict) -> str: """渲染邮件模板""" return template.format(**context)
def send_smtp_email(recipient: str, subject: str, content: str) -> bool: """发送SMTP邮件""" logger.info(f"Sending SMTP email to {recipient}") return True
def download_image(url: str) -> bytes: """下载图片""" response = requests.get(url) return response.content
def apply_image_operation(image_data: bytes, operation: Dict) -> bytes: """应用图片操作""" return image_data
def save_processed_image(image_data: bytes) -> str: """保存处理后的图片""" return "https://example.com/processed_image.jpg"
def query_report_data(report_type: str, filters: Dict) -> List[Dict]: """查询报告数据""" return [{"id": 1, "value": 100}]
def process_report_data(data: List[Dict]) -> Dict: """处理报告数据""" return {"processed": True, "count": len(data)}
def generate_charts(data: Dict) -> List[str]: """生成图表""" return ["chart1.png", "chart2.png"]
def create_pdf_report(data: Dict, charts: List[str]) -> str: """创建PDF报告""" return "/tmp/report.pdf"
def upload_report_file(file_path: str) -> str: """上传报告文件""" return "https://example.com/reports/report.pdf"
def get_user_email(user_id: int) -> str: """获取用户邮箱""" return f"user{user_id}@example.com"
|