ThinkPHP6/8 队列任务处理实战:从入门到生产环境部署
Orion K Lv6

ThinkPHP的队列系统是处理异步任务的强大工具,特别适用于邮件发送、订单处理、数据统计等耗时操作。本文将深入探讨think-queue的配置、使用技巧和生产环境部署方案,帮助开发者构建高效的异步任务处理系统。

队列系统概述

什么是消息队列

消息队列是一种异步通信机制,允许应用程序将任务放入队列中,由后台进程异步处理。这种模式具有以下优势:

  • 解耦合:生产者和消费者独立运行
  • 高并发:支持大量任务并发处理
  • 可靠性:任务持久化存储,支持重试机制
  • 扩展性:可水平扩展处理能力

think-queue特性

1 2

  • 支持多种驱动:Redis、Database、Sync等
  • 任务重试机制和失败处理
  • 延迟任务执行
  • 多队列支持
  • 内存限制和超时控制
  • 进程守护和监控

安装与配置

安装think-queue

1
2
3
4
5
# ThinkPHP6/8 安装命令
composer require topthink/think-queue

# 如果是ThinkPHP6专用版本
composer require thinkphp6/think-queue:dev-main

队列配置详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?php
// config/queue.php
return [
// 默认队列驱动
'default' => env('queue.driver', 'redis'),

// 队列连接配置
'connections' => [
// 同步执行(开发调试用)
'sync' => [
'type' => 'sync',
],

// Redis驱动配置
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('redis.host', '127.0.0.1'),
'port' => env('redis.port', 6379),
'password' => env('redis.password', ''),
'select' => env('redis.select', 0),
'timeout' => 0,
'persistent' => false,
'prefix' => 'queue:',
'serialize' => ['serialize', 'unserialize'],
],

// 数据库驱动配置
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],

// 高可用Redis集群配置
'redis_cluster' => [
'type' => 'redis',
'queue' => 'default',
'host' => [
'192.168.1.100:7000',
'192.168.1.101:7000',
'192.168.1.102:7000',
],
'password' => env('redis.password', ''),
'timeout' => 0,
'persistent' => false,
'prefix' => 'cluster:queue:',
],
],

// 失败任务配置
'failed' => [
'type' => 'database',
'table' => 'failed_jobs',
],
];

数据库表结构

如果使用Database驱动,需要创建相应的数据表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- 任务队列表
CREATE TABLE `jobs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`queue` varchar(255) NOT NULL COMMENT '队列名称',
`payload` longtext NOT NULL COMMENT '任务数据',
`attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '尝试次数',
`reserve_time` int(10) unsigned DEFAULT NULL COMMENT '保留时间',
`available_time` int(10) unsigned NOT NULL COMMENT '可用时间',
`create_time` int(10) unsigned NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `jobs_queue_index` (`queue`),
KEY `jobs_available_time_index` (`available_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务队列表';

-- 失败任务表
CREATE TABLE `failed_jobs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`connection` text NOT NULL COMMENT '连接名称',
`queue` text NOT NULL COMMENT '队列名称',
`payload` longtext NOT NULL COMMENT '任务数据',
`exception` longtext NOT NULL COMMENT '异常信息',
`failed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失败时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='失败任务表';

任务类开发

单任务处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?php
namespace app\job;

use think\queue\Job;
use think\facade\Log;
use think\facade\Db;
use think\facade\Mail;

/**
* 邮件发送任务类
*/
class EmailJob
{
/**
* 执行邮件发送任务
* @param Job $job 当前任务对象
* @param array $data 任务数据
* @return void
*/
public function fire(Job $job, $data)
{
try {
// 记录任务开始
Log::info('邮件发送任务开始', $data);

// 检查重试次数
if ($job->attempts() > 3) {
Log::error('邮件发送任务重试次数超限', [
'attempts' => $job->attempts(),
'data' => $data
]);
$job->delete();
return;
}

// 验证必要参数
if (!isset($data['email']) || !isset($data['subject']) || !isset($data['content'])) {
Log::error('邮件发送任务参数不完整', $data);
$job->delete();
return;
}

// 执行邮件发送
$result = $this->sendEmail($data);

if ($result) {
// 更新发送状态
if (isset($data['email_log_id'])) {
Db::name('email_logs')
->where('id', $data['email_log_id'])
->update([
'status' => 1,
'sent_at' => time(),
'attempts' => $job->attempts()
]);
}

Log::info('邮件发送成功', $data);
$job->delete(); // 删除任务
} else {
// 发送失败,延迟重试
$delay = min(60 * pow(2, $job->attempts()), 3600); // 指数退避,最大1小时
Log::warning('邮件发送失败,将在' . $delay . '秒后重试', $data);
$job->release($delay);
}

} catch (\Exception $e) {
Log::error('邮件发送任务异常', [
'error' => $e->getMessage(),
'data' => $data,
'attempts' => $job->attempts()
]);

// 异常情况下延迟重试
if ($job->attempts() < 3) {
$job->release(300); // 5分钟后重试
} else {
$job->delete(); // 超过重试次数,删除任务
}
}
}

/**
* 任务失败处理
* @param array $data 任务数据
* @return void
*/
public function failed($data)
{
Log::error('邮件发送任务最终失败', $data);

// 更新数据库状态
if (isset($data['email_log_id'])) {
Db::name('email_logs')
->where('id', $data['email_log_id'])
->update([
'status' => 2, // 失败状态
'failed_at' => time()
]);
}

// 发送告警通知
$this->sendFailureAlert($data);
}

/**
* 发送邮件
* @param array $data 邮件数据
* @return bool
*/
private function sendEmail(array $data): bool
{
try {
// 这里实现具体的邮件发送逻辑
// 可以使用PHPMailer、SwiftMailer等

// 模拟邮件发送
$success = Mail::send([
'to' => $data['email'],
'subject' => $data['subject'],
'content' => $data['content'],
'template' => $data['template'] ?? 'default'
]);

return $success;

} catch (\Exception $e) {
Log::error('邮件发送底层异常', [
'error' => $e->getMessage(),
'data' => $data
]);
return false;
}
}

/**
* 发送失败告警
* @param array $data 任务数据
* @return void
*/
private function sendFailureAlert(array $data): void
{
// 发送钉钉、企业微信等告警通知
// 这里可以实现具体的告警逻辑
}
}

多任务处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
<?php
namespace app\job;

use think\queue\Job;
use think\facade\Log;
use think\facade\Db;

/**
* 订单处理任务类
*/
class OrderJob
{
/**
* 订单超时取消任务
* @param Job $job 当前任务对象
* @param array $data 任务数据
* @return void
*/
public function cancelExpiredOrder(Job $job, $data)
{
try {
$orderId = $data['order_id'];

// 检查订单状态
$order = Db::name('orders')->where('id', $orderId)->find();

if (!$order) {
Log::warning('订单不存在', ['order_id' => $orderId]);
$job->delete();
return;
}

// 只处理未支付订单
if ($order['status'] != 0) {
Log::info('订单状态已变更,无需取消', [
'order_id' => $orderId,
'status' => $order['status']
]);
$job->delete();
return;
}

// 检查是否超时
$expireTime = $order['create_time'] + ($data['timeout'] ?? 1800); // 默认30分钟
if (time() < $expireTime) {
Log::info('订单未超时,延迟处理', [
'order_id' => $orderId,
'expire_time' => date('Y-m-d H:i:s', $expireTime)
]);
$job->release($expireTime - time());
return;
}

// 执行订单取消
$this->cancelOrder($orderId);

Log::info('订单超时取消成功', ['order_id' => $orderId]);
$job->delete();

} catch (\Exception $e) {
Log::error('订单取消任务异常', [
'error' => $e->getMessage(),
'data' => $data
]);

if ($job->attempts() < 3) {
$job->release(60); // 1分钟后重试
} else {
$job->delete();
}
}
}

/**
* 订单自动确认收货任务
* @param Job $job 当前任务对象
* @param array $data 任务数据
* @return void
*/
public function autoConfirmOrder(Job $job, $data)
{
try {
$orderId = $data['order_id'];

// 检查订单状态
$order = Db::name('orders')->where('id', $orderId)->find();

if (!$order || $order['status'] != 3) { // 3表示已发货
$job->delete();
return;
}

// 检查是否到达自动确认时间
$autoConfirmTime = $order['ship_time'] + ($data['auto_days'] ?? 7) * 86400;
if (time() < $autoConfirmTime) {
$job->release($autoConfirmTime - time());
return;
}

// 执行自动确认收货
$this->confirmOrder($orderId);

Log::info('订单自动确认收货成功', ['order_id' => $orderId]);
$job->delete();

} catch (\Exception $e) {
Log::error('订单自动确认任务异常', [
'error' => $e->getMessage(),
'data' => $data
]);

if ($job->attempts() < 3) {
$job->release(3600); // 1小时后重试
} else {
$job->delete();
}
}
}

/**
* 订单数据统计任务
* @param Job $job 当前任务对象
* @param array $data 任务数据
* @return void
*/
public function statisticsOrder(Job $job, $data)
{
try {
$date = $data['date'] ?? date('Y-m-d');

// 统计当日订单数据
$stats = $this->calculateOrderStats($date);

// 保存统计结果
Db::name('order_statistics')->replace(true)->insert([
'date' => $date,
'total_orders' => $stats['total_orders'],
'total_amount' => $stats['total_amount'],
'paid_orders' => $stats['paid_orders'],
'paid_amount' => $stats['paid_amount'],
'cancel_orders' => $stats['cancel_orders'],
'refund_orders' => $stats['refund_orders'],
'created_at' => time(),
'updated_at' => time()
]);

Log::info('订单统计任务完成', ['date' => $date, 'stats' => $stats]);
$job->delete();

} catch (\Exception $e) {
Log::error('订单统计任务异常', [
'error' => $e->getMessage(),
'data' => $data
]);

if ($job->attempts() < 3) {
$job->release(300); // 5分钟后重试
} else {
$job->delete();
}
}
}

/**
* 取消订单
* @param int $orderId 订单ID
* @return bool
*/
private function cancelOrder(int $orderId): bool
{
Db::startTrans();
try {
// 更新订单状态
Db::name('orders')
->where('id', $orderId)
->update([
'status' => 5, // 已取消
'cancel_time' => time(),
'cancel_reason' => '订单超时自动取消'
]);

// 恢复库存
$orderItems = Db::name('order_items')->where('order_id', $orderId)->select();
foreach ($orderItems as $item) {
Db::name('products')
->where('id', $item['product_id'])
->inc('stock', $item['quantity']);
}

// 释放优惠券
Db::name('user_coupons')
->where('order_id', $orderId)
->update(['status' => 0, 'used_time' => 0]);

Db::commit();
return true;

} catch (\Exception $e) {
Db::rollback();
Log::error('取消订单失败', [
'order_id' => $orderId,
'error' => $e->getMessage()
]);
return false;
}
}

/**
* 确认收货
* @param int $orderId 订单ID
* @return bool
*/
private function confirmOrder(int $orderId): bool
{
Db::startTrans();
try {
// 更新订单状态
Db::name('orders')
->where('id', $orderId)
->update([
'status' => 4, // 已完成
'confirm_time' => time(),
'confirm_type' => 2 // 自动确认
]);

// 结算佣金
$this->settleCommission($orderId);

Db::commit();
return true;

} catch (\Exception $e) {
Db::rollback();
Log::error('确认收货失败', [
'order_id' => $orderId,
'error' => $e->getMessage()
]);
return false;
}
}

/**
* 计算订单统计数据
* @param string $date 日期
* @return array
*/
private function calculateOrderStats(string $date): array
{
$startTime = strtotime($date);
$endTime = $startTime + 86400;

return [
'total_orders' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->count(),
'total_amount' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->sum('total_amount'),
'paid_orders' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->where('status', '>=', 1)
->count(),
'paid_amount' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->where('status', '>=', 1)
->sum('total_amount'),
'cancel_orders' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->where('status', 5)
->count(),
'refund_orders' => Db::name('orders')
->where('create_time', 'between', [$startTime, $endTime])
->where('status', 6)
->count(),
];
}

/**
* 结算佣金
* @param int $orderId 订单ID
* @return void
*/
private function settleCommission(int $orderId): void
{
// 实现佣金结算逻辑
}
}

任务发布与管理

任务发布服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
<?php
namespace app\service;

use think\facade\Queue;
use think\facade\Log;
use think\facade\Db;

/**
* 队列任务发布服务
*/
class QueueService
{
/**
* 发布邮件发送任务
* @param string $email 邮箱地址
* @param string $subject 邮件主题
* @param string $content 邮件内容
* @param string $template 邮件模板
* @param int $delay 延迟时间(秒)
* @return bool
*/
public function sendEmail(string $email, string $subject, string $content, string $template = 'default', int $delay = 0): bool
{
try {
// 记录邮件发送日志
$logId = Db::name('email_logs')->insertGetId([
'email' => $email,
'subject' => $subject,
'template' => $template,
'status' => 0, // 待发送
'created_at' => time()
]);

$jobData = [
'email_log_id' => $logId,
'email' => $email,
'subject' => $subject,
'content' => $content,
'template' => $template,
'created_at' => time()
];

// 发布任务
if ($delay > 0) {
$result = Queue::later($delay, 'app\\job\\EmailJob', $jobData, 'email');
} else {
$result = Queue::push('app\\job\\EmailJob', $jobData, 'email');
}

if ($result !== false) {
Log::info('邮件发送任务已发布', $jobData);
return true;
}

return false;

} catch (\Exception $e) {
Log::error('邮件发送任务发布失败', [
'error' => $e->getMessage(),
'email' => $email,
'subject' => $subject
]);
return false;
}
}

/**
* 发布订单超时取消任务
* @param int $orderId 订单ID
* @param int $timeout 超时时间(秒)
* @return bool
*/
public function cancelExpiredOrder(int $orderId, int $timeout = 1800): bool
{
try {
$jobData = [
'order_id' => $orderId,
'timeout' => $timeout,
'created_at' => time()
];

// 延迟执行取消任务
$result = Queue::later($timeout, 'app\\job\\OrderJob@cancelExpiredOrder', $jobData, 'order');

if ($result !== false) {
Log::info('订单超时取消任务已发布', $jobData);
return true;
}

return false;

} catch (\Exception $e) {
Log::error('订单超时取消任务发布失败', [
'error' => $e->getMessage(),
'order_id' => $orderId
]);
return false;
}
}

/**
* 发布订单自动确认收货任务
* @param int $orderId 订单ID
* @param int $autoDays 自动确认天数
* @return bool
*/
public function autoConfirmOrder(int $orderId, int $autoDays = 7): bool
{
try {
$jobData = [
'order_id' => $orderId,
'auto_days' => $autoDays,
'created_at' => time()
];

// 延迟执行自动确认任务
$delay = $autoDays * 86400; // 转换为秒
$result = Queue::later($delay, 'app\\job\\OrderJob@autoConfirmOrder', $jobData, 'order');

if ($result !== false) {
Log::info('订单自动确认任务已发布', $jobData);
return true;
}

return false;

} catch (\Exception $e) {
Log::error('订单自动确认任务发布失败', [
'error' => $e->getMessage(),
'order_id' => $orderId
]);
return false;
}
}

/**
* 批量发布任务
* @param array $jobs 任务列表
* @return array
*/
public function batchPush(array $jobs): array
{
$results = [];

foreach ($jobs as $index => $job) {
try {
$jobClass = $job['class'];
$jobData = $job['data'];
$queueName = $job['queue'] ?? 'default';
$delay = $job['delay'] ?? 0;

if ($delay > 0) {
$result = Queue::later($delay, $jobClass, $jobData, $queueName);
} else {
$result = Queue::push($jobClass, $jobData, $queueName);
}

$results[$index] = [
'success' => $result !== false,
'job_id' => $result,
'error' => null
];

} catch (\Exception $e) {
$results[$index] = [
'success' => false,
'job_id' => null,
'error' => $e->getMessage()
];
}
}

return $results;
}

/**
* 发布定时统计任务
* @param string $date 统计日期
* @return bool
*/
public function scheduleStatistics(string $date = null): bool
{
$date = $date ?: date('Y-m-d');

try {
$jobData = [
'date' => $date,
'created_at' => time()
];

// 在当天23:59执行统计任务
$executeTime = strtotime($date . ' 23:59:00');
$delay = max(0, $executeTime - time());

$result = Queue::later($delay, 'app\\job\\OrderJob@statisticsOrder', $jobData, 'statistics');

if ($result !== false) {
Log::info('统计任务已发布', $jobData);
return true;
}

return false;

} catch (\Exception $e) {
Log::error('统计任务发布失败', [
'error' => $e->getMessage(),
'date' => $date
]);
return false;
}
}
}

生产环境部署

Supervisor进程守护

4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
; /etc/supervisor/conf.d/thinkphp-queue.conf
[group:thinkphp-queue]
programs=queue-email,queue-order,queue-statistics

; 邮件队列进程
[program:queue-email]
command=php /var/www/html/think queue:listen --queue=email --memory=512 --timeout=60
directory=/var/www/html
user=www-data
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/queue-email.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=2
process_name=%(program_name)s_%(process_num)02d

; 订单队列进程
[program:queue-order]
command=php /var/www/html/think queue:listen --queue=order --memory=512 --timeout=300
directory=/var/www/html
user=www-data
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/queue-order.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=3
process_name=%(program_name)s_%(process_num)02d

; 统计队列进程
[program:queue-statistics]
command=php /var/www/html/think queue:listen --queue=statistics --memory=256 --timeout=600
directory=/var/www/html
user=www-data
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/queue-statistics.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
numprocs=1
process_name=%(program_name)s_%(process_num)02d

启动和管理命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 重新加载配置
sudo supervisorctl reread
sudo supervisorctl update

# 启动所有队列进程
sudo supervisorctl start thinkphp-queue:*

# 停止所有队列进程
sudo supervisorctl stop thinkphp-queue:*

# 重启所有队列进程
sudo supervisorctl restart thinkphp-queue:*

# 查看进程状态
sudo supervisorctl status

# 查看日志
sudo supervisorctl tail -f queue-email

Docker容器化部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Dockerfile
FROM php:8.1-cli

# 安装必要的扩展
RUN apt-get update && apt-get install -y \
libzip-dev \
zip \
unzip \
supervisor \
&& docker-php-ext-install zip pdo_mysql \
&& pecl install redis \
&& docker-php-ext-enable redis

# 安装Composer
COPY --from=composer:latest /usr/bin/composer /usr/bin/composer

# 设置工作目录
WORKDIR /var/www/html

# 复制项目文件
COPY . .

# 安装依赖
RUN composer install --no-dev --optimize-autoloader

# 复制Supervisor配置
COPY docker/supervisor.conf /etc/supervisor/conf.d/

# 设置权限
RUN chown -R www-data:www-data /var/www/html \
&& chmod -R 755 /var/www/html

# 启动Supervisor
CMD ["/usr/bin/supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# docker-compose.yml
version: '3.8'

services:
app:
build: .
container_name: thinkphp-app
volumes:
- ./:/var/www/html
- ./storage/logs:/var/www/html/runtime/log
environment:
- APP_ENV=production
- REDIS_HOST=redis
- DB_HOST=mysql
depends_on:
- redis
- mysql
networks:
- app-network

queue:
build: .
container_name: thinkphp-queue
command: php think queue:listen --memory=512
volumes:
- ./:/var/www/html
environment:
- APP_ENV=production
- REDIS_HOST=redis
- DB_HOST=mysql
depends_on:
- redis
- mysql
networks:
- app-network
deploy:
replicas: 3

redis:
image: redis:7-alpine
container_name: thinkphp-redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- app-network

mysql:
image: mysql:8.0
container_name: thinkphp-mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: thinkphp
volumes:
- mysql_data:/var/lib/mysql
networks:
- app-network

volumes:
redis_data:
mysql_data:

networks:
app-network:
driver: bridge

监控与运维

队列监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
<?php
namespace app\service;

use think\facade\Cache;
use think\facade\Db;
use think\facade\Log;

/**
* 队列监控服务
*/
class QueueMonitorService
{
/**
* 获取队列状态
* @return array
*/
public function getQueueStatus(): array
{
$status = [];

// Redis队列状态
if (config('queue.default') === 'redis') {
$redis = Cache::store('redis')->handler();

$queues = ['email', 'order', 'statistics', 'default'];
foreach ($queues as $queue) {
$waitingKey = 'queue:' . $queue;
$processingKey = 'queue:' . $queue . ':processing';
$failedKey = 'queue:' . $queue . ':failed';

$status[$queue] = [
'waiting' => $redis->llen($waitingKey),
'processing' => $redis->llen($processingKey),
'failed' => $redis->llen($failedKey)
];
}
}

// 数据库队列状态
if (config('queue.default') === 'database') {
$queues = Db::name('jobs')
->field('queue, count(*) as total')
->group('queue')
->select()
->toArray();

foreach ($queues as $queue) {
$status[$queue['queue']] = [
'waiting' => $queue['total'],
'processing' => 0,
'failed' => 0
];
}
}

return $status;
}

/**
* 获取队列性能指标
* @return array
*/
public function getPerformanceMetrics(): array
{
$metrics = [
'processed_jobs' => 0,
'failed_jobs' => 0,
'avg_processing_time' => 0,
'memory_usage' => 0,
'cpu_usage' => 0
];

// 从缓存或数据库获取性能指标
$todayKey = 'queue_metrics_' . date('Y-m-d');
$todayMetrics = Cache::get($todayKey, []);

if ($todayMetrics) {
$metrics = array_merge($metrics, $todayMetrics);
}

return $metrics;
}

/**
* 记录任务执行指标
* @param string $queue 队列名称
* @param float $processingTime 处理时间
* @param bool $success 是否成功
* @return void
*/
public function recordJobMetrics(string $queue, float $processingTime, bool $success = true): void
{
$todayKey = 'queue_metrics_' . date('Y-m-d');
$metrics = Cache::get($todayKey, [
'processed_jobs' => 0,
'failed_jobs' => 0,
'total_processing_time' => 0,
'queues' => []
]);

// 更新总体指标
if ($success) {
$metrics['processed_jobs']++;
} else {
$metrics['failed_jobs']++;
}

$metrics['total_processing_time'] += $processingTime;

// 更新队列指标
if (!isset($metrics['queues'][$queue])) {
$metrics['queues'][$queue] = [
'processed' => 0,
'failed' => 0,
'total_time' => 0
];
}

if ($success) {
$metrics['queues'][$queue]['processed']++;
} else {
$metrics['queues'][$queue]['failed']++;
}

$metrics['queues'][$queue]['total_time'] += $processingTime;

// 计算平均处理时间
$totalJobs = $metrics['processed_jobs'] + $metrics['failed_jobs'];
if ($totalJobs > 0) {
$metrics['avg_processing_time'] = $metrics['total_processing_time'] / $totalJobs;
}

Cache::set($todayKey, $metrics, 86400);
}

/**
* 检查队列健康状态
* @return array
*/
public function healthCheck(): array
{
$health = [
'status' => 'healthy',
'issues' => [],
'recommendations' => []
];

$queueStatus = $this->getQueueStatus();

foreach ($queueStatus as $queue => $status) {
// 检查等待任务数量
if ($status['waiting'] > 1000) {
$health['issues'][] = "队列 {$queue} 等待任务过多 ({$status['waiting']})";
$health['recommendations'][] = "增加 {$queue} 队列的消费者进程";
$health['status'] = 'warning';
}

// 检查失败任务数量
if ($status['failed'] > 100) {
$health['issues'][] = "队列 {$queue} 失败任务过多 ({$status['failed']})";
$health['recommendations'][] = "检查 {$queue} 队列的任务处理逻辑";
$health['status'] = 'critical';
}
}

// 检查Redis连接
try {
Cache::store('redis')->get('health_check');
} catch (\Exception $e) {
$health['issues'][] = 'Redis连接异常: ' . $e->getMessage();
$health['status'] = 'critical';
}

return $health;
}

/**
* 清理失败任务
* @param string $queue 队列名称
* @param int $days 保留天数
* @return int
*/
public function cleanFailedJobs(string $queue = null, int $days = 7): int
{
$cleanedCount = 0;

if (config('queue.default') === 'database') {
$query = Db::name('failed_jobs')
->where('failed_at', '<', date('Y-m-d H:i:s', time() - $days * 86400));

if ($queue) {
$query->where('queue', $queue);
}

$cleanedCount = $query->delete();
}

Log::info('清理失败任务完成', [
'queue' => $queue,
'days' => $days,
'cleaned_count' => $cleanedCount
]);

return $cleanedCount;
}
}

告警通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 队列告警服务
*/
class QueueAlertService
{
/**
* 发送告警通知
* @param string $level 告警级别
* @param string $message 告警消息
* @param array $data 附加数据
* @return void
*/
public function sendAlert(string $level, string $message, array $data = []): void
{
$alert = [
'level' => $level,
'message' => $message,
'data' => $data,
'timestamp' => date('Y-m-d H:i:s'),
'server' => gethostname()
];

// 发送到钉钉
$this->sendToDingTalk($alert);

// 发送到企业微信
$this->sendToWeWork($alert);

// 记录告警日志
Log::channel('alert')->write($level, $message, $data);
}

/**
* 发送到钉钉
* @param array $alert 告警信息
* @return void
*/
private function sendToDingTalk(array $alert): void
{
$webhook = config('alert.dingtalk.webhook');
if (!$webhook) return;

$content = "**队列告警通知**\n\n";
$content .= "**级别**: {$alert['level']}\n";
$content .= "**消息**: {$alert['message']}\n";
$content .= "**时间**: {$alert['timestamp']}\n";
$content .= "**服务器**: {$alert['server']}\n";

if ($alert['data']) {
$content .= "**详情**: " . json_encode($alert['data'], JSON_UNESCAPED_UNICODE) . "\n";
}

$payload = [
'msgtype' => 'markdown',
'markdown' => [
'title' => '队列告警',
'text' => $content
]
];

// 发送HTTP请求
$this->sendHttpRequest($webhook, $payload);
}

/**
* 发送HTTP请求
* @param string $url 请求URL
* @param array $data 请求数据
* @return void
*/
private function sendHttpRequest(string $url, array $data): void
{
try {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_TIMEOUT, 10);

curl_exec($ch);
curl_close($ch);
} catch (\Exception $e) {
Log::error('发送告警通知失败', ['error' => $e->getMessage()]);
}
}
}

最佳实践总结

1. 任务设计原则

  • 幂等性:任务可以安全地重复执行
  • 原子性:任务要么完全成功,要么完全失败
  • 超时控制:设置合理的任务执行超时时间
  • 错误处理:完善的异常处理和重试机制

2. 性能优化建议

  • 合理分队列:按业务类型和优先级分配队列
  • 批量处理:对于大量相似任务,考虑批量处理
  • 内存管理:定期重启消费者进程,避免内存泄漏
  • 监控告警:建立完善的监控和告警机制

3. 运维注意事项

  • 进程守护:使用Supervisor等工具保证进程常驻
  • 日志管理:合理配置日志轮转和清理策略
  • 资源监控:监控CPU、内存、磁盘等资源使用情况
  • 备份恢复:定期备份队列数据和配置文件

ThinkPHP的队列系统为异步任务处理提供了强大的支持,通过合理的设计和配置,可以显著提升应用的性能和用户体验。在生产环境中,需要特别注意监控、告警和运维管理,确保队列系统的稳定运行。

本站由 提供部署服务