ThinkPHP6/8 队列系统与异步任务处理实战指南

在现代Web应用开发中,队列系统是处理异步任务、提高系统性能和用户体验的重要技术。ThinkPHP6/8提供了强大的think-queue扩展包,支持多种驱动方式,能够轻松实现异步任务处理、延迟执行、任务重试等功能。本文将详细介绍ThinkPHP队列系统的配置、使用和最佳实践。

队列系统概述

什么是消息队列

think-queue是ThinkPHP官方提供的消息队列服务,专门支持队列服务的扩展包。它适用于:

  • 大并发场景下的任务处理
  • 返回结果时间较长的第三方接口调用
  • 批量操作任务(短信发送、邮件发送、APP推送)
  • 需要异步处理的业务逻辑

队列系统特性

  • 消息管理:发布、获取、执行、删除、重发、失败处理
  • 延迟执行:支持延迟任务和定时任务
  • 超时控制:任务执行超时处理
  • 多队列支持:支持多个队列并行处理
  • 内存限制:防止内存溢出
  • 进程管理:启动、停止、守护进程
  • 降级机制:可降级为同步执行

安装与配置

安装think-queue扩展

1
2
3
4
5
# ThinkPHP6安装
composer require topthink/think-queue

# ThinkPHP8安装
composer require thinkphp6/think-queue:dev-main

队列配置

安装完成后,在config/queue.php中配置队列驱动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?php
/**
* 队列配置文件
* 支持驱动:sync(同步)、database(数据库)、redis(Redis)
*/
return [
// 默认驱动
'default' => 'redis',

// 驱动配置
'connections' => [
// 同步执行(调试用)
'sync' => [
'type' => 'sync',
],

// 数据库驱动
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],

// Redis驱动
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
'expire' => 60, // 任务过期时间(秒)
],
],

// 失败任务配置
'failed' => [
'type' => 'database',
'table' => 'failed_jobs',
],
];

数据库驱动表结构

如果使用数据库驱动,需要创建jobs表:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `jobs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`queue` varchar(255) NOT NULL DEFAULT '' COMMENT '队列名称',
`payload` longtext NOT NULL COMMENT '有效负载',
`attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
`reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订阅次数',
`reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '订阅时间',
`available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效时间',
`created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列';

创建任务类

单任务处理类

创建app/job/EmailJob.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
namespace app\job;

use think\queue\Job;
use think\facade\Log;
use think\facade\Mail;

/**
* 邮件发送任务类
*/
class EmailJob
{
/**
* 执行任务
* @param Job $job 当前任务对象
* @param array $data 任务数据
*/
public function fire(Job $job, $data)
{
try {
// 检查任务是否仍需执行
if (!$this->checkJob($data)) {
$job->delete();
return;
}

// 执行邮件发送
$result = $this->sendEmail($data);

if ($result) {
// 任务执行成功,删除任务
$job->delete();
Log::info('邮件发送成功', $data);
} else {
// 任务执行失败,检查重试次数
if ($job->attempts() > 3) {
Log::error('邮件发送失败,超过最大重试次数', $data);
$job->delete();
} else {
// 延迟重试
$job->release(60); // 60秒后重试
}
}
} catch (\Exception $e) {
Log::error('邮件任务执行异常:' . $e->getMessage(), $data);

if ($job->attempts() > 3) {
$job->delete();
} else {
$job->release(60);
}
}
}

/**
* 任务失败处理
* @param array $data 任务数据
*/
public function failed($data)
{
Log::error('邮件任务最终失败', $data);
// 可以在这里进行失败后的处理,如通知管理员
}

/**
* 检查任务是否需要执行
* @param array $data 任务数据
* @return bool
*/
private function checkJob($data)
{
// 检查邮件地址是否有效
if (empty($data['email']) || !filter_var($data['email'], FILTER_VALIDATE_EMAIL)) {
return false;
}

return true;
}

/**
* 发送邮件
* @param array $data 邮件数据
* @return bool
*/
private function sendEmail($data)
{
try {
// 实际的邮件发送逻辑
$result = Mail::to($data['email'])
->subject($data['subject'])
->html($data['content'])
->send();

return $result;
} catch (\Exception $e) {
Log::error('邮件发送异常:' . $e->getMessage());
return false;
}
}
}

多任务处理类

创建app/job/NotificationJob.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<?php
namespace app\job;

use think\queue\Job;
use think\facade\Log;

/**
* 通知任务类(支持多种通知方式)
*/
class NotificationJob
{
/**
* 发送短信通知
* @param Job $job
* @param array $data
*/
public function sendSms(Job $job, $data)
{
$result = $this->processSms($data);
$this->handleJobResult($job, $result, $data, '短信');
}

/**
* 发送邮件通知
* @param Job $job
* @param array $data
*/
public function sendEmail(Job $job, $data)
{
$result = $this->processEmail($data);
$this->handleJobResult($job, $result, $data, '邮件');
}

/**
* 发送APP推送
* @param Job $job
* @param array $data
*/
public function sendPush(Job $job, $data)
{
$result = $this->processPush($data);
$this->handleJobResult($job, $result, $data, 'APP推送');
}

/**
* 处理任务结果
* @param Job $job
* @param bool $result
* @param array $data
* @param string $type
*/
private function handleJobResult(Job $job, $result, $data, $type)
{
if ($result) {
$job->delete();
Log::info($type . '发送成功', $data);
} else {
if ($job->attempts() > 3) {
Log::error($type . '发送失败,超过最大重试次数', $data);
$job->delete();
} else {
$job->release(30);
}
}
}

/**
* 处理短信发送
* @param array $data
* @return bool
*/
private function processSms($data)
{
// 短信发送逻辑
sleep(2); // 模拟耗时操作
return true;
}

/**
* 处理邮件发送
* @param array $data
* @return bool
*/
private function processEmail($data)
{
// 邮件发送逻辑
sleep(3); // 模拟耗时操作
return true;
}

/**
* 处理APP推送
* @param array $data
* @return bool
*/
private function processPush($data)
{
// APP推送逻辑
sleep(1); // 模拟耗时操作
return true;
}

/**
* 任务失败处理
* @param array $data
*/
public function failed($data)
{
Log::error('通知任务最终失败', $data);
}
}

发布任务

队列服务封装

创建app/service/QueueService.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
namespace app\service;

use think\facade\Queue;
use think\facade\Log;

/**
* 队列服务类
*/
class QueueService
{
/**
* 发布邮件任务
* @param string $email 邮箱地址
* @param string $subject 邮件主题
* @param string $content 邮件内容
* @param int $delay 延迟时间(秒)
* @return mixed
*/
public static function pushEmailJob($email, $subject, $content, $delay = 0)
{
$data = [
'email' => $email,
'subject' => $subject,
'content' => $content,
'created_at' => time(),
];

$jobClass = 'app\\job\\EmailJob';
$queueName = 'email_queue';

if ($delay > 0) {
return Queue::later($delay, $jobClass, $data, $queueName);
} else {
return Queue::push($jobClass, $data, $queueName);
}
}

/**
* 发布通知任务
* @param string $type 通知类型(sms/email/push)
* @param array $data 通知数据
* @param string $queueName 队列名称
* @return mixed
*/
public static function pushNotificationJob($type, $data, $queueName = 'notification_queue')
{
$jobClass = 'app\\job\\NotificationJob@send' . ucfirst($type);

$taskData = array_merge($data, [
'type' => $type,
'created_at' => time(),
]);

return Queue::push($jobClass, $taskData, $queueName);
}

/**
* 批量发布任务
* @param array $jobs 任务列表
* @return array
*/
public static function pushBatchJobs($jobs)
{
$results = [];

foreach ($jobs as $job) {
$result = Queue::push(
$job['class'],
$job['data'],
$job['queue'] ?? 'default'
);

$results[] = [
'job' => $job,
'result' => $result,
];
}

return $results;
}

/**
* 获取队列状态
* @param string $queueName 队列名称
* @return array
*/
public static function getQueueStatus($queueName = 'default')
{
// 这里可以根据不同驱动实现队列状态查询
// Redis驱动示例
$redis = app('redis');

return [
'waiting' => $redis->llen('queue:' . $queueName),
'processing' => $redis->llen('queue:' . $queueName . ':processing'),
'failed' => $redis->llen('queue:' . $queueName . ':failed'),
];
}
}

控制器中使用队列

创建app/controller/QueueController.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<?php
namespace app\controller;

use app\BaseController;
use app\service\QueueService;
use think\Response;

/**
* 队列控制器
*/
class QueueController extends BaseController
{
/**
* 发送邮件
* @return Response
*/
public function sendEmail()
{
$email = $this->request->param('email');
$subject = $this->request->param('subject', '测试邮件');
$content = $this->request->param('content', '这是一封测试邮件');
$delay = $this->request->param('delay', 0);

if (empty($email)) {
return json(['code' => 400, 'msg' => '邮箱地址不能为空']);
}

$result = QueueService::pushEmailJob($email, $subject, $content, $delay);

if ($result !== false) {
return json(['code' => 200, 'msg' => '邮件任务已加入队列', 'data' => $result]);
} else {
return json(['code' => 500, 'msg' => '邮件任务加入队列失败']);
}
}

/**
* 发送通知
* @return Response
*/
public function sendNotification()
{
$type = $this->request->param('type'); // sms/email/push
$data = $this->request->param('data', []);

if (empty($type) || !in_array($type, ['sms', 'email', 'push'])) {
return json(['code' => 400, 'msg' => '通知类型无效']);
}

$result = QueueService::pushNotificationJob($type, $data);

if ($result !== false) {
return json(['code' => 200, 'msg' => '通知任务已加入队列', 'data' => $result]);
} else {
return json(['code' => 500, 'msg' => '通知任务加入队列失败']);
}
}

/**
* 批量发送任务
* @return Response
*/
public function batchSend()
{
$users = [
['email' => 'user1@example.com', 'name' => '用户1'],
['email' => 'user2@example.com', 'name' => '用户2'],
['email' => 'user3@example.com', 'name' => '用户3'],
];

$jobs = [];
foreach ($users as $user) {
$jobs[] = [
'class' => 'app\\job\\EmailJob',
'data' => [
'email' => $user['email'],
'subject' => '批量邮件通知',
'content' => '您好,' . $user['name'] . '!这是一封批量邮件。',
],
'queue' => 'batch_email_queue',
];
}

$results = QueueService::pushBatchJobs($jobs);

return json([
'code' => 200,
'msg' => '批量任务已加入队列',
'data' => $results
]);
}

/**
* 获取队列状态
* @return Response
*/
public function getStatus()
{
$queueName = $this->request->param('queue', 'default');
$status = QueueService::getQueueStatus($queueName);

return json([
'code' => 200,
'msg' => '获取队列状态成功',
'data' => $status
]);
}
}

队列监听与管理

启动队列监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# work方式(单进程)
php think queue:work --queue email_queue

# listen方式(推荐,支持代码热更新)
php think queue:listen --queue email_queue

# 监听多个队列
php think queue:listen --queue email_queue,notification_queue

# 设置内存限制
php think queue:listen --queue email_queue --memory=512

# 设置超时时间
php think queue:listen --queue email_queue --timeout=60

# 设置最大任务数
php think queue:listen --queue email_queue --max-jobs=1000

队列管理命令

1
2
3
4
5
6
7
8
9
10
11
# 重启队列
php think queue:restart

# 清除失败任务
php think queue:flush

# 重试失败任务
php think queue:retry all

# 查看失败任务
php think queue:failed

进程守护脚本

创建queue_daemon.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/bin/bash

# 队列守护脚本
QUEUE_NAME="email_queue,notification_queue"
PHP_PATH="/usr/bin/php"
PROJECT_PATH="/var/www/html/your_project"
LOG_PATH="$PROJECT_PATH/runtime/queue.log"

cd $PROJECT_PATH

while true; do
echo "$(date): Starting queue listener..." >> $LOG_PATH
$PHP_PATH think queue:listen --queue=$QUEUE_NAME >> $LOG_PATH 2>&1

if [ $? -ne 0 ]; then
echo "$(date): Queue listener crashed, restarting in 5 seconds..." >> $LOG_PATH
sleep 5
fi
done

性能优化与监控

队列性能监控服务

创建app/service/QueueMonitorService.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
<?php
namespace app\service;

use think\facade\Cache;
use think\facade\Log;

/**
* 队列监控服务
*/
class QueueMonitorService
{
/**
* 记录队列性能指标
* @param string $queueName 队列名称
* @param string $jobClass 任务类
* @param float $executeTime 执行时间
* @param bool $success 是否成功
*/
public static function recordMetrics($queueName, $jobClass, $executeTime, $success)
{
$date = date('Y-m-d');
$hour = date('H');

// 记录每小时统计
$key = "queue_metrics:{$queueName}:{$date}:{$hour}";
$metrics = Cache::get($key, [
'total' => 0,
'success' => 0,
'failed' => 0,
'avg_time' => 0,
'max_time' => 0,
]);

$metrics['total']++;
if ($success) {
$metrics['success']++;
} else {
$metrics['failed']++;
}

// 计算平均执行时间
$metrics['avg_time'] = ($metrics['avg_time'] * ($metrics['total'] - 1) + $executeTime) / $metrics['total'];
$metrics['max_time'] = max($metrics['max_time'], $executeTime);

Cache::set($key, $metrics, 86400); // 缓存24小时

// 记录慢任务
if ($executeTime > 10) { // 超过10秒的任务
Log::warning('慢任务检测', [
'queue' => $queueName,
'job' => $jobClass,
'execute_time' => $executeTime,
]);
}
}

/**
* 获取队列性能报告
* @param string $queueName 队列名称
* @param string $date 日期
* @return array
*/
public static function getPerformanceReport($queueName, $date = null)
{
$date = $date ?: date('Y-m-d');
$report = [];

for ($hour = 0; $hour < 24; $hour++) {
$key = "queue_metrics:{$queueName}:{$date}:" . sprintf('%02d', $hour);
$metrics = Cache::get($key, [
'total' => 0,
'success' => 0,
'failed' => 0,
'avg_time' => 0,
'max_time' => 0,
]);

$report[$hour] = $metrics;
}

return $report;
}

/**
* 检查队列健康状态
* @param string $queueName 队列名称
* @return array
*/
public static function checkQueueHealth($queueName)
{
$redis = app('redis');

// 获取队列长度
$waitingJobs = $redis->llen("queue:{$queueName}");
$processingJobs = $redis->llen("queue:{$queueName}:processing");
$failedJobs = $redis->llen("queue:{$queueName}:failed");

// 计算健康分数
$healthScore = 100;

// 等待任务过多扣分
if ($waitingJobs > 1000) {
$healthScore -= 30;
} elseif ($waitingJobs > 500) {
$healthScore -= 15;
}

// 失败任务过多扣分
if ($failedJobs > 100) {
$healthScore -= 40;
} elseif ($failedJobs > 50) {
$healthScore -= 20;
}

// 处理中任务过多扣分
if ($processingJobs > 50) {
$healthScore -= 20;
}

return [
'queue_name' => $queueName,
'waiting_jobs' => $waitingJobs,
'processing_jobs' => $processingJobs,
'failed_jobs' => $failedJobs,
'health_score' => max(0, $healthScore),
'status' => $healthScore >= 80 ? 'healthy' : ($healthScore >= 60 ? 'warning' : 'critical'),
];
}
}

最佳实践

队列设计原则

  1. 任务幂等性:确保任务可以安全重试
  2. 任务原子性:每个任务应该是独立的
  3. 合理的重试机制:设置适当的重试次数和延迟时间
  4. 任务超时处理:避免任务长时间占用资源
  5. 错误处理:完善的异常处理和日志记录

性能优化建议

  1. 选择合适的驱动

    • 开发环境:sync(同步执行,便于调试)
    • 生产环境:redis(高性能,支持持久化)
    • 大规模应用:考虑RabbitMQ等专业消息队列
  2. 队列分离

    • 按业务类型分离队列
    • 按优先级分离队列
    • 按执行时间分离队列
  3. 监控和告警

    • 监控队列长度
    • 监控任务执行时间
    • 监控失败率
    • 设置告警阈值
  4. 资源管理

    • 设置内存限制
    • 控制并发数量
    • 定期清理失败任务

总结

ThinkPHP6/8的队列系统为开发者提供了强大而灵活的异步任务处理能力。通过合理的配置和使用,可以显著提升应用性能和用户体验。在实际项目中,需要根据业务需求选择合适的驱动方式,设计合理的任务类,并建立完善的监控和管理机制。

关键要点:

  • 选择合适的队列驱动(Redis推荐用于生产环境)
  • 设计幂等和原子的任务类
  • 实现完善的错误处理和重试机制
  • 建立队列监控和性能分析
  • 使用进程守护确保队列服务稳定运行

通过本文的实战指南,相信你已经掌握了ThinkPHP队列系统的核心技术,能够在项目中灵活运用队列来处理各种异步任务需求。

本站由 提供部署服务