Suche…


Vorteile von Generatoren

PHP 5.5 führt Generatoren und das Flows-Schlüsselwort ein, mit dem wir asynchronen Code schreiben können, der eher wie synchroner Code aussieht.

Der yield ist dafür verantwortlich, dem aufrufenden Code wieder die Kontrolle zu geben und an dieser Stelle einen Wiederaufnahmepunkt bereitzustellen. Sie können einen Wert entlang der yield senden. Der Rückgabewert dieses Ausdrucks ist entweder null oder der Wert, der an Generator::send() .

function reverse_range($i) {
    // the mere presence of the yield keyword in this function makes this a Generator
    do {
        // $i is retained between resumptions
        print yield $i;
    } while (--$i > 0);
}

$gen = reverse_range(5);
print $gen->current();
$gen->send("injected!"); // send also resumes the Generator

foreach ($gen as $val) { // loops over the Generator, resuming it upon each iteration
    echo $val;
}

// Output: 5injected!4321

Dieser Mechanismus kann von einer Coroutine-Implementierung verwendet werden, um auf vom Generator generierte Awaitables zu warten (indem er sich selbst als Rückruf für die Auflösung registriert) und die Ausführung des Generators fortzusetzen, sobald das Awaitable aufgelöst ist.

Icicle-Ereignisschleife verwenden

Icicle verwendet Awaitables und Generatoren, um Coroutines zu erstellen.

require __DIR__ . '/vendor/autoload.php';

use Icicle\Awaitable;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;

$generator = function (float $time) {
    try {
        // Sets $start to the value returned by microtime() after approx. $time seconds.
        $start = yield Awaitable\resolve(microtime(true))->delay($time);

        echo "Sleep time: ", microtime(true) - $start, "\n";

        // Throws the exception from the rejected awaitable into the coroutine.
        return yield Awaitable\reject(new Exception('Rejected awaitable'));
    } catch (Throwable $e) { // Catches awaitable rejection reason.
        echo "Caught exception: ", $e->getMessage(), "\n";
    }

    return yield Awaitable\resolve('Coroutine completed');
};

// Coroutine sleeps for 1.2 seconds, then will resolve with a string.
$coroutine = new Coroutine($generator(1.2));
$coroutine->done(function (string $data) {
    echo $data, "\n";
});

Loop\run();

Verwenden der Amp-Ereignisschleife

Amp- Kabelbäume Versprechen [ein anderer Name für Awaitables] und Generatoren für Coroutine-Erstellung

require __DIR__ . '/vendor/autoload.php';

use Amp\Dns;

// Try our system defined resolver or googles, whichever is fastest
function queryStackOverflow($recordtype) {
    $requests = [
        Dns\query("stackoverflow.com", $recordtype),
        Dns\query("stackoverflow.com", $recordtype, ["server" => "8.8.8.8"]),
    ];
    // returns a Promise resolving when the first one of the requests resolves
    return yield Amp\first($request);
}

\Amp\run(function() { // main loop, implicitly a coroutine
    try {
        // convert to coroutine with Amp\resolve()
        $promise = Amp\resolve(queryStackOverflow(Dns\Record::NS));
        list($ns, $type, $ttl) = // we need only one NS result, not all
            current(yield Amp\timeout($promise, 2000 /* milliseconds */));
        echo "The result of the fastest server to reply to our query was $ns";
    } catch (Amp\TimeoutException $e) {
        echo "We've heard no answer for 2 seconds! Bye!";
    } catch (Dns\NoRecordException $e) {
        echo "No NS records there? Stupid DNS nameserver!";
    }
});

Nicht blockierende Prozesse mit proc_open () starten

PHP unterstützt keine gleichzeitige Ausführung von Code, es sei denn, Sie installieren Erweiterungen wie pthread . Dies kann manchmal umgangen werden, indem proc_open() und stream_set_blocking() und deren Ausgabe asynchron gelesen wird.

Wenn wir Code in kleinere Abschnitte aufteilen, können wir ihn als mehrere Übergänge ausführen. Mit der Funktion stream_set_blocking() können wir jeden Teilprozess auch blockieren. Das heißt, wir können mehrere Unterprozesse erzeugen und dann in einer Schleife (ähnlich einer geraden Schleife) auf ihre Ausgabe prüfen und warten, bis alle beendet sind.

Als Beispiel können wir einen kleinen Subprozess haben, der nur eine Schleife ausführt und in jeder Iteration zufällig für 100 - 1000 ms schläft (beachten Sie, dass die Verzögerung für einen Subprozess immer gleich ist).

<?php
// subprocess.php
$name = $argv[1];
$delay = rand(1, 10) * 100;
printf("$name delay: ${delay}ms\n");

for ($i = 0; $i < 5; $i++) {
    usleep($delay * 1000);
    printf("$name: $i\n");
}

Dann wird der Hauptprozess Unterprozesse erzeugen und deren Ausgabe lesen. Wir können es in kleinere Blöcke aufteilen:

<?php
// non-blocking-proc_open.php
// File descriptors for each subprocess.
$descriptors = [
    0 => ['pipe', 'r'], // stdin
    1 => ['pipe', 'w'], // stdout
];

$pipes = [];
$processes = [];
foreach (range(1, 3) as $i) {
    // Spawn a subprocess.
    $proc = proc_open('php subprocess.php proc' . $i, $descriptors, $procPipes);
    $processes[$i] = $proc;
    // Make the subprocess non-blocking (only output pipe).
    stream_set_blocking($procPipes[1], 0);
    $pipes[$i] = $procPipes;
}

// Run in a loop until all subprocesses finish.
while (array_filter($processes, function($proc) { return proc_get_status($proc)['running']; })) {
    foreach (range(1, 3) as $i) {
        usleep(10 * 1000); // 100ms
        // Read all available output (unread output is buffered).
        $str = fread($pipes[$i][1], 1024);
        if ($str) {
            printf($str);
        }
    }
}

// Close all pipes and processes.
foreach (range(1, 3) as $i) {
    fclose($pipes[$i][1]);
    proc_close($processes[$i]);
}

Die Ausgabe enthält dann eine Mischung aus allen drei Unterprozessen, wie sie von fread () gelesen werden (beachten Sie, dass proc1 in diesem Fall viel früher beendet wurde als die beiden anderen):

$ php non-blocking-proc_open.php 
proc1 delay: 200ms
proc2 delay: 1000ms
proc3 delay: 800ms
proc1: 0
proc1: 1
proc1: 2
proc1: 3
proc3: 0
proc1: 4
proc2: 0
proc3: 1
proc2: 1
proc3: 2
proc2: 2
proc3: 3
proc2: 3
proc3: 4
proc2: 4

Serielle Schnittstelle mit Event und DIO lesen

DIO- Streams werden derzeit von der Ereigniserweiterung nicht erkannt. Es gibt keine saubere Möglichkeit, den in der DIO-Ressource eingeschlossenen Dateideskriptor zu erhalten. Es gibt jedoch eine Problemumgehung:

  • offener Stream für den Port mit fopen() ;
  • machen Sie den Stream mit stream_set_blocking() nicht blockierend;
  • numerische Dateideskriptoren aus dem Stream mit EventUtil::getSocketFd() ;
  • dio_fdopen() den numerischen Dateideskriptor an dio_fdopen() (derzeit undokumentiert) und dio_fdopen() Sie die DIO-Ressource ab.
  • Hinzufügen eines Event mit einem Rückruf zum Abhören der Leseereignisse im Dateideskriptor;
  • Im Callback-Drain werden die verfügbaren Daten entladen und entsprechend der Logik Ihrer Anwendung verarbeitet.

dio.php

<?php
class Scanner {
  protected $port; // port path, e.g. /dev/pts/5
  protected $fd; // numeric file descriptor
  protected $base; // EventBase
  protected $dio; // dio resource
  protected $e_open; // Event
  protected $e_read; // Event

  public function __construct ($port) {
    $this->port = $port;
    $this->base = new EventBase();
  }

  public function __destruct() {
    $this->base->exit();

    if ($this->e_open)
      $this->e_open->free();
    if ($this->e_read)
      $this->e_read->free();
    if ($this->dio)
      dio_close($this->dio);
  }

  public function run() {
    $stream = fopen($this->port, 'rb');
    stream_set_blocking($stream, false);

    $this->fd = EventUtil::getSocketFd($stream);
    if ($this->fd < 0) {
      fprintf(STDERR, "Failed attach to port, events: %d\n", $events);
      return;
    }

    $this->e_open = new Event($this->base, $this->fd, Event::WRITE, [$this, '_onOpen']);
    $this->e_open->add();
    $this->base->dispatch();

    fclose($stream);
  }

  public function _onOpen($fd, $events) {
    $this->e_open->del();

    $this->dio = dio_fdopen($this->fd);
    // Call other dio functions here, e.g.
    dio_tcsetattr($this->dio, [
      'baud' => 9600,
      'bits' => 8,
      'stop'  => 1,
      'parity' => 0
    ]);

    $this->e_read = new Event($this->base, $this->fd, Event::READ | Event::PERSIST,
      [$this, '_onRead']);
    $this->e_read->add();
  }

  public function _onRead($fd, $events) {
    while ($data = dio_read($this->dio, 1)) {
      var_dump($data);
    }
  }
}

// Change the port argument
$scanner = new Scanner('/dev/pts/5');
$scanner->run();

Testen

Führen Sie den folgenden Befehl in Terminal A aus:

$ socat -d -d pty,raw,echo=0 pty,raw,echo=0
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/5
2016/12/01 18:04:06 socat[16750] N PTY is /dev/pts/8
2016/12/01 18:04:06 socat[16750] N starting data transfer loop with FDs [5,5] and [7,7]

Die Ausgabe kann unterschiedlich sein. Verwenden Sie die PTYs aus den ersten Zeilen (insbesondere /dev/pts/5 und /dev/pts/8 ).

Führen Sie in Terminal B das oben genannte Skript aus. Möglicherweise benötigen Sie Root-Berechtigungen:

$ sudo php dio.php

Senden Sie in Terminal C eine Zeichenfolge an den ersten PTY:

$ echo test > /dev/pts/8

Ausgabe

string(1) "t"
string(1) "e"
string(1) "s"
string(1) "t"
string(1) "
"

HTTP-Client basierend auf Ereigniserweiterung

Dies ist ein Beispiel für eine HTTP-Clientklasse, die auf der Ereigniserweiterung basiert.

Die Klasse ermöglicht es, eine Reihe von HTTP-Anforderungen zu planen und dann asynchron auszuführen.

http-client.php

<?php
class MyHttpClient {
  /// @var EventBase
  protected $base;
  /// @var array Instances of EventHttpConnection
  protected $connections = [];

  public function __construct() {
    $this->base = new EventBase();
  }

  /**
   * Dispatches all pending requests (events)
   *
   * @return void
   */
  public function run() {
    $this->base->dispatch();
  }

  public function __destruct() {
    // Destroy connection objects explicitly, don't wait for GC.
    // Otherwise, EventBase may be free'd earlier.
    $this->connections = null;
  }

  /**
   * @brief Adds a pending HTTP request
   *
   * @param string $address Hostname, or IP
   * @param int $port Port number
   * @param array $headers Extra HTTP headers
   * @param int $cmd A EventHttpRequest::CMD_* constant
   * @param string $resource HTTP request resource, e.g. '/page?a=b&c=d'
   *
   * @return EventHttpRequest|false
   */
  public function addRequest($address, $port, array $headers,
    $cmd = EventHttpRequest::CMD_GET, $resource = '/')
  {
    $conn = new EventHttpConnection($this->base, null, $address, $port);
    $conn->setTimeout(5);

    $req = new EventHttpRequest([$this, '_requestHandler'], $this->base);

    foreach ($headers as $k => $v) {
      $req->addHeader($k, $v, EventHttpRequest::OUTPUT_HEADER);
    }
    $req->addHeader('Host', $address, EventHttpRequest::OUTPUT_HEADER);
    $req->addHeader('Connection', 'close', EventHttpRequest::OUTPUT_HEADER);
    if ($conn->makeRequest($req, $cmd, $resource)) {
      $this->connections []= $conn;
      return $req;
    }

    return false;
  }


  /**
   * @brief Handles an HTTP request
   *
   * @param EventHttpRequest $req
   * @param mixed $unused
   *
   * @return void
   */
  public function _requestHandler($req, $unused) {
    if (is_null($req)) {
      echo "Timed out\n";
    } else {
      $response_code = $req->getResponseCode();

      if ($response_code == 0) {
        echo "Connection refused\n";
      } elseif ($response_code != 200) {
        echo "Unexpected response: $response_code\n";
      } else {
        echo "Success: $response_code\n";
        $buf = $req->getInputBuffer();
        echo "Body:\n";
        while ($s = $buf->readLine(EventBuffer::EOL_ANY)) {
          echo $s, PHP_EOL;
        }
      }
    }
  }
}


$address = "my-host.local";
$port = 80;
$headers = [ 'User-Agent' => 'My-User-Agent/1.0', ];

$client = new MyHttpClient();

// Add pending requests
for ($i = 0; $i < 10; $i++) {
  $client->addRequest($address, $port, $headers,
    EventHttpRequest::CMD_GET, '/test.php?a=' . $i);
}

// Dispatch pending requests
$client->run();

test.php

Dies ist ein Beispielskript auf der Serverseite.

<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;
echo 'User-Agent: ', $_SERVER['HTTP_USER_AGENT'] ?? '(none)', PHP_EOL;

Verwendungszweck

php http-client.php

Beispielausgabe

Success: 200
Body:
GET: array (
  'a' => '1',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
  'a' => '0',
)
User-Agent: My-User-Agent/1.0
Success: 200
Body:
GET: array (
  'a' => '3',
)
...

(Getrimmt.)

Beachten Sie, dass der Code für die Langzeitverarbeitung im CLI SAPI ausgelegt ist .

HTTP-Client basierend auf Ev Extension

Dies ist ein Beispiel für einen HTTP-Client, der auf der Erweiterung Ev basiert.

Die Erweiterung "ev" implementiert eine einfache, aber leistungsfähige Ereignisschleife für allgemeine Zwecke. Es bietet keine netzwerkspezifischen Watcher, aber der E / A-Watcher kann für die asynchrone Verarbeitung von Sockets verwendet werden .

Der folgende Code zeigt, wie HTTP-Anforderungen für die parallele Verarbeitung geplant werden können.

http-client.php

<?php
class MyHttpRequest {
  /// @var MyHttpClient
  private $http_client;
  /// @var string
  private $address;
  /// @var string HTTP resource such as /page?get=param
  private $resource;
  /// @var string HTTP method such as GET, POST etc.
  private $method;
  /// @var int
  private $service_port;
  /// @var resource Socket
  private $socket;
  /// @var double Connection timeout in seconds.
  private $timeout = 10.;
  /// @var int Chunk size in bytes for socket_recv()
  private $chunk_size = 20;
  /// @var EvTimer
  private $timeout_watcher;
  /// @var EvIo
  private $write_watcher;
  /// @var EvIo
  private $read_watcher;
  /// @var EvTimer
  private $conn_watcher;
  /// @var string buffer for incoming data
  private $buffer;
  /// @var array errors reported by sockets extension in non-blocking mode.
  private static $e_nonblocking = [
    11, // EAGAIN or EWOULDBLOCK
    115, // EINPROGRESS
  ];

  /**
   * @param MyHttpClient $client
   * @param string $host Hostname, e.g. google.co.uk
   * @param string $resource HTTP resource, e.g. /page?a=b&c=d
   * @param string $method HTTP method: GET, HEAD, POST, PUT etc.
   * @throws RuntimeException
   */
  public function __construct(MyHttpClient $client, $host, $resource, $method) {
    $this->http_client = $client;
    $this->host        = $host;
    $this->resource    = $resource;
    $this->method      = $method;

    // Get the port for the WWW service
    $this->service_port = getservbyname('www', 'tcp');

    // Get the IP address for the target host
    $this->address = gethostbyname($this->host);

    // Create a TCP/IP socket
    $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
    if (!$this->socket) {
      throw new RuntimeException("socket_create() failed: reason: " .
        socket_strerror(socket_last_error()));
    }

    // Set O_NONBLOCK flag
    socket_set_nonblock($this->socket);

    $this->conn_watcher = $this->http_client->getLoop()
      ->timer(0, 0., [$this, 'connect']);
  }

  public function __destruct() {
    $this->close();
  }

  private function freeWatcher(&$w) {
    if ($w) {
      $w->stop();
      $w = null;
    }
  }

  /**
   * Deallocates all resources of the request
   */
  private function close() {
    if ($this->socket) {
      socket_close($this->socket);
      $this->socket = null;
    }

    $this->freeWatcher($this->timeout_watcher);
    $this->freeWatcher($this->read_watcher);
    $this->freeWatcher($this->write_watcher);
    $this->freeWatcher($this->conn_watcher);
  }

  /**
   * Initializes a connection on socket
   * @return bool
   */
  public function connect() {
    $loop = $this->http_client->getLoop();

    $this->timeout_watcher = $loop->timer($this->timeout, 0., [$this, '_onTimeout']);
    $this->write_watcher = $loop->io($this->socket, Ev::WRITE, [$this, '_onWritable']);

    return socket_connect($this->socket, $this->address, $this->service_port);
  }

  /**
   * Callback for timeout (EvTimer) watcher
   */
  public function _onTimeout(EvTimer $w) {
    $w->stop();
    $this->close();
  }

  /**
   * Callback which is called when the socket becomes wriable
   */
  public function _onWritable(EvIo $w) {
    $this->timeout_watcher->stop();
    $w->stop();

    $in = implode("\r\n", [
      "{$this->method} {$this->resource} HTTP/1.1",
      "Host: {$this->host}",
      'Connection: Close',
    ]) . "\r\n\r\n";

    if (!socket_write($this->socket, $in, strlen($in))) {
      trigger_error("Failed writing $in to socket", E_USER_ERROR);
      return;
    }

    $loop = $this->http_client->getLoop();
    $this->read_watcher = $loop->io($this->socket,
      Ev::READ, [$this, '_onReadable']);

    // Continue running the loop
    $loop->run();
  }

  /**
   * Callback which is called when the socket becomes readable
   */
  public function _onReadable(EvIo $w) {
    // recv() 20 bytes in non-blocking mode
    $ret = socket_recv($this->socket, $out, 20, MSG_DONTWAIT);

    if ($ret) {
      // Still have data to read. Append the read chunk to the buffer.
      $this->buffer .= $out;
    } elseif ($ret === 0) {
      // All is read
      printf("\n<<<<\n%s\n>>>>", rtrim($this->buffer));
      fflush(STDOUT);
      $w->stop();
      $this->close();
      return;
    }

    // Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK
    if (in_array(socket_last_error(), static::$e_nonblocking)) {
      return;
    }

    $w->stop();
    $this->close();
  }
}

/////////////////////////////////////
class MyHttpClient {
  /// @var array Instances of MyHttpRequest
  private $requests = [];
  /// @var EvLoop
  private $loop;

  public function __construct() {
    // Each HTTP client runs its own event loop
    $this->loop = new EvLoop();
  }

  public function __destruct() {
    $this->loop->stop();
  }

  /**
   * @return EvLoop
   */
  public function getLoop() {
    return $this->loop;
  }

  /**
   * Adds a pending request
   */
  public function addRequest(MyHttpRequest $r) {
    $this->requests []= $r;
  }

  /**
   * Dispatches all pending requests
   */
  public function run() {
    $this->loop->run();
  }
}


/////////////////////////////////////
// Usage
$client = new MyHttpClient();
foreach (range(1, 10) as $i) {
  $client->addRequest(new MyHttpRequest($client, 'my-host.local', '/test.php?a=' . $i, 'GET'));
}
$client->run();

Testen

Angenommen, das http://my-host.local/test.php den $_GET von $_GET :

<?php
echo 'GET: ', var_export($_GET, true), PHP_EOL;

Die Ausgabe des php http-client.php Befehls php http-client.php dann wie folgt aus:

<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo

1d
GET: array (
  'a' => '3',
)

0
>>>>
<<<<
HTTP/1.1 200 OK
Server: nginx/1.10.1
Date: Fri, 02 Dec 2016 12:39:54 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/7.0.13-pl0-gentoo

1d
GET: array (
  'a' => '2',
)

0
>>>>
...

(getrimmt)

Beachten Sie, dass die Sockets- Erweiterung in PHP 5 möglicherweise Warnungen für die EINPROGRESS , EAGAIN und EWOULDBLOCK errno . Es ist möglich, die Protokolle mit zu deaktivieren

error_reporting(E_ERROR);


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow