PHP
Asynkron programmering
Sök…
Generatorernas fördelar
PHP 5.5 introducerar Generatorer och nyckelordet som ger oss möjlighet att skriva asynkron kod som ser mer ut som synkron kod.
yield
är ansvarigt för att ge kontroll tillbaka ringningskoden och tillhandahålla en återupptagningspunkt på den platsen. Man kan skicka ett värde längs yield
. Returvärdet för detta uttryck är antingen null
eller värdet som skickades till Generator::send()
.
function reverse_range($i) {
// the mere presence of the yield keyword in this function makes this a Generator
do {
// $i is retained between resumptions
print yield $i;
} while (--$i > 0);
}
$gen = reverse_range(5);
print $gen->current();
$gen->send("injected!"); // send also resumes the Generator
foreach ($gen as $val) { // loops over the Generator, resuming it upon each iteration
echo $val;
}
// Output: 5injected!4321
Den här mekanismen kan användas av en koroutinimplementering för att vänta på de väntningar som genereras av generatorn (genom att registrera sig själv som en återuppringning för upplösning) och fortsätta exekveringen av generatorn så snart väntan är löst.
Använda Icicle-händels loop
Icicle använder Awaitables och Generators för att skapa Coroutines.
require __DIR__ . '/vendor/autoload.php';
use Icicle\Awaitable;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
$generator = function (float $time) {
try {
// Sets $start to the value returned by microtime() after approx. $time seconds.
$start = yield Awaitable\resolve(microtime(true))->delay($time);
echo "Sleep time: ", microtime(true) - $start, "\n";
// Throws the exception from the rejected awaitable into the coroutine.
return yield Awaitable\reject(new Exception('Rejected awaitable'));
} catch (Throwable $e) { // Catches awaitable rejection reason.
echo "Caught exception: ", $e->getMessage(), "\n";
}
return yield Awaitable\resolve('Coroutine completed');
};
// Coroutine sleeps for 1.2 seconds, then will resolve with a string.
$coroutine = new Coroutine($generator(1.2));
$coroutine->done(function (string $data) {
echo $data, "\n";
});
Loop\run();
Använda Amp-händelsslinga
Amp utnyttjar löften [ett annat namn för väntande] och generatorer för skapande av koroutin.
require __DIR__ . '/vendor/autoload.php';
use Amp\Dns;
// Try our system defined resolver or googles, whichever is fastest
function queryStackOverflow($recordtype) {
$requests = [
Dns\query("stackoverflow.com", $recordtype),
Dns\query("stackoverflow.com", $recordtype, ["server" => "8.8.8.8"]),
];
// returns a Promise resolving when the first one of the requests resolves
return yield Amp\first($request);
}
\Amp\run(function() { // main loop, implicitly a coroutine
try {
// convert to coroutine with Amp\resolve()
$promise = Amp\resolve(queryStackOverflow(Dns\Record::NS));
list($ns, $type, $ttl) = // we need only one NS result, not all
current(yield Amp\timeout($promise, 2000 /* milliseconds */));
echo "The result of the fastest server to reply to our query was $ns";
} catch (Amp\TimeoutException $e) {
echo "We've heard no answer for 2 seconds! Bye!";
} catch (Dns\NoRecordException $e) {
echo "No NS records there? Stupid DNS nameserver!";
}
});
Lektar icke-blockerande processer med proc_open ()
PHP har inget stöd för att köra kod samtidigt om du inte installerar tillägg som pthread
. Detta kan ibland kringgås genom att använda proc_open()
och stream_set_blocking()
och läsa deras utgång asynkront.
Om vi delar upp koden i mindre bitar kan vi köra den som flera suprosesser. Sedan med hjälp av stream_set_blocking()
-funktionen kan vi göra att varje delprocess också blockeras. Detta innebär att vi kan leka flera delprocesser och sedan kontrollera om deras utdata i en slinga (på liknande sätt som en jämn slinga) och vänta tills alla avslutas.
Som exempel kan vi ha en liten delprocess som bara kör en slinga och i varje iteration sover slumpmässigt i 100 - 1000 ms (notera, förseningen är alltid densamma för en delprocess).
<?php
// subprocess.php
$name = $argv[1];
$delay = rand(1, 10) * 100;
printf("$name delay: ${delay}ms\n");
for ($i = 0; $i < 5; $i++) {
usleep($delay * 1000);
printf("$name: $i\n");
}
Sedan kommer huvudprocessen att leka delprocesser och läsa deras utgång. Vi kan dela upp det i mindre block:
- Spawn delprocesser med proc_open () .
- Gör att varje delprocess inte blockerar med
stream_set_blocking()
. - Kör en slinga tills alla delprocesser slutar med
proc_get_status()
. - Stäng
fclose()
korrekt medfclose()
för varje delprocess medfclose()
och stäng processhandtag medproc_close()
.
<?php
// non-blocking-proc_open.php
// File descriptors for each subprocess.
$descriptors = [
0 => ['pipe', 'r'], // stdin
1 => ['pipe', 'w'], // stdout
];
$pipes = [];
$processes = [];
foreach (range(1, 3) as $i) {
// Spawn a subprocess.
$proc = proc_open('php subprocess.php proc' . $i, $descriptors, $procPipes);
$processes[$i] = $proc;
// Make the subprocess non-blocking (only output pipe).
stream_set_blocking($procPipes[1], 0);
$pipes[$i] = $procPipes;
}
// Run in a loop until all subprocesses finish.
while (array_filter($processes, function($proc) { return proc_get_status($proc)['running']; })) {
foreach (range(1, 3) as $i) {
usleep(10 * 1000); // 100ms
// Read all available output (unread output is buffered).
$str = fread($pipes[$i][1], 1024);
if ($str) {
printf($str);
}
}
}
// Close all pipes and processes.
foreach (range(1, 3) as $i) {
fclose($pipes[$i][1]);
proc_close($processes[$i]);
}
Utgången innehåller sedan blandning från alla tre delprocesser när de läses av fread () (notera att i detta fall slutade proc1
mycket tidigare än de andra två):
$ php non-blocking-proc_open.php
proc1 delay: 200ms
proc2 delay: 1000ms
proc3 delay: 800ms
proc1: 0
proc1: 1
proc1: 2
proc1: 3
proc3: 0
proc1: 4
proc2: 0
proc3: 1
proc2: 1
proc3: 2
proc2: 2
proc3: 3
proc2: 3
proc3: 4
proc2: 4
Läser serieport med Event och DIO
DIO- strömmar känner för närvarande inte igen av evenemangstillägget . Det finns inget rent sätt att få filbeskrivningen inkapslad i DIO-resursen. Men det finns en lösning:
- öppen ström för porten med
fopen()
; - gör att strömmen inte blockeras med
stream_set_blocking()
; - få numerisk filbeskrivning från strömmen med
EventUtil::getSocketFd()
; - vidarebefordra den numeriska filbeskrivningen till
dio_fdopen()
(för närvarande icke-dokumenterad) och få DIO-resursen; - lägg till en
Event
med en återuppringning för att lyssna på de lästa händelserna i filbeskrivningen; - dränera tillgängliga data i återuppringningen och bearbeta dem enligt din applikations logik.
dio.php
<?php
class Scanner {
protected $port; // port path, e.g. /dev/pts/5
protected $fd; // numeric file descriptor
protected $base; // EventBase
protected $dio; // dio resource
protected $e_open; // Event
protected $e_read; // Event
public function __construct ($port) {
$this->port = $port;
$this->base = new EventBase();
}
public function __destruct() {
$this->base->exit();
if ($this->e_open)
$this->e_open->free();
if ($this->e_read)
$this->e_read->free();
if ($this->dio)
dio_close($this->dio);
}
public function run() {
$stream = fopen($this->port, 'rb');
stream_set_blocking($stream, false);
$this->fd = EventUtil::getSocketFd($stream);
if ($this->fd < 0) {
fprintf(STDERR, "Failed attach to port, events: %d\n", $events);
return;
}
$this->e_open = new Event($this->base, $this->fd, Event::WRITE, [$this, '_onOpen']);
$this->e_open->add();
$this->base->dispatch();
fclose($stream);
}
public function _onOpen($fd, $events) {
$this->e_open->del();
$this->dio = dio_fdopen($this->fd);
// Call other dio functions here, e.g.
dio_tcsetattr($this->dio, [
'baud' => 9600,
'bits' => 8,
'stop' => 1,
'parity' => 0
]);
$this->e_read = new Event($this->base, $this->fd, Event::READ | Event::PERSIST,
[$this, '_onRead']);
$this->e_read->add();
}
public function _onRead($fd, $events) {
while ($data = dio_read($this->dio, 1)) {
var_dump($data);
}
}
}
// Change the port argument
$scanner = new Scanner('/dev/pts/5');
$scanner->run();
Testning
Kör följande kommando i terminal A:
$ socat -d -d pty,raw,echo=0 pty,raw,echo=0
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/5
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/8
2016/12/01 18:04:06 socat[16750] N starting data transfer loop with FDs [5,5] and [7,7]
Utgången kan vara annorlunda. Använd PTY: erna från de första par raderna ( /dev/pts/5
och /dev/pts/8
, i synnerhet).
I terminal B kör ovan nämnda skript. Du kanske behöver root-privilegier:
$ sudo php dio.php
I terminal C skicka en sträng till den första PTY:
$ echo test > /dev/pts/8
Produktion
string(1) "t"
string(1) "e"
string(1) "s"
string(1) "t"
string(1) "
"
HTTP-klient baserat på eventförlängning
Detta är ett exempel på HTTP-klientklass baserat på Event- förlängning.
Klassen tillåter att schemalägga ett antal HTTP-förfrågningar och sedan köra dem asynkront.
http-client.php
<?php
class MyHttpClient {
/// @var EventBase
protected $base;
/// @var array Instances of EventHttpConnection
protected $connections = [];
public function __construct() {
$this->base = new EventBase();
}
/**
* Dispatches all pending requests (events)
*
* @return void
*/
public function run() {
$this->base->dispatch();
}
public function __destruct() {
// Destroy connection objects explicitly, don't wait for GC.
// Otherwise, EventBase may be free'd earlier.
$this->connections = null;
}
/**
* @brief Adds a pending HTTP request
*
* @param string $address Hostname, or IP
* @param int $port Port number
* @param array $headers Extra HTTP headers
* @param int $cmd A EventHttpRequest::CMD_* constant
* @param string $resource HTTP request resource, e.g. '/page?a=b&c=d'
*
* @return EventHttpRequest|false
*/
public function addRequest($address, $port, array $headers,
$cmd = EventHttpRequest::CMD_GET, $resource = '/')
{
$conn = new EventHttpConnection($this->base, null, $address, $port);
$conn->setTimeout(5);
$req = new EventHttpRequest([$this, '_requestHandler'], $this->base);
foreach ($headers as $k => $v) {
$req->addHeader($k, $v, EventHttpRequest::OUTPUT_HEADER);
}
$req->addHeader('Host', $address, EventHttpRequest::OUTPUT_HEADER);
$req->addHeader('Connection', 'close', EventHttpRequest::OUTPUT_HEADER);
if ($conn->makeRequest($req, $cmd, $resource)) {
$this->connections []= $conn;
return $req;
}
return false;
}
/**
* @brief Handles an HTTP request
*
* @param EventHttpRequest $req
* @param mixed $unused
*
* @return void
*/
public function _requestHandler($req, $unused) {
if (is_null($req)) {
echo "Timed out\n";
} else {
$response_code = $req->getResponseCode();
if ($response_code == 0) {
echo "Connection refused\n";
} elseif ($response_code != 200) {
echo "Unexpected response: $response_code\n";
} else {
echo "Success: $response_code\n";
$buf = $req->getInputBuffer();
echo "Body:\n";
while ($s = $buf->readLine(EventBuffer::EOL_ANY)) {
echo $s, PHP_EOL;
}
}
}
}
}
$address = "my-host.local";
$port = 80;
$headers = [ 'User-Agent' => 'My-User-Agent/1.0', ];
$client = new MyHttpClient();
// Add pending requests
for ($i = 0; $i < 10; $i++) {
$client->addRequest($address, $port, $headers,
EventHttpRequest::CMD_GET, '/test.php?a=' . $i);
}
// Dispatch pending requests
$client->run();
test.php
Detta är ett exempelskript på serversidan.
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
echo 'User-Agent: ', $_SERVER['HTTP_USER_AGENT'] ?? '(none)', PHP_EOL;
Användande
php http-client.php
Provutgång
Success: 200
Body:
GET: array (
'a' => '1',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '0',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '3',
)
...
(Trimmad.)
Observera att koden är utformad för långvarig behandling i CLI SAPI .
HTTP-klient baserat på Ev-förlängning
Detta är ett exempel på HTTP-klient baserat på Ev- förlängning.
Ev-förlängning implementerar en enkel men kraftfull händelsslinga för allmänt bruk. Det tillhandahåller inte nätverksspecifika tittare, men dess I / O-bevakare kan användas för asynkron behandling av uttag .
Följande kod visar hur HTTP-förfrågningar kan schemaläggas för parallellbehandling.
http-client.php
<?php
class MyHttpRequest {
/// @var MyHttpClient
private $http_client;
/// @var string
private $address;
/// @var string HTTP resource such as /page?get=param
private $resource;
/// @var string HTTP method such as GET, POST etc.
private $method;
/// @var int
private $service_port;
/// @var resource Socket
private $socket;
/// @var double Connection timeout in seconds.
private $timeout = 10.;
/// @var int Chunk size in bytes for socket_recv()
private $chunk_size = 20;
/// @var EvTimer
private $timeout_watcher;
/// @var EvIo
private $write_watcher;
/// @var EvIo
private $read_watcher;
/// @var EvTimer
private $conn_watcher;
/// @var string buffer for incoming data
private $buffer;
/// @var array errors reported by sockets extension in non-blocking mode.
private static $e_nonblocking = [
11, // EAGAIN or EWOULDBLOCK
115, // EINPROGRESS
];
/**
* @param MyHttpClient $client
* @param string $host Hostname, e.g. google.co.uk
* @param string $resource HTTP resource, e.g. /page?a=b&c=d
* @param string $method HTTP method: GET, HEAD, POST, PUT etc.
* @throws RuntimeException
*/
public function __construct(MyHttpClient $client, $host, $resource, $method) {
$this->http_client = $client;
$this->host = $host;
$this->resource = $resource;
$this->method = $method;
// Get the port for the WWW service
$this->service_port = getservbyname('www', 'tcp');
// Get the IP address for the target host
$this->address = gethostbyname($this->host);
// Create a TCP/IP socket
$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!$this->socket) {
throw new RuntimeException("socket_create() failed: reason: " .
socket_strerror(socket_last_error()));
}
// Set O_NONBLOCK flag
socket_set_nonblock($this->socket);
$this->conn_watcher = $this->http_client->getLoop()
->timer(0, 0., [$this, 'connect']);
}
public function __destruct() {
$this->close();
}
private function freeWatcher(&$w) {
if ($w) {
$w->stop();
$w = null;
}
}
/**
* Deallocates all resources of the request
*/
private function close() {
if ($this->socket) {
socket_close($this->socket);
$this->socket = null;
}
$this->freeWatcher($this->timeout_watcher);
$this->freeWatcher($this->read_watcher);
$this->freeWatcher($this->write_watcher);
$this->freeWatcher($this->conn_watcher);
}
/**
* Initializes a connection on socket
* @return bool
*/
public function connect() {
$loop = $this->http_client->getLoop();
$this->timeout_watcher = $loop->timer($this->timeout, 0., [$this, '_onTimeout']);
$this->write_watcher = $loop->io($this->socket, Ev::WRITE, [$this, '_onWritable']);
return socket_connect($this->socket, $this->address, $this->service_port);
}
/**
* Callback for timeout (EvTimer) watcher
*/
public function _onTimeout(EvTimer $w) {
$w->stop();
$this->close();
}
/**
* Callback which is called when the socket becomes wriable
*/
public function _onWritable(EvIo $w) {
$this->timeout_watcher->stop();
$w->stop();
$in = implode("\r\n", [
"{$this->method} {$this->resource} HTTP/1.1",
"Host: {$this->host}",
'Connection: Close',
]) . "\r\n\r\n";
if (!socket_write($this->socket, $in, strlen($in))) {
trigger_error("Failed writing $in to socket", E_USER_ERROR);
return;
}
$loop = $this->http_client->getLoop();
$this->read_watcher = $loop->io($this->socket,
Ev::READ, [$this, '_onReadable']);
// Continue running the loop
$loop->run();
}
/**
* Callback which is called when the socket becomes readable
*/
public function _onReadable(EvIo $w) {
// recv() 20 bytes in non-blocking mode
$ret = socket_recv($this->socket, $out, 20, MSG_DONTWAIT);
if ($ret) {
// Still have data to read. Append the read chunk to the buffer.
$this->buffer .= $out;
} elseif ($ret === 0) {
// All is read
printf("\n<<<<\n%s\n>>>>", rtrim($this->buffer));
fflush(STDOUT);
$w->stop();
$this->close();
return;
}
// Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK
if (in_array(socket_last_error(), static::$e_nonblocking)) {
return;
}
$w->stop();
$this->close();
}
}
/////////////////////////////////////
class MyHttpClient {
/// @var array Instances of MyHttpRequest
private $requests = [];
/// @var EvLoop
private $loop;
public function __construct() {
// Each HTTP client runs its own event loop
$this->loop = new EvLoop();
}
public function __destruct() {
$this->loop->stop();
}
/**
* @return EvLoop
*/
public function getLoop() {
return $this->loop;
}
/**
* Adds a pending request
*/
public function addRequest(MyHttpRequest $r) {
$this->requests []= $r;
}
/**
* Dispatches all pending requests
*/
public function run() {
$this->loop->run();
}
}
/////////////////////////////////////
// Usage
$client = new MyHttpClient();
foreach (range(1, 10) as $i) {
$client->addRequest(new MyHttpRequest($client, 'my-host.local', '/test.php?a=' . $i, 'GET'));
}
$client->run();
Testning
Anta att http://my-host.local/test.php
skriver ut dumpningen på $_GET
:
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
Då kommer utdata från kommandot php http-client.php
att likna följande:
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '3',
)
0
>>>>
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '2',
)
0
>>>>
...
(trimmas)
Observera i PHP 5 Uttagen förlängningen kan logga varningar för EINPROGRESS
, EAGAIN
och EWOULDBLOCK
errno
värden. Det är möjligt att stänga av stockarna med
error_reporting(E_ERROR);