ThinkPHP8 事件系统与队列处理实战指南:高效异步处理解决方案
Orion K Lv6

ThinkPHP8继承了强大的事件系统和队列处理机制,为开发者提供了高效的异步处理解决方案 1。本文将深入探讨ThinkPHP8中事件系统的使用方法、队列处理的最佳实践以及定时任务的实现技巧,帮助开发者构建高性能的异步处理系统。

事件系统深度解析

事件系统优势

新版的事件系统可以看成是5.1版本行为系统的升级版,事件系统相比行为系统强大的地方在于事件本身可以是一个类,并且可以更好的支持事件订阅者 1

事件相比较中间件的优势是事件比中间件更加精准定位(或者说粒度更细),并且更适合一些业务场景的扩展。例如,我们通常会遇到用户注册或者登录后需要做一系列操作,通过事件系统可以做到不侵入原有代码完成登录的操作扩展,降低系统的耦合性的同时,也降低了BUG的可能性 1

事件监听实现

1. 创建事件监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
<?php
declare(strict_types=1);

namespace app\listener;

use think\facade\Log;
use think\facade\Cache;
use think\facade\Db;

/**
* 用户注册事件监听器
* 处理用户注册后的相关业务逻辑
*/
class UserRegisterListener
{
/**
* 处理用户注册事件
* @param array $user 用户信息
* @return void
*/
public function handle(array $user): void
{
try {
// 1. 发送欢迎邮件
$this->sendWelcomeEmail($user);

// 2. 初始化用户配置
$this->initUserConfig($user['id']);

// 3. 记录注册日志
$this->logUserRegister($user);

// 4. 更新统计数据
$this->updateRegisterStats();

// 5. 触发积分奖励
$this->giveRegisterReward($user['id']);

} catch (\Exception $e) {
Log::error('用户注册事件处理失败', [
'user_id' => $user['id'] ?? 0,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
}
}

/**
* 发送欢迎邮件
* @param array $user 用户信息
* @return void
*/
private function sendWelcomeEmail(array $user): void
{
// 将邮件发送任务加入队列
\think\facade\Queue::push('app\\job\\SendEmailJob', [
'type' => 'welcome',
'email' => $user['email'],
'username' => $user['username']
], 'email');
}

/**
* 初始化用户配置
* @param int $userId 用户ID
* @return void
*/
private function initUserConfig(int $userId): void
{
$defaultConfig = [
'user_id' => $userId,
'theme' => 'default',
'language' => 'zh-cn',
'timezone' => 'Asia/Shanghai',
'notification_email' => 1,
'notification_sms' => 0,
'created_at' => date('Y-m-d H:i:s')
];

Db::name('user_config')->insert($defaultConfig);
}

/**
* 记录注册日志
* @param array $user 用户信息
* @return void
*/
private function logUserRegister(array $user): void
{
Log::info('用户注册成功', [
'user_id' => $user['id'],
'username' => $user['username'],
'email' => $user['email'],
'register_ip' => request()->ip(),
'register_time' => date('Y-m-d H:i:s')
]);
}

/**
* 更新注册统计
* @return void
*/
private function updateRegisterStats(): void
{
$today = date('Y-m-d');
$cacheKey = 'register_stats_' . $today;

Cache::inc($cacheKey);
Cache::expire($cacheKey, 86400); // 缓存24小时
}

/**
* 给予注册奖励
* @param int $userId 用户ID
* @return void
*/
private function giveRegisterReward(int $userId): void
{
// 将积分奖励任务加入队列
\think\facade\Queue::push('app\\job\\RewardJob', [
'user_id' => $userId,
'type' => 'register',
'points' => 100,
'reason' => '注册奖励'
], 'reward');
}
}

/**
* 用户登录事件监听器
* 处理用户登录后的相关业务逻辑
*/
class UserLoginListener
{
/**
* 处理用户登录事件
* @param array $user 用户信息
* @return void
*/
public function handle(array $user): void
{
try {
// 1. 更新最后登录时间
$this->updateLastLoginTime($user['id']);

// 2. 记录登录日志
$this->logUserLogin($user);

// 3. 检查安全风险
$this->checkSecurityRisk($user);

// 4. 更新在线状态
$this->updateOnlineStatus($user['id']);

// 5. 处理连续登录奖励
$this->handleContinuousLoginReward($user['id']);

} catch (\Exception $e) {
Log::error('用户登录事件处理失败', [
'user_id' => $user['id'],
'error' => $e->getMessage()
]);
}
}

/**
* 更新最后登录时间
* @param int $userId 用户ID
* @return void
*/
private function updateLastLoginTime(int $userId): void
{
Db::name('users')
->where('id', $userId)
->update([
'last_login_time' => date('Y-m-d H:i:s'),
'last_login_ip' => request()->ip()
]);
}

/**
* 记录登录日志
* @param array $user 用户信息
* @return void
*/
private function logUserLogin(array $user): void
{
$loginLog = [
'user_id' => $user['id'],
'username' => $user['username'],
'login_ip' => request()->ip(),
'user_agent' => request()->header('User-Agent'),
'login_time' => date('Y-m-d H:i:s')
];

Db::name('login_logs')->insert($loginLog);
}

/**
* 检查安全风险
* @param array $user 用户信息
* @return void
*/
private function checkSecurityRisk(array $user): void
{
$currentIp = request()->ip();
$lastLoginIp = $user['last_login_ip'] ?? '';

// 检查IP变化
if (!empty($lastLoginIp) && $currentIp !== $lastLoginIp) {
// 发送安全提醒
\think\facade\Queue::push('app\\job\\SecurityAlertJob', [
'user_id' => $user['id'],
'type' => 'ip_change',
'current_ip' => $currentIp,
'last_ip' => $lastLoginIp
], 'security');
}

// 检查异常登录频率
$recentLogins = Db::name('login_logs')
->where('user_id', $user['id'])
->where('login_time', '>', date('Y-m-d H:i:s', time() - 3600))
->count();

if ($recentLogins > 10) {
// 触发安全警告
\think\facade\Queue::push('app\\job\\SecurityAlertJob', [
'user_id' => $user['id'],
'type' => 'frequent_login',
'count' => $recentLogins
], 'security');
}
}

/**
* 更新在线状态
* @param int $userId 用户ID
* @return void
*/
private function updateOnlineStatus(int $userId): void
{
Cache::set('user_online_' . $userId, time(), 1800); // 30分钟过期
}

/**
* 处理连续登录奖励
* @param int $userId 用户ID
* @return void
*/
private function handleContinuousLoginReward(int $userId): void
{
$today = date('Y-m-d');
$yesterday = date('Y-m-d', strtotime('-1 day'));

// 检查昨天是否登录
$yesterdayLogin = Db::name('login_logs')
->where('user_id', $userId)
->where('login_time', 'between', [$yesterday . ' 00:00:00', $yesterday . ' 23:59:59'])
->find();

// 检查今天是否首次登录
$todayFirstLogin = Db::name('login_logs')
->where('user_id', $userId)
->where('login_time', 'between', [$today . ' 00:00:00', $today . ' 23:59:59'])
->count() === 1;

if ($yesterdayLogin && $todayFirstLogin) {
// 连续登录,给予奖励
\think\facade\Queue::push('app\\job\\RewardJob', [
'user_id' => $userId,
'type' => 'continuous_login',
'points' => 10,
'reason' => '连续登录奖励'
], 'reward');
}
}
}

2. 事件订阅器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?php
declare(strict_types=1);

namespace app\subscribe;

use think\facade\Log;
use think\facade\Queue;

/**
* 订单事件订阅器
* 在一个监听器中监听多个事件
*/
class OrderSubscriber
{
/**
* 订单创建事件处理
* @param array $order 订单信息
* @return void
*/
public function onOrderCreated(array $order): void
{
try {
Log::info('订单创建事件触发', ['order_id' => $order['id']]);

// 1. 发送订单确认邮件
Queue::push('app\\job\\SendEmailJob', [
'type' => 'order_created',
'order_id' => $order['id'],
'user_id' => $order['user_id']
], 'email');

// 2. 更新库存
Queue::push('app\\job\\UpdateStockJob', [
'order_id' => $order['id'],
'action' => 'decrease'
], 'stock');

// 3. 记录销售统计
$this->updateSalesStats($order);

} catch (\Exception $e) {
Log::error('订单创建事件处理失败', [
'order_id' => $order['id'],
'error' => $e->getMessage()
]);
}
}

/**
* 订单支付事件处理
* @param array $order 订单信息
* @return void
*/
public function onOrderPaid(array $order): void
{
try {
Log::info('订单支付事件触发', ['order_id' => $order['id']]);

// 1. 发送支付成功通知
Queue::push('app\\job\\SendEmailJob', [
'type' => 'payment_success',
'order_id' => $order['id'],
'user_id' => $order['user_id']
], 'email');

// 2. 触发发货流程
Queue::push('app\\job\\ShippingJob', [
'order_id' => $order['id']
], 'shipping');

// 3. 给予积分奖励
Queue::push('app\\job\\RewardJob', [
'user_id' => $order['user_id'],
'type' => 'order_payment',
'points' => intval($order['amount']),
'reason' => '订单支付奖励'
], 'reward');

} catch (\Exception $e) {
Log::error('订单支付事件处理失败', [
'order_id' => $order['id'],
'error' => $e->getMessage()
]);
}
}

/**
* 订单取消事件处理
* @param array $order 订单信息
* @return void
*/
public function onOrderCancelled(array $order): void
{
try {
Log::info('订单取消事件触发', ['order_id' => $order['id']]);

// 1. 恢复库存
Queue::push('app\\job\\UpdateStockJob', [
'order_id' => $order['id'],
'action' => 'increase'
], 'stock');

// 2. 处理退款
if ($order['status'] === 'paid') {
Queue::push('app\\job\\RefundJob', [
'order_id' => $order['id'],
'amount' => $order['amount']
], 'refund');
}

// 3. 发送取消通知
Queue::push('app\\job\\SendEmailJob', [
'type' => 'order_cancelled',
'order_id' => $order['id'],
'user_id' => $order['user_id']
], 'email');

} catch (\Exception $e) {
Log::error('订单取消事件处理失败', [
'order_id' => $order['id'],
'error' => $e->getMessage()
]);
}
}

/**
* 更新销售统计
* @param array $order 订单信息
* @return void
*/
private function updateSalesStats(array $order): void
{
$date = date('Y-m-d');
$statsKey = 'sales_stats_' . $date;

$stats = \think\facade\Cache::get($statsKey, [
'order_count' => 0,
'total_amount' => 0
]);

$stats['order_count']++;
$stats['total_amount'] += $order['amount'];

\think\facade\Cache::set($statsKey, $stats, 86400);
}
}

3. 事件配置与注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
// config/event.php
return [
'bind' => [
// 事件绑定
],

'listen' => [
// 系统事件
'AppInit' => [],
'HttpRun' => [],
'HttpEnd' => [],
'LogLevel' => [],
'LogWrite' => [],

// 自定义事件监听
'UserRegister' => ['app\\listener\\UserRegisterListener'],
'UserLogin' => ['app\\listener\\UserLoginListener'],
],

'subscribe' => [
// 事件订阅
'app\\subscribe\\OrderSubscriber',
],
];

事件触发与使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
<?php
declare(strict_types=1);

namespace app\controller;

use think\facade\Event;
use think\facade\Db;
use think\Response;

/**
* 用户控制器
* 演示事件系统的使用
*/
class User
{
/**
* 用户注册
* @return Response JSON响应
*/
public function register(): Response
{
$data = request()->post();

// 开启事务
Db::startTrans();

try {
// 1. 创建用户
$userId = Db::name('users')->insertGetId([
'username' => $data['username'],
'email' => $data['email'],
'password' => password_hash($data['password'], PASSWORD_DEFAULT),
'created_at' => date('Y-m-d H:i:s')
]);

$user = [
'id' => $userId,
'username' => $data['username'],
'email' => $data['email']
];

// 2. 触发用户注册事件
Event::trigger('UserRegister', $user);

Db::commit();

return json([
'code' => 200,
'message' => '注册成功',
'data' => ['user_id' => $userId]
]);

} catch (\Exception $e) {
Db::rollback();

return json([
'code' => 500,
'message' => '注册失败:' . $e->getMessage()
]);
}
}

/**
* 用户登录
* @return Response JSON响应
*/
public function login(): Response
{
$data = request()->post();

try {
// 1. 验证用户
$user = Db::name('users')
->where('username', $data['username'])
->find();

if (!$user || !password_verify($data['password'], $user['password'])) {
return json([
'code' => 401,
'message' => '用户名或密码错误'
]);
}

// 2. 触发用户登录事件
Event::trigger('UserLogin', $user);

// 3. 设置登录状态
session('user_id', $user['id']);
session('username', $user['username']);

return json([
'code' => 200,
'message' => '登录成功',
'data' => [
'user_id' => $user['id'],
'username' => $user['username']
]
]);

} catch (\Exception $e) {
return json([
'code' => 500,
'message' => '登录失败:' . $e->getMessage()
]);
}
}
}

/**
* 订单控制器
* 演示事件订阅器的使用
*/
class Order
{
/**
* 创建订单
* @return Response JSON响应
*/
public function create(): Response
{
$data = request()->post();

Db::startTrans();

try {
// 1. 创建订单
$orderId = Db::name('orders')->insertGetId([
'order_number' => $this->generateOrderNumber(),
'user_id' => $data['user_id'],
'amount' => $data['amount'],
'status' => 'pending',
'created_at' => date('Y-m-d H:i:s')
]);

$order = [
'id' => $orderId,
'user_id' => $data['user_id'],
'amount' => $data['amount'],
'status' => 'pending'
];

// 2. 触发订单创建事件
Event::trigger('OrderCreated', $order);

Db::commit();

return json([
'code' => 200,
'message' => '订单创建成功',
'data' => ['order_id' => $orderId]
]);

} catch (\Exception $e) {
Db::rollback();

return json([
'code' => 500,
'message' => '订单创建失败:' . $e->getMessage()
]);
}
}

/**
* 支付订单
* @param int $orderId 订单ID
* @return Response JSON响应
*/
public function pay(int $orderId): Response
{
Db::startTrans();

try {
// 1. 更新订单状态
$order = Db::name('orders')->where('id', $orderId)->find();

if (!$order) {
return json(['code' => 404, 'message' => '订单不存在']);
}

if ($order['status'] !== 'pending') {
return json(['code' => 400, 'message' => '订单状态不正确']);
}

Db::name('orders')
->where('id', $orderId)
->update([
'status' => 'paid',
'paid_at' => date('Y-m-d H:i:s')
]);

$order['status'] = 'paid';

// 2. 触发订单支付事件
Event::trigger('OrderPaid', $order);

Db::commit();

return json([
'code' => 200,
'message' => '支付成功'
]);

} catch (\Exception $e) {
Db::rollback();

return json([
'code' => 500,
'message' => '支付失败:' . $e->getMessage()
]);
}
}

/**
* 生成订单号
* @return string 订单号
*/
private function generateOrderNumber(): string
{
return date('YmdHis') . mt_rand(1000, 9999);
}
}

队列处理系统

think-queue配置与使用

think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送 3

1. 安装与配置

1
2
# 安装think-queue
composer require topthink/think-queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?php
// config/queue.php
return [
// Redis驱动配置
'connector' => 'redis',
'expire' => 60, // 任务过期时间(秒)
'default' => 'default', // 默认队列名称
'host' => env('redis.host', '127.0.0.1'),
'port' => env('redis.port', 6379),
'password' => env('redis.password', ''),
'select' => 5, // Redis数据库索引
'timeout' => 0,
'persistent' => false,

// 队列配置
'queues' => [
'email' => [
'connector' => 'redis',
'queue' => 'email_queue',
'retry_after' => 90,
'block_for' => null,
],
'sms' => [
'connector' => 'redis',
'queue' => 'sms_queue',
'retry_after' => 60,
],
'image' => [
'connector' => 'redis',
'queue' => 'image_queue',
'retry_after' => 300,
]
]
];

2. 创建队列任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
<?php
declare(strict_types=1);

namespace app\job;

use think\facade\Log;
use think\queue\Job;

/**
* 邮件发送任务
* 处理各种类型的邮件发送
*/
class SendEmailJob
{
/**
* 执行任务
* @param Job $job 任务对象
* @param array $data 任务数据
* @return void
*/
public function fire(Job $job, array $data): void
{
try {
// 获取任务数据
$type = $data['type'] ?? '';
$email = $data['email'] ?? '';
$userId = $data['user_id'] ?? 0;

if (empty($email)) {
throw new \Exception('邮箱地址不能为空');
}

// 根据类型发送不同邮件
switch ($type) {
case 'welcome':
$this->sendWelcomeEmail($data);
break;
case 'order_created':
$this->sendOrderCreatedEmail($data);
break;
case 'payment_success':
$this->sendPaymentSuccessEmail($data);
break;
case 'order_cancelled':
$this->sendOrderCancelledEmail($data);
break;
default:
throw new \Exception('未知的邮件类型:' . $type);
}

// 记录发送日志
Log::info('邮件发送成功', [
'type' => $type,
'email' => $email,
'user_id' => $userId
]);

// 删除任务
$job->delete();

} catch (\Exception $e) {
Log::error('邮件发送失败', [
'error' => $e->getMessage(),
'data' => $data,
'attempts' => $job->attempts()
]);

// 重试机制
if ($job->attempts() < 3) {
$job->release(60); // 60秒后重试
} else {
$job->delete(); // 超过重试次数,删除任务
$this->handleFailedEmail($data, $e->getMessage());
}
}
}

/**
* 发送欢迎邮件
* @param array $data 邮件数据
* @return void
*/
private function sendWelcomeEmail(array $data): void
{
$subject = '欢迎注册我们的网站';
$content = "亲爱的 {$data['username']},欢迎您注册我们的网站!";

$this->sendEmail($data['email'], $subject, $content);
}

/**
* 发送订单创建邮件
* @param array $data 邮件数据
* @return void
*/
private function sendOrderCreatedEmail(array $data): void
{
$orderId = $data['order_id'];
$order = \think\facade\Db::name('orders')->where('id', $orderId)->find();

if (!$order) {
throw new \Exception('订单不存在');
}

$user = \think\facade\Db::name('users')->where('id', $order['user_id'])->find();

$subject = '订单创建成功通知';
$content = "亲爱的 {$user['username']},您的订单 {$order['order_number']} 已创建成功!";

$this->sendEmail($user['email'], $subject, $content);
}

/**
* 发送支付成功邮件
* @param array $data 邮件数据
* @return void
*/
private function sendPaymentSuccessEmail(array $data): void
{
$orderId = $data['order_id'];
$order = \think\facade\Db::name('orders')->where('id', $orderId)->find();
$user = \think\facade\Db::name('users')->where('id', $order['user_id'])->find();

$subject = '支付成功通知';
$content = "亲爱的 {$user['username']},您的订单 {$order['order_number']} 支付成功,金额:¥{$order['amount']}";

$this->sendEmail($user['email'], $subject, $content);
}

/**
* 发送订单取消邮件
* @param array $data 邮件数据
* @return void
*/
private function sendOrderCancelledEmail(array $data): void
{
$orderId = $data['order_id'];
$order = \think\facade\Db::name('orders')->where('id', $orderId)->find();
$user = \think\facade\Db::name('users')->where('id', $order['user_id'])->find();

$subject = '订单取消通知';
$content = "亲爱的 {$user['username']},您的订单 {$order['order_number']} 已取消。";

$this->sendEmail($user['email'], $subject, $content);
}

/**
* 实际发送邮件
* @param string $email 邮箱地址
* @param string $subject 邮件主题
* @param string $content 邮件内容
* @return void
*/
private function sendEmail(string $email, string $subject, string $content): void
{
// 这里实现实际的邮件发送逻辑
// 可以使用PHPMailer、SwiftMailer等邮件库

// 模拟邮件发送
sleep(1); // 模拟发送耗时

// 实际项目中应该调用邮件服务
// $mailer = new PHPMailer();
// $mailer->setFrom('noreply@example.com');
// $mailer->addAddress($email);
// $mailer->Subject = $subject;
// $mailer->Body = $content;
// $mailer->send();
}

/**
* 处理失败的邮件
* @param array $data 邮件数据
* @param string $error 错误信息
* @return void
*/
private function handleFailedEmail(array $data, string $error): void
{
// 记录失败的邮件到数据库
\think\facade\Db::name('failed_emails')->insert([
'type' => $data['type'],
'email' => $data['email'],
'data' => json_encode($data),
'error' => $error,
'created_at' => date('Y-m-d H:i:s')
]);
}
}

/**
* 图片处理任务
* 处理图片压缩、缩略图生成等
*/
class ImageProcessJob
{
/**
* 执行任务
* @param Job $job 任务对象
* @param array $data 任务数据
* @return void
*/
public function fire(Job $job, array $data): void
{
try {
$imagePath = $data['image_path'] ?? '';
$operations = $data['operations'] ?? [];

if (empty($imagePath) || !file_exists($imagePath)) {
throw new \Exception('图片文件不存在');
}

foreach ($operations as $operation) {
switch ($operation['type']) {
case 'resize':
$this->resizeImage($imagePath, $operation['width'], $operation['height']);
break;
case 'thumbnail':
$this->generateThumbnail($imagePath, $operation['size']);
break;
case 'watermark':
$this->addWatermark($imagePath, $operation['watermark']);
break;
}
}

Log::info('图片处理成功', ['image_path' => $imagePath]);
$job->delete();

} catch (\Exception $e) {
Log::error('图片处理失败', [
'error' => $e->getMessage(),
'data' => $data
]);

if ($job->attempts() < 2) {
$job->release(30);
} else {
$job->delete();
}
}
}

/**
* 调整图片尺寸
* @param string $imagePath 图片路径
* @param int $width 宽度
* @param int $height 高度
* @return void
*/
private function resizeImage(string $imagePath, int $width, int $height): void
{
// 实现图片尺寸调整逻辑
// 可以使用GD库或ImageMagick
}

/**
* 生成缩略图
* @param string $imagePath 图片路径
* @param int $size 缩略图尺寸
* @return void
*/
private function generateThumbnail(string $imagePath, int $size): void
{
// 实现缩略图生成逻辑
}

/**
* 添加水印
* @param string $imagePath 图片路径
* @param string $watermark 水印文件路径
* @return void
*/
private function addWatermark(string $imagePath, string $watermark): void
{
// 实现水印添加逻辑
}
}

/**
* 积分奖励任务
* 处理用户积分奖励
*/
class RewardJob
{
/**
* 执行任务
* @param Job $job 任务对象
* @param array $data 任务数据
* @return void
*/
public function fire(Job $job, array $data): void
{
try {
$userId = $data['user_id'] ?? 0;
$type = $data['type'] ?? '';
$points = $data['points'] ?? 0;
$reason = $data['reason'] ?? '';

if ($userId <= 0 || $points <= 0) {
throw new \Exception('用户ID或积分数量无效');
}

// 开启事务
\think\facade\Db::startTrans();

// 1. 更新用户积分
\think\facade\Db::name('users')
->where('id', $userId)
->inc('points', $points);

// 2. 记录积分变动
\think\facade\Db::name('point_logs')->insert([
'user_id' => $userId,
'type' => $type,
'points' => $points,
'reason' => $reason,
'created_at' => date('Y-m-d H:i:s')
]);

\think\facade\Db::commit();

Log::info('积分奖励成功', [
'user_id' => $userId,
'points' => $points,
'type' => $type
]);

$job->delete();

} catch (\Exception $e) {
\think\facade\Db::rollback();

Log::error('积分奖励失败', [
'error' => $e->getMessage(),
'data' => $data
]);

if ($job->attempts() < 3) {
$job->release(30);
} else {
$job->delete();
}
}
}
}

定时任务系统

使用easy-task创建定时任务

在ThinkPHP8中,可以使用easy-task扩展包来创建定时任务 2

1. 安装与配置

1
2
3
4
5
# 安装easy-task
composer require easy-task/easy-task

# 创建命令行处理类
php think make:command Task task

2. 定时任务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
<?php
declare(strict_types=1);

namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;
use think\facade\Db;
use think\facade\Log;
use think\facade\Cache;

/**
* 定时任务命令
* 处理各种定时任务
*/
class Task extends Command
{
/**
* 配置命令
* @return void
*/
protected function configure(): void
{
$this->setName('task')
->addArgument('action', Argument::OPTIONAL, 'action', '')
->addArgument('force', Argument::OPTIONAL, 'force', '')
->setDescription('定时任务管理命令');
}

/**
* 执行命令
* @param Input $input 输入对象
* @param Output $output 输出对象
* @return void
*/
protected function execute(Input $input, Output $output): void
{
$action = trim($input->getArgument('action'));
$force = trim($input->getArgument('force'));

$task = new \EasyTask\Task();
$task->setRunTimePath('./runtime/');

// 添加定时任务
$this->addTasks($task);

switch ($action) {
case 'start':
$output->writeln('启动定时任务...');
$task->start();
break;
case 'status':
$task->status();
break;
case 'stop':
$force = ($force === 'force');
$output->writeln('停止定时任务...');
$task->stop($force);
break;
default:
$output->writeln('可用命令:start, status, stop');
break;
}
}

/**
* 添加定时任务
* @param \EasyTask\Task $task 任务对象
* @return void
*/
private function addTasks(\EasyTask\Task $task): void
{
// 1. 清理过期数据任务(每小时执行)
$task->addFunc(function() {
$this->cleanExpiredData();
}, 'clean_expired_data', 3600, 1);

// 2. 统计数据任务(每天凌晨执行)
$task->addFunc(function() {
$this->generateDailyStats();
}, 'daily_stats', 86400, 1);

// 3. 发送提醒邮件任务(每30分钟执行)
$task->addFunc(function() {
$this->sendReminderEmails();
}, 'reminder_emails', 1800, 1);

// 4. 备份数据库任务(每天凌晨2点执行)
$task->addFunc(function() {
$this->backupDatabase();
}, 'backup_database', 86400, 1);

// 5. 处理失败队列任务(每10分钟执行)
$task->addFunc(function() {
$this->processFailedJobs();
}, 'process_failed_jobs', 600, 1);
}

/**
* 清理过期数据
* @return void
*/
private function cleanExpiredData(): void
{
try {
$expiredTime = date('Y-m-d H:i:s', time() - 86400 * 30); // 30天前

// 清理过期日志
$deletedLogs = Db::name('logs')
->where('created_at', '<', $expiredTime)
->delete();

// 清理过期会话
$deletedSessions = Db::name('sessions')
->where('last_activity', '<', time() - 86400 * 7) // 7天前
->delete();

// 清理过期缓存文件
$this->cleanExpiredCacheFiles();

Log::info('清理过期数据完成', [
'deleted_logs' => $deletedLogs,
'deleted_sessions' => $deletedSessions
]);

} catch (\Exception $e) {
Log::error('清理过期数据失败', ['error' => $e->getMessage()]);
}
}

/**
* 生成每日统计
* @return void
*/
private function generateDailyStats(): void
{
try {
$yesterday = date('Y-m-d', strtotime('-1 day'));

// 统计用户注册数
$registerCount = Db::name('users')
->whereTime('created_at', 'between', [$yesterday . ' 00:00:00', $yesterday . ' 23:59:59'])
->count();

// 统计订单数和销售额
$orderStats = Db::name('orders')
->field('COUNT(*) as order_count, SUM(amount) as total_amount')
->whereTime('created_at', 'between', [$yesterday . ' 00:00:00', $yesterday . ' 23:59:59'])
->where('status', 'paid')
->find();

// 统计活跃用户数
$activeUsers = Db::name('login_logs')
->whereTime('login_time', 'between', [$yesterday . ' 00:00:00', $yesterday . ' 23:59:59'])
->group('user_id')
->count();

// 保存统计数据
Db::name('daily_stats')->insert([
'date' => $yesterday,
'register_count' => $registerCount,
'order_count' => $orderStats['order_count'] ?? 0,
'total_amount' => $orderStats['total_amount'] ?? 0,
'active_users' => $activeUsers,
'created_at' => date('Y-m-d H:i:s')
]);

Log::info('每日统计生成完成', [
'date' => $yesterday,
'register_count' => $registerCount,
'order_count' => $orderStats['order_count'] ?? 0,
'active_users' => $activeUsers
]);

} catch (\Exception $e) {
Log::error('生成每日统计失败', ['error' => $e->getMessage()]);
}
}

/**
* 发送提醒邮件
* @return void
*/
private function sendReminderEmails(): void
{
try {
// 查找需要提醒的订单(未支付超过1小时)
$unpaidOrders = Db::name('orders')
->alias('o')
->join('users u', 'o.user_id = u.id')
->field('o.id, o.order_number, o.amount, u.email, u.username')
->where('o.status', 'pending')
->where('o.created_at', '<', date('Y-m-d H:i:s', time() - 3600))
->where('o.reminder_sent', 0)
->limit(100)
->select()
->toArray();

foreach ($unpaidOrders as $order) {
// 发送提醒邮件到队列
\think\facade\Queue::push('app\\job\\SendEmailJob', [
'type' => 'payment_reminder',
'email' => $order['email'],
'order_id' => $order['id'],
'order_number' => $order['order_number'],
'amount' => $order['amount']
], 'email');

// 标记已发送提醒
Db::name('orders')
->where('id', $order['id'])
->update(['reminder_sent' => 1]);
}

if (!empty($unpaidOrders)) {
Log::info('发送提醒邮件完成', ['count' => count($unpaidOrders)]);
}

} catch (\Exception $e) {
Log::error('发送提醒邮件失败', ['error' => $e->getMessage()]);
}
}

/**
* 备份数据库
* @return void
*/
private function backupDatabase(): void
{
try {
$backupPath = './backup/';
if (!is_dir($backupPath)) {
mkdir($backupPath, 0755, true);
}

$filename = 'backup_' . date('Y-m-d_H-i-s') . '.sql';
$filepath = $backupPath . $filename;

// 获取数据库配置
$config = config('database.connections.mysql');

// 执行备份命令
$command = sprintf(
'mysqldump -h%s -P%s -u%s -p%s %s > %s',
$config['hostname'],
$config['hostport'],
$config['username'],
$config['password'],
$config['database'],
$filepath
);

exec($command, $output, $returnCode);

if ($returnCode === 0) {
Log::info('数据库备份成功', ['file' => $filename]);

// 清理旧备份文件(保留最近7天)
$this->cleanOldBackups($backupPath);
} else {
Log::error('数据库备份失败', ['command' => $command, 'output' => $output]);
}

} catch (\Exception $e) {
Log::error('数据库备份异常', ['error' => $e->getMessage()]);
}
}

/**
* 处理失败的队列任务
* @return void
*/
private function processFailedJobs(): void
{
try {
// 重新处理失败的邮件
$failedEmails = Db::name('failed_emails')
->where('retry_count', '<', 3)
->where('created_at', '>', date('Y-m-d H:i:s', time() - 86400))
->limit(50)
->select()
->toArray();

foreach ($failedEmails as $email) {
$data = json_decode($email['data'], true);

// 重新加入队列
\think\facade\Queue::push('app\\job\\SendEmailJob', $data, 'email');

// 更新重试次数
Db::name('failed_emails')
->where('id', $email['id'])
->inc('retry_count');
}

if (!empty($failedEmails)) {
Log::info('重新处理失败任务', ['count' => count($failedEmails)]);
}

} catch (\Exception $e) {
Log::error('处理失败任务异常', ['error' => $e->getMessage()]);
}
}

/**
* 清理过期缓存文件
* @return void
*/
private function cleanExpiredCacheFiles(): void
{
$cacheDir = './runtime/cache/';
if (!is_dir($cacheDir)) {
return;
}

$iterator = new \RecursiveIteratorIterator(
new \RecursiveDirectoryIterator($cacheDir)
);

$expiredTime = time() - 86400 * 7; // 7天前
$deletedCount = 0;

foreach ($iterator as $file) {
if ($file->isFile() && $file->getMTime() < $expiredTime) {
unlink($file->getPathname());
$deletedCount++;
}
}

if ($deletedCount > 0) {
Log::info('清理过期缓存文件', ['count' => $deletedCount]);
}
}

/**
* 清理旧备份文件
* @param string $backupPath 备份目录
* @return void
*/
private function cleanOldBackups(string $backupPath): void
{
$files = glob($backupPath . 'backup_*.sql');

if (count($files) > 7) {
// 按修改时间排序
usort($files, function($a, $b) {
return filemtime($a) - filemtime($b);
});

// 删除最旧的文件
$filesToDelete = array_slice($files, 0, count($files) - 7);
foreach ($filesToDelete as $file) {
unlink($file);
}

Log::info('清理旧备份文件', ['count' => count($filesToDelete)]);
}
}
}

3. 配置命令

1
2
3
4
5
6
7
<?php
// config/console.php
return [
'commands' => [
'task' => 'app\\command\\Task',
],
];

4. 执行定时任务

1
2
3
4
5
6
7
8
9
10
11
# 启动定时任务
php think task start

# 查看任务状态
php think task status

# 停止定时任务
php think task stop

# 强制停止定时任务
php think task stop force

队列监控与管理

队列监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
<?php
declare(strict_types=1);

namespace app\service;

use think\facade\Cache;
use think\facade\Db;
use think\facade\Log;

/**
* 队列监控服务
* 监控队列状态和性能
*/
class QueueMonitorService
{
/**
* 获取队列状态
* @return array 队列状态信息
*/
public function getQueueStatus(): array
{
$redis = Cache::store('redis')->handler();

$queues = ['default', 'email', 'sms', 'image', 'reward'];
$status = [];

foreach ($queues as $queue) {
$waitingKey = 'queue:' . $queue;
$processingKey = 'queue:' . $queue . ':processing';
$failedKey = 'queue:' . $queue . ':failed';

$status[$queue] = [
'waiting' => $redis->llen($waitingKey),
'processing' => $redis->llen($processingKey),
'failed' => $redis->llen($failedKey),
'total_processed' => $this->getTotalProcessed($queue),
'avg_processing_time' => $this->getAvgProcessingTime($queue)
];
}

return $status;
}

/**
* 获取队列性能统计
* @param string $queue 队列名称
* @param string $period 统计周期(hour/day/week)
* @return array 性能统计
*/
public function getQueueStats(string $queue, string $period = 'hour'): array
{
$cacheKey = "queue_stats_{$queue}_{$period}";

return Cache::remember($cacheKey, function() use ($queue, $period) {
$timeRange = $this->getTimeRange($period);

$stats = Db::name('queue_logs')
->field('COUNT(*) as total_jobs, AVG(processing_time) as avg_time, SUM(CASE WHEN status="failed" THEN 1 ELSE 0 END) as failed_jobs')
->where('queue', $queue)
->where('created_at', 'between', $timeRange)
->find();

return [
'total_jobs' => $stats['total_jobs'] ?? 0,
'avg_processing_time' => round($stats['avg_time'] ?? 0, 2),
'failed_jobs' => $stats['failed_jobs'] ?? 0,
'success_rate' => $stats['total_jobs'] > 0 ?
round((($stats['total_jobs'] - $stats['failed_jobs']) / $stats['total_jobs']) * 100, 2) : 0
];
}, 300);
}

/**
* 获取失败任务详情
* @param string $queue 队列名称
* @param int $limit 限制数量
* @return array 失败任务列表
*/
public function getFailedJobs(string $queue = '', int $limit = 50): array
{
$query = Db::name('failed_jobs')
->field('id,queue,payload,exception,failed_at')
->order('failed_at', 'desc')
->limit($limit);

if (!empty($queue)) {
$query->where('queue', $queue);
}

return $query->select()->toArray();
}

/**
* 重试失败任务
* @param int $jobId 任务ID
* @return bool 重试结果
*/
public function retryFailedJob(int $jobId): bool
{
try {
$job = Db::name('failed_jobs')->where('id', $jobId)->find();

if (!$job) {
return false;
}

$payload = json_decode($job['payload'], true);

// 重新加入队列
\think\facade\Queue::push($payload['job'], $payload['data'], $job['queue']);

// 删除失败记录
Db::name('failed_jobs')->where('id', $jobId)->delete();

Log::info('重试失败任务成功', ['job_id' => $jobId]);

return true;
} catch (\Exception $e) {
Log::error('重试失败任务异常', [
'job_id' => $jobId,
'error' => $e->getMessage()
]);

return false;
}
}

/**
* 获取总处理数量
* @param string $queue 队列名称
* @return int 总处理数量
*/
private function getTotalProcessed(string $queue): int
{
return Cache::get("queue_total_processed_{$queue}", 0);
}

/**
* 获取平均处理时间
* @param string $queue 队列名称
* @return float 平均处理时间(秒)
*/
private function getAvgProcessingTime(string $queue): float
{
return Cache::get("queue_avg_time_{$queue}", 0.0);
}

/**
* 获取时间范围
* @param string $period 周期
* @return array 时间范围
*/
private function getTimeRange(string $period): array
{
switch ($period) {
case 'hour':
return [
date('Y-m-d H:00:00'),
date('Y-m-d H:59:59')
];
case 'day':
return [
date('Y-m-d 00:00:00'),
date('Y-m-d 23:59:59')
];
case 'week':
return [
date('Y-m-d 00:00:00', strtotime('monday this week')),
date('Y-m-d 23:59:59', strtotime('sunday this week'))
];
default:
return [
date('Y-m-d H:00:00'),
date('Y-m-d H:59:59')
];
}
}
}

## 事务与队列的协同处理

### 事务中的事件处理

事件可以被主方法捕获异常!主方法开启事务后,事件中若出现数据库错误,主方法可以捕获该异常并进行回滚等操作 <mcreference link="https://blog.csdn.net/u010713053/article/details/105152031" index="1">1</mcreference>。

```php
<?php
declare(strict_types=1);

namespace app\service;

use think\facade\Db;
use think\facade\Event;
use think\facade\Queue;
use think\facade\Log;

/**
* 事务事件协同服务
* 处理事务中的事件和队列协同
*/
class TransactionEventService
{
/**
* 创建订单并处理相关业务
* @param array $orderData 订单数据
* @return array 处理结果
*/
public function createOrderWithEvents(array $orderData): array
{
// 开启事务
Db::startTrans();

try {
// 1. 创建订单
$orderId = Db::name('orders')->insertGetId([
'order_number' => $this->generateOrderNumber(),
'user_id' => $orderData['user_id'],
'amount' => $orderData['amount'],
'status' => 'pending',
'created_at' => date('Y-m-d H:i:s')
]);

// 2. 创建订单详情
foreach ($orderData['items'] as $item) {
Db::name('order_items')->insert([
'order_id' => $orderId,
'product_id' => $item['product_id'],
'quantity' => $item['quantity'],
'price' => $item['price'],
'created_at' => date('Y-m-d H:i:s')
]);
}

// 3. 更新库存(在事务中)
foreach ($orderData['items'] as $item) {
$result = Db::name('products')
->where('id', $item['product_id'])
->where('stock', '>=', $item['quantity'])
->dec('stock', $item['quantity']);

if (!$result) {
throw new \Exception("商品库存不足:{$item['product_id']}");
}
}

// 4. 触发订单创建事件(在事务中)
$order = [
'id' => $orderId,
'user_id' => $orderData['user_id'],
'amount' => $orderData['amount'],
'status' => 'pending'
];

Event::trigger('OrderCreated', $order);

// 5. 提交事务
Db::commit();

// 6. 事务成功后,处理异步任务
$this->handlePostTransactionTasks($order);

return [
'success' => true,
'order_id' => $orderId,
'message' => '订单创建成功'
];

} catch (\Exception $e) {
// 回滚事务
Db::rollback();

Log::error('订单创建失败', [
'error' => $e->getMessage(),
'order_data' => $orderData
]);

return [
'success' => false,
'message' => '订单创建失败:' . $e->getMessage()
];
}
}

/**
* 处理事务后的异步任务
* @param array $order 订单信息
* @return void
*/
private function handlePostTransactionTasks(array $order): void
{
// 发送订单确认邮件(异步)
Queue::push('app\\job\\SendEmailJob', [
'type' => 'order_created',
'order_id' => $order['id'],
'user_id' => $order['user_id']
], 'email');

// 发送短信通知(异步)
Queue::push('app\\job\\SendSmsJob', [
'type' => 'order_created',
'order_id' => $order['id'],
'user_id' => $order['user_id']
], 'sms');

// 更新统计数据(异步)
Queue::push('app\\job\\UpdateStatsJob', [
'type' => 'order_created',
'order_id' => $order['id'],
'amount' => $order['amount']
], 'stats');
}

/**
* 生成订单号
* @return string 订单号
*/
private function generateOrderNumber(): string
{
return date('YmdHis') . mt_rand(1000, 9999);
}
}

最佳实践与性能优化

1. 事件系统最佳实践

  • 合理使用事件:事件适合处理业务扩展,不要滥用
  • 异常处理:事件监听器中要有完善的异常处理
  • 性能考虑:避免在事件中执行耗时操作
  • 事务协调:注意事件与事务的协调关系

2. 队列系统最佳实践

  • 任务设计:保持任务的幂等性和原子性
  • 重试机制:设置合理的重试次数和间隔
  • 监控告警:建立完善的队列监控体系
  • 资源管理:合理配置队列工作进程数量

3. 定时任务最佳实践

  • 任务分离:将不同类型的任务分开执行
  • 错误处理:完善的错误处理和日志记录
  • 性能优化:避免在定时任务中执行过重的操作
  • 监控管理:建立任务执行状态监控

总结

ThinkPHP8的事件系统和队列处理为开发者提供了强大的异步处理能力。通过合理使用事件系统,可以实现业务逻辑的解耦和扩展;通过队列系统,可以处理耗时任务和提高系统响应速度;通过定时任务,可以自动化处理各种周期性任务。

在实际项目中,需要根据业务需求选择合适的处理方式,建立完善的监控和错误处理机制,确保系统的稳定性和可靠性。掌握这些技术,能够显著提升ThinkPHP8应用的性能和用户体验。

本站由 提供部署服务