PHP
비동기 프로그래밍
수색…
발전기의 장점
PHP 5.5에서는 Generators와 yield 키워드를 도입하여 동기 코드와 비슷한 비동기 코드를 작성할 수 있습니다.
yield
표현식은 호출 코드를 다시 제어하고 해당 위치에서 다시 시작 지점을 제공합니다. yield
명령을 따라 값을 보낼 수 있습니다. 이 표현식의 반환 값은 null
이거나 Generator::send()
에 전달 된 값입니다.
function reverse_range($i) {
// the mere presence of the yield keyword in this function makes this a Generator
do {
// $i is retained between resumptions
print yield $i;
} while (--$i > 0);
}
$gen = reverse_range(5);
print $gen->current();
$gen->send("injected!"); // send also resumes the Generator
foreach ($gen as $val) { // loops over the Generator, resuming it upon each iteration
echo $val;
}
// Output: 5injected!4321
이 메커니즘은 코 루틴 구현에 의해 생성자가 생성 한 Awaitables를 기다리고 (해결을위한 콜백으로 등록하여) Awaitable이 해결 되 자마자 Generator 실행을 계속할 수 있습니다.
고드름 이벤트 루프 사용하기
Icicle 은 Awaitables 및 Generators를 사용하여 Coroutines를 만듭니다.
require __DIR__ . '/vendor/autoload.php';
use Icicle\Awaitable;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
$generator = function (float $time) {
try {
// Sets $start to the value returned by microtime() after approx. $time seconds.
$start = yield Awaitable\resolve(microtime(true))->delay($time);
echo "Sleep time: ", microtime(true) - $start, "\n";
// Throws the exception from the rejected awaitable into the coroutine.
return yield Awaitable\reject(new Exception('Rejected awaitable'));
} catch (Throwable $e) { // Catches awaitable rejection reason.
echo "Caught exception: ", $e->getMessage(), "\n";
}
return yield Awaitable\resolve('Coroutine completed');
};
// Coroutine sleeps for 1.2 seconds, then will resolve with a string.
$coroutine = new Coroutine($generator(1.2));
$coroutine->done(function (string $data) {
echo $data, "\n";
});
Loop\run();
앰프 이벤트 루프 사용
앰프 하네스는 코 루틴 생성을 위해 [Awaitables의 다른 이름] 및 생성기를 약속합니다.
require __DIR__ . '/vendor/autoload.php';
use Amp\Dns;
// Try our system defined resolver or googles, whichever is fastest
function queryStackOverflow($recordtype) {
$requests = [
Dns\query("stackoverflow.com", $recordtype),
Dns\query("stackoverflow.com", $recordtype, ["server" => "8.8.8.8"]),
];
// returns a Promise resolving when the first one of the requests resolves
return yield Amp\first($request);
}
\Amp\run(function() { // main loop, implicitly a coroutine
try {
// convert to coroutine with Amp\resolve()
$promise = Amp\resolve(queryStackOverflow(Dns\Record::NS));
list($ns, $type, $ttl) = // we need only one NS result, not all
current(yield Amp\timeout($promise, 2000 /* milliseconds */));
echo "The result of the fastest server to reply to our query was $ns";
} catch (Amp\TimeoutException $e) {
echo "We've heard no answer for 2 seconds! Bye!";
} catch (Dns\NoRecordException $e) {
echo "No NS records there? Stupid DNS nameserver!";
}
});
proc_open ()을 사용하여 비 블로킹 프로세스 생성
PHP는 pthread
와 같은 확장 기능을 설치하지 않으면 코드를 동시에 실행할 수 없습니다. 이것은 때때로 proc_open()
및 stream_set_blocking()
하고 출력을 비동기 적으로 읽음으로써 무시할 수 있습니다.
코드를 작은 덩어리로 분할하면 다중 suprocess로 실행할 수 있습니다. 그런 다음 stream_set_blocking()
함수를 사용하여 각 하위 프로세스를 비 차단으로 만들 수 있습니다. 이는 여러 개의 하위 프로세스를 생성 한 다음 루프에서 출력을 확인 (짝수 루프와 유사하게)하고 모두 완료 될 때까지 대기 할 수 있음을 의미합니다.
예를 들어, 루프를 실행하는 작은 하위 프로세스가있을 수 있으며 각 반복마다 100 - 1000ms 동안 무작위로 잠자기 상태가됩니다 (참고로 지연은 항상 한 하위 프로세스에서 동일합니다).
<?php
// subprocess.php
$name = $argv[1];
$delay = rand(1, 10) * 100;
printf("$name delay: ${delay}ms\n");
for ($i = 0; $i < 5; $i++) {
usleep($delay * 1000);
printf("$name: $i\n");
}
그러면 주 프로세스가 하위 프로세스를 생성하고 출력을 읽습니다. 우리는 그것을 더 작은 블록으로 나눌 수 있습니다 :
- proc_open ()으로 하위 프로세스를 생성하십시오.
-
stream_set_blocking()
하여 각 서브 프로세스를 비 블로킹으로 만듭니다. -
proc_get_status()
사용하여 모든 서브 프로세스가 끝날 때까지 루프를 실행하십시오. -
fclose()
사용하여 각 하위 프로세스의 출력 파이프로 파일 핸들을 올바르게 닫고proc_close()
사용하여 프로세스 핸들을 닫습니다.
<?php
// non-blocking-proc_open.php
// File descriptors for each subprocess.
$descriptors = [
0 => ['pipe', 'r'], // stdin
1 => ['pipe', 'w'], // stdout
];
$pipes = [];
$processes = [];
foreach (range(1, 3) as $i) {
// Spawn a subprocess.
$proc = proc_open('php subprocess.php proc' . $i, $descriptors, $procPipes);
$processes[$i] = $proc;
// Make the subprocess non-blocking (only output pipe).
stream_set_blocking($procPipes[1], 0);
$pipes[$i] = $procPipes;
}
// Run in a loop until all subprocesses finish.
while (array_filter($processes, function($proc) { return proc_get_status($proc)['running']; })) {
foreach (range(1, 3) as $i) {
usleep(10 * 1000); // 100ms
// Read all available output (unread output is buffered).
$str = fread($pipes[$i][1], 1024);
if ($str) {
printf($str);
}
}
}
// Close all pipes and processes.
foreach (range(1, 3) as $i) {
fclose($pipes[$i][1]);
proc_close($processes[$i]);
}
출력은 fread ()에 의해 읽혀지는 세 가지 하위 프로세스의 혼합을 포함합니다 ( 이 경우 proc1
은 다른 두 프로세스보다 훨씬 proc1
).
$ php non-blocking-proc_open.php
proc1 delay: 200ms
proc2 delay: 1000ms
proc3 delay: 800ms
proc1: 0
proc1: 1
proc1: 2
proc1: 3
proc3: 0
proc1: 4
proc2: 0
proc3: 1
proc2: 1
proc3: 2
proc2: 2
proc3: 3
proc2: 3
proc3: 4
proc2: 4
이벤트 및 DIO로 직렬 포트 읽기
DIO 스트림은 현재 이벤트 확장에 의해 인식되지 않습니다. DIO 자원에 캡슐화 된 파일 설명자를 얻는 명확한 방법은 없습니다. 하지만 해결 방법이 있습니다.
-
fopen()
을 사용하여 포트에 대한 스트림을 엽니 다. -
stream_set_blocking()
스트림을 non-blocking으로 만든다; -
EventUtil::getSocketFd()
를 사용하여 스트림에서 숫자 파일 설명자를 가져옵니다. - 숫자 파일 기술자를
dio_fdopen()
(현재 문서화되지 않음)에 전달하고 DIO 리소스를 얻습니다. - 파일 디스크립터의 읽기 이벤트를 청취하기위한
Event
를 콜백과 함께 추가한다. - 콜백에서 사용 가능한 데이터를 비우고 응용 프로그램의 논리에 따라 처리합니다.
dio.php
<?php
class Scanner {
protected $port; // port path, e.g. /dev/pts/5
protected $fd; // numeric file descriptor
protected $base; // EventBase
protected $dio; // dio resource
protected $e_open; // Event
protected $e_read; // Event
public function __construct ($port) {
$this->port = $port;
$this->base = new EventBase();
}
public function __destruct() {
$this->base->exit();
if ($this->e_open)
$this->e_open->free();
if ($this->e_read)
$this->e_read->free();
if ($this->dio)
dio_close($this->dio);
}
public function run() {
$stream = fopen($this->port, 'rb');
stream_set_blocking($stream, false);
$this->fd = EventUtil::getSocketFd($stream);
if ($this->fd < 0) {
fprintf(STDERR, "Failed attach to port, events: %d\n", $events);
return;
}
$this->e_open = new Event($this->base, $this->fd, Event::WRITE, [$this, '_onOpen']);
$this->e_open->add();
$this->base->dispatch();
fclose($stream);
}
public function _onOpen($fd, $events) {
$this->e_open->del();
$this->dio = dio_fdopen($this->fd);
// Call other dio functions here, e.g.
dio_tcsetattr($this->dio, [
'baud' => 9600,
'bits' => 8,
'stop' => 1,
'parity' => 0
]);
$this->e_read = new Event($this->base, $this->fd, Event::READ | Event::PERSIST,
[$this, '_onRead']);
$this->e_read->add();
}
public function _onRead($fd, $events) {
while ($data = dio_read($this->dio, 1)) {
var_dump($data);
}
}
}
// Change the port argument
$scanner = new Scanner('/dev/pts/5');
$scanner->run();
테스트
터미널 A에서 다음 명령을 실행하십시오.
$ socat -d -d pty,raw,echo=0 pty,raw,echo=0
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/5
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/8
2016/12/01 18:04:06 socat[16750] N starting data transfer loop with FDs [5,5] and [7,7]
출력은 다를 수 있습니다. 첫 번째 행 (특히 /dev/pts/5
및 /dev/pts/8
)의 PTY를 사용하십시오.
터미널 B에서 위에서 언급 한 스크립트를 실행하십시오. 루트 권한이 필요할 수 있습니다.
$ sudo php dio.php
터미널 C에서 문자열을 첫 번째 PTY로 보냅니다.
$ echo test > /dev/pts/8
산출
string(1) "t"
string(1) "e"
string(1) "s"
string(1) "t"
string(1) "
"
이벤트 확장을 기반으로 한 HTTP 클라이언트
이것은 이벤트 확장을 기반으로하는 샘플 HTTP 클라이언트 클래스입니다.
이 클래스는 많은 수의 HTTP 요청을 스케줄링 한 다음 비동기로 실행합니다.
http-client.php
<?php
class MyHttpClient {
/// @var EventBase
protected $base;
/// @var array Instances of EventHttpConnection
protected $connections = [];
public function __construct() {
$this->base = new EventBase();
}
/**
* Dispatches all pending requests (events)
*
* @return void
*/
public function run() {
$this->base->dispatch();
}
public function __destruct() {
// Destroy connection objects explicitly, don't wait for GC.
// Otherwise, EventBase may be free'd earlier.
$this->connections = null;
}
/**
* @brief Adds a pending HTTP request
*
* @param string $address Hostname, or IP
* @param int $port Port number
* @param array $headers Extra HTTP headers
* @param int $cmd A EventHttpRequest::CMD_* constant
* @param string $resource HTTP request resource, e.g. '/page?a=b&c=d'
*
* @return EventHttpRequest|false
*/
public function addRequest($address, $port, array $headers,
$cmd = EventHttpRequest::CMD_GET, $resource = '/')
{
$conn = new EventHttpConnection($this->base, null, $address, $port);
$conn->setTimeout(5);
$req = new EventHttpRequest([$this, '_requestHandler'], $this->base);
foreach ($headers as $k => $v) {
$req->addHeader($k, $v, EventHttpRequest::OUTPUT_HEADER);
}
$req->addHeader('Host', $address, EventHttpRequest::OUTPUT_HEADER);
$req->addHeader('Connection', 'close', EventHttpRequest::OUTPUT_HEADER);
if ($conn->makeRequest($req, $cmd, $resource)) {
$this->connections []= $conn;
return $req;
}
return false;
}
/**
* @brief Handles an HTTP request
*
* @param EventHttpRequest $req
* @param mixed $unused
*
* @return void
*/
public function _requestHandler($req, $unused) {
if (is_null($req)) {
echo "Timed out\n";
} else {
$response_code = $req->getResponseCode();
if ($response_code == 0) {
echo "Connection refused\n";
} elseif ($response_code != 200) {
echo "Unexpected response: $response_code\n";
} else {
echo "Success: $response_code\n";
$buf = $req->getInputBuffer();
echo "Body:\n";
while ($s = $buf->readLine(EventBuffer::EOL_ANY)) {
echo $s, PHP_EOL;
}
}
}
}
}
$address = "my-host.local";
$port = 80;
$headers = [ 'User-Agent' => 'My-User-Agent/1.0', ];
$client = new MyHttpClient();
// Add pending requests
for ($i = 0; $i < 10; $i++) {
$client->addRequest($address, $port, $headers,
EventHttpRequest::CMD_GET, '/test.php?a=' . $i);
}
// Dispatch pending requests
$client->run();
test.php
이것은 서버 측의 샘플 스크립트입니다.
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
echo 'User-Agent: ', $_SERVER['HTTP_USER_AGENT'] ?? '(none)', PHP_EOL;
용법
php http-client.php
샘플 출력
Success: 200
Body:
GET: array (
'a' => '1',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '0',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '3',
)
...
(잘라 내기.)
이 코드는 CLI SAPI 에서 장기적인 처리를 위해 설계된 것입니다.
Ev 확장자에 기반한 HTTP 클라이언트
Ev 확장자를 기반으로 한 샘플 HTTP 클라이언트입니다.
Ev 확장은 간단하면서도 강력한 범용 이벤트 루프를 구현합니다. 네트워크 관련 관찰자는 제공하지 않지만 I / O 감시자 는 소켓의 비동기 처리에 사용할 수 있습니다.
다음 코드는 HTTP 요청을 병렬 처리하도록 예약하는 방법을 보여줍니다.
http-client.php
<?php
class MyHttpRequest {
/// @var MyHttpClient
private $http_client;
/// @var string
private $address;
/// @var string HTTP resource such as /page?get=param
private $resource;
/// @var string HTTP method such as GET, POST etc.
private $method;
/// @var int
private $service_port;
/// @var resource Socket
private $socket;
/// @var double Connection timeout in seconds.
private $timeout = 10.;
/// @var int Chunk size in bytes for socket_recv()
private $chunk_size = 20;
/// @var EvTimer
private $timeout_watcher;
/// @var EvIo
private $write_watcher;
/// @var EvIo
private $read_watcher;
/// @var EvTimer
private $conn_watcher;
/// @var string buffer for incoming data
private $buffer;
/// @var array errors reported by sockets extension in non-blocking mode.
private static $e_nonblocking = [
11, // EAGAIN or EWOULDBLOCK
115, // EINPROGRESS
];
/**
* @param MyHttpClient $client
* @param string $host Hostname, e.g. google.co.uk
* @param string $resource HTTP resource, e.g. /page?a=b&c=d
* @param string $method HTTP method: GET, HEAD, POST, PUT etc.
* @throws RuntimeException
*/
public function __construct(MyHttpClient $client, $host, $resource, $method) {
$this->http_client = $client;
$this->host = $host;
$this->resource = $resource;
$this->method = $method;
// Get the port for the WWW service
$this->service_port = getservbyname('www', 'tcp');
// Get the IP address for the target host
$this->address = gethostbyname($this->host);
// Create a TCP/IP socket
$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!$this->socket) {
throw new RuntimeException("socket_create() failed: reason: " .
socket_strerror(socket_last_error()));
}
// Set O_NONBLOCK flag
socket_set_nonblock($this->socket);
$this->conn_watcher = $this->http_client->getLoop()
->timer(0, 0., [$this, 'connect']);
}
public function __destruct() {
$this->close();
}
private function freeWatcher(&$w) {
if ($w) {
$w->stop();
$w = null;
}
}
/**
* Deallocates all resources of the request
*/
private function close() {
if ($this->socket) {
socket_close($this->socket);
$this->socket = null;
}
$this->freeWatcher($this->timeout_watcher);
$this->freeWatcher($this->read_watcher);
$this->freeWatcher($this->write_watcher);
$this->freeWatcher($this->conn_watcher);
}
/**
* Initializes a connection on socket
* @return bool
*/
public function connect() {
$loop = $this->http_client->getLoop();
$this->timeout_watcher = $loop->timer($this->timeout, 0., [$this, '_onTimeout']);
$this->write_watcher = $loop->io($this->socket, Ev::WRITE, [$this, '_onWritable']);
return socket_connect($this->socket, $this->address, $this->service_port);
}
/**
* Callback for timeout (EvTimer) watcher
*/
public function _onTimeout(EvTimer $w) {
$w->stop();
$this->close();
}
/**
* Callback which is called when the socket becomes wriable
*/
public function _onWritable(EvIo $w) {
$this->timeout_watcher->stop();
$w->stop();
$in = implode("\r\n", [
"{$this->method} {$this->resource} HTTP/1.1",
"Host: {$this->host}",
'Connection: Close',
]) . "\r\n\r\n";
if (!socket_write($this->socket, $in, strlen($in))) {
trigger_error("Failed writing $in to socket", E_USER_ERROR);
return;
}
$loop = $this->http_client->getLoop();
$this->read_watcher = $loop->io($this->socket,
Ev::READ, [$this, '_onReadable']);
// Continue running the loop
$loop->run();
}
/**
* Callback which is called when the socket becomes readable
*/
public function _onReadable(EvIo $w) {
// recv() 20 bytes in non-blocking mode
$ret = socket_recv($this->socket, $out, 20, MSG_DONTWAIT);
if ($ret) {
// Still have data to read. Append the read chunk to the buffer.
$this->buffer .= $out;
} elseif ($ret === 0) {
// All is read
printf("\n<<<<\n%s\n>>>>", rtrim($this->buffer));
fflush(STDOUT);
$w->stop();
$this->close();
return;
}
// Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK
if (in_array(socket_last_error(), static::$e_nonblocking)) {
return;
}
$w->stop();
$this->close();
}
}
/////////////////////////////////////
class MyHttpClient {
/// @var array Instances of MyHttpRequest
private $requests = [];
/// @var EvLoop
private $loop;
public function __construct() {
// Each HTTP client runs its own event loop
$this->loop = new EvLoop();
}
public function __destruct() {
$this->loop->stop();
}
/**
* @return EvLoop
*/
public function getLoop() {
return $this->loop;
}
/**
* Adds a pending request
*/
public function addRequest(MyHttpRequest $r) {
$this->requests []= $r;
}
/**
* Dispatches all pending requests
*/
public function run() {
$this->loop->run();
}
}
/////////////////////////////////////
// Usage
$client = new MyHttpClient();
foreach (range(1, 10) as $i) {
$client->addRequest(new MyHttpRequest($client, 'my-host.local', '/test.php?a=' . $i, 'GET'));
}
$client->run();
테스트
http://my-host.local/test.php
스크립트가 $_GET
의 덤프를 인쇄하고 있다고 가정합니다.
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
그러면 php http-client.php
명령의 출력은 다음과 유사합니다.
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '3',
)
0
>>>>
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '2',
)
0
>>>>
...
(손질 된)
PHP 5에서 소켓 확장은 EINPROGRESS
, EAGAIN
및 EWOULDBLOCK
errno
값에 대한 경고를 기록 할 수 있습니다. 로그를 해제 할 수 있습니다.
error_reporting(E_ERROR);