PHP
Asynchroon programmeren
Zoeken…
Voordelen van generatoren
PHP 5.5 introduceert Generators en het yield-trefwoord, waarmee we asynchrone code kunnen schrijven die meer op synchrone code lijkt.
De yield
is verantwoordelijk voor het teruggeven van de controle aan de roepende code en het verschaffen van een hervattingspunt op die plaats. Men kan een waarde verzenden langs de yield
. De retourwaarde van deze expressie is null
of de waarde die is doorgegeven aan Generator::send()
.
function reverse_range($i) {
// the mere presence of the yield keyword in this function makes this a Generator
do {
// $i is retained between resumptions
print yield $i;
} while (--$i > 0);
}
$gen = reverse_range(5);
print $gen->current();
$gen->send("injected!"); // send also resumes the Generator
foreach ($gen as $val) { // loops over the Generator, resuming it upon each iteration
echo $val;
}
// Output: 5injected!4321
Dit mechanisme kan door een coroutine-implementatie worden gebruikt om te wachten op Awaitables van de Generator (door zichzelf te registreren als een callback voor resolutie) en de uitvoering van de Generator voort te zetten zodra Awaitable is opgelost.
Icicle-gebeurtenislus gebruiken
Icicle gebruikt Awaitables en Generators om Coroutines te maken.
require __DIR__ . '/vendor/autoload.php';
use Icicle\Awaitable;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
$generator = function (float $time) {
try {
// Sets $start to the value returned by microtime() after approx. $time seconds.
$start = yield Awaitable\resolve(microtime(true))->delay($time);
echo "Sleep time: ", microtime(true) - $start, "\n";
// Throws the exception from the rejected awaitable into the coroutine.
return yield Awaitable\reject(new Exception('Rejected awaitable'));
} catch (Throwable $e) { // Catches awaitable rejection reason.
echo "Caught exception: ", $e->getMessage(), "\n";
}
return yield Awaitable\resolve('Coroutine completed');
};
// Coroutine sleeps for 1.2 seconds, then will resolve with a string.
$coroutine = new Coroutine($generator(1.2));
$coroutine->done(function (string $data) {
echo $data, "\n";
});
Loop\run();
Amp-gebeurtenislus gebruiken
Amp maakt gebruik van beloften [een andere naam voor Awaitables] en generatoren voor het maken van coroutines.
require __DIR__ . '/vendor/autoload.php';
use Amp\Dns;
// Try our system defined resolver or googles, whichever is fastest
function queryStackOverflow($recordtype) {
$requests = [
Dns\query("stackoverflow.com", $recordtype),
Dns\query("stackoverflow.com", $recordtype, ["server" => "8.8.8.8"]),
];
// returns a Promise resolving when the first one of the requests resolves
return yield Amp\first($request);
}
\Amp\run(function() { // main loop, implicitly a coroutine
try {
// convert to coroutine with Amp\resolve()
$promise = Amp\resolve(queryStackOverflow(Dns\Record::NS));
list($ns, $type, $ttl) = // we need only one NS result, not all
current(yield Amp\timeout($promise, 2000 /* milliseconds */));
echo "The result of the fastest server to reply to our query was $ns";
} catch (Amp\TimeoutException $e) {
echo "We've heard no answer for 2 seconds! Bye!";
} catch (Dns\NoRecordException $e) {
echo "No NS records there? Stupid DNS nameserver!";
}
});
Niet-blokkerende processen voortzetten met proc_open ()
PHP biedt geen ondersteuning voor het gelijktijdig uitvoeren van code, tenzij u extensies zoals pthread
installeert. Dit kan soms worden omzeild door proc_open()
en stream_set_blocking()
en hun uitvoer asynchroon te lezen.
Als we code in kleinere delen splitsen, kunnen we deze als meerdere suprocessen uitvoeren. Vervolgens kunnen we met de functie stream_set_blocking()
elk subproces ook niet-blokkeren. Dit betekent dat we meerdere subprocessen kunnen spawnen en vervolgens hun output in een lus kunnen controleren (vergelijkbaar met een even lus) en wachten tot ze allemaal zijn voltooid.
Als een voorbeeld kunnen we een klein subproces hebben dat gewoon een lus uitvoert en in elke iteratie willekeurig 100 - 1000ms slaapt (let op, de vertraging is altijd hetzelfde voor één subproces).
<?php
// subprocess.php
$name = $argv[1];
$delay = rand(1, 10) * 100;
printf("$name delay: ${delay}ms\n");
for ($i = 0; $i < 5; $i++) {
usleep($delay * 1000);
printf("$name: $i\n");
}
Vervolgens zal het hoofdproces subprocessen spawnen en hun uitvoer lezen. We kunnen het in kleinere blokken splitsen:
- Subprocessen uitzetten met proc_open () .
- Maak elk subproces niet-blokkerend met
stream_set_blocking()
. - Voer een lus uit totdat alle subprocessen klaar zijn met
proc_get_status()
. - Sluit bestandshandvatten goed met de uitvoerpijp voor elk subproces met behulp van
fclose()
en sluitproc_close()
metproc_close()
.
<?php
// non-blocking-proc_open.php
// File descriptors for each subprocess.
$descriptors = [
0 => ['pipe', 'r'], // stdin
1 => ['pipe', 'w'], // stdout
];
$pipes = [];
$processes = [];
foreach (range(1, 3) as $i) {
// Spawn a subprocess.
$proc = proc_open('php subprocess.php proc' . $i, $descriptors, $procPipes);
$processes[$i] = $proc;
// Make the subprocess non-blocking (only output pipe).
stream_set_blocking($procPipes[1], 0);
$pipes[$i] = $procPipes;
}
// Run in a loop until all subprocesses finish.
while (array_filter($processes, function($proc) { return proc_get_status($proc)['running']; })) {
foreach (range(1, 3) as $i) {
usleep(10 * 1000); // 100ms
// Read all available output (unread output is buffered).
$str = fread($pipes[$i][1], 1024);
if ($str) {
printf($str);
}
}
}
// Close all pipes and processes.
foreach (range(1, 3) as $i) {
fclose($pipes[$i][1]);
proc_close($processes[$i]);
}
De uitvoer bevat dan een mengsel van alle drie de subprocessen zoals ze worden gelezen door fread () (merk op dat proc1
in dit geval veel eerder eindigde dan de andere twee):
$ php non-blocking-proc_open.php
proc1 delay: 200ms
proc2 delay: 1000ms
proc3 delay: 800ms
proc1: 0
proc1: 1
proc1: 2
proc1: 3
proc3: 0
proc1: 4
proc2: 0
proc3: 1
proc2: 1
proc3: 2
proc2: 2
proc3: 3
proc2: 3
proc3: 4
proc2: 4
Seriële poort lezen met Event en DIO
DIO- streams worden momenteel niet herkend door de Event- extensie. Er is geen schone manier om de bestandsdescriptor te verkrijgen die is ingekapseld in de DIO-bron. Maar er is een oplossing:
- open stream voor de poort met
fopen()
; - zorg dat de stream niet blokkeert met
stream_set_blocking()
; - verkrijg numerieke bestandsdescriptor uit de stream met
EventUtil::getSocketFd()
; - geef de numerieke bestandsdescriptor door aan
dio_fdopen()
(momenteel niet gedocumenteerd) en verkrijg de DIO-bron; - voeg een
Event
met een callback voor het luisteren naar de gelezen gebeurtenissen op de bestandsdescriptor; - in de callback de beschikbare gegevens leegmaken en verwerken volgens de logica van uw toepassing.
dio.php
<?php
class Scanner {
protected $port; // port path, e.g. /dev/pts/5
protected $fd; // numeric file descriptor
protected $base; // EventBase
protected $dio; // dio resource
protected $e_open; // Event
protected $e_read; // Event
public function __construct ($port) {
$this->port = $port;
$this->base = new EventBase();
}
public function __destruct() {
$this->base->exit();
if ($this->e_open)
$this->e_open->free();
if ($this->e_read)
$this->e_read->free();
if ($this->dio)
dio_close($this->dio);
}
public function run() {
$stream = fopen($this->port, 'rb');
stream_set_blocking($stream, false);
$this->fd = EventUtil::getSocketFd($stream);
if ($this->fd < 0) {
fprintf(STDERR, "Failed attach to port, events: %d\n", $events);
return;
}
$this->e_open = new Event($this->base, $this->fd, Event::WRITE, [$this, '_onOpen']);
$this->e_open->add();
$this->base->dispatch();
fclose($stream);
}
public function _onOpen($fd, $events) {
$this->e_open->del();
$this->dio = dio_fdopen($this->fd);
// Call other dio functions here, e.g.
dio_tcsetattr($this->dio, [
'baud' => 9600,
'bits' => 8,
'stop' => 1,
'parity' => 0
]);
$this->e_read = new Event($this->base, $this->fd, Event::READ | Event::PERSIST,
[$this, '_onRead']);
$this->e_read->add();
}
public function _onRead($fd, $events) {
while ($data = dio_read($this->dio, 1)) {
var_dump($data);
}
}
}
// Change the port argument
$scanner = new Scanner('/dev/pts/5');
$scanner->run();
testen
Voer de volgende opdracht uit in terminal A:
$ socat -d -d pty,raw,echo=0 pty,raw,echo=0
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/5
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/8
2016/12/01 18:04:06 socat[16750] N starting data transfer loop with FDs [5,5] and [7,7]
De uitvoer kan verschillen. Gebruik de PTY's uit de eerste paar rijen (met name /dev/pts/5
en /dev/pts/8
).
Voer in terminal B het bovengenoemde script uit. Mogelijk hebt u rootrechten nodig:
$ sudo php dio.php
Stuur in terminal C een string naar de eerste PTY:
$ echo test > /dev/pts/8
uitgang
string(1) "t"
string(1) "e"
string(1) "s"
string(1) "t"
string(1) "
"
HTTP-client op basis van gebeurtenisuitbreiding
Dit is een voorbeeld van een HTTP-clientklasse op basis van Event- extensie.
Met de klasse kan een aantal HTTP-aanvragen worden gepland en vervolgens asynchroon worden uitgevoerd.
http-client.php
<?php
class MyHttpClient {
/// @var EventBase
protected $base;
/// @var array Instances of EventHttpConnection
protected $connections = [];
public function __construct() {
$this->base = new EventBase();
}
/**
* Dispatches all pending requests (events)
*
* @return void
*/
public function run() {
$this->base->dispatch();
}
public function __destruct() {
// Destroy connection objects explicitly, don't wait for GC.
// Otherwise, EventBase may be free'd earlier.
$this->connections = null;
}
/**
* @brief Adds a pending HTTP request
*
* @param string $address Hostname, or IP
* @param int $port Port number
* @param array $headers Extra HTTP headers
* @param int $cmd A EventHttpRequest::CMD_* constant
* @param string $resource HTTP request resource, e.g. '/page?a=b&c=d'
*
* @return EventHttpRequest|false
*/
public function addRequest($address, $port, array $headers,
$cmd = EventHttpRequest::CMD_GET, $resource = '/')
{
$conn = new EventHttpConnection($this->base, null, $address, $port);
$conn->setTimeout(5);
$req = new EventHttpRequest([$this, '_requestHandler'], $this->base);
foreach ($headers as $k => $v) {
$req->addHeader($k, $v, EventHttpRequest::OUTPUT_HEADER);
}
$req->addHeader('Host', $address, EventHttpRequest::OUTPUT_HEADER);
$req->addHeader('Connection', 'close', EventHttpRequest::OUTPUT_HEADER);
if ($conn->makeRequest($req, $cmd, $resource)) {
$this->connections []= $conn;
return $req;
}
return false;
}
/**
* @brief Handles an HTTP request
*
* @param EventHttpRequest $req
* @param mixed $unused
*
* @return void
*/
public function _requestHandler($req, $unused) {
if (is_null($req)) {
echo "Timed out\n";
} else {
$response_code = $req->getResponseCode();
if ($response_code == 0) {
echo "Connection refused\n";
} elseif ($response_code != 200) {
echo "Unexpected response: $response_code\n";
} else {
echo "Success: $response_code\n";
$buf = $req->getInputBuffer();
echo "Body:\n";
while ($s = $buf->readLine(EventBuffer::EOL_ANY)) {
echo $s, PHP_EOL;
}
}
}
}
}
$address = "my-host.local";
$port = 80;
$headers = [ 'User-Agent' => 'My-User-Agent/1.0', ];
$client = new MyHttpClient();
// Add pending requests
for ($i = 0; $i < 10; $i++) {
$client->addRequest($address, $port, $headers,
EventHttpRequest::CMD_GET, '/test.php?a=' . $i);
}
// Dispatch pending requests
$client->run();
test.php
Dit is een voorbeeldscript aan de serverzijde.
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
echo 'User-Agent: ', $_SERVER['HTTP_USER_AGENT'] ?? '(none)', PHP_EOL;
Gebruik
php http-client.php
Voorbeelduitvoer
Success: 200
Body:
GET: array (
'a' => '1',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '0',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
'a' => '3',
)
...
(Getrimd).
Let op, de code is ontworpen voor langdurige verwerking in de CLI SAPI .
HTTP-client op basis van Ev-extensie
Dit is een voorbeeld van een HTTP-client op basis van de Ev- extensie.
Ev extensie implementeert een eenvoudige maar krachtige event-loop voor algemene doeleinden. Het biedt geen netwerkspecifieke watchers, maar de I / O-watcher kan worden gebruikt voor asynchrone verwerking van sockets .
De volgende code laat zien hoe HTTP-aanvragen kunnen worden gepland voor parallelle verwerking.
http-client.php
<?php
class MyHttpRequest {
/// @var MyHttpClient
private $http_client;
/// @var string
private $address;
/// @var string HTTP resource such as /page?get=param
private $resource;
/// @var string HTTP method such as GET, POST etc.
private $method;
/// @var int
private $service_port;
/// @var resource Socket
private $socket;
/// @var double Connection timeout in seconds.
private $timeout = 10.;
/// @var int Chunk size in bytes for socket_recv()
private $chunk_size = 20;
/// @var EvTimer
private $timeout_watcher;
/// @var EvIo
private $write_watcher;
/// @var EvIo
private $read_watcher;
/// @var EvTimer
private $conn_watcher;
/// @var string buffer for incoming data
private $buffer;
/// @var array errors reported by sockets extension in non-blocking mode.
private static $e_nonblocking = [
11, // EAGAIN or EWOULDBLOCK
115, // EINPROGRESS
];
/**
* @param MyHttpClient $client
* @param string $host Hostname, e.g. google.co.uk
* @param string $resource HTTP resource, e.g. /page?a=b&c=d
* @param string $method HTTP method: GET, HEAD, POST, PUT etc.
* @throws RuntimeException
*/
public function __construct(MyHttpClient $client, $host, $resource, $method) {
$this->http_client = $client;
$this->host = $host;
$this->resource = $resource;
$this->method = $method;
// Get the port for the WWW service
$this->service_port = getservbyname('www', 'tcp');
// Get the IP address for the target host
$this->address = gethostbyname($this->host);
// Create a TCP/IP socket
$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!$this->socket) {
throw new RuntimeException("socket_create() failed: reason: " .
socket_strerror(socket_last_error()));
}
// Set O_NONBLOCK flag
socket_set_nonblock($this->socket);
$this->conn_watcher = $this->http_client->getLoop()
->timer(0, 0., [$this, 'connect']);
}
public function __destruct() {
$this->close();
}
private function freeWatcher(&$w) {
if ($w) {
$w->stop();
$w = null;
}
}
/**
* Deallocates all resources of the request
*/
private function close() {
if ($this->socket) {
socket_close($this->socket);
$this->socket = null;
}
$this->freeWatcher($this->timeout_watcher);
$this->freeWatcher($this->read_watcher);
$this->freeWatcher($this->write_watcher);
$this->freeWatcher($this->conn_watcher);
}
/**
* Initializes a connection on socket
* @return bool
*/
public function connect() {
$loop = $this->http_client->getLoop();
$this->timeout_watcher = $loop->timer($this->timeout, 0., [$this, '_onTimeout']);
$this->write_watcher = $loop->io($this->socket, Ev::WRITE, [$this, '_onWritable']);
return socket_connect($this->socket, $this->address, $this->service_port);
}
/**
* Callback for timeout (EvTimer) watcher
*/
public function _onTimeout(EvTimer $w) {
$w->stop();
$this->close();
}
/**
* Callback which is called when the socket becomes wriable
*/
public function _onWritable(EvIo $w) {
$this->timeout_watcher->stop();
$w->stop();
$in = implode("\r\n", [
"{$this->method} {$this->resource} HTTP/1.1",
"Host: {$this->host}",
'Connection: Close',
]) . "\r\n\r\n";
if (!socket_write($this->socket, $in, strlen($in))) {
trigger_error("Failed writing $in to socket", E_USER_ERROR);
return;
}
$loop = $this->http_client->getLoop();
$this->read_watcher = $loop->io($this->socket,
Ev::READ, [$this, '_onReadable']);
// Continue running the loop
$loop->run();
}
/**
* Callback which is called when the socket becomes readable
*/
public function _onReadable(EvIo $w) {
// recv() 20 bytes in non-blocking mode
$ret = socket_recv($this->socket, $out, 20, MSG_DONTWAIT);
if ($ret) {
// Still have data to read. Append the read chunk to the buffer.
$this->buffer .= $out;
} elseif ($ret === 0) {
// All is read
printf("\n<<<<\n%s\n>>>>", rtrim($this->buffer));
fflush(STDOUT);
$w->stop();
$this->close();
return;
}
// Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK
if (in_array(socket_last_error(), static::$e_nonblocking)) {
return;
}
$w->stop();
$this->close();
}
}
/////////////////////////////////////
class MyHttpClient {
/// @var array Instances of MyHttpRequest
private $requests = [];
/// @var EvLoop
private $loop;
public function __construct() {
// Each HTTP client runs its own event loop
$this->loop = new EvLoop();
}
public function __destruct() {
$this->loop->stop();
}
/**
* @return EvLoop
*/
public function getLoop() {
return $this->loop;
}
/**
* Adds a pending request
*/
public function addRequest(MyHttpRequest $r) {
$this->requests []= $r;
}
/**
* Dispatches all pending requests
*/
public function run() {
$this->loop->run();
}
}
/////////////////////////////////////
// Usage
$client = new MyHttpClient();
foreach (range(1, 10) as $i) {
$client->addRequest(new MyHttpRequest($client, 'my-host.local', '/test.php?a=' . $i, 'GET'));
}
$client->run();
testen
Stel dat het http://my-host.local/test.php
script de dump van $_GET
afdrukt:
<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
De uitvoer van de opdracht php http-client.php
is dan ongeveer als volgt:
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '3',
)
0
>>>>
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo
1d
GET: array (
'a' => '2',
)
0
>>>>
...
(bijgesneden)
Let op, in PHP 5 kan de sockets extension waarschuwingen registreren voor EINPROGRESS
, EAGAIN
en EWOULDBLOCK
errno
waarden. Het is mogelijk om de logboeken uit te schakelen met
error_reporting(E_ERROR);