为了正常地执行上面的轮询动作,我们将在调度器里增加一个特殊的任务:
<?php
protected function ioPollTask() {
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
需要在某个地方注册这个任务,例如,你可以在run()方法的开始增加$this->newTask($this->ioPollTask())。然后就像其他 任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法)。ioPollTask将使用0秒的超时来调用ioPoll, 这意味着stream_select将立即返回(而不是等待)。
只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪。如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行,直到有新的连接建立。这将导致100%的CPU利用率。相反,让操作系统做这种等待会更有效。
现在编写服务器相对容易了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket, 0);
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket, 8192);
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield waitForWrite($socket);
fwrite($socket, $response);
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
这段代码将接收到localhost:8000上的连接,然后仅仅返回发送来的内容作为HTTP响应。要做“实际”的事情的话就爱哪个非常复杂(处理 HTTP请求可能已经超出了这篇文章的范围)。上面的代码片段只是演示了一般性的概念。
你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器。这条命令将向服务器发送10000个请求,并且其中100个请求将同时到达。使用这样的数目,我得到了处于中间的10毫秒的响应时间。不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,总的吞吐量应该更像是10000请求/秒)。调高并发数(比如 -c 500),服务器大多数运行良好,不过某些连接将抛出“连接被对方重置”的错误。由于我对低级别的socket资料了解的非常少,所以 我不能指出问题出在哪儿。
协程堆栈
如果你试图用我们的调度系统建立更大的系统的话,你将很快遇到问题:我们习惯了把代码分解为更小的函数,然后调用它们。然而, 如果使用了协程的话,就不能这么做了。例如,看下面代码:
<?php
function echoTimes($msg, $max) {
for ($i = 1; $i <= $max; ++$i) {
echo "$msg iteration $i\n";
yield;
}
}
function task() {
echoTimes('foo', 10); // print foo ten times
echo "---\n";
echoTimes('bar', 5); // print bar five times
yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它。然而它无法运行。正如在这篇文章的开始 所提到的,调用生成器(或者协程)将没有真正地做任何事情,它仅仅返回一个对象。这也出现在上面的例子里。echoTimes调用除了放回一个(无用的)协程对象外不做任何事情。
为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装。我们将调用它:“协程堆栈”。因为它将管理嵌套的协程调用堆栈。 这将是通过生成协程来调用子协程成为可能:
$retval = (yield someCoroutine($foo, $bar));
使用yield,子协程也能再次返回值:
yield retval("I'm a return value!");
retval函数除了返回一个值的封装外没有做任何其他事情。这个封装将表示它是一个返回值。
<?php
class CoroutineReturnValue {
protected $value;
public function __construct($value) {
$this->value = $value;
}
public function getValue() {
return $this->value;
}
}
function retval($value) {
return new CoroutineReturnValue($value);
}
为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
for (;;) {
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
$gen->send(yield $gen->key() => $value);
}
}
这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色。在$gen->send(yield $gen->key()=>$value);这行完成了代理功能。另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里。一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程。
为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);。
现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数。为了分组相关的 功能,我将使用下面类:
<?php
class CoSocket {
protected $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function accept() {
yield waitForRead($this->socket);
yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
}
public function read($size) {
yield waitForRead($this->socket);
yield retval(fread($this->socket, $size));
}
public function write($string) {
yield waitForWrite($this->socket);
fwrite($this->socket, $string);
}
public function close() {
@fclose($this->socket);
}
}
现在服务器可以编写的稍微简洁点了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
$socket = new CoSocket($socket);
while (true) {
yield newTask(
handleClient(yield $socket->accept())
);
}
}
function handleClient($socket) {
$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield $socket->write($response);
yield $socket->close();
}
错误处理
作为一个优秀的程序员,相信你已经察觉到上面的例子缺少错误处理。几乎所有的 socket 都是易出错的。我这样做的原因一方面固然是因为错误处理的乏味(特别是 socket!),另一方面也在于它很容易使代码体积膨胀。
不过,我仍然了一讲一下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误。尽管此方法还未在 PHP 中实现,但我很快就会提交它,就在今天。
throw() 方法接受一个 Exception,并将其抛出到协程的当前悬挂点,看看下面代码:
<?php
function gen() {
echo "Foo\n";
try {
yield;
} catch (Exception $e) {
echo "Exception: {$e->getMessage()}\n";
}
echo "Bar\n";
}
$gen = gen();
$gen->rewind(); // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
// and "Bar"
这非常棒,因为我们可以使用系统调用以及子协程调用异常抛出。对与系统调用,Scheduler::run() 方法需要一些小调整:
<?php
if ($retval instanceof SystemCall) {
try {
$retval($task, $this);
} catch (Exception $e) {
$task->setException($e);
$this->schedule($task);
}
continue;
}
Task 类也许要添加 throw 调用处理:
<?php
class Task {
// ...
protected $exception = null;
public function setException($exception) {
$this->exception = $exception;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} elseif ($this->exception) {
$retval = $this->coroutine->throw($this->exception);
$this->exception = null;
return $retval;
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
// ...
}
现在,我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
if ($scheduler->killTask($tid)) {
$scheduler->schedule($task);
} else {
throw new InvalidArgumentException('Invalid task ID!');
}
}
);
}
试试看:
<?php
function task() {
try {
yield killTask(500);
} catch (Exception $e) {
echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
}
}
这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常。要修复需要做些调整:
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
$exception = null;
for (;;) {
try {
if ($exception) {
$gen->throw($exception);
$exception = null;
continue;
}
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
try {
$sendValue = (yield $gen->key() => $value);
} catch (Exception $e) {
$gen->throw($e);
continue;
}
$gen->send($sendValue);
} catch (Exception $e) {
if ($stack->isEmpty()) {
throw $e;
}
$gen = $stack->pop();
$exception = $e;
}
}
}
结束语
在这篇文章里,我使用多任务协作构建了一个任务调度器,其中包括执行“系统调用”,做非阻塞操作和处理错误。所有这些里真正很酷的事情是任务的结果代码看起来完全同步,甚至任务正在执行大量的异步操作的时候也是这样。如果你打算从套接口读取数据的话,你将不需要传递某个回调函数或者注册一个事件侦听器。相反,你只要书写yield $socket->read()。这儿大部分都是你常常也要编写的,只在它的前面增加yield。
当我第一次听到所有这一切的时候,我发现这个概念完全令人折服,而且正是这个激励我在PHP中实现了它。同时我发现协程真正令人心慌。在令人敬畏的代码和很大一堆代码之间只有单薄的一行,我认为协程正好处在这一行上。讲讲使用上面所述的方法书写异步代码是否真的有益对我来说很难。
无论如何,我认为这是一个有趣的话题,而且我希望你也能找到它的乐趣。欢迎评论:)