Laravel 队列系统完全指南:异步处理与性能优化
Orion K Lv6

Laravel 队列系统完全指南

Laravel 队列系统为开发者提供了一个统一的 API 来处理各种后台任务,支持多种队列驱动如 Redis、数据库、Amazon SQS 等。通过将耗时任务移到队列中异步处理,可以显著提升应用程序的响应速度和用户体验。1

1. 队列系统简介

什么是队列系统

队列系统是一种异步处理机制,由三个核心组件组成:5

  • 队列(Queue):存储待处理任务的数据结构
  • 消息(Message):推送到队列中的任务数据
  • 处理进程(Worker):消费队列中任务的后台进程

队列系统的优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 传统同步处理
public function sendEmail(User $user)
{
// 发送邮件可能需要 2-3 秒
Mail::to($user)->send(new WelcomeEmail());

// 用户需要等待邮件发送完成
return response()->json(['message' => '注册成功']);
}

// 使用队列异步处理
public function sendEmail(User $user)
{
// 立即将任务推入队列
SendWelcomeEmail::dispatch($user);

// 立即返回响应
return response()->json(['message' => '注册成功']);
}

2. 队列配置

基础配置

队列配置文件位于 config/queue.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php

return [
// 默认队列连接
'default' => env('QUEUE_CONNECTION', 'sync'),

// 队列连接配置
'connections' => [
// 同步队列(开发环境)
'sync' => [
'driver' => 'sync',
],

// 数据库队列
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
'after_commit' => false,
],

// Redis 队列
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
'after_commit' => false,
],

// Amazon SQS
'sqs' => [
'driver' => 'sqs',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
'queue' => env('SQS_QUEUE', 'default'),
'suffix' => env('SQS_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'after_commit' => false,
],
],

// 失败任务配置
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'database'),
'database' => env('DB_CONNECTION', 'mysql'),
'table' => 'failed_jobs',
],
];

环境配置

.env 文件中配置队列:

1
2
3
4
5
6
7
8
# 队列驱动
QUEUE_CONNECTION=redis

# Redis 队列配置
REDIS_QUEUE=default

# 数据库队列配置
DB_CONNECTION=mysql

数据库队列设置

1
2
3
4
5
6
7
8
# 创建队列表迁移
php artisan queue:table

# 创建失败任务表迁移
php artisan queue:failed-table

# 运行迁移
php artisan migrate

3. 创建队列任务

生成任务类

1
2
3
4
5
# 创建基础任务
php artisan make:job ProcessPodcast

# 创建同步任务(不进入队列)
php artisan make:job ProcessPodcast --sync

任务类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Exception;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* 任务最大尝试次数
*
* @var int
*/
public $tries = 3;

/**
* 任务超时时间(秒)
*
* @var int
*/
public $timeout = 120;

/**
* 任务失败前的最大等待时间(秒)
*
* @var int
*/
public $retryAfter = 60;

/**
* 是否在数据库事务提交后分发任务
*
* @var bool
*/
public $afterCommit = true;

/**
* Podcast 实例
*
* @var \App\Models\Podcast
*/
protected $podcast;

/**
* 创建新的任务实例
*
* @param \App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}

/**
* 执行任务
*
* @param \App\Services\AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
Log::info('开始处理播客', ['podcast_id' => $this->podcast->id]);

try {
// 处理音频文件
$processor->process($this->podcast->audio_path);

// 更新播客状态
$this->podcast->update([
'status' => 'processed',
'processed_at' => now(),
]);

Log::info('播客处理完成', ['podcast_id' => $this->podcast->id]);

} catch (Exception $e) {
Log::error('播客处理失败', [
'podcast_id' => $this->podcast->id,
'error' => $e->getMessage(),
]);

// 重新抛出异常以触发重试机制
throw $e;
}
}

/**
* 任务失败时的处理
*
* @param \Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
Log::error('播客处理最终失败', [
'podcast_id' => $this->podcast->id,
'error' => $exception->getMessage(),
]);

// 更新播客状态为失败
$this->podcast->update([
'status' => 'failed',
'error_message' => $exception->getMessage(),
]);

// 发送失败通知
// Notification::send($this->podcast->user, new PodcastProcessingFailed($this->podcast));
}

/**
* 计算重试延迟时间
*
* @return array
*/
public function backoff()
{
// 指数退避:1分钟、4分钟、16分钟
return [60, 240, 960];
}

/**
* 确定任务是否应该重试
*
* @param \Exception $exception
* @return bool
*/
public function shouldRetry(Exception $exception)
{
// 某些异常不应该重试
if ($exception instanceof \InvalidArgumentException) {
return false;
}

return true;
}
}

唯一任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessUniqueTask implements ShouldQueue, ShouldBeUnique
{
use Dispatchable, InteractsWithQueue, SerializesModels;

public $userId;

/**
* 唯一任务的锁定时间(秒)
*
* @var int
*/
public $uniqueFor = 3600;

public function __construct($userId)
{
$this->userId = $userId;
}

/**
* 获取任务的唯一ID
*
* @return string
*/
public function uniqueId()
{
return $this->userId;
}

public function handle()
{
// 处理任务逻辑
}
}

4. 分发任务

基础分发

1
2
3
4
5
6
7
8
9
10
11
12
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;

// 立即分发任务
ProcessPodcast::dispatch($podcast);

// 条件分发
ProcessPodcast::dispatchIf($condition, $podcast);
ProcessPodcast::dispatchUnless($condition, $podcast);

// 同步分发(不进入队列)
ProcessPodcast::dispatchSync($podcast);

延迟分发

1
2
3
4
5
6
7
8
// 延迟 10 分钟后执行
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));

// 指定具体时间执行
ProcessPodcast::dispatch($podcast)->delay(now()->addHour());

// 使用 Carbon 实例
ProcessPodcast::dispatch($podcast)->delay(\Carbon\Carbon::parse('2024-01-01 12:00:00'));

指定队列和连接

1
2
3
4
5
6
7
8
9
10
// 指定队列
ProcessPodcast::dispatch($podcast)->onQueue('processing');

// 指定连接
ProcessPodcast::dispatch($podcast)->onConnection('redis');

// 同时指定连接和队列
ProcessPodcast::dispatch($podcast)
->onConnection('redis')
->onQueue('high-priority');

任务链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Illuminate\Support\Facades\Bus;

// 创建任务链
Bus::chain([
new ProcessPodcast($podcast),
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
])->dispatch();

// 带错误处理的任务链
Bus::chain([
new ProcessPodcast($podcast),
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
])->catch(function (Throwable $e) {
// 处理链中任何任务的失败
Log::error('任务链执行失败', ['error' => $e->getMessage()]);
})->dispatch();

任务批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
new ProcessPodcast($podcast1),
new ProcessPodcast($podcast2),
new ProcessPodcast($podcast3),
])->then(function (Batch $batch) {
// 所有任务成功完成
Log::info('批处理任务全部完成');
})->catch(function (Batch $batch, Throwable $e) {
// 第一个失败的任务
Log::error('批处理任务失败', ['error' => $e->getMessage()]);
})->finally(function (Batch $batch) {
// 批处理完成(无论成功或失败)
Log::info('批处理任务结束');
})->dispatch();

// 获取批处理ID
$batchId = $batch->id;

// 查询批处理状态
$batch = Bus::findBatch($batchId);
if ($batch->finished()) {
// 批处理已完成
}

5. 运行队列工作进程

基础命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 启动队列工作进程
php artisan queue:work

# 指定连接
php artisan queue:work redis

# 指定队列
php artisan queue:work --queue=high,default

# 处理单个任务后退出
php artisan queue:work --once

# 设置内存限制
php artisan queue:work --memory=512

# 设置超时时间
php artisan queue:work --timeout=60

# 设置睡眠时间
php artisan queue:work --sleep=3

# 设置重试次数
php artisan queue:work --tries=3

# 强制运行(即使在维护模式下)
php artisan queue:work --force

队列优先级

1
2
# 按优先级处理队列
php artisan queue:work --queue=high,medium,low

队列监听

1
2
3
4
5
# 监听队列(代码更改时自动重启)
php artisan queue:listen

# 指定连接和队列
php artisan queue:listen redis --queue=emails

6. 任务中间件

速率限制中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
/**
* 处理队列任务
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 获得锁,执行任务
$next($job);
}, function () use ($job) {
// 无法获得锁,释放任务回队列
$job->release(5);
});
}
}

防止任务重叠中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Cache;

class WithoutOverlapping
{
protected $key;

public function __construct($key)
{
$this->key = $key;
}

public function handle($job, $next)
{
$lock = Cache::lock($this->key, 60);

if ($lock->get()) {
try {
$next($job);
} finally {
$lock->release();
}
} else {
// 无法获得锁,释放任务
$job->release(10);
}
}
}

在任务中使用中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use App\Jobs\Middleware\RateLimited;
use App\Jobs\Middleware\WithoutOverlapping;

class ProcessPodcast implements ShouldQueue
{
// ...

/**
* 获取任务应该通过的中间件
*
* @return array
*/
public function middleware()
{
return [
new RateLimited,
new WithoutOverlapping($this->podcast->id),
];
}
}

7. 失败任务处理

查看失败任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 列出所有失败任务
php artisan queue:failed

# 重试失败任务
php artisan queue:retry 5

# 重试所有失败任务
php artisan queue:retry all

# 删除失败任务
php artisan queue:forget 5

# 清空所有失败任务
php artisan queue:flush

失败任务事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\Facades\Event;

// 在 EventServiceProvider 中注册
Event::listen(JobFailed::class, function (JobFailed $event) {
Log::error('任务失败', [
'connection' => $event->connectionName,
'queue' => $event->job->getQueue(),
'exception' => $event->exception->getMessage(),
]);

// 发送通知
// Notification::route('mail', 'admin@example.com')
// ->notify(new JobFailedNotification($event));
});

自定义失败任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 在任务类中定义
public function failed(Exception $exception)
{
// 清理资源
$this->cleanup();

// 记录错误
Log::error('任务执行失败', [
'job' => static::class,
'exception' => $exception->getMessage(),
]);

// 发送通知
// ...
}

8. 监控和调试

Laravel Horizon

Horizon 是 Laravel 为 Redis 队列提供的美观仪表板:3

1
2
3
4
5
6
7
8
# 安装 Horizon
composer require laravel/horizon

# 发布配置文件
php artisan horizon:install

# 启动 Horizon
php artisan horizon

队列监控命令

1
2
3
4
5
6
7
8
9
# 监控队列状态
php artisan queue:monitor redis:default,redis:high --max=100

# 清空队列
php artisan queue:clear redis
php artisan queue:clear redis --queue=emails

# 重启所有队列工作进程
php artisan queue:restart

自定义监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\Facades\Event;

// 监控任务处理时间
Event::listen(JobProcessing::class, function (JobProcessing $event) {
$event->job->startTime = microtime(true);
});

Event::listen(JobProcessed::class, function (JobProcessed $event) {
$duration = microtime(true) - $event->job->startTime;

Log::info('任务处理完成', [
'job' => $event->job->resolveName(),
'duration' => $duration,
'memory' => memory_get_peak_usage(true),
]);
});

9. 性能优化

队列配置优化

1
2
3
4
5
6
7
8
9
// config/queue.php
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => 5, // 阻塞等待时间
'after_commit' => false,
],

批量处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ProcessLargeDataset implements ShouldQueue
{
public function handle()
{
// 分批处理大量数据
User::chunk(1000, function ($users) {
foreach ($users as $user) {
// 处理单个用户
$this->processUser($user);
}
});
}

private function processUser(User $user)
{
// 处理逻辑
}
}

内存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MemoryEfficientJob implements ShouldQueue
{
public function handle()
{
// 使用游标避免内存溢出
foreach (User::cursor() as $user) {
$this->processUser($user);

// 定期检查内存使用
if (memory_get_usage() > 100 * 1024 * 1024) { // 100MB
Log::warning('内存使用过高,释放任务');
$this->release(30);
return;
}
}
}
}

数据库连接优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DatabaseOptimizedJob implements ShouldQueue
{
public function handle()
{
// 在长时间运行的任务中重新连接数据库
DB::reconnect();

// 处理任务
$this->processData();

// 断开连接释放资源
DB::disconnect();
}
}

10. 实际应用示例

邮件发送队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?php

namespace App\Jobs;

use App\Models\User;
use App\Mail\WelcomeEmail;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Mail;
use Exception;

class SendWelcomeEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public $tries = 3;
public $timeout = 30;

protected $user;

public function __construct(User $user)
{
$this->user = $user;
}

public function handle()
{
try {
Mail::to($this->user)->send(new WelcomeEmail($this->user));

// 更新用户状态
$this->user->update(['welcome_email_sent' => true]);

} catch (Exception $e) {
// 记录错误但不重试某些异常
if ($e instanceof \Swift_TransportException) {
$this->fail($e);
return;
}

throw $e;
}
}

public function failed(Exception $exception)
{
// 标记邮件发送失败
$this->user->update(['welcome_email_failed' => true]);
}
}

图片处理队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php

namespace App\Jobs;

use App\Models\Image;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Storage;
use Intervention\Image\Facades\Image as ImageProcessor;

class ProcessImageUpload implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public $tries = 2;
public $timeout = 300; // 5分钟

protected $image;
protected $sizes = [
'thumbnail' => [150, 150],
'medium' => [300, 300],
'large' => [800, 600],
];

public function __construct(Image $image)
{
$this->image = $image;
}

public function handle()
{
$originalPath = $this->image->path;

if (!Storage::exists($originalPath)) {
throw new \Exception('原始图片文件不存在');
}

$processedPaths = [];

foreach ($this->sizes as $size => $dimensions) {
$processedPath = $this->processImage($originalPath, $size, $dimensions);
$processedPaths[$size] = $processedPath;
}

// 更新数据库记录
$this->image->update([
'processed_paths' => $processedPaths,
'status' => 'processed',
'processed_at' => now(),
]);
}

private function processImage($originalPath, $size, $dimensions)
{
$image = ImageProcessor::make(Storage::get($originalPath));

// 调整大小
$image->fit($dimensions[0], $dimensions[1]);

// 生成新文件名
$filename = pathinfo($originalPath, PATHINFO_FILENAME);
$extension = pathinfo($originalPath, PATHINFO_EXTENSION);
$newPath = "images/processed/{$filename}_{$size}.{$extension}";

// 保存处理后的图片
Storage::put($newPath, $image->encode());

return $newPath;
}

public function failed(Exception $exception)
{
$this->image->update([
'status' => 'failed',
'error_message' => $exception->getMessage(),
]);
}
}

11. 测试队列任务

基础测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php

namespace Tests\Feature;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class PodcastProcessingTest extends TestCase
{
use RefreshDatabase;

public function test_podcast_processing_job_is_dispatched()
{
Queue::fake();

$podcast = Podcast::factory()->create();

// 触发任务分发
ProcessPodcast::dispatch($podcast);

// 断言任务被分发
Queue::assertPushed(ProcessPodcast::class, function ($job) use ($podcast) {
return $job->podcast->id === $podcast->id;
});
}

public function test_podcast_processing_job_handles_correctly()
{
$podcast = Podcast::factory()->create(['status' => 'pending']);

// 直接执行任务
$job = new ProcessPodcast($podcast);
$job->handle(new \App\Services\AudioProcessor());

// 断言状态更新
$this->assertEquals('processed', $podcast->fresh()->status);
}

public function test_failed_podcast_processing()
{
Queue::fake();

$podcast = Podcast::factory()->create();

// 模拟任务失败
$job = new ProcessPodcast($podcast);
$job->failed(new \Exception('Processing failed'));

// 断言失败处理
$this->assertEquals('failed', $podcast->fresh()->status);
}
}

批处理测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public function test_batch_processing()
{
Queue::fake();

$podcasts = Podcast::factory()->count(3)->create();

Bus::batch([
new ProcessPodcast($podcasts[0]),
new ProcessPodcast($podcasts[1]),
new ProcessPodcast($podcasts[2]),
])->dispatch();

Queue::assertPushed(ProcessPodcast::class, 3);
}

12. 最佳实践

任务设计原则

  1. 幂等性:任务应该可以安全地重复执行
  2. 原子性:任务应该是不可分割的操作单元
  3. 容错性:任务应该能够处理各种异常情况
  4. 可监控:任务应该提供足够的日志和监控信息

错误处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class RobustJob implements ShouldQueue
{
public function handle()
{
try {
$this->processData();
} catch (\Exception $e) {
// 记录详细错误信息
Log::error('任务处理失败', [
'job' => static::class,
'attempts' => $this->attempts(),
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
]);

// 根据异常类型决定是否重试
if ($this->shouldRetry($e)) {
throw $e; // 重新抛出以触发重试
} else {
$this->fail($e); // 标记为失败
}
}
}

private function shouldRetry(\Exception $e)
{
// 网络错误可以重试
if ($e instanceof \GuzzleHttp\Exception\ConnectException) {
return true;
}

// 业务逻辑错误不应重试
if ($e instanceof \InvalidArgumentException) {
return false;
}

return true;
}
}

生产环境部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 使用 Supervisor 管理队列进程
# /etc/supervisor/conf.d/laravel-worker.conf
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/project/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/path/to/your/project/storage/logs/worker.log
stopwaitsecs=3600

总结

Laravel 队列系统是构建高性能 Web 应用的重要工具,它通过异步处理机制显著提升了应用的响应速度和用户体验。4 通过合理的队列配置、任务设计和监控机制,可以构建稳定可靠的异步处理系统。

掌握队列系统的使用不仅能够解决性能瓶颈问题,还能为应用提供更好的可扩展性和容错能力。在实际项目中,应该根据具体需求选择合适的队列驱动和配置策略,确保系统的稳定性和可维护性。

本站由 提供部署服务