Recherche…


Avantages des générateurs

PHP 5.5 introduit Generators et le mot-clé yield, ce qui nous permet d’écrire un code asynchrone qui ressemble plus à du code synchrone.

L'expression de yield est chargée de redonner le contrôle au code d'appel et de fournir un point de reprise à cet endroit. On peut envoyer une valeur le long de l'instruction de yield . La valeur de retour de cette expression est null ou la valeur qui a été transmise à Generator::send() .

function reverse_range($i) {
    // the mere presence of the yield keyword in this function makes this a Generator
    do {
        // $i is retained between resumptions
        print yield $i;
    } while (--$i > 0);
}

$gen = reverse_range(5);
print $gen->current();
$gen->send("injected!"); // send also resumes the Generator

foreach ($gen as $val) { // loops over the Generator, resuming it upon each iteration
    echo $val;
}

// Output: 5injected!4321

Ce mécanisme peut être utilisé par une implémentation de coroutine pour attendre les Awaitables générés par le générateur (en s'enregistrant en tant que rappel pour la résolution) et poursuivre l'exécution du générateur dès que le fichier en attente est résolu.

Utilisation de la boucle d'événement Icicle

Icicle utilise Awaitables et Generators pour créer des Coroutines.

require __DIR__ . '/vendor/autoload.php';

use Icicle\Awaitable;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;

$generator = function (float $time) {
    try {
        // Sets $start to the value returned by microtime() after approx. $time seconds.
        $start = yield Awaitable\resolve(microtime(true))->delay($time);

        echo "Sleep time: ", microtime(true) - $start, "\n";

        // Throws the exception from the rejected awaitable into the coroutine.
        return yield Awaitable\reject(new Exception('Rejected awaitable'));
    } catch (Throwable $e) { // Catches awaitable rejection reason.
        echo "Caught exception: ", $e->getMessage(), "\n";
    }

    return yield Awaitable\resolve('Coroutine completed');
};

// Coroutine sleeps for 1.2 seconds, then will resolve with a string.
$coroutine = new Coroutine($generator(1.2));
$coroutine->done(function (string $data) {
    echo $data, "\n";
});

Loop\run();

Utilisation de la boucle d'événement Amp

Amp exploite Promises [un autre nom pour Awaitables] et Generators pour la création de coroutines.

require __DIR__ . '/vendor/autoload.php';

use Amp\Dns;

// Try our system defined resolver or googles, whichever is fastest
function queryStackOverflow($recordtype) {
    $requests = [
        Dns\query("stackoverflow.com", $recordtype),
        Dns\query("stackoverflow.com", $recordtype, ["server" => "8.8.8.8"]),
    ];
    // returns a Promise resolving when the first one of the requests resolves
    return yield Amp\first($request);
}

\Amp\run(function() { // main loop, implicitly a coroutine
    try {
        // convert to coroutine with Amp\resolve()
        $promise = Amp\resolve(queryStackOverflow(Dns\Record::NS));
        list($ns, $type, $ttl) = // we need only one NS result, not all
            current(yield Amp\timeout($promise, 2000 /* milliseconds */));
        echo "The result of the fastest server to reply to our query was $ns";
    } catch (Amp\TimeoutException $e) {
        echo "We've heard no answer for 2 seconds! Bye!";
    } catch (Dns\NoRecordException $e) {
        echo "No NS records there? Stupid DNS nameserver!";
    }
});

Création de processus non bloquants avec proc_open ()

PHP ne prend pas en charge l'exécution du code simultanément, sauf si vous installez des extensions telles que pthread . Cela peut parfois être contourné en utilisant proc_open() et stream_set_blocking() et en lisant leur sortie de manière asynchrone.

Si nous divisons le code en morceaux plus petits, nous pouvons l'exécuter sous la forme de plusieurs suprocesses. Ensuite, en utilisant la fonction stream_set_blocking() , nous pouvons également rendre chaque sous-processus non bloquant. Cela signifie que nous pouvons générer plusieurs sous-processus, puis vérifier leur sortie dans une boucle (similaire à une boucle paire) et attendre qu'ils soient tous terminés.

Par exemple, nous pouvons avoir un petit sous-processus qui exécute une boucle et qui, dans chaque itération, dort aléatoirement pendant 100 à 1000 ms (notez que le délai est toujours le même pour un sous-processus).

<?php
// subprocess.php
$name = $argv[1];
$delay = rand(1, 10) * 100;
printf("$name delay: ${delay}ms\n");

for ($i = 0; $i < 5; $i++) {
    usleep($delay * 1000);
    printf("$name: $i\n");
}

Ensuite, le processus principal va générer des sous-processus et lire leur sortie. Nous pouvons le diviser en blocs plus petits:

<?php
// non-blocking-proc_open.php
// File descriptors for each subprocess.
$descriptors = [
    0 => ['pipe', 'r'], // stdin
    1 => ['pipe', 'w'], // stdout
];

$pipes = [];
$processes = [];
foreach (range(1, 3) as $i) {
    // Spawn a subprocess.
    $proc = proc_open('php subprocess.php proc' . $i, $descriptors, $procPipes);
    $processes[$i] = $proc;
    // Make the subprocess non-blocking (only output pipe).
    stream_set_blocking($procPipes[1], 0);
    $pipes[$i] = $procPipes;
}

// Run in a loop until all subprocesses finish.
while (array_filter($processes, function($proc) { return proc_get_status($proc)['running']; })) {
    foreach (range(1, 3) as $i) {
        usleep(10 * 1000); // 100ms
        // Read all available output (unread output is buffered).
        $str = fread($pipes[$i][1], 1024);
        if ($str) {
            printf($str);
        }
    }
}

// Close all pipes and processes.
foreach (range(1, 3) as $i) {
    fclose($pipes[$i][1]);
    proc_close($processes[$i]);
}

La sortie contient alors un mélange des trois sous-processus tels qu'ils sont lus par fread () (notez que dans ce cas, proc1 s'est terminé beaucoup plus tôt que les deux autres):

$ php non-blocking-proc_open.php 
proc1 delay: 200ms
proc2 delay: 1000ms
proc3 delay: 800ms
proc1: 0
proc1: 1
proc1: 2
proc1: 3
proc3: 0
proc1: 4
proc2: 0
proc3: 1
proc2: 1
proc3: 2
proc2: 2
proc3: 3
proc2: 3
proc3: 4
proc2: 4

Lecture du port série avec Event et DIO

Les flux DIO ne sont actuellement pas reconnus par l'extension d' événement . Il n'y a pas de moyen propre d'obtenir le descripteur de fichier encapsulé dans la ressource DIO. Mais il existe une solution de contournement:

  • ouvrir le flux pour le port avec fopen() ;
  • rendre le flux non bloquant avec stream_set_blocking() ;
  • obtenir le descripteur de fichier numérique du flux avec EventUtil::getSocketFd() ;
  • transmettez le descripteur de fichier numérique à dio_fdopen() (actuellement non documenté) et récupérez la ressource DIO;
  • ajouter un Event avec un rappel pour écouter les événements de lecture sur le descripteur de fichier;
  • dans le rappel, videz les données disponibles et traitez-les selon la logique de votre application.

dio.php

<?php
class Scanner {
  protected $port; // port path, e.g. /dev/pts/5
  protected $fd; // numeric file descriptor
  protected $base; // EventBase
  protected $dio; // dio resource
  protected $e_open; // Event
  protected $e_read; // Event

  public function __construct ($port) {
    $this->port = $port;
    $this->base = new EventBase();
  }

  public function __destruct() {
    $this->base->exit();

    if ($this->e_open)
      $this->e_open->free();
    if ($this->e_read)
      $this->e_read->free();
    if ($this->dio)
      dio_close($this->dio);
  }

  public function run() {
    $stream = fopen($this->port, 'rb');
    stream_set_blocking($stream, false);

    $this->fd = EventUtil::getSocketFd($stream);
    if ($this->fd < 0) {
      fprintf(STDERR, "Failed attach to port, events: %d\n", $events);
      return;
    }

    $this->e_open = new Event($this->base, $this->fd, Event::WRITE, [$this, '_onOpen']);
    $this->e_open->add();
    $this->base->dispatch();

    fclose($stream);
  }

  public function _onOpen($fd, $events) {
    $this->e_open->del();

    $this->dio = dio_fdopen($this->fd);
    // Call other dio functions here, e.g.
    dio_tcsetattr($this->dio, [
      'baud' => 9600,
      'bits' => 8,
      'stop'  => 1,
      'parity' => 0
    ]);

    $this->e_read = new Event($this->base, $this->fd, Event::READ | Event::PERSIST,
      [$this, '_onRead']);
    $this->e_read->add();
  }

  public function _onRead($fd, $events) {
    while ($data = dio_read($this->dio, 1)) {
      var_dump($data);
    }
  }
}

// Change the port argument
$scanner = new Scanner('/dev/pts/5');
$scanner->run();

Essai

Exécutez la commande suivante dans le terminal A:

$ socat -d -d pty,raw,echo=0 pty,raw,echo=0
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/5
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/8
2016/12/01 18:04:06 socat[16750] N starting data transfer loop with FDs [5,5] and [7,7]

La sortie peut être différente. Utilisez les PTY des deux premières lignes ( /dev/pts/5 et /dev/pts/8 en particulier).

Dans le terminal B, exécutez le script susmentionné. Vous pouvez avoir besoin des privilèges root:

$ sudo php dio.php

Au terminal C, envoyez une chaîne au premier PTY:

$ echo test > /dev/pts/8

Sortie

string(1) "t"
string(1) "e"
string(1) "s"
string(1) "t"
string(1) "
"

Client HTTP basé sur l'extension d'événement

Ceci est un exemple de classe de client HTTP basée sur l'extension d' événement .

La classe permet de planifier un certain nombre de requêtes HTTP, puis de les exécuter de manière asynchrone.

http-client.php

<?php
class MyHttpClient {
  /// @var EventBase
  protected $base;
  /// @var array Instances of EventHttpConnection
  protected $connections = [];

  public function __construct() {
    $this->base = new EventBase();
  }

  /**
   * Dispatches all pending requests (events)
   *
   * @return void
   */
  public function run() {
    $this->base->dispatch();
  }

  public function __destruct() {
    // Destroy connection objects explicitly, don't wait for GC.
    // Otherwise, EventBase may be free'd earlier.
    $this->connections = null;
  }

  /**
   * @brief Adds a pending HTTP request
   *
   * @param string $address Hostname, or IP
   * @param int $port Port number
   * @param array $headers Extra HTTP headers
   * @param int $cmd A EventHttpRequest::CMD_* constant
   * @param string $resource HTTP request resource, e.g. '/page?a=b&c=d'
   *
   * @return EventHttpRequest|false
   */
  public function addRequest($address, $port, array $headers,
    $cmd = EventHttpRequest::CMD_GET, $resource = '/')
  {
    $conn = new EventHttpConnection($this->base, null, $address, $port);
    $conn->setTimeout(5);

    $req = new EventHttpRequest([$this, '_requestHandler'], $this->base);

    foreach ($headers as $k => $v) {
      $req->addHeader($k, $v, EventHttpRequest::OUTPUT_HEADER);
    }
    $req->addHeader('Host', $address, EventHttpRequest::OUTPUT_HEADER);
    $req->addHeader('Connection', 'close', EventHttpRequest::OUTPUT_HEADER);
    if ($conn->makeRequest($req, $cmd, $resource)) {
      $this->connections []= $conn;
      return $req;
    }

    return false;
  }


  /**
   * @brief Handles an HTTP request
   *
   * @param EventHttpRequest $req
   * @param mixed $unused
   *
   * @return void
   */
  public function _requestHandler($req, $unused) {
    if (is_null($req)) {
      echo "Timed out\n";
    } else {
      $response_code = $req->getResponseCode();

      if ($response_code == 0) {
        echo "Connection refused\n";
      } elseif ($response_code != 200) {
        echo "Unexpected response: $response_code\n";
      } else {
        echo "Success: $response_code\n";
        $buf = $req->getInputBuffer();
        echo "Body:\n";
        while ($s = $buf->readLine(EventBuffer::EOL_ANY)) {
          echo $s, PHP_EOL;
        }
      }
    }
  }
}


$address = "my-host.local";
$port = 80;
$headers = [ 'User-Agent' => 'My-User-Agent/1.0', ];

$client = new MyHttpClient();

// Add pending requests
for ($i = 0; $i < 10; $i++) {
  $client->addRequest($address, $port, $headers,
    EventHttpRequest::CMD_GET, '/test.php?a=' . $i);
}

// Dispatch pending requests
$client->run();

test.php

Ceci est un exemple de script côté serveur.

<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
echo 'User-Agent: ', $_SERVER['HTTP_USER_AGENT'] ?? '(none)', PHP_EOL;

Usage

php http-client.php

Échantillon sortie

Success: 200
Body:
GET: array (
  'a' => '1',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
  'a' => '0',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
  'a' => '3',
)
...

(Coupé.)

Notez que le code est conçu pour un traitement à long terme dans le CLI SAPI .

Client HTTP basé sur l'extension Ev

Ceci est un exemple de client HTTP basé sur l'extension Ev .

L'extension Ev implémente une boucle d'événement simple mais puissante. Il ne fournit pas d'observateurs spécifiques au réseau, mais son observateur d'E / S peut être utilisé pour le traitement asynchrone des sockets .

Le code suivant montre comment les requêtes HTTP peuvent être planifiées pour un traitement parallèle.

http-client.php

<?php
class MyHttpRequest {
  /// @var MyHttpClient
  private $http_client;
  /// @var string
  private $address;
  /// @var string HTTP resource such as /page?get=param
  private $resource;
  /// @var string HTTP method such as GET, POST etc.
  private $method;
  /// @var int
  private $service_port;
  /// @var resource Socket
  private $socket;
  /// @var double Connection timeout in seconds.
  private $timeout = 10.;
  /// @var int Chunk size in bytes for socket_recv()
  private $chunk_size = 20;
  /// @var EvTimer
  private $timeout_watcher;
  /// @var EvIo
  private $write_watcher;
  /// @var EvIo
  private $read_watcher;
  /// @var EvTimer
  private $conn_watcher;
  /// @var string buffer for incoming data
  private $buffer;
  /// @var array errors reported by sockets extension in non-blocking mode.
  private static $e_nonblocking = [
    11, // EAGAIN or EWOULDBLOCK
    115, // EINPROGRESS
  ];

  /**
   * @param MyHttpClient $client
   * @param string $host Hostname, e.g. google.co.uk
   * @param string $resource HTTP resource, e.g. /page?a=b&c=d
   * @param string $method HTTP method: GET, HEAD, POST, PUT etc.
   * @throws RuntimeException
   */
  public function __construct(MyHttpClient $client, $host, $resource, $method) {
    $this->http_client = $client;
    $this->host        = $host;
    $this->resource    = $resource;
    $this->method      = $method;

    // Get the port for the WWW service
    $this->service_port = getservbyname('www', 'tcp');

    // Get the IP address for the target host
    $this->address = gethostbyname($this->host);

    // Create a TCP/IP socket
    $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
    if (!$this->socket) {
      throw new RuntimeException("socket_create() failed: reason: " .
        socket_strerror(socket_last_error()));
    }

    // Set O_NONBLOCK flag
    socket_set_nonblock($this->socket);

    $this->conn_watcher = $this->http_client->getLoop()
      ->timer(0, 0., [$this, 'connect']);
  }

  public function __destruct() {
    $this->close();
  }

  private function freeWatcher(&$w) {
    if ($w) {
      $w->stop();
      $w = null;
    }
  }

  /**
   * Deallocates all resources of the request
   */
  private function close() {
    if ($this->socket) {
      socket_close($this->socket);
      $this->socket = null;
    }

    $this->freeWatcher($this->timeout_watcher);
    $this->freeWatcher($this->read_watcher);
    $this->freeWatcher($this->write_watcher);
    $this->freeWatcher($this->conn_watcher);
  }

  /**
   * Initializes a connection on socket
   * @return bool
   */
  public function connect() {
    $loop = $this->http_client->getLoop();

    $this->timeout_watcher = $loop->timer($this->timeout, 0., [$this, '_onTimeout']);
    $this->write_watcher = $loop->io($this->socket, Ev::WRITE, [$this, '_onWritable']);

    return socket_connect($this->socket, $this->address, $this->service_port);
  }

  /**
   * Callback for timeout (EvTimer) watcher
   */
  public function _onTimeout(EvTimer $w) {
    $w->stop();
    $this->close();
  }

  /**
   * Callback which is called when the socket becomes wriable
   */
  public function _onWritable(EvIo $w) {
    $this->timeout_watcher->stop();
    $w->stop();

    $in = implode("\r\n", [
      "{$this->method} {$this->resource} HTTP/1.1",
      "Host: {$this->host}",
      'Connection: Close',
    ]) . "\r\n\r\n";

    if (!socket_write($this->socket, $in, strlen($in))) {
      trigger_error("Failed writing $in to socket", E_USER_ERROR);
      return;
    }

    $loop = $this->http_client->getLoop();
    $this->read_watcher = $loop->io($this->socket,
      Ev::READ, [$this, '_onReadable']);

    // Continue running the loop
    $loop->run();
  }

  /**
   * Callback which is called when the socket becomes readable
   */
  public function _onReadable(EvIo $w) {
    // recv() 20 bytes in non-blocking mode
    $ret = socket_recv($this->socket, $out, 20, MSG_DONTWAIT);

    if ($ret) {
      // Still have data to read. Append the read chunk to the buffer.
      $this->buffer .= $out;
    } elseif ($ret === 0) {
      // All is read
      printf("\n<<<<\n%s\n>>>>", rtrim($this->buffer));
      fflush(STDOUT);
      $w->stop();
      $this->close();
      return;
    }

    // Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK
    if (in_array(socket_last_error(), static::$e_nonblocking)) {
      return;
    }

    $w->stop();
    $this->close();
  }
}

/////////////////////////////////////
class MyHttpClient {
  /// @var array Instances of MyHttpRequest
  private $requests = [];
  /// @var EvLoop
  private $loop;

  public function __construct() {
    // Each HTTP client runs its own event loop
    $this->loop = new EvLoop();
  }

  public function __destruct() {
    $this->loop->stop();
  }

  /**
   * @return EvLoop
   */
  public function getLoop() {
    return $this->loop;
  }

  /**
   * Adds a pending request
   */
  public function addRequest(MyHttpRequest $r) {
    $this->requests []= $r;
  }

  /**
   * Dispatches all pending requests
   */
  public function run() {
    $this->loop->run();
  }
}


/////////////////////////////////////
// Usage
$client = new MyHttpClient();
foreach (range(1, 10) as $i) {
  $client->addRequest(new MyHttpRequest($client, 'my-host.local', '/test.php?a=' . $i, 'GET'));
}
$client->run();

Essai

Supposons que le script http://my-host.local/test.php imprime le vidage de $_GET :

<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;

La sortie de la commande php http-client.php sera similaire à la suivante:

<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo

1d
GET: array (
  'a' => '3',
)

0
>>>>
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo

1d
GET: array (
  'a' => '2',
)

0
>>>>
...

(coupé)

Remarque, en PHP 5 l'extension sockets peut connecter des avertissements pour EINPROGRESS , EAGAIN et EWOULDBLOCK errno valeurs. Il est possible de désactiver les journaux avec

error_reporting(E_ERROR);


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow