
| from celery import current_task from celery.exceptions import Retry import requests import time from typing import Dict, List, Optional import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3) def send_email_task(self, recipient: str, subject: str, content: str, template: str = None): """发送邮件任务""" try: logger.info(f"Task {self.request.id}: Sending email to {recipient}") if template: content = render_email_template(template, {"content": content}) success = send_smtp_email(recipient, subject, content) if not success: raise Exception("SMTP send failed") logger.info(f"Task {self.request.id}: Email sent successfully") return {"status": "sent", "recipient": recipient} except Exception as exc: logger.error(f"Task {self.request.id}: Email send failed: {exc}") if self.request.retries < self.max_retries: logger.info(f"Task {self.request.id}: Retrying in 60 seconds...") raise self.retry(countdown=60, exc=exc) else: logger.error(f"Task {self.request.id}: Max retries exceeded") return {"status": "failed", "error": str(exc)}
@celery_app.task(bind=True) def process_image_task(self, image_url: str, operations: List[Dict]): """图片处理任务""" try: task_id = self.request.id logger.info(f"Task {task_id}: Processing image {image_url}") self.update_state( state="PROGRESS", meta={"current": 0, "total": len(operations), "status": "下载图片中..."} ) image_data = download_image(image_url) processed_image = image_data for i, operation in enumerate(operations): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(operations), "status": f"执行操作: {operation['type']}" } ) processed_image = apply_image_operation(processed_image, operation) time.sleep(1) result_url = save_processed_image(processed_image) return { "status": "completed", "original_url": image_url, "result_url": result_url, "operations_applied": len(operations) } except Exception as exc: logger.error(f"Task {task_id}: Image processing failed: {exc}") self.update_state( state="FAILURE", meta={"error": str(exc)} ) raise
@celery_app.task(bind=True) def generate_report_task(self, report_type: str, filters: Dict, user_id: int): """生成报告任务""" try: task_id = self.request.id logger.info(f"Task {task_id}: Generating {report_type} report for user {user_id}") steps = [ "查询数据", "数据处理", "生成图表", "创建PDF", "上传文件" ] for i, step in enumerate(steps): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(steps), "status": step } ) if step == "查询数据": data = query_report_data(report_type, filters) elif step == "数据处理": processed_data = process_report_data(data) elif step == "生成图表": charts = generate_charts(processed_data) elif step == "创建PDF": pdf_path = create_pdf_report(processed_data, charts) elif step == "上传文件": file_url = upload_report_file(pdf_path) time.sleep(2) send_report_notification.delay(user_id, file_url, report_type) return { "status": "completed", "report_type": report_type, "file_url": file_url, "generated_at": time.time() } except Exception as exc: logger.error(f"Task {task_id}: Report generation failed: {exc}") raise
@celery_app.task def send_report_notification(user_id: int, file_url: str, report_type: str): """发送报告完成通知""" user_email = get_user_email(user_id) send_email_task.delay( user_email, f"{report_type}报告已生成", f"您的报告已生成完成,请点击下载:{file_url}" )
def render_email_template(template: str, context: Dict) -> str: """渲染邮件模板""" return template.format(**context)
def send_smtp_email(recipient: str, subject: str, content: str) -> bool: """发送SMTP邮件""" logger.info(f"Sending SMTP email to {recipient}") return True
def download_image(url: str) -> bytes: """下载图片""" response = requests.get(url) return response.content
def apply_image_operation(image_data: bytes, operation: Dict) -> bytes: """应用图片操作""" return image_data
def save_processed_image(image_data: bytes) -> str: """保存处理后的图片""" return "https://example.com/processed_image.jpg"
def query_report_data(report_type: str, filters: Dict) -> List[Dict]: """查询报告数据""" return [{"id": 1, "value": 100}]
def process_report_data(data: List[Dict]) -> Dict: """处理报告数据""" return {"processed": True, "count": len(data)}
def generate_charts(data: Dict) -> List[str]: """生成图表""" return ["chart1.png", "chart2.png"]
def create_pdf_report(data: Dict, charts: List[str]) -> str: """创建PDF报告""" return "/tmp/report.pdf"
def upload_report_file(file_path: str) -> str: """上传报告文件""" return "https://example.com/reports/report.pdf"
def get_user_email(user_id: int) -> str: """获取用户邮箱""" return f"user{user_id}@example.com"
|