PHP 8.1 纤程(Fibers)异步编程实战:协程的PHP实现
Orion K Lv6

引言

PHP 8.1引入的纤程(Fibers)是PHP异步编程的重大突破。作为一个长期使用Node.js和Go进行异步开发的程序员,我对PHP终于有了原生的协程支持感到非常兴奋。经过一年多的实践,我想分享一些纤程的实际应用经验和最佳实践。

什么是纤程

纤程是一种轻量级的协程实现,允许在单个线程中实现协作式多任务处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 基本的纤程示例
$fiber = new Fiber(function (): void {
echo "纤程开始执行\n";

// 暂停纤程,返回值给调用者
$value = Fiber::suspend('Hello from fiber');
echo "纤程恢复执行,接收到: $value\n";

// 再次暂停
Fiber::suspend('Second suspend');
echo "纤程最终完成\n";
});

// 启动纤程
$result1 = $fiber->start();
echo "主程序接收到: $result1\n";

// 恢复纤程执行
$result2 = $fiber->resume('Data from main');
echo "主程序接收到: $result2\n";

// 最终恢复
$fiber->resume('Final data');

// 输出:
// 纤程开始执行
// 主程序接收到: Hello from fiber
// 纤程恢复执行,接收到: Data from main
// 主程序接收到: Second suspend
// 纤程最终完成

传统同步 vs 纤程异步

传统同步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 传统同步方式:阻塞执行
function fetchDataSync(): array {
$results = [];

// 每个请求都会阻塞
$results['user'] = file_get_contents('https://api.example.com/user/123');
$results['posts'] = file_get_contents('https://api.example.com/posts');
$results['comments'] = file_get_contents('https://api.example.com/comments');

return $results;
}

$start = microtime(true);
$data = fetchDataSync();
$end = microtime(true);
echo "同步执行时间: " . ($end - $start) . " 秒\n";
// 假设每个请求需要1秒,总时间约3秒

使用纤程的异步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 异步HTTP客户端(简化版)
class AsyncHttpClient {
private array $fibers = [];

public function get(string $url): Fiber {
$fiber = new Fiber(function () use ($url) {
// 模拟异步HTTP请求
$context = stream_context_create([
'http' => [
'method' => 'GET',
'timeout' => 10
]
]);

// 在实际实现中,这里会使用非阻塞I/O
$result = file_get_contents($url, false, $context);
return $result;
});

$this->fibers[] = $fiber;
return $fiber;
}

public function awaitAll(): array {
$results = [];

// 启动所有纤程
foreach ($this->fibers as $index => $fiber) {
$fiber->start();
$results[$index] = null;
}

// 等待所有纤程完成
while (!empty($this->fibers)) {
foreach ($this->fibers as $index => $fiber) {
if ($fiber->isTerminated()) {
$results[$index] = $fiber->getReturn();
unset($this->fibers[$index]);
}
}

// 让出CPU时间
usleep(1000);
}

return $results;
}
}

// 使用异步方式
function fetchDataAsync(): array {
$client = new AsyncHttpClient();

// 并发启动多个请求
$userFiber = $client->get('https://api.example.com/user/123');
$postsFiber = $client->get('https://api.example.com/posts');
$commentsFiber = $client->get('https://api.example.com/comments');

// 等待所有请求完成
$results = $client->awaitAll();

return [
'user' => $results[0],
'posts' => $results[1],
'comments' => $results[2]
];
}

$start = microtime(true);
$data = fetchDataAsync();
$end = microtime(true);
echo "异步执行时间: " . ($end - $start) . " 秒\n";
// 并发执行,总时间约1秒

实际应用场景

1. 数据库连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class DatabaseConnectionPool {
private array $connections = [];
private array $available = [];
private int $maxConnections;

public function __construct(int $maxConnections = 10) {
$this->maxConnections = $maxConnections;
}

public function getConnection(): PDO {
$fiber = new Fiber(function () {
while (true) {
// 检查是否有可用连接
if (!empty($this->available)) {
return array_pop($this->available);
}

// 如果连接数未达到上限,创建新连接
if (count($this->connections) < $this->maxConnections) {
$connection = new PDO(
'mysql:host=localhost;dbname=test',
'username',
'password'
);
$this->connections[] = $connection;
return $connection;
}

// 等待连接释放
Fiber::suspend();
}
});

return $fiber->start();
}

public function releaseConnection(PDO $connection): void {
$this->available[] = $connection;

// 通知等待的纤程
$this->notifyWaitingFibers();
}

private function notifyWaitingFibers(): void {
// 在实际实现中,这里会恢复等待的纤程
}
}

// 使用示例
class UserService {
private DatabaseConnectionPool $pool;

public function __construct(DatabaseConnectionPool $pool) {
$this->pool = $pool;
}

public function getUser(int $id): ?array {
$fiber = new Fiber(function () use ($id) {
$connection = $this->pool->getConnection();

try {
$stmt = $connection->prepare('SELECT * FROM users WHERE id = ?');
$stmt->execute([$id]);
$result = $stmt->fetch(PDO::FETCH_ASSOC);

return $result ?: null;
} finally {
$this->pool->releaseConnection($connection);
}
});

return $fiber->start();
}
}

2. 异步任务队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class AsyncTaskQueue {
private array $tasks = [];
private array $running = [];
private int $maxConcurrent;

public function __construct(int $maxConcurrent = 5) {
$this->maxConcurrent = $maxConcurrent;
}

public function addTask(callable $task, array $args = []): void {
$this->tasks[] = ['task' => $task, 'args' => $args];
}

public function process(): array {
$results = [];

while (!empty($this->tasks) || !empty($this->running)) {
// 启动新任务
while (count($this->running) < $this->maxConcurrent && !empty($this->tasks)) {
$taskData = array_shift($this->tasks);
$fiber = new Fiber(function () use ($taskData) {
return call_user_func_array($taskData['task'], $taskData['args']);
});

$this->running[] = $fiber;
$fiber->start();
}

// 检查完成的任务
foreach ($this->running as $index => $fiber) {
if ($fiber->isTerminated()) {
$results[] = $fiber->getReturn();
unset($this->running[$index]);
}
}

// 重新索引数组
$this->running = array_values($this->running);

// 短暂休眠,避免CPU占用过高
usleep(1000);
}

return $results;
}
}

// 使用示例
$queue = new AsyncTaskQueue(3);

// 添加任务
$queue->addTask(function ($url) {
return file_get_contents($url);
}, ['https://api.example.com/data1']);

$queue->addTask(function ($data) {
return json_encode($data);
}, [['key' => 'value']]);

$queue->addTask(function ($number) {
return $number * $number;
}, [42]);

// 处理所有任务
$results = $queue->process();
print_r($results);

3. 实时数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class RealTimeDataProcessor {
private array $processors = [];
private bool $running = false;

public function addProcessor(string $name, callable $processor): void {
$this->processors[$name] = $processor;
}

public function start(): void {
$this->running = true;

$fibers = [];

// 为每个处理器创建纤程
foreach ($this->processors as $name => $processor) {
$fibers[$name] = new Fiber(function () use ($name, $processor) {
while ($this->running) {
// 获取数据
$data = $this->fetchData($name);

if ($data !== null) {
// 处理数据
$result = $processor($data);

// 保存结果
$this->saveResult($name, $result);
}

// 让出控制权
Fiber::suspend();
}
});
}

// 启动所有纤程
foreach ($fibers as $fiber) {
$fiber->start();
}

// 主循环
while ($this->running) {
foreach ($fibers as $name => $fiber) {
if (!$fiber->isTerminated()) {
$fiber->resume();
}
}

usleep(10000); // 10ms
}
}

public function stop(): void {
$this->running = false;
}

private function fetchData(string $source): ?array {
// 模拟从不同数据源获取数据
static $counter = 0;
$counter++;

if ($counter % 10 === 0) {
return ['source' => $source, 'data' => rand(1, 100), 'timestamp' => time()];
}

return null;
}

private function saveResult(string $source, mixed $result): void {
// 保存处理结果
error_log("[$source] Result: " . json_encode($result));
}
}

// 使用示例
$processor = new RealTimeDataProcessor();

$processor->addProcessor('sensor1', function ($data) {
return ['processed' => $data['data'] * 2, 'timestamp' => $data['timestamp']];
});

$processor->addProcessor('sensor2', function ($data) {
return ['average' => $data['data'] / 2, 'timestamp' => $data['timestamp']];
});

// 启动处理器(在实际应用中会在后台运行)
// $processor->start();

4. 异步文件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class AsyncFileProcessor {
private int $maxConcurrent;

public function __construct(int $maxConcurrent = 5) {
$this->maxConcurrent = $maxConcurrent;
}

public function processFiles(array $files, callable $processor): array {
$results = [];
$fibers = [];
$completed = 0;

foreach ($files as $index => $file) {
$fiber = new Fiber(function () use ($file, $processor) {
return $processor($file);
});

$fibers[$index] = [
'fiber' => $fiber,
'file' => $file,
'started' => false,
'completed' => false
];
}

while ($completed < count($files)) {
$running = 0;

// 启动新的纤程
foreach ($fibers as $index => &$fiberData) {
if (!$fiberData['started'] && $running < $this->maxConcurrent) {
$fiberData['fiber']->start();
$fiberData['started'] = true;
$running++;
}

if ($fiberData['started'] && !$fiberData['completed']) {
$running++;
}
}

// 检查完成的纤程
foreach ($fibers as $index => &$fiberData) {
if ($fiberData['started'] && !$fiberData['completed'] && $fiberData['fiber']->isTerminated()) {
$results[$index] = [
'file' => $fiberData['file'],
'result' => $fiberData['fiber']->getReturn()
];
$fiberData['completed'] = true;
$completed++;
}
}

usleep(1000);
}

return $results;
}
}

// 使用示例
$processor = new AsyncFileProcessor(3);

$files = [
'/path/to/file1.txt',
'/path/to/file2.txt',
'/path/to/file3.txt',
'/path/to/file4.txt',
'/path/to/file5.txt'
];

$results = $processor->processFiles($files, function ($file) {
// 模拟文件处理
$content = file_get_contents($file);
$wordCount = str_word_count($content);

return [
'word_count' => $wordCount,
'size' => filesize($file),
'processed_at' => date('Y-m-d H:i:s')
];
});

foreach ($results as $result) {
echo "文件: {$result['file']}, 字数: {$result['result']['word_count']}\n";
}

纤程调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
class FiberScheduler {
private array $fibers = [];
private array $waiting = [];
private bool $running = false;

public function add(Fiber $fiber, string $name = null): void {
$name = $name ?? uniqid('fiber_');
$this->fibers[$name] = [
'fiber' => $fiber,
'status' => 'ready'
];
}

public function run(): void {
$this->running = true;

while ($this->running && !empty($this->fibers)) {
foreach ($this->fibers as $name => $fiberData) {
$fiber = $fiberData['fiber'];

if ($fiber->isTerminated()) {
unset($this->fibers[$name]);
continue;
}

if ($fiberData['status'] === 'ready') {
if (!$fiber->isStarted()) {
$fiber->start();
} else {
$fiber->resume();
}

$this->fibers[$name]['status'] = 'running';
}
}

// 检查等待条件
$this->checkWaitingConditions();

usleep(1000);
}
}

public function stop(): void {
$this->running = false;
}

public function sleep(float $seconds): void {
$wakeTime = microtime(true) + $seconds;
$this->waiting[] = [
'type' => 'sleep',
'wake_time' => $wakeTime,
'fiber' => Fiber::getCurrent()
];

Fiber::suspend();
}

public function waitFor(callable $condition): void {
$this->waiting[] = [
'type' => 'condition',
'condition' => $condition,
'fiber' => Fiber::getCurrent()
];

Fiber::suspend();
}

private function checkWaitingConditions(): void {
$currentTime = microtime(true);

foreach ($this->waiting as $index => $wait) {
$shouldResume = false;

if ($wait['type'] === 'sleep' && $currentTime >= $wait['wake_time']) {
$shouldResume = true;
} elseif ($wait['type'] === 'condition' && $wait['condition']()) {
$shouldResume = true;
}

if ($shouldResume) {
$wait['fiber']->resume();
unset($this->waiting[$index]);
}
}

$this->waiting = array_values($this->waiting);
}
}

// 使用示例
$scheduler = new FiberScheduler();

// 添加纤程
$scheduler->add(new Fiber(function () use ($scheduler) {
echo "纤程1开始\n";
$scheduler->sleep(1.0);
echo "纤程1恢复\n";
$scheduler->sleep(0.5);
echo "纤程1完成\n";
}), 'fiber1');

$scheduler->add(new Fiber(function () use ($scheduler) {
echo "纤程2开始\n";
$scheduler->sleep(0.5);
echo "纤程2恢复\n";
$scheduler->sleep(1.0);
echo "纤程2完成\n";
}), 'fiber2');

// 运行调度器
$scheduler->run();

错误处理和异常管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class SafeFiberExecutor {
public static function execute(callable $task, array $args = []): mixed {
$fiber = new Fiber(function () use ($task, $args) {
try {
return call_user_func_array($task, $args);
} catch (Throwable $e) {
// 记录错误
error_log("纤程执行错误: " . $e->getMessage());

// 返回错误信息
return [
'error' => true,
'message' => $e->getMessage(),
'code' => $e->getCode(),
'trace' => $e->getTraceAsString()
];
}
});

try {
return $fiber->start();
} catch (FiberError $e) {
return [
'error' => true,
'message' => '纤程执行失败: ' . $e->getMessage(),
'code' => $e->getCode()
];
}
}

public static function executeMultiple(array $tasks): array {
$results = [];
$fibers = [];

// 创建所有纤程
foreach ($tasks as $index => $taskData) {
$task = $taskData['task'];
$args = $taskData['args'] ?? [];

$fibers[$index] = new Fiber(function () use ($task, $args) {
try {
return [
'success' => true,
'result' => call_user_func_array($task, $args)
];
} catch (Throwable $e) {
return [
'success' => false,
'error' => $e->getMessage(),
'code' => $e->getCode()
];
}
});
}

// 启动所有纤程
foreach ($fibers as $index => $fiber) {
try {
$results[$index] = $fiber->start();
} catch (FiberError $e) {
$results[$index] = [
'success' => false,
'error' => '纤程启动失败: ' . $e->getMessage()
];
}
}

return $results;
}
}

// 使用示例
$tasks = [
[
'task' => function ($x, $y) {
if ($y === 0) {
throw new DivisionByZeroError('除零错误');
}
return $x / $y;
},
'args' => [10, 2]
],
[
'task' => function ($x, $y) {
if ($y === 0) {
throw new DivisionByZeroError('除零错误');
}
return $x / $y;
},
'args' => [10, 0] // 这会导致错误
],
[
'task' => function ($str) {
return strtoupper($str);
},
'args' => ['hello world']
]
];

$results = SafeFiberExecutor::executeMultiple($tasks);

foreach ($results as $index => $result) {
if ($result['success']) {
echo "任务 $index 成功: {$result['result']}\n";
} else {
echo "任务 $index 失败: {$result['error']}\n";
}
}

最佳实践

1. 纤程池管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class FiberPool {
private array $pool = [];
private int $maxSize;
private int $currentSize = 0;

public function __construct(int $maxSize = 100) {
$this->maxSize = $maxSize;
}

public function acquire(callable $task): Fiber {
if (!empty($this->pool)) {
$fiber = array_pop($this->pool);
// 重置纤程状态
return $this->resetFiber($fiber, $task);
}

if ($this->currentSize < $this->maxSize) {
$this->currentSize++;
return new Fiber($task);
}

throw new RuntimeException('纤程池已满');
}

public function release(Fiber $fiber): void {
if ($fiber->isTerminated()) {
$this->pool[] = $fiber;
}
}

private function resetFiber(Fiber $fiber, callable $task): Fiber {
// 在实际实现中,这里需要重置纤程状态
return new Fiber($task);
}

public function getStats(): array {
return [
'pool_size' => count($this->pool),
'current_size' => $this->currentSize,
'max_size' => $this->maxSize
];
}
}

2. 资源清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class ResourceAwareFiber {
private array $resources = [];

public function addResource(string $name, mixed $resource, callable $cleanup = null): void {
$this->resources[$name] = [
'resource' => $resource,
'cleanup' => $cleanup
];
}

public function execute(callable $task): mixed {
$fiber = new Fiber(function () use ($task) {
try {
return $task($this->resources);
} finally {
$this->cleanup();
}
});

return $fiber->start();
}

private function cleanup(): void {
foreach ($this->resources as $name => $resourceData) {
if ($resourceData['cleanup']) {
try {
$resourceData['cleanup']($resourceData['resource']);
} catch (Throwable $e) {
error_log("清理资源 '$name' 时出错: " . $e->getMessage());
}
}
}

$this->resources = [];
}
}

// 使用示例
$resourceFiber = new ResourceAwareFiber();

$resourceFiber->addResource('file', fopen('/tmp/test.txt', 'w'), function ($file) {
fclose($file);
});

$resourceFiber->addResource('curl', curl_init(), function ($curl) {
curl_close($curl);
});

$result = $resourceFiber->execute(function ($resources) {
fwrite($resources['file']['resource'], 'Hello World');

curl_setopt($resources['curl']['resource'], CURLOPT_URL, 'https://api.example.com');
curl_setopt($resources['curl']['resource'], CURLOPT_RETURNTRANSFER, true);

return curl_exec($resources['curl']['resource']);
});

注意事项和限制

1. 内存管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 避免内存泄漏
class MemoryAwareFiber {
public static function execute(callable $task, int $memoryLimit = 128 * 1024 * 1024): mixed {
$fiber = new Fiber(function () use ($task, $memoryLimit) {
$startMemory = memory_get_usage(true);

try {
$result = $task();

$currentMemory = memory_get_usage(true);
if ($currentMemory - $startMemory > $memoryLimit) {
trigger_error('纤程内存使用超出限制', E_USER_WARNING);
}

return $result;
} finally {
// 强制垃圾回收
gc_collect_cycles();
}
});

return $fiber->start();
}
}

2. 异常传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ExceptionHandlingFiber {
public static function safeExecute(callable $task): array {
$fiber = new Fiber(function () use ($task) {
try {
return ['success' => true, 'result' => $task()];
} catch (Throwable $e) {
return [
'success' => false,
'error' => $e->getMessage(),
'type' => get_class($e),
'file' => $e->getFile(),
'line' => $e->getLine()
];
}
});

try {
return $fiber->start();
} catch (FiberError $e) {
return [
'success' => false,
'error' => '纤程错误: ' . $e->getMessage(),
'type' => 'FiberError'
];
}
}
}

总结

PHP 8.1的纤程为PHP带来了强大的异步编程能力,它让我们能够:

  1. 提高并发性能:在I/O密集型任务中显著提升性能
  2. 简化异步代码:相比回调和Promise更直观
  3. 资源高效利用:单线程内实现协作式多任务
  4. 灵活的控制流:可以在任意点暂停和恢复执行

最适用的场景:

  • HTTP客户端和API调用
  • 数据库操作
  • 文件I/O处理
  • 实时数据处理
  • 任务队列系统

使用建议:

  • 合理管理纤程生命周期
  • 注意异常处理和资源清理
  • 避免在CPU密集型任务中使用
  • 考虑内存使用和性能监控

纤程让PHP在异步编程领域迈出了重要一步,为构建高性能应用提供了新的可能性!

本站由 提供部署服务